diff --git a/lib/classes/module/console/taskrun.class.php b/lib/classes/module/console/taskrun.class.php
index 6e22d6fd24..fe7da52197 100644
--- a/lib/classes/module/console/taskrun.class.php
+++ b/lib/classes/module/console/taskrun.class.php
@@ -25,11 +25,16 @@ use Symfony\Component\Console\Command\Command;
class module_console_taskrun extends Command
{
+ private $task;
+ private $shedulerPID;
public function __construct($name = null)
{
parent::__construct($name);
+ $this->task = NULL;
+ $this->shedulerPID = NULL;
+
$this->addArgument('task_id', InputArgument::REQUIRED, 'The task_id to run');
$this->addOption(
'runner'
@@ -54,21 +59,56 @@ class module_console_taskrun extends Command
$task_id = (int) $input->getArgument('task_id');
- if ($task_id <= 0 || strlen($task_id) !== strlen($input->getArgument('task_id')))
+ if($task_id <= 0 || strlen($task_id) !== strlen($input->getArgument('task_id')))
throw new \RuntimeException('Argument must be an Id.');
$appbox = appbox::get_instance();
$task_manager = new task_manager($appbox);
- $task = $task_manager->get_task($task_id);
+ $this->task = $task_manager->get_task($task_id);
- $runner = task_abstract::RUNNER_SCHEDULER;
- if ($input->getOption('runner') === task_abstract::RUNNER_MANUAL)
+ if($input->getOption('runner') === task_abstract::RUNNER_MANUAL)
+ {
$runner = task_abstract::RUNNER_MANUAL;
+ }
+ else
+ {
+ $runner = task_abstract::RUNNER_SCHEDULER;
+ $registry = $appbox->get_registry();
+ $schedFile = $registry->get('GV_RootPath') . 'tmp/locks/scheduler.lock';
+ if(file_exists($schedFile))
+ $this->shedulerPID = (int) (trim(file_get_contents($schedFile)));
+ }
- $task->run($runner);
+ register_tick_function(array($this, 'tick_handler'), true);
+ declare(ticks=1);
+ $this->task->run($runner);
+ printf("TASK QUIT\n");
return $this;
}
+ public function tick_handler()
+ {
+ static $start = FALSE;
+ if($start === FALSE)
+ $start = time();
+ if(time() - $start > 0)
+ {
+ if($this->shedulerPID)
+ {
+ if(!posix_kill($this->shedulerPID, 0))
+ {
+ if(method_exists($this->task, 'signal'))
+ $this->task->signal('SIGNAL_SCHEDULER_DIED');
+ else
+ $this->task->set_status(task_abstract::STATUS_TOSTOP);
+ }
+ }
+
+ $start = time();
+ }
+ }
+
}
+
\ No newline at end of file
diff --git a/lib/classes/task/Scheduler.class.php b/lib/classes/task/Scheduler.class.php
index 95808b07e7..deae331966 100644
--- a/lib/classes/task/Scheduler.class.php
+++ b/lib/classes/task/Scheduler.class.php
@@ -21,25 +21,31 @@ class task_Scheduler
{
const TASKDELAYTOQUIT = 60;
+ // how to schedule tasks (choose in 'run' method)
+ const METHOD_FORK = 'METHOD_FORK';
+ const METHOD_PROC_OPEN = 'METHOD_PROC_OPEN';
+
+ private $method;
protected $output;
protected function log($message)
{
- if ($this->output instanceof OutputInterface)
- {
- $this->output->writeln($message);
- }
-
$registry = registry::get_instance();
$logdir = $registry->get('GV_RootPath') . 'logs/';
logs::rotate($logdir . "scheduler.log");
$date_obj = new DateTime();
- $message = sprintf("%s %s \n", $date_obj->format(DATE_ATOM), $message);
-
- file_put_contents($logdir . "scheduler.log", $message, FILE_APPEND);
+ $message = sprintf("%s\t%s", $date_obj->format(DATE_ATOM), $message);
+ if($this->output instanceof OutputInterface)
+ {
+ $this->output->writeln($message);
+ }
+ // else
+ {
+ file_put_contents($logdir . "scheduler.log", $message."\n", FILE_APPEND);
+ }
return $this;
}
@@ -52,6 +58,8 @@ class task_Scheduler
public function run(OutputInterface $output = null, $log_tasks = true)
{
+ $this->method = self::METHOD_FORK;
+
require_once dirname(__FILE__) . '/../../bootstrap.php';
$this->output = $output;
$appbox = appbox::get_instance();
@@ -61,67 +69,74 @@ class task_Scheduler
$lockdir = $registry->get('GV_RootPath') . 'tmp/locks/';
- for ($try = 0; true; $try++)
+ for($try = 0; true; $try++)
{
- $schedlock = fopen(($lockfile = ($lockdir . 'scheduler.lock')), 'a+');
- if (flock($schedlock, LOCK_EX | LOCK_NB) != true)
+ if(($schedlock = fopen(($lockfile = ($lockdir . 'scheduler.lock')), 'a+')))
{
- $this->log(sprintf("failed to lock '%s' (try=%s/4)", $lockfile, $try));
- if ($try == 4)
+ if(flock($schedlock, LOCK_EX | LOCK_NB) === FALSE)
{
- $this->log("scheduler already running.");
- fclose($schedlock);
+ $this->log(sprintf("failed to lock '%s' (try=%s/4)", $lockfile, $try));
+ if($try == 4)
+ {
+ $this->log("scheduler already running.");
+ fclose($schedlock);
- return;
+ return;
+ }
+ else
+ {
+ sleep(2);
+ }
}
else
{
- sleep(2);
+ // locked
+ ftruncate($schedlock, 0);
+ fwrite($schedlock, '' . getmypid());
+ fflush($schedlock);
+ break;
}
}
- else
- {
- ftruncate($schedlock, 0);
- fwrite($schedlock, '' . getmypid());
- fflush($schedlock);
- break;
- }
}
+
+ $this->log(sprintf("running scheduler with method %s", $this->method));
+
+
+ if($this->method == self::METHOD_FORK)
+ pcntl_signal(SIGCHLD, SIG_IGN);
$logdir = $registry->get('GV_RootPath') . 'logs/';
$conn = self::get_connection();
- $ttask = array();
+ $taskPoll = array(); // the poll of tasks
$sleeptime = 3;
- $sql = "UPDATE sitepreff SET schedstatus='started', schedpid = :pid";
- $stmt = $conn->prepare($sql);
- $stmt->execute(array(':pid' => getmypid()));
- $stmt->closeCursor();
-
+ $sql = "UPDATE sitepreff SET schedstatus='started'";
+ $conn->exec($sql);
$task_manager = new task_manager($appbox);
- $tlist = array();
- foreach ($task_manager->get_tasks() as $task)
+ // set every 'auto-start' task to start
+ foreach($task_manager->get_tasks() as $task)
{
- if (!$task->is_active())
+ if($task->is_active())
{
- continue;
- }
+ $tid = $task->get_task_id();
- $tid = $task->get_task_id();
-
- if (!$task->is_running())
- {
- /* @var $task task_abstract */
- $task->reset_crash_counter();
- $task->set_status(task_abstract::STATUS_TOSTART);
+ if(!$task->get_pid())
+ {
+ /* @var $task task_abstract */
+ $task->reset_crash_counter();
+ $task->set_status(task_abstract::STATUS_TOSTART);
+ }
}
}
+ $tlist = array();
+
+
$schedstatus = 'started';
$runningtask = 0;
@@ -129,12 +144,12 @@ class task_Scheduler
$last_log_check = array();
- while ($schedstatus == 'started' || $runningtask > 0)
+ while($schedstatus == 'started' || $runningtask > 0)
{
- while (!$conn->ping())
+ while(!$conn->ping())
{
unset($conn);
- if (!$connwaslost)
+ if(!$connwaslost)
{
$this->log(sprintf("Warning : abox connection lost, restarting in 10 min."));
}
@@ -142,7 +157,7 @@ class task_Scheduler
$conn = self::get_connection();
$connwaslost = true;
}
- if ($connwaslost)
+ if($connwaslost)
{
$this->log("abox connection restored");
@@ -161,12 +176,12 @@ class task_Scheduler
$row = $stmt->fetch(PDO::FETCH_ASSOC);
$stmt->closeCursor();
- if ($row)
+ if($row)
{
$schedstatus = $row["schedstatus"];
}
- if ($schedstatus == 'tostop')
+ if($schedstatus == 'tostop')
{
$sql = 'UPDATE sitepreff SET schedstatus = "stopping"';
$stmt = $conn->prepare($sql);
@@ -186,87 +201,86 @@ class task_Scheduler
logs::rotate($logdir . "scheduler.error.log");
- /**
- * potentially, all tasks are supposed to be removed
- */
- foreach ($ttask as $tkey => $tv)
- {
- $ttask[$tkey]["todel"] = true;
- }
+ // initialy, all tasks are supposed to be removed from the poll
+ foreach($taskPoll as $tkey => $task)
+ $taskPoll[$tkey]["todel"] = true;
- $sql = "SELECT * FROM task2";
- $stmt = $conn->prepare($sql);
- $stmt->execute();
- $rs = $stmt->fetchAll(PDO::FETCH_ASSOC);
- $stmt->closeCursor();
-
- foreach ($task_manager->get_tasks(true) as $task)
+ foreach($task_manager->get_tasks(true) as $task)
{
$tkey = "t_" . $task->get_task_id();
logs::rotate($logdir . "task_$tkey.log");
logs::rotate($logdir . "task_$tkey.error.log");
- if (!isset($ttask[$tkey]))
+ if(!isset($taskPoll[$tkey]))
{
+ // the task is not in the poll, add it
$phpcli = $registry->get('GV_cli');
- switch ($system)
+ switch($system)
{
default:
case "DARWIN":
case "WINDOWS":
case "LINUX":
- $cmd = $phpcli . ' -f '
- . $registry->get('GV_RootPath')
- . "bin/console task:run "
- . $task->get_task_id()
- . " --runner=scheduler ";
+ $cmd = $phpcli;
+ $args = array('-f', $registry->get('GV_RootPath') . 'bin/console', 'task:run', $task->get_task_id(), '--runner=scheduler');
break;
}
- $ttask[$tkey] = array(
+ $taskPoll[$tkey] = array(
"task" => $task,
"current_status" => $task->get_status(),
- "process" => null,
"cmd" => $cmd,
- "killat" => null,
- "pipes" => null
+ "args" => $args,
+ "killat" => null
);
+ if($this->method == self::METHOD_PROC_OPEN)
+ {
+ $taskPoll[$tkey]['process'] = NULL;
+ $taskPoll[$tkey]['pipes'] = NULL;
+ }
+
$this->log(
sprintf(
"new Task %s, status=%s"
- , $ttask[$tkey]["task"]->get_task_id()
+ , $taskPoll[$tkey]["task"]->get_task_id()
, $task->get_status()
)
);
}
else
{
- if ($ttask[$tkey]["current_status"] != $task->get_status())
+ // the task is already in the poll, update its status
+ if($taskPoll[$tkey]["current_status"] != $task->get_status())
{
$this->log(
sprintf(
"Task %s, oldstatus=%s, newstatus=%s"
- , $ttask[$tkey]["task"]->get_task_id()
- , $ttask[$tkey]["current_status"]
+ , $taskPoll[$tkey]["task"]->get_task_id()
+ , $taskPoll[$tkey]["current_status"]
, $task->get_status()
)
);
- $ttask[$tkey]["current_status"] = $task->get_status();
+ $taskPoll[$tkey]["current_status"] = $task->get_status();
}
-
- $ttask[$tkey]["task"] = $task;
+ // update the whole task object
+ unset($taskPoll[$tkey]["task"]);
+ $taskPoll[$tkey]["task"] = $task;
}
- $ttask[$tkey]["todel"] = false;
+
+ unset($task);
+
+ $taskPoll[$tkey]["todel"] = false; // this task exists, do not remove from poll
}
- foreach ($ttask as $tkey => $tv)
+ // remove not-existing task from poll
+ foreach($taskPoll as $tkey => $task)
{
- if ($tv["todel"])
+ if($task["todel"])
{
- $this->log(sprintf("Task %s deleted", $ttask[$tkey]["task"]->get_task_id()));
- unset($ttask[$tkey]);
+ $this->log(sprintf("Task %s deleted", $taskPoll[$tkey]["task"]->get_task_id()));
+ unset($taskPoll[$tkey]);
}
}
@@ -275,257 +289,588 @@ class task_Scheduler
* Launch task that are not yet launched
*/
$runningtask = 0;
-
- $common_status = array(
- task_abstract::STATUS_STARTED
- , task_abstract::RETURNSTATUS_STOPPED
- );
-
- foreach ($ttask as $tkey => $tv)
+
+ foreach($taskPoll as $tkey => $tv)
{
- if (!in_array($ttask[$tkey]["task"]->get_status(), $common_status))
- {
- $this->log(
- sprintf(
- 'task %s has status %s'
- , $ttask[$tkey]["task"]->get_task_id()
- , $ttask[$tkey]["task"]->get_status()
- )
- );
- }
- switch ($ttask[$tkey]["task"]->get_status())
+ switch($tv['task']->get_status())
{
default:
- $this->log(
- sprintf(
- 'Unknow status `%s`'
- , $ttask[$tkey]["task"]->get_status()
- )
- );
+ $this->log(sprintf('Unknow status `%s`', $tv['task']->get_status()));
break;
+
case task_abstract::RETURNSTATUS_TORESTART:
- @fclose($ttask[$tkey]["pipes"][1]);
- @fclose($ttask[$tkey]["pipes"][2]);
- @proc_close($ttask[$tkey]["process"]);
-
- $ttask[$tkey]["process"] = null;
- if ($schedstatus == 'started')
+ if(!$taskPoll[$tkey]['task']->get_pid())
{
- $ttask[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
+ if($this->method == self::METHOD_PROC_OPEN)
+ {
+ @fclose($taskPoll[$tkey]["pipes"][1]);
+ @fclose($taskPoll[$tkey]["pipes"][2]);
+ @proc_close($taskPoll[$tkey]["process"]);
+
+ $taskPoll[$tkey]["process"] = null;
+ }
+ if($schedstatus == 'started')
+ {
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
+ }
+ // trick to start the task immediatly : DON'T break if ending with 'tostart'
+ // so it will continue with 'tostart' case !
}
- break;
- case task_abstract::STATUS_TOSTART:
- $ttask[$tkey]["killat"] = NULL;
- if ($schedstatus == 'started' && !$ttask[$tkey]["process"])
+ else
{
- $descriptors = array(
- 1 => array("pipe", "w")
- , 2 => array("pipe", "w")
- );
+ break;
+ }
- if ($log_tasks === true)
+ case task_abstract::STATUS_TOSTART:
+ // if scheduler is 'tostop', don't launch a new task !
+ if($schedstatus != 'started')
+ break;
+
+ $taskPoll[$tkey]["killat"] = NULL;
+
+ if($this->method == self::METHOD_PROC_OPEN)
+ {
+ if(!$taskPoll[$tkey]["process"])
{
- $descriptors[1] = array(
- "file"
- , $logdir . "task_$tkey.log"
- , "a+"
- );
- $descriptors[2] = array(
- "file"
- , $logdir . "task_$tkey.error.log"
- , "a+"
- );
- }
-
- $ttask[$tkey]["process"] = proc_open(
- $ttask[$tkey]["cmd"]
- , $descriptors
- , $ttask[$tkey]["pipes"]
- , $registry->get('GV_RootPath') . "bin/"
- , null
- , array('bypass_shell' => true)
- );
-
- if (is_resource($ttask[$tkey]["process"]))
- {
-// proc_status['pid'] if the pid of the sh !!!
-// $proc_status = proc_get_status($ttask[$tkey]["process"]);
-// if ($proc_status['running'])
-// $ttask[$tkey]['task']->set_pid($proc_status['pid']);
- }
-
- if ($ttask[$tkey]['task']->get_pid() !== null)
- {
- $this->log(
- sprintf(
- "Task %s '%s' started (pid=%s)"
- , $ttask[$tkey]['task']->get_task_id()
- , $ttask[$tkey]["cmd"]
- , $ttask[$tkey]['task']->get_pid()
- )
- );
- $runningtask++;
- }
- else
- {
- $ttask[$tkey]["task"]->increment_crash_counter();
-
- @fclose($ttask[$tkey]["pipes"][1]);
- @fclose($ttask[$tkey]["pipes"][2]);
- @proc_close($ttask[$tkey]["process"]);
- $ttask[$tkey]["process"] = null;
-
- $this->log(
- sprintf(
- "Task %s '%s' failed to start %d times"
- , $ttask[$tkey]["task"]->get_task_id()
- , $ttask[$tkey]["cmd"]
- , $ttask[$tkey]["task"]->get_crash_counter()
- )
+ $descriptors = array(
+ 1 => array("pipe", "w"),
+ 2 => array("pipe", "w")
);
- if ($ttask[$tkey]["task"]->get_crash_counter() > 5)
+ if($log_tasks === true)
{
- $ttask[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
+ $descriptors[1] = array(
+ "file"
+ , $logdir . "task_$tkey.log"
+ , "a+"
+ );
+ $descriptors[2] = array(
+ "file"
+ , $logdir . "task_$tkey.error.log"
+ , "a+"
+ );
+ }
+
+ $taskPoll[$tkey]["process"] = proc_open(
+ $taskPoll[$tkey]["cmd"] . ' ' . implode(' ', $taskPoll[$tkey]["args"])
+ , $descriptors
+ , $taskPoll[$tkey]["pipes"]
+ , $registry->get('GV_RootPath') . "bin/"
+ , null
+ , array('bypass_shell' => true)
+ );
+
+ if(is_resource($taskPoll[$tkey]["process"]))
+ {
+ sleep(2); // let the process lock and write it's pid
+ }
+
+ if(is_resource($taskPoll[$tkey]["process"]) && $taskPoll[$tkey]['task']->get_pid() !== null)
+ {
+ $this->log(
+ sprintf(
+ "Task %s '%s' started (pid=%s)"
+ , $taskPoll[$tkey]['task']->get_task_id()
+ , $taskPoll[$tkey]["cmd"]
+ , $taskPoll[$tkey]['task']->get_pid()
+ )
+ );
+ $runningtask++;
}
else
{
- $ttask[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
+ $taskPoll[$tkey]["task"]->increment_crash_counter();
+
+ @fclose($taskPoll[$tkey]["pipes"][1]);
+ @fclose($taskPoll[$tkey]["pipes"][2]);
+ @proc_close($taskPoll[$tkey]["process"]);
+ $taskPoll[$tkey]["process"] = null;
+
+ $this->log(
+ sprintf(
+ "Task %s '%s' failed to start %d times"
+ , $taskPoll[$tkey]["task"]->get_task_id()
+ , $taskPoll[$tkey]["cmd"]
+ , $taskPoll[$tkey]["task"]->get_crash_counter()
+ )
+ );
+
+ if($taskPoll[$tkey]["task"]->get_crash_counter() > 5)
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
+ else
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
}
}
}
+ elseif($this->method == self::METHOD_FORK)
+ {
+ // printf("forking pid %d\n", getmypid());
+ $pid = pcntl_fork();
+ if($pid == -1)
+ {
+ die("failed to fork");
+ }
+ elseif($pid == 0)
+ {
+ // child
+ // printf("hello i am child pid=%d\n", getmypid());
+ // printf("%s %s \n", $taskPoll[$tkey]["cmd"], implode(' ', $taskPoll[$tkey]["args"]));
+ // ;
+ umask(0);
+ openlog('MyLog', LOG_PID | LOG_PERROR, LOG_LOCAL0);
+ if(posix_setsid() < 0)
+ die("Forked process could not detach from terminal\n");
+ //chdir(dirname(__FILE__));
+ fclose(STDIN);
+ fclose(STDOUT);
+ fclose(STDERR);
+ $fdIN = fopen('/dev/null', 'r');
+ $fdOUT = fopen($logdir . "task_$tkey.log", 'a+');
+ $fdERR = fopen($logdir . "task_$tkey.error.log", 'a+');
+
+ pcntl_exec($taskPoll[$tkey]["cmd"], $taskPoll[$tkey]["args"]);
+
+ sleep(2);
+ }
+ else
+ {
+ // parent
+ // printf("hello i am parent pid=%d\n", getmypid());
+ }
+ }
break;
case task_abstract::STATUS_STARTED:
+
$crashed = false;
- /**
- * If no process, the task is probably manually ran
- */
- if ($ttask[$tkey]["process"])
+ // If no process, the task is probably manually ran
+
+ if($this->method == self::METHOD_PROC_OPEN)
{
- $ttask[$tkey]["killat"] = NULL;
- if (is_resource($ttask[$tkey]["process"]))
+ if($taskPoll[$tkey]["process"])
{
- $proc_status = proc_get_status($ttask[$tkey]["process"]);
- if ($proc_status['running'])
- $runningtask++;
+ $taskPoll[$tkey]["killat"] = NULL;
+
+ if(is_resource($taskPoll[$tkey]["process"]))
+ {
+ $proc_status = proc_get_status($taskPoll[$tkey]["process"]);
+ if($proc_status['running'])
+ $runningtask++;
+ else
+ $crashed = true;
+ }
else
+ {
$crashed = true;
+ }
}
- else
+ }
+
+ if(!$crashed && !$taskPoll[$tkey]['task']->get_pid())
+ $crashed = true;
+
+ if(!$crashed)
+ {
+ $taskPoll[$tkey]["killat"] = NULL;
+ $runningtask++;
+ }
+ else
+ {
+ // crashed !
+ $taskPoll[$tkey]["task"]->increment_crash_counter();
+
+ if($this->method == self::METHOD_PROC_OPEN)
{
- $crashed = true;
+ @fclose($taskPoll[$tkey]["pipes"][1]);
+ @fclose($taskPoll[$tkey]["pipes"][2]);
+ @proc_close($taskPoll[$tkey]["process"]);
+ $taskPoll[$tkey]["process"] = null;
}
- }
-
- if ($crashed === true && $ttask[$tkey]["task"]->get_status() === task_abstract::RETURNSTATUS_TORESTART)
- {
- $crashed = false;
- }
- if ($crashed)
- {
- $ttask[$tkey]["task"]->increment_crash_counter();
-
- @fclose($ttask[$tkey]["pipes"][1]);
- @fclose($ttask[$tkey]["pipes"][2]);
- @proc_close($ttask[$tkey]["process"]);
- $ttask[$tkey]["process"] = null;
-
$this->log(
sprintf(
"Task %s crashed %d times"
- , $ttask[$tkey]["task"]->get_task_id()
- , $ttask[$tkey]["task"]->get_crash_counter()
+ , $taskPoll[$tkey]["task"]->get_task_id()
+ , $taskPoll[$tkey]["task"]->get_crash_counter()
)
);
-
- $ttask[$tkey]["task"]->increment_crash_counter();
-
- if ($ttask[$tkey]["task"]->get_crash_counter() > 5)
- {
- $ttask[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
- }
+ if($taskPoll[$tkey]["task"]->get_crash_counter() > 5)
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
else
- {
- $ttask[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
- }
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
}
break;
case task_abstract::STATUS_TOSTOP:
- if ($ttask[$tkey]["process"])
+
+ if($taskPoll[$tkey]["killat"] === NULL)
+ $taskPoll[$tkey]["killat"] = time() + self::TASKDELAYTOQUIT;
+
+ $tpid = $taskPoll[$tkey]['task']->get_pid();
+ if($tpid)
{
- if ($ttask[$tkey]["killat"] === NULL)
- $ttask[$tkey]["killat"] = time() + self::TASKDELAYTOQUIT;
- if (($dt = $ttask[$tkey]["killat"] - time()) < 0)
+ if(($dt = $taskPoll[$tkey]["killat"] - time()) < 0)
{
- $ppid = $ttask[$tkey]['task']->get_pid();
- $pids = preg_split('/\s+/', `ps -o pid --no-heading --ppid $ppid`);
- foreach ($pids as $pid)
+ if($this->method == self::METHOD_PROC_OPEN)
{
- if (is_numeric($pid))
+ $pids = preg_split('/\s+/', `ps -o pid --no-heading --ppid $tpid`);
+ foreach($pids as $pid)
{
- $this->log("Killing pid %d", $pid);
- posix_kill($pid, 9);
+ if(is_numeric($pid))
+ {
+ $this->log("Killing pid %d", $pid);
+ posix_kill($pid, 9);
+ }
}
}
+ elseif($this->method == self::METHOD_FORK)
+ {
+ posix_kill($tpid, 9);
+ }
$this->log(
sprintf(
"SIGKILL sent to task %s (pid=%s)"
- , $ttask[$tkey]["task"]->get_task_id()
- , $ttask[$tkey]["task"]->get_pid()
+ , $taskPoll[$tkey]["task"]->get_task_id()
+ , $tpid
)
);
- proc_terminate($ttask[$tkey]["process"], 9);
- @fclose($ttask[$tkey]["pipes"][1]);
- @fclose($ttask[$tkey]["pipes"][2]);
- proc_close($ttask[$tkey]["process"]);
- unlink($lockdir . 'task_' . $ttask[$tkey]['task']->get_task_id() . '.lock');
+ if($this->method == self::METHOD_PROC_OPEN)
+ {
+ proc_terminate($taskPoll[$tkey]["process"], 9);
+ @fclose($taskPoll[$tkey]["pipes"][1]);
+ @fclose($taskPoll[$tkey]["pipes"][2]);
+ proc_close($taskPoll[$tkey]["process"]);
+ }
+ unlink($lockdir . 'task_' . $taskPoll[$tkey]['task']->get_task_id() . '.lock');
- $ttask[$tkey]["task"]->increment_crash_counter();
-// $ttask[$tkey]["task"]->set_pid(null);
- $ttask[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
+ $taskPoll[$tkey]["task"]->increment_crash_counter();
+ // $taskPoll[$tkey]["task"]->set_pid(null);
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
}
else
{
$this->log(
sprintf(
"waiting task %s to quit (kill in %d seconds)"
- , $ttask[$tkey]["task"]->get_task_id()
+ , $taskPoll[$tkey]["task"]->get_task_id()
, $dt
)
);
$runningtask++;
}
}
+ else
+ {
+ $this->log(
+ sprintf(
+ "task %s has quit"
+ , $taskPoll[$tkey]["task"]->get_task_id()
+ )
+ );
+ }
+
break;
case task_abstract::RETURNSTATUS_STOPPED:
case task_abstract::RETURNSTATUS_TODELETE:
- if ($ttask[$tkey]["process"])
+ if($this->method == self::METHOD_PROC_OPEN)
{
- @fclose($ttask[$tkey]["pipes"][1]);
- @fclose($ttask[$tkey]["pipes"][2]);
- @proc_close($ttask[$tkey]["process"]);
+ if($taskPoll[$tkey]["process"])
+ {
+ @fclose($taskPoll[$tkey]["pipes"][1]);
+ @fclose($taskPoll[$tkey]["pipes"][2]);
+ @proc_close($taskPoll[$tkey]["process"]);
- $ttask[$tkey]["process"] = null;
+ $taskPoll[$tkey]["process"] = null;
+ }
}
break;
}
}
+
+ /*
+
+
+ $common_status = array(
+ task_abstract::STATUS_STARTED
+ , task_abstract::RETURNSTATUS_STOPPED
+ );
+
+
+ foreach($taskPoll as $tkey => $tv)
+ {
+ // if (!in_array($taskPoll[$tkey]["task"]->get_status(), $common_status))
+ // {
+ // $this->log(
+ // sprintf(
+ // 'task %s has status %s'
+ // , $taskPoll[$tkey]["task"]->get_task_id()
+ // , $taskPoll[$tkey]["task"]->get_status()
+ // )
+ // );
+ // }
+ switch($tv['task']->get_status())
+ {
+ default:
+ $this->log(sprintf('Unknow status `%s`', $tv['task']->get_status()));
+ break;
+
+ case task_abstract::RETURNSTATUS_TORESTART:
+ if(!$taskPoll[$tkey]['task']->get_pid())
+ {
+ @fclose($taskPoll[$tkey]["pipes"][1]);
+ @fclose($taskPoll[$tkey]["pipes"][2]);
+ @proc_close($taskPoll[$tkey]["process"]);
+
+ $taskPoll[$tkey]["process"] = null;
+ if($schedstatus == 'started')
+ {
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
+ }
+ // trick to start the task immediatly : DON'T break if ending with 'tostart'
+ // so it will continue with 'tostart' case !
+ }
+ else
+ {
+ break;
+ }
+
+ case task_abstract::STATUS_TOSTART:
+ // if scheduler is 'tostop', don't launch a new task !
+ if($schedstatus != 'started')
+ break;
+
+ $taskPoll[$tkey]["killat"] = NULL;
+ if(!$taskPoll[$tkey]["process"])
+ {
+ $descriptors = array(
+ 1 => array("pipe", "w"),
+ 2 => array("pipe", "w")
+ );
+
+ if($log_tasks === true)
+ {
+ $descriptors[1] = array(
+ "file"
+ , $logdir . "task_$tkey.log"
+ , "a+"
+ );
+ $descriptors[2] = array(
+ "file"
+ , $logdir . "task_$tkey.error.log"
+ , "a+"
+ );
+ }
+
+ $taskPoll[$tkey]["process"] = proc_open(
+ $taskPoll[$tkey]["cmd"].' '.implode(' ', $taskPoll[$tkey]["args"])
+ , $descriptors
+ , $taskPoll[$tkey]["pipes"]
+ , $registry->get('GV_RootPath') . "bin/"
+ , null
+ , array('bypass_shell' => true)
+ );
+
+ if(is_resource($taskPoll[$tkey]["process"]))
+ {
+ sleep(2); // let the process lock and write it's pid
+ }
+
+ if(is_resource($taskPoll[$tkey]["process"]) && $taskPoll[$tkey]['task']->get_pid() !== null)
+ {
+
+ // ************************************************
+ // file_put_contents("/tmp/scheduler2.log", sprintf("%s [%d] : RUNNING ? : pid=%s \n", __FILE__, __LINE__, $taskPoll[$tkey]['task']->get_pid()), FILE_APPEND);
+
+ $this->log(
+ sprintf(
+ "Task %s '%s' started (pid=%s)"
+ , $taskPoll[$tkey]['task']->get_task_id()
+ , $taskPoll[$tkey]["cmd"]
+ , $taskPoll[$tkey]['task']->get_pid()
+ )
+ );
+ $runningtask++;
+ }
+ else
+ {
+ // ************************************************
+ // file_put_contents("/tmp/scheduler2.log", sprintf("%s [%d] : NOT RUNNING ? : pid=%s \n", __FILE__, __LINE__, $taskPoll[$tkey]['task']->get_pid()), FILE_APPEND);
+ $taskPoll[$tkey]["task"]->increment_crash_counter();
+
+ @fclose($taskPoll[$tkey]["pipes"][1]);
+ @fclose($taskPoll[$tkey]["pipes"][2]);
+ @proc_close($taskPoll[$tkey]["process"]);
+ $taskPoll[$tkey]["process"] = null;
+
+ $this->log(
+ sprintf(
+ "Task %s '%s' failed to start %d times"
+ , $taskPoll[$tkey]["task"]->get_task_id()
+ , $taskPoll[$tkey]["cmd"]
+ , $taskPoll[$tkey]["task"]->get_crash_counter()
+ )
+ );
+
+ if($taskPoll[$tkey]["task"]->get_crash_counter() > 5)
+ {
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
+ }
+ else
+ {
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
+ }
+ }
+ }
+ break;
+
+ case task_abstract::STATUS_STARTED:
+ $crashed = false;
+ // If no process, the task is probably manually ran
+ if($taskPoll[$tkey]["process"])
+ {
+ $taskPoll[$tkey]["killat"] = NULL;
+
+ // if(is_resource($taskPoll[$tkey]["process"]))
+ // {
+ // $proc_status = proc_get_status($taskPoll[$tkey]["process"]);
+ // if($proc_status['running'])
+ // $runningtask++;
+ // else
+ // $crashed = true;
+ // }
+ // else
+ // {
+ // $crashed = true;
+ // }
+
+ if($taskPoll[$tkey]['task']->get_pid())
+ $runningtask++;
+ else
+ $crashed = true;
+ }
+
+ if($crashed === true && $taskPoll[$tkey]["task"]->get_status() === task_abstract::RETURNSTATUS_TORESTART)
+ {
+ $crashed = false;
+ }
+ if($crashed)
+ {
+ $taskPoll[$tkey]["task"]->increment_crash_counter();
+
+ @fclose($taskPoll[$tkey]["pipes"][1]);
+ @fclose($taskPoll[$tkey]["pipes"][2]);
+ @proc_close($taskPoll[$tkey]["process"]);
+ $taskPoll[$tkey]["process"] = null;
+
+ $this->log(
+ sprintf(
+ "Task %s crashed %d times"
+ , $taskPoll[$tkey]["task"]->get_task_id()
+ , $taskPoll[$tkey]["task"]->get_crash_counter()
+ )
+ );
+
+
+ if($taskPoll[$tkey]["task"]->get_crash_counter() > 5)
+ {
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
+ }
+ else
+ {
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
+ }
+ }
+ break;
+
+ case task_abstract::STATUS_TOSTOP:
+ if($taskPoll[$tkey]["process"])
+ {
+ if($taskPoll[$tkey]["killat"] === NULL)
+ $taskPoll[$tkey]["killat"] = time() + self::TASKDELAYTOQUIT;
+
+ if(($dt = $taskPoll[$tkey]["killat"] - time()) < 0)
+ {
+ $ppid = $taskPoll[$tkey]['task']->get_pid();
+ $pids = preg_split('/\s+/', `ps -o pid --no-heading --ppid $ppid`);
+ foreach($pids as $pid)
+ {
+ if(is_numeric($pid))
+ {
+ $this->log("Killing pid %d", $pid);
+ posix_kill($pid, 9);
+ }
+ }
+
+ $this->log(
+ sprintf(
+ "SIGKILL sent to task %s (pid=%s)"
+ , $taskPoll[$tkey]["task"]->get_task_id()
+ , $taskPoll[$tkey]["task"]->get_pid()
+ )
+ );
+
+ proc_terminate($taskPoll[$tkey]["process"], 9);
+ @fclose($taskPoll[$tkey]["pipes"][1]);
+ @fclose($taskPoll[$tkey]["pipes"][2]);
+ proc_close($taskPoll[$tkey]["process"]);
+ unlink($lockdir . 'task_' . $taskPoll[$tkey]['task']->get_task_id() . '.lock');
+
+ $taskPoll[$tkey]["task"]->increment_crash_counter();
+ // $taskPoll[$tkey]["task"]->set_pid(null);
+ $taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
+ }
+ else
+ {
+ $this->log(
+ sprintf(
+ "waiting task %s to quit (kill in %d seconds)"
+ , $taskPoll[$tkey]["task"]->get_task_id()
+ , $dt
+ )
+ );
+ $runningtask++;
+ }
+ }
+ break;
+
+ case task_abstract::RETURNSTATUS_STOPPED:
+ case task_abstract::RETURNSTATUS_TODELETE:
+ if($taskPoll[$tkey]["process"])
+ {
+ @fclose($taskPoll[$tkey]["pipes"][1]);
+ @fclose($taskPoll[$tkey]["pipes"][2]);
+ @proc_close($taskPoll[$tkey]["process"]);
+
+ $taskPoll[$tkey]["process"] = null;
+ }
+ break;
+ }
+ }
+
+ */
+
+
+
+
+
+
+
+
+
+
$to_reopen = false;
- if ($conn->ping())
+ if($conn->ping())
{
$conn->close();
unset($conn);
$to_reopen = true;
}
sleep($sleeptime);
- if ($to_reopen)
+ if($to_reopen)
{
$conn = self::get_connection();
}
diff --git a/lib/classes/task/abstract.class.php b/lib/classes/task/abstract.class.php
index a032c51213..801696d992 100644
--- a/lib/classes/task/abstract.class.php
+++ b/lib/classes/task/abstract.class.php
@@ -20,6 +20,8 @@ abstract class task_abstract
const STATE_MAXMEGSREACHED = 'STATE_MAXMEGS';
const STATE_MAXRECSDONE = 'STATE_MAXRECS';
const STATE_FINISHED = 'STATE_FINISHED';
+
+ const SIGNAL_SCHEDULER_DIED = 'SIGNAL_SCHEDULER_DIED';
protected $suicidable = false;
protected $launched_by = 0;
@@ -129,7 +131,7 @@ abstract class task_abstract
, self::RETURNSTATUS_TORESTART
, self::STATUS_TOSTART
);
- if (!in_array($status, $av_status))
+ if(!in_array($status, $av_status))
throw new Exception(sprintf('unknown status `%s`', $status));
@@ -145,18 +147,17 @@ abstract class task_abstract
return $this->task_status;
}
- public function set_pid($pid)
- {
- $conn = connection::getPDOConnection();
-
- $sql = 'UPDATE task2 SET pid = :pid WHERE task_id = :taskid';
- $stmt = $conn->prepare($sql);
- $stmt->execute(array(':pid' => $pid, ':taskid' => $this->get_task_id()));
- $stmt->closeCursor();
-
- return $this;
- }
-
+// public function set_pid($pid)
+// {
+// $conn = connection::getPDOConnection();
+//
+// $sql = 'UPDATE task2 SET pid = :pid WHERE task_id = :taskid';
+// $stmt = $conn->prepare($sql);
+// $stmt->execute(array(':pid' => $pid, ':taskid' => $this->get_task_id()));
+// $stmt->closeCursor();
+//
+// return $this;
+// }
// 'active' means 'auto-start when scheduler starts'
public function set_active($boolean)
{
@@ -221,6 +222,7 @@ abstract class task_abstract
{
return $this->crash_counter;
}
+
public function increment_crash_counter()
{
$conn = connection::getPDOConnection();
@@ -268,9 +270,9 @@ abstract class task_abstract
$this->system = system_server::get_platform();
$this->launched_by = array_key_exists("REQUEST_URI", $_SERVER) ? self::LAUCHED_BY_BROWSER : self::LAUCHED_BY_COMMANDLINE;
- if ($this->system != "DARWIN" && $this->system != "WINDOWS" && $this->system != "LINUX")
+ if($this->system != "DARWIN" && $this->system != "WINDOWS" && $this->system != "LINUX")
{
- if ($this->launched_by == self::LAUCHED_BY_COMMANDLINE)
+ if($this->launched_by == self::LAUCHED_BY_COMMANDLINE)
{
// printf("Desole, ce programme ne fonctionne pas sous '" . $this->system . "'.\n");
flush();
@@ -279,7 +281,7 @@ abstract class task_abstract
}
else
{
- if ($this->launched_by == self::LAUCHED_BY_COMMANDLINE)
+ if($this->launched_by == self::LAUCHED_BY_COMMANDLINE)
{
flush();
}
@@ -289,11 +291,14 @@ abstract class task_abstract
{
$conn = connection::getPDOConnection();
}
- catch (Exception $e)
+ catch(Exception $e)
{
$this->log($e->getMessage());
$this->log(("Warning : abox connection lost, restarting in 10 min."));
- sleep(60 * 10);
+
+ for($t = 60 * 10; $t > 0; $t--) // DON'T do sleep(600) because it prevents ticks !
+ sleep(1);
+
$this->running = false;
return('');
@@ -304,7 +309,7 @@ abstract class task_abstract
$stmt->execute(array(':taskid' => $this->get_task_id()));
$row = $stmt->fetch(PDO::FETCH_ASSOC);
$stmt->closeCursor();
- if (!$row)
+ if(!$row)
throw new Exception('Unknown task id');
$this->title = $row['name'];
$this->crash_counter = (int) $row['crashed'];
@@ -357,7 +362,7 @@ abstract class task_abstract
$stmt->closeCursor();
$lock_file = $registry->get('GV_RootPath') . 'tmp/locks/task_' . $this->get_task_id() . '.lock';
- if (is_writable($lock_file))
+ if(is_writable($lock_file))
unlink($lock_file);
return;
@@ -366,7 +371,7 @@ abstract class task_abstract
protected function check_memory_usage()
{
$current_memory = memory_get_usage();
- if ($current_memory >> 20 >= $this->maxmegs)
+ if($current_memory >> 20 >= $this->maxmegs)
{
$this->log(sprintf(
"Max memory (%s M) reached (current is %s M)"
@@ -381,7 +386,7 @@ abstract class task_abstract
protected function check_records_done()
{
- if ($this->records_done >= (int) ($this->maxrecs))
+ if($this->records_done >= (int) ($this->maxrecs))
{
$this->current_state = self::STATE_MAXRECSDONE;
}
@@ -402,26 +407,68 @@ abstract class task_abstract
public function get_pid()
{
- $conn = connection::getPDOConnection();
- $sql = 'SELECT pid FROM task2 WHERE task_id = :taskid';
- $stmt = $conn->prepare($sql);
- $stmt->execute(array(':taskid' => $this->get_task_id()));
- $row = $stmt->fetch(PDO::FETCH_ASSOC);
- $stmt->closeCursor();
+ $pid = NULL;
- $pid = $row ? $row['pid'] : null;
+ $taskid = $this->get_task_id();
+ $registry = registry::get_instance();
+ system_file::mkdir($lockdir = $registry->get('GV_RootPath') . 'tmp/locks/');
+
+ if(($fd = fopen(($lockfile = ($lockdir . 'task_' . $taskid . '.lock')), 'a+')))
+ {
+// ************************************************
+// file_put_contents("/tmp/scheduler2.log", sprintf("%s [%d] : fopen(%s) \n", __FILE__, __LINE__, $lockfile), FILE_APPEND);
+ if(flock($fd, LOCK_EX | LOCK_NB) === FALSE)
+ {
+ // already locked ? : task running
+ $pid = fgets($fd);
+// ************************************************
+// file_put_contents("/tmp/scheduler2.log", sprintf("%s [%d] : can't flock() : pid=%s \n", __FILE__, __LINE__, $pid), FILE_APPEND);
+ }
+ else
+ {
+ // can lock : not running
+// ************************************************
+// file_put_contents("/tmp/scheduler2.log", sprintf("%s [%d] : NOT RUNNING can flock() : pid=%s \n", __FILE__, __LINE__, file_get_contents($lockfile)), FILE_APPEND);
+ flock($fd, LOCK_UN);
+ }
+ fclose($fd);
+ }
return $pid;
}
+ /*
+ public function is_running()
+ {
+ $retval = false;
+ $registry = registry::get_instance();
+ $lockdir = $registry->get('GV_RootPath') . 'tmp/locks/';
+ $tasklock = fopen($lockfile = ($lockdir . '/task_' . $this->get_task_id() . '.lock'), 'a+');
+
+ if (flock($tasklock, LOCK_SH | LOCK_NB) != true)
+ {
+ $retval = true;
+ }
+ else
+ {
+ ftruncate($tasklock, 0);
+ flock($tasklock, LOCK_UN | LOCK_NB);
+ fclose($tasklock);
+ unlink($lockfile);
+ }
+
+ return $retval;
+ }
+ */
+
protected function check_current_state()
{
- switch ($this->current_state)
+ switch($this->current_state)
{
case self::STATE_MAXMEGSREACHED:
case self::STATE_MAXRECSDONE:
default:
- if ($this->get_runner() == self::RUNNER_SCHEDULER)
+ if($this->get_runner() == self::RUNNER_SCHEDULER)
{
$this->task_status = self::STATUS_TOSTOP;
$this->return_value = self::RETURNSTATUS_TORESTART;
@@ -429,7 +476,7 @@ abstract class task_abstract
break;
case self::STATE_FINISHED:
$this->task_status = self::STATUS_TOSTOP;
- if ($this->suicidable === true)
+ if($this->suicidable === true)
{
$this->return_value = self::RETURNSTATUS_TODELETE;
$this->log('will hang myself');
@@ -458,13 +505,13 @@ abstract class task_abstract
$stmt->execute(array(':taskid' => $this->get_task_id()));
$row = $stmt->fetch(PDO::FETCH_ASSOC);
$stmt->closeCursor();
- if (!$row || $row['status'] == 'tostop')
+ if(!$row || $row['status'] == 'tostop')
{
$this->task_status = self::STATUS_TOSTOP;
$this->return_value = self::RETURNSTATUS_STOPPED;
}
}
- catch (Exception $e)
+ catch(Exception $e)
{
$this->return_value = self::RETURNSTATUS_STOPPED;
$this->task_status = self::STATUS_TOSTOP;
@@ -477,21 +524,26 @@ abstract class task_abstract
protected function pause($when_started=0)
{
$this->log($this->records_done . ' records done');
- if ($this->running)// && $this->records_done == 0)
+ if($this->running)// && $this->records_done == 0)
{
$when_started = time() - $when_started;
- if ($when_started < $this->period)
+ if($when_started < $this->period)
{
$conn = connection::getPDOConnection();
$conn->close();
unset($conn);
- sleep($this->period - $when_started);
+// sleep($this->period - $when_started);
+ for($t = $this->period - $when_started; $t > 0; $t--) // DON'T do sleep($this->period - $when_started) because it prevents ticks !
+ sleep(1);
}
}
}
final public function run($runner)
{
+
+// ************************************************
+// file_put_contents("/tmp/scheduler2.log", sprintf("%s [%d] : LAUNCHING : tid=%s \n", __FILE__, __LINE__, $this->get_task_id()), FILE_APPEND);
$taskid = $this->get_task_id();
$conn = connection::getPDOConnection();
@@ -499,8 +551,10 @@ abstract class task_abstract
system_file::mkdir($lockdir = $registry->get('GV_RootPath') . 'tmp/locks/');
$locker = true;
$tasklock = fopen(($lockfile = ($lockdir . 'task_' . $taskid . '.lock')), 'a+');
- if (flock($tasklock, LOCK_EX | LOCK_NB, $locker) != true)
+ if(flock($tasklock, LOCK_EX | LOCK_NB, $locker) === FALSE)
{
+// ************************************************
+// file_put_contents("/tmp/scheduler2.log", sprintf("%s [%d] : LAUNCH OPENED AND CANT LOCK : pid=%s \n", __FILE__, __LINE__, getmypid()), FILE_APPEND);
printf(("runtask::ERROR : task already running.\n"), $taskid);
fclose($tasklock);
@@ -511,6 +565,8 @@ abstract class task_abstract
ftruncate($tasklock, 0);
fwrite($tasklock, '' . getmypid());
fflush($tasklock);
+// ************************************************
+// file_put_contents("/tmp/scheduler2.log", sprintf("%s [%d] : LAUNCH OPENED AND LOCKED : pid=%s \n", __FILE__, __LINE__, getmypid()), FILE_APPEND);
}
$this->set_runner($runner);
@@ -526,7 +582,7 @@ abstract class task_abstract
fclose($tasklock);
@unlink($lockfile);
- if ($this->return_value == self::RETURNSTATUS_TODELETE)
+ if($this->return_value == self::RETURNSTATUS_TODELETE)
$this->delete();
else
$this->set_status($this->return_value);
@@ -536,42 +592,20 @@ abstract class task_abstract
abstract protected function run2();
- public function is_running()
- {
- $retval = false;
- $registry = registry::get_instance();
- $lockdir = $registry->get('GV_RootPath') . 'tmp/locks/';
- $tasklock = fopen($lockfile = ($lockdir . '/task_' . $this->get_task_id() . '.lock'), 'a+');
-
- if (flock($tasklock, LOCK_SH | LOCK_NB) != true)
- {
- $retval = true;
- }
- else
- {
- ftruncate($tasklock, 0);
- flock($tasklock, LOCK_UN | LOCK_NB);
- fclose($tasklock);
- unlink($lockfile);
- }
-
- return $retval;
- }
-
protected function load_settings(SimpleXMLElement $sx_task_settings)
{
$this->period = (int) $sx_task_settings->period;
- if ($this->period <= 0 || $this->period >= 60 * 60)
+ if($this->period <= 0 || $this->period >= 60 * 60)
$this->period = 60;
$this->maxrecs = (int) $sx_task_settings->maxrecs;
- if ($sx_task_settings->maxrecs < 10 || $sx_task_settings->maxrecs > 1000)
+ if($sx_task_settings->maxrecs < 10 || $sx_task_settings->maxrecs > 1000)
$this->maxrecs = 100;
$this->maxmegs = (int) $sx_task_settings->maxmegs;
- if ($sx_task_settings->maxmegs < 16 || $sx_task_settings->maxmegs > 512)
+ if($sx_task_settings->maxmegs < 16 || $sx_task_settings->maxmegs > 512)
$this->maxmegs = 24;
$this->record_buffer_size = (int) $sx_task_settings->flush;
- if ($sx_task_settings->flush < 1 || $sx_task_settings->flush > 100)
+ if($sx_task_settings->flush < 1 || $sx_task_settings->flush > 100)
$this->record_buffer_size = 10;
return $this;
@@ -579,7 +613,7 @@ abstract class task_abstract
protected function increment_loops()
{
- if ($this->get_runner() == self::RUNNER_SCHEDULER && $this->loop > $this->maxloops)
+ if($this->get_runner() == self::RUNNER_SCHEDULER && $this->loop > $this->maxloops)
{
$this->log(sprintf(('%d loops done, restarting'), $this->loop));
$this->task_status = self::STATUS_TOSTOP;
@@ -593,7 +627,7 @@ abstract class task_abstract
public function apply_task_status()
{
- if ($this->task_status == self::STATUS_TOSTOP)
+ if($this->task_status == self::STATUS_TOSTOP)
{
$this->running = false;
}
@@ -605,7 +639,7 @@ abstract class task_abstract
{
static $lastt = null;
$t = explode(' ', ($ut = microtime()));
- if ($lastt === null)
+ if($lastt === null)
$lastt = $t;
$dt = ($t[0] - $lastt[0]) + ($t[1] - $lastt[1]);
@@ -638,7 +672,7 @@ abstract class task_abstract
*/
public static function create(appbox $appbox, $class_name, $settings = null)
{
- if (!class_exists($class_name))
+ if(!class_exists($class_name))
throw new Exception('Unknown task class');
$sql = 'INSERT INTO task2
@@ -649,9 +683,9 @@ abstract class task_abstract
:name, "0000/00/00 00:00:00", :class, :settings)';
- if ($settings && !DOMDocument::loadXML($settings))
+ if($settings && !DOMDocument::loadXML($settings))
throw new Exception('settings invalide');
- elseif (!$settings)
+ elseif(!$settings)
$settings = "\n