This commit is contained in:
jygaulier
2012-01-31 19:21:06 +01:00
parent 5a5f86a97b
commit 72078aa836
13 changed files with 1040 additions and 1305 deletions

View File

@@ -26,6 +26,7 @@ class task_Scheduler
const METHOD_PROC_OPEN = 'METHOD_PROC_OPEN';
private $method;
private $input;
protected $output;
protected function log($message)
@@ -33,18 +34,21 @@ class task_Scheduler
$registry = registry::get_instance();
$logdir = $registry->get('GV_RootPath') . 'logs/';
logs::rotate($logdir . "scheduler.log");
logs::rotate($logdir . "scheduler_l.log");
logs::rotate($logdir . "scheduler_o.log");
logs::rotate($logdir . "scheduler_e.log");
$date_obj = new DateTime();
$message = sprintf("%s\t%s", $date_obj->format(DATE_ATOM), $message);
if($this->output instanceof OutputInterface)
{
$this->output->writeln($message);
// $this->output->writeln($message);
}
// else
// $this->output->writeln($this->input->getOption('nolog'));
if($this->input && !($this->input->getOption('nolog')))
{
file_put_contents($logdir . "scheduler.log", $message."\n", FILE_APPEND);
file_put_contents($logdir . "scheduler_l.log", $message . "\n", FILE_APPEND);
}
return $this;
}
@@ -52,20 +56,32 @@ class task_Scheduler
protected static function get_connection()
{
require dirname(__FILE__) . '/../../../config/connexion.inc';
return new connection_pdo('appbox', $hostname, $port, $user, $password, $dbname);
return(appbox::get_instance()->get_connection());
}
public function run(OutputInterface $output = null, $log_tasks = true)
public function run($input=null, OutputInterface $output = null) //, $log = true, $log_tasks = true)
{
$this->method = self::METHOD_FORK;
require_once dirname(__FILE__) . '/../../bootstrap.php';
$this->input = $input;
$this->output = $output;
$appbox = appbox::get_instance();
$registry = $appbox->get_registry();
$nullfile = '';
$system = system_server::get_platform();
switch($system)
{
case "WINDOWS":
$nullfile = 'NUL';
break;
default:
case "DARWIN":
case "LINUX":
$nullfile = '/dev/null';
break;
}
$lockdir = $registry->get('GV_RootPath') . 'tmp/locks/';
@@ -98,16 +114,15 @@ class task_Scheduler
}
}
}
$this->log(sprintf("running scheduler with method %s", $this->method));
if($this->method == self::METHOD_FORK)
pcntl_signal(SIGCHLD, SIG_IGN);
$logdir = $registry->get('GV_RootPath') . 'logs/';
$conn = self::get_connection();
$conn = appbox::get_instance()->get_connection();
$taskPoll = array(); // the poll of tasks
@@ -146,15 +161,36 @@ class task_Scheduler
while($schedstatus == 'started' || $runningtask > 0)
{
while(!$conn->ping())
while(1)
{
try
{
assert(is_object($conn));
$ping = @$conn->ping();
}
catch(ErrorException $e)
{
$ping = false;
}
if($ping)
break;
unset($conn);
if(!$connwaslost)
{
$this->log(sprintf("Warning : abox connection lost, restarting in 10 min."));
}
sleep(60 * 10);
$conn = self::get_connection();
for($i = 0; $i < 60 * 10; $i++)
sleep(1);
try
{
$conn = appbox::get_instance()->get_connection();
}
catch(ErrorException $e)
{
$ping = false;
}
$connwaslost = true;
}
if($connwaslost)
@@ -168,13 +204,21 @@ class task_Scheduler
$connwaslost = false;
}
// printf("%d \n", __LINE__);
$schedstatus = '';
$sql = "SELECT schedstatus FROM sitepreff";
$stmt = $conn->prepare($sql);
$stmt->execute();
$row = $stmt->fetch(PDO::FETCH_ASSOC);
$stmt->closeCursor();
$row = NULL;
try
{
$sql = "SELECT schedstatus FROM sitepreff";
$stmt = $conn->prepare($sql);
$stmt->execute();
$row = $stmt->fetch(PDO::FETCH_ASSOC);
$stmt->closeCursor();
}
catch(ErrorException $e)
{
continue;
}
if($row)
{
@@ -193,14 +237,14 @@ class task_Scheduler
$stmt = $conn->prepare($sql);
$stmt->execute();
$stmt->closeCursor();
$this->log("schedstatus == 'stopping', waiting tasks to end");
}
logs::rotate($logdir . "scheduler.log");
logs::rotate($logdir . "scheduler.error.log");
logs::rotate($logdir . "scheduler_t.log");
logs::rotate($logdir . "scheduler_o.log");
logs::rotate($logdir . "scheduler_e.log");
// printf("%d \n", __LINE__);
// initialy, all tasks are supposed to be removed from the poll
foreach($taskPoll as $tkey => $task)
$taskPoll[$tkey]["todel"] = true;
@@ -208,32 +252,41 @@ class task_Scheduler
foreach($task_manager->get_tasks(true) as $task)
{
$tkey = "t_" . $task->get_task_id();
$status = $task->get_status();
logs::rotate($logdir . "task_$tkey.log");
logs::rotate($logdir . "task_$tkey.error.log");
logs::rotate($logdir . "task_t_" . $task->get_task_id() . ".log");
logs::rotate($logdir . "task_o_" . $task->get_task_id() . ".log");
logs::rotate($logdir . "task_e_" . $task->get_task_id() . ".log");
if(!isset($taskPoll[$tkey]))
{
// the task is not in the poll, add it
$phpcli = $registry->get('GV_cli');
switch($system)
{
case "WINDOWS":
$cmd = $phpcli;
$args = array('-f', $registry->get('GV_RootPath') . 'bin/console', '--', '-q', 'task:run', $task->get_task_id(), '--runner=scheduler');
if($this->input && ($this->input->getOption('notasklog')))
$args[] = 'notasklog';
break;
default:
case "DARWIN":
case "WINDOWS":
case "LINUX":
$cmd = $phpcli;
$args = array('-f', $registry->get('GV_RootPath') . 'bin/console', 'task:run', $task->get_task_id(), '--runner=scheduler');
$args = array('-f', $registry->get('GV_RootPath') . 'bin/console', '--', '-q', 'task:run', $task->get_task_id(), '--runner=scheduler');
if($this->input && ($this->input->getOption('notasklog')))
$args[] = 'notasklog';
break;
}
$taskPoll[$tkey] = array(
"task" => $task,
"current_status" => $task->get_status(),
"current_status" => $status,
"cmd" => $cmd,
"args" => $args,
"killat" => null
"killat" => null,
"sigterm_sent" => false
);
if($this->method == self::METHOD_PROC_OPEN)
{
@@ -245,24 +298,24 @@ class task_Scheduler
sprintf(
"new Task %s, status=%s"
, $taskPoll[$tkey]["task"]->get_task_id()
, $task->get_status()
, $status
)
);
}
else
{
// the task is already in the poll, update its status
if($taskPoll[$tkey]["current_status"] != $task->get_status())
if($taskPoll[$tkey]["current_status"] != $status)
{
$this->log(
sprintf(
"Task %s, oldstatus=%s, newstatus=%s"
, $taskPoll[$tkey]["task"]->get_task_id()
, $taskPoll[$tkey]["current_status"]
, $task->get_status()
, $status
)
);
$taskPoll[$tkey]["current_status"] = $task->get_status();
$taskPoll[$tkey]["current_status"] = $status;
}
// update the whole task object
unset($taskPoll[$tkey]["task"]);
@@ -284,18 +337,16 @@ class task_Scheduler
}
}
/**
* Launch task that are not yet launched
*/
// Launch task that are not yet launched
$runningtask = 0;
foreach($taskPoll as $tkey => $tv)
{
switch($tv['task']->get_status())
$status = $tv['task']->get_status();
switch($status)
{
default:
$this->log(sprintf('Unknow status `%s`', $tv['task']->get_status()));
$this->log(sprintf('Unknow status `%s`', $status));
break;
case task_abstract::RETURNSTATUS_TORESTART:
@@ -332,24 +383,8 @@ class task_Scheduler
{
if(!$taskPoll[$tkey]["process"])
{
$descriptors = array(
1 => array("pipe", "w"),
2 => array("pipe", "w")
);
if($log_tasks === true)
{
$descriptors[1] = array(
"file"
, $logdir . "task_$tkey.log"
, "a+"
);
$descriptors[2] = array(
"file"
, $logdir . "task_$tkey.error.log"
, "a+"
);
}
$descriptors[1] = array('file', $logdir . "task_o_" . $task->get_task_id() . ".log", 'a+');
$descriptors[2] = array('file', $logdir . "task_e_" . $task->get_task_id() . ".log", 'a+');
$taskPoll[$tkey]["process"] = proc_open(
$taskPoll[$tkey]["cmd"] . ' ' . implode(' ', $taskPoll[$tkey]["args"])
@@ -415,27 +450,25 @@ class task_Scheduler
// child
// printf("hello i am child pid=%d\n", getmypid());
// printf("%s %s \n", $taskPoll[$tkey]["cmd"], implode(' ', $taskPoll[$tkey]["args"]));
// ;
umask(0);
openlog('MyLog', LOG_PID | LOG_PERROR, LOG_LOCAL0);
if(posix_setsid() < 0)
die("Forked process could not detach from terminal\n");
//chdir(dirname(__FILE__));
fclose(STDIN);
fclose(STDOUT);
fclose(STDERR);
$fdIN = fopen('/dev/null', 'r');
$fdOUT = fopen($logdir . "task_$tkey.log", 'a+');
$fdERR = fopen($logdir . "task_$tkey.error.log", 'a+');
$fdIN = fopen($nullfile, 'r');
$fdOUT = fopen($logdir . "task_o_" . $taskPoll[$tkey]["task"]->get_task_id() . ".log", 'a+');
$fdERR = fopen($logdir . "task_e_" . $taskPoll[$tkey]["task"]->get_task_id() . ".log", 'a+');
$this->log(sprintf("exec('%s %s')", $taskPoll[$tkey]["cmd"], implode(' ', $taskPoll[$tkey]["args"])));
pcntl_exec($taskPoll[$tkey]["cmd"], $taskPoll[$tkey]["args"]);
sleep(2);
}
else
{
// parent
// printf("hello i am parent pid=%d\n", getmypid());
// sleep(2);
}
}
break;
@@ -506,33 +539,30 @@ class task_Scheduler
if($taskPoll[$tkey]["killat"] === NULL)
$taskPoll[$tkey]["killat"] = time() + self::TASKDELAYTOQUIT;
$tpid = $taskPoll[$tkey]['task']->get_pid();
if($tpid)
$pid = $taskPoll[$tkey]['task']->get_pid();
if($pid)
{
if(!$taskPoll[$tkey]['sigterm_sent'])
{
posix_kill($pid, SIGTERM);
$this->log(
sprintf(
"SIGTERM sent to task %s (pid=%s)"
, $taskPoll[$tkey]["task"]->get_task_id()
, $pid
)
);
}
if(($dt = $taskPoll[$tkey]["killat"] - time()) < 0)
{
if($this->method == self::METHOD_PROC_OPEN)
{
$pids = preg_split('/\s+/', `ps -o pid --no-heading --ppid $tpid`);
foreach($pids as $pid)
{
if(is_numeric($pid))
{
$this->log("Killing pid %d", $pid);
posix_kill($pid, 9);
}
}
}
elseif($this->method == self::METHOD_FORK)
{
posix_kill($tpid, 9);
}
posix_kill($pid, 9);
$this->log(
sprintf(
"SIGKILL sent to task %s (pid=%s)"
, $taskPoll[$tkey]["task"]->get_task_id()
, $tpid
, $pid
)
);
@@ -569,6 +599,7 @@ class task_Scheduler
, $taskPoll[$tkey]["task"]->get_task_id()
)
);
$taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
}
break;
@@ -590,290 +621,8 @@ class task_Scheduler
}
}
/*
$common_status = array(
task_abstract::STATUS_STARTED
, task_abstract::RETURNSTATUS_STOPPED
);
foreach($taskPoll as $tkey => $tv)
{
// if (!in_array($taskPoll[$tkey]["task"]->get_status(), $common_status))
// {
// $this->log(
// sprintf(
// 'task %s has status %s'
// , $taskPoll[$tkey]["task"]->get_task_id()
// , $taskPoll[$tkey]["task"]->get_status()
// )
// );
// }
switch($tv['task']->get_status())
{
default:
$this->log(sprintf('Unknow status `%s`', $tv['task']->get_status()));
break;
case task_abstract::RETURNSTATUS_TORESTART:
if(!$taskPoll[$tkey]['task']->get_pid())
{
@fclose($taskPoll[$tkey]["pipes"][1]);
@fclose($taskPoll[$tkey]["pipes"][2]);
@proc_close($taskPoll[$tkey]["process"]);
$taskPoll[$tkey]["process"] = null;
if($schedstatus == 'started')
{
$taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
}
// trick to start the task immediatly : DON'T break if ending with 'tostart'
// so it will continue with 'tostart' case !
}
else
{
break;
}
case task_abstract::STATUS_TOSTART:
// if scheduler is 'tostop', don't launch a new task !
if($schedstatus != 'started')
break;
$taskPoll[$tkey]["killat"] = NULL;
if(!$taskPoll[$tkey]["process"])
{
$descriptors = array(
1 => array("pipe", "w"),
2 => array("pipe", "w")
);
if($log_tasks === true)
{
$descriptors[1] = array(
"file"
, $logdir . "task_$tkey.log"
, "a+"
);
$descriptors[2] = array(
"file"
, $logdir . "task_$tkey.error.log"
, "a+"
);
}
$taskPoll[$tkey]["process"] = proc_open(
$taskPoll[$tkey]["cmd"].' '.implode(' ', $taskPoll[$tkey]["args"])
, $descriptors
, $taskPoll[$tkey]["pipes"]
, $registry->get('GV_RootPath') . "bin/"
, null
, array('bypass_shell' => true)
);
if(is_resource($taskPoll[$tkey]["process"]))
{
sleep(2); // let the process lock and write it's pid
}
if(is_resource($taskPoll[$tkey]["process"]) && $taskPoll[$tkey]['task']->get_pid() !== null)
{
// ************************************************
// file_put_contents("/tmp/scheduler2.log", sprintf("%s [%d] : RUNNING ? : pid=%s \n", __FILE__, __LINE__, $taskPoll[$tkey]['task']->get_pid()), FILE_APPEND);
$this->log(
sprintf(
"Task %s '%s' started (pid=%s)"
, $taskPoll[$tkey]['task']->get_task_id()
, $taskPoll[$tkey]["cmd"]
, $taskPoll[$tkey]['task']->get_pid()
)
);
$runningtask++;
}
else
{
// ************************************************
// file_put_contents("/tmp/scheduler2.log", sprintf("%s [%d] : NOT RUNNING ? : pid=%s \n", __FILE__, __LINE__, $taskPoll[$tkey]['task']->get_pid()), FILE_APPEND);
$taskPoll[$tkey]["task"]->increment_crash_counter();
@fclose($taskPoll[$tkey]["pipes"][1]);
@fclose($taskPoll[$tkey]["pipes"][2]);
@proc_close($taskPoll[$tkey]["process"]);
$taskPoll[$tkey]["process"] = null;
$this->log(
sprintf(
"Task %s '%s' failed to start %d times"
, $taskPoll[$tkey]["task"]->get_task_id()
, $taskPoll[$tkey]["cmd"]
, $taskPoll[$tkey]["task"]->get_crash_counter()
)
);
if($taskPoll[$tkey]["task"]->get_crash_counter() > 5)
{
$taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
}
else
{
$taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
}
}
}
break;
case task_abstract::STATUS_STARTED:
$crashed = false;
// If no process, the task is probably manually ran
if($taskPoll[$tkey]["process"])
{
$taskPoll[$tkey]["killat"] = NULL;
// if(is_resource($taskPoll[$tkey]["process"]))
// {
// $proc_status = proc_get_status($taskPoll[$tkey]["process"]);
// if($proc_status['running'])
// $runningtask++;
// else
// $crashed = true;
// }
// else
// {
// $crashed = true;
// }
if($taskPoll[$tkey]['task']->get_pid())
$runningtask++;
else
$crashed = true;
}
if($crashed === true && $taskPoll[$tkey]["task"]->get_status() === task_abstract::RETURNSTATUS_TORESTART)
{
$crashed = false;
}
if($crashed)
{
$taskPoll[$tkey]["task"]->increment_crash_counter();
@fclose($taskPoll[$tkey]["pipes"][1]);
@fclose($taskPoll[$tkey]["pipes"][2]);
@proc_close($taskPoll[$tkey]["process"]);
$taskPoll[$tkey]["process"] = null;
$this->log(
sprintf(
"Task %s crashed %d times"
, $taskPoll[$tkey]["task"]->get_task_id()
, $taskPoll[$tkey]["task"]->get_crash_counter()
)
);
if($taskPoll[$tkey]["task"]->get_crash_counter() > 5)
{
$taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
}
else
{
$taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
}
}
break;
case task_abstract::STATUS_TOSTOP:
if($taskPoll[$tkey]["process"])
{
if($taskPoll[$tkey]["killat"] === NULL)
$taskPoll[$tkey]["killat"] = time() + self::TASKDELAYTOQUIT;
if(($dt = $taskPoll[$tkey]["killat"] - time()) < 0)
{
$ppid = $taskPoll[$tkey]['task']->get_pid();
$pids = preg_split('/\s+/', `ps -o pid --no-heading --ppid $ppid`);
foreach($pids as $pid)
{
if(is_numeric($pid))
{
$this->log("Killing pid %d", $pid);
posix_kill($pid, 9);
}
}
$this->log(
sprintf(
"SIGKILL sent to task %s (pid=%s)"
, $taskPoll[$tkey]["task"]->get_task_id()
, $taskPoll[$tkey]["task"]->get_pid()
)
);
proc_terminate($taskPoll[$tkey]["process"], 9);
@fclose($taskPoll[$tkey]["pipes"][1]);
@fclose($taskPoll[$tkey]["pipes"][2]);
proc_close($taskPoll[$tkey]["process"]);
unlink($lockdir . 'task_' . $taskPoll[$tkey]['task']->get_task_id() . '.lock');
$taskPoll[$tkey]["task"]->increment_crash_counter();
// $taskPoll[$tkey]["task"]->set_pid(null);
$taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED);
}
else
{
$this->log(
sprintf(
"waiting task %s to quit (kill in %d seconds)"
, $taskPoll[$tkey]["task"]->get_task_id()
, $dt
)
);
$runningtask++;
}
}
break;
case task_abstract::RETURNSTATUS_STOPPED:
case task_abstract::RETURNSTATUS_TODELETE:
if($taskPoll[$tkey]["process"])
{
@fclose($taskPoll[$tkey]["pipes"][1]);
@fclose($taskPoll[$tkey]["pipes"][2]);
@proc_close($taskPoll[$tkey]["process"]);
$taskPoll[$tkey]["process"] = null;
}
break;
}
}
*/
$to_reopen = false;
if($conn->ping())
{
$conn->close();
unset($conn);
$to_reopen = true;
}
sleep($sleeptime);
if($to_reopen)
{
$conn = self::get_connection();
}
for($i=0; $i<$sleeptime; $i++)
sleep(1);
}
$sql = "UPDATE sitepreff SET schedstatus='stopped', schedpid='0'";