fixes for windows

This commit is contained in:
jygaulier
2012-04-25 11:51:46 +02:00
parent b4e691c1db
commit 5b442da08b
2 changed files with 77 additions and 58 deletions

View File

@@ -24,6 +24,7 @@ class task_Scheduler
const METHOD_FORK = 'METHOD_FORK'; const METHOD_FORK = 'METHOD_FORK';
const METHOD_PROC_OPEN = 'METHOD_PROC_OPEN'; const METHOD_PROC_OPEN = 'METHOD_PROC_OPEN';
const ERR_ALREADY_RUNNING = 114; // aka EALREADY (Operation already in progress)
private $method; private $method;
private $input; private $input;
protected $output; protected $output;
@@ -43,7 +44,6 @@ class task_Scheduler
if ($this->input && ! ($this->input->getOption('nolog'))) { if ($this->input && ! ($this->input->getOption('nolog'))) {
file_put_contents($logdir . "scheduler_l.log", $message . "\n", FILE_APPEND); file_put_contents($logdir . "scheduler_l.log", $message . "\n", FILE_APPEND);
} }
return $this; return $this;
} }
@@ -54,8 +54,6 @@ class task_Scheduler
public function run($input=null, OutputInterface $output = null) //, $log = true, $log_tasks = true) public function run($input=null, OutputInterface $output = null) //, $log = true, $log_tasks = true)
{ {
$this->method = self::METHOD_FORK;
require_once dirname(__FILE__) . '/../../bootstrap.php'; require_once dirname(__FILE__) . '/../../bootstrap.php';
$this->input = $input; $this->input = $input;
$this->output = $output; $this->output = $output;
@@ -67,17 +65,19 @@ class task_Scheduler
switch ($system) { switch ($system) {
case "WINDOWS": case "WINDOWS":
$nullfile = 'NUL'; $nullfile = 'NUL';
$this->method = self::METHOD_PROC_OPEN;
break; break;
default: default:
case "DARWIN": case "DARWIN":
case "LINUX": case "LINUX":
$nullfile = '/dev/null'; $nullfile = '/dev/null';
$this->method = self::METHOD_FORK;
break; break;
} }
$lockdir = $registry->get('GV_RootPath') . 'tmp/locks/'; $lockdir = $registry->get('GV_RootPath') . 'tmp/locks/';
for ($try = 0; true; $try ++ ) { for ($try = 1; true; $try ++ ) {
if (($schedlock = fopen(($lockfile = ($lockdir . 'scheduler.lock')), 'a+'))) { if (($schedlock = fopen(($lockfile = ($lockdir . 'scheduler.lock')), 'a+'))) {
if (flock($schedlock, LOCK_EX | LOCK_NB) === FALSE) { if (flock($schedlock, LOCK_EX | LOCK_NB) === FALSE) {
$this->log(sprintf("failed to lock '%s' (try=%s/4)", $lockfile, $try)); $this->log(sprintf("failed to lock '%s' (try=%s/4)", $lockfile, $try));
@@ -85,6 +85,7 @@ class task_Scheduler
$this->log("scheduler already running."); $this->log("scheduler already running.");
fclose($schedlock); fclose($schedlock);
throw new Exception('scheduler already running.', self::ERR_ALREADY_RUNNING);
return; return;
} else { } else {
sleep(2); sleep(2);
@@ -94,6 +95,11 @@ class task_Scheduler
ftruncate($schedlock, 0); ftruncate($schedlock, 0);
fwrite($schedlock, '' . getmypid()); fwrite($schedlock, '' . getmypid());
fflush($schedlock); fflush($schedlock);
// for windows : unlock then lock shared to allow OTHER processes to read the file
// too bad : no critical section nor atomicity
flock($schedlock, LOCK_UN);
flock($schedlock, LOCK_SH);
break; break;
} }
} }
@@ -304,7 +310,7 @@ class task_Scheduler
$this->log(sprintf('Unknow status `%s`', $status)); $this->log(sprintf('Unknow status `%s`', $status));
break; break;
case task_abstract::RETURNSTATUS_TORESTART: case task_abstract::STATUS_TORESTART:
if ( ! $taskPoll[$tkey]['task']->get_pid()) { if ( ! $taskPoll[$tkey]['task']->get_pid()) {
if ($this->method == self::METHOD_PROC_OPEN) { if ($this->method == self::METHOD_PROC_OPEN) {
@fclose($taskPoll[$tkey]["pipes"][1]); @fclose($taskPoll[$tkey]["pipes"][1]);
@@ -331,8 +337,8 @@ class task_Scheduler
if ($this->method == self::METHOD_PROC_OPEN) { if ($this->method == self::METHOD_PROC_OPEN) {
if ( ! $taskPoll[$tkey]["process"]) { if ( ! $taskPoll[$tkey]["process"]) {
$descriptors[1] = array('file', $logdir . "task_o_" . $task->get_task_id() . ".log", 'a+'); $descriptors[1] = array('file', $logdir . "task_o_" . $taskPoll[$tkey]['task']->get_task_id() . ".log", 'a+');
$descriptors[2] = array('file', $logdir . "task_e_" . $task->get_task_id() . ".log", 'a+'); $descriptors[2] = array('file', $logdir . "task_e_" . $taskPoll[$tkey]['task']->get_task_id() . ".log", 'a+');
$taskPoll[$tkey]["process"] = proc_open( $taskPoll[$tkey]["process"] = proc_open(
$taskPoll[$tkey]["cmd"] . ' ' . implode(' ', $taskPoll[$tkey]["args"]) $taskPoll[$tkey]["cmd"] . ' ' . implode(' ', $taskPoll[$tkey]["args"])
@@ -352,7 +358,7 @@ class task_Scheduler
sprintf( sprintf(
"Task %s '%s' started (pid=%s)" "Task %s '%s' started (pid=%s)"
, $taskPoll[$tkey]['task']->get_task_id() , $taskPoll[$tkey]['task']->get_task_id()
, $taskPoll[$tkey]["cmd"] , $taskPoll[$tkey]["cmd"] . ' ' . implode(' ', $taskPoll[$tkey]["args"])
, $taskPoll[$tkey]['task']->get_pid() , $taskPoll[$tkey]['task']->get_pid()
) )
); );
@@ -375,7 +381,7 @@ class task_Scheduler
); );
if ($taskPoll[$tkey]["task"]->get_crash_counter() > 5) if ($taskPoll[$tkey]["task"]->get_crash_counter() > 5)
$taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED); $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_STOPPED);
else else
$taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART); $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
} }
@@ -433,8 +439,9 @@ class task_Scheduler
} }
} }
if ( ! $crashed && ! $taskPoll[$tkey]['task']->get_pid()) if ( ! $crashed && ! $taskPoll[$tkey]['task']->get_pid()) {
$crashed = true; $crashed = true;
}
if ( ! $crashed) { if ( ! $crashed) {
$taskPoll[$tkey]["killat"] = NULL; $taskPoll[$tkey]["killat"] = NULL;
@@ -458,7 +465,7 @@ class task_Scheduler
); );
if ($taskPoll[$tkey]["task"]->get_crash_counter() > 5) if ($taskPoll[$tkey]["task"]->get_crash_counter() > 5)
$taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED); $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_STOPPED);
else else
$taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART); $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_TOSTART);
} }
@@ -471,6 +478,9 @@ class task_Scheduler
$pid = $taskPoll[$tkey]['task']->get_pid(); $pid = $taskPoll[$tkey]['task']->get_pid();
if ($pid) { if ($pid) {
// send ctrl-c to tell the task to CLEAN quit
// (just in case the task doesn't pool his status 'tostop' fast enough)
if (function_exists('posix_kill')) {
if ( ! $taskPoll[$tkey]['sigterm_sent']) { if ( ! $taskPoll[$tkey]['sigterm_sent']) {
posix_kill($pid, SIGTERM); posix_kill($pid, SIGTERM);
$this->log( $this->log(
@@ -481,10 +491,24 @@ class task_Scheduler
) )
); );
} }
}
if (($dt = $taskPoll[$tkey]["killat"] - time()) < 0) { if (($dt = $taskPoll[$tkey]["killat"] - time()) < 0) {
// task still alive, time to kill
if ($this->method == self::METHOD_PROC_OPEN) {
proc_terminate($taskPoll[$tkey]["process"], 9);
@fclose($taskPoll[$tkey]["pipes"][1]);
@fclose($taskPoll[$tkey]["pipes"][2]);
proc_close($taskPoll[$tkey]["process"]);
$this->log(
sprintf(
"proc_terminate(...) done on task %s (pid=%s)"
, $taskPoll[$tkey]["task"]->get_task_id()
, $pid
)
);
} else { // METHOD_FORK, I guess we have posix
posix_kill($pid, 9); posix_kill($pid, 9);
$this->log( $this->log(
sprintf( sprintf(
"SIGKILL sent to task %s (pid=%s)" "SIGKILL sent to task %s (pid=%s)"
@@ -492,18 +516,14 @@ class task_Scheduler
, $pid , $pid
) )
); );
if ($this->method == self::METHOD_PROC_OPEN) {
proc_terminate($taskPoll[$tkey]["process"], 9);
@fclose($taskPoll[$tkey]["pipes"][1]);
@fclose($taskPoll[$tkey]["pipes"][2]);
proc_close($taskPoll[$tkey]["process"]);
} }
/*
unlink($lockdir . 'task_' . $taskPoll[$tkey]['task']->get_task_id() . '.lock'); unlink($lockdir . 'task_' . $taskPoll[$tkey]['task']->get_task_id() . '.lock');
$taskPoll[$tkey]["task"]->increment_crash_counter(); $taskPoll[$tkey]["task"]->increment_crash_counter();
// $taskPoll[$tkey]["task"]->set_pid(null); // $taskPoll[$tkey]["task"]->set_pid(null);
$taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED); $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_STOPPED);
*/
} else { } else {
$this->log( $this->log(
sprintf( sprintf(
@@ -521,13 +541,13 @@ class task_Scheduler
, $taskPoll[$tkey]["task"]->get_task_id() , $taskPoll[$tkey]["task"]->get_task_id()
) )
); );
$taskPoll[$tkey]["task"]->set_status(task_abstract::RETURNSTATUS_STOPPED); $taskPoll[$tkey]["task"]->set_status(task_abstract::STATUS_STOPPED);
} }
break; break;
case task_abstract::RETURNSTATUS_STOPPED: case task_abstract::STATUS_STOPPED:
case task_abstract::RETURNSTATUS_TODELETE: case task_abstract::STATUS_TODELETE:
if ($this->method == self::METHOD_PROC_OPEN) { if ($this->method == self::METHOD_PROC_OPEN) {
if ($taskPoll[$tkey]["process"]) { if ($taskPoll[$tkey]["process"]) {
@fclose($taskPoll[$tkey]["pipes"][1]); @fclose($taskPoll[$tkey]["pipes"][1]);

View File

@@ -33,10 +33,8 @@ class task_manager
public function old_get_tasks($refresh = false) public function old_get_tasks($refresh = false)
{ {
if ($this->tasks && ! $refresh) { if ($this->tasks && ! $refresh)
return $this->tasks; return $this->tasks;
}
$sql = "SELECT task2.* FROM task2 ORDER BY task_id ASC"; $sql = "SELECT task2.* FROM task2 ORDER BY task_id ASC";
$stmt = $this->appbox->get_connection()->prepare($sql); $stmt = $this->appbox->get_connection()->prepare($sql);
$stmt->execute(); $stmt->execute();
@@ -63,9 +61,8 @@ class task_manager
public function get_tasks($refresh = false) public function get_tasks($refresh = false)
{ {
if ($this->tasks && ! $refresh) { if ($this->tasks && ! $refresh)
return $this->tasks; return $this->tasks;
}
$sql = "SELECT task2.* FROM task2 ORDER BY task_id ASC"; $sql = "SELECT task2.* FROM task2 ORDER BY task_id ASC";
$stmt = $this->appbox->get_connection()->prepare($sql); $stmt = $this->appbox->get_connection()->prepare($sql);
@@ -148,11 +145,20 @@ class task_manager
public function get_scheduler_state() public function get_scheduler_state()
{ {
$pid = NULL;
$appbox = appbox::get_instance(\bootstrap::getCore()); $appbox = appbox::get_instance(\bootstrap::getCore());
$sql = "SELECT UNIX_TIMESTAMP()-UNIX_TIMESTAMP(schedqtime) AS qdelay
, schedstatus AS status FROM sitepreff";
$stmt = $this->appbox->get_connection()->prepare($sql);
$stmt->execute();
$ret = $stmt->fetch(PDO::FETCH_ASSOC);
$stmt->closeCursor();
$pid = NULL;
$lockdir = $appbox->get_registry()->get('GV_RootPath') . 'tmp/locks/'; $lockdir = $appbox->get_registry()->get('GV_RootPath') . 'tmp/locks/';
if (($schedlock = fopen($lockdir . 'scheduler.lock', 'a+'))) { if (($schedlock = fopen($lockdir . 'scheduler.lock', 'a+'))) {
if (flock($schedlock, LOCK_SH | LOCK_NB) === FALSE) { if (flock($schedlock, LOCK_EX | LOCK_NB) === FALSE) {
// already locked : running ! // already locked : running !
$pid = trim(fgets($schedlock, 512)); $pid = trim(fgets($schedlock, 512));
} else { } else {
@@ -162,13 +168,6 @@ class task_manager
fclose($schedlock); fclose($schedlock);
} }
$sql = "SELECT UNIX_TIMESTAMP()-UNIX_TIMESTAMP(schedqtime) AS qdelay
, schedstatus AS status FROM sitepreff";
$stmt = $this->appbox->get_connection()->prepare($sql);
$stmt->execute();
$ret = $stmt->fetch(PDO::FETCH_ASSOC);
$stmt->closeCursor();
if ($pid === NULL && $ret['status'] !== 'stopped') { if ($pid === NULL && $ret['status'] !== 'stopped') {
// auto fix // auto fix
$this->appbox->get_connection()->exec('UPDATE sitepreff SET schedstatus=\'stopped\''); $this->appbox->get_connection()->exec('UPDATE sitepreff SET schedstatus=\'stopped\'');