Cleaning opened connections

This commit is contained in:
Romain Neutron
2012-01-30 12:00:34 +01:00
parent 56fc13c5bb
commit 190b5992ff

View File

@@ -19,9 +19,11 @@ use Symfony\Component\Console\Output\OutputInterface;
class task_Scheduler class task_Scheduler
{ {
const TASKDELAYTOQUIT = 60; const TASKDELAYTOQUIT = 60;
protected $output; protected $output;
protected static $connection;
protected function log($message) protected function log($message)
{ {
@@ -31,12 +33,12 @@ class task_Scheduler
} }
$registry = registry::get_instance(); $registry = registry::get_instance();
$logdir = $registry->get('GV_RootPath') . 'logs/'; $logdir = $registry->get('GV_RootPath') . 'logs/';
logs::rotate($logdir . "scheduler.log"); logs::rotate($logdir . "scheduler.log");
$date_obj = new DateTime(); $date_obj = new DateTime();
$message = sprintf("%s %s \n", $date_obj->format(DATE_ATOM), $message); $message = sprintf("%s %s \n", $date_obj->format(DATE_ATOM), $message);
file_put_contents($logdir . "scheduler.log", $message, FILE_APPEND); file_put_contents($logdir . "scheduler.log", $message, FILE_APPEND);
@@ -47,14 +49,19 @@ class task_Scheduler
{ {
require dirname(__FILE__) . '/../../../config/connexion.inc'; require dirname(__FILE__) . '/../../../config/connexion.inc';
return new connection_pdo('appbox', $hostname, $port, $user, $password, $dbname); if (!self::$connection)
{
self::$connection = new connection_pdo ('appbox', $hostname, $port, $user, $password, $dbname);
}
return self::$connection;
} }
public function run(OutputInterface $output = null, $log_tasks = true) public function run(OutputInterface $output = null, $log_tasks = true)
{ {
require_once dirname(__FILE__) . '/../../bootstrap.php'; require_once dirname(__FILE__) . '/../../bootstrap.php';
$this->output = $output; $this->output = $output;
$appbox = appbox::get_instance(); $appbox = appbox::get_instance();
$registry = $appbox->get_registry(); $registry = $appbox->get_registry();
$system = system_server::get_platform(); $system = system_server::get_platform();
@@ -63,7 +70,7 @@ class task_Scheduler
for ($try = 0; true; $try++) for ($try = 0; true; $try++)
{ {
$schedlock = fopen(($lockfile = ($lockdir . 'scheduler.lock')), 'a+'); $schedlock = fopen(($lockfile = ($lockdir . 'scheduler.lock')), 'a+');
if (flock($schedlock, LOCK_EX | LOCK_NB) != true) if (flock($schedlock, LOCK_EX | LOCK_NB) != true)
{ {
$this->log(sprintf("failed to lock '%s' (try=%s/4)", $lockfile, $try)); $this->log(sprintf("failed to lock '%s' (try=%s/4)", $lockfile, $try));
@@ -96,7 +103,7 @@ class task_Scheduler
$sleeptime = 3; $sleeptime = 3;
$sql = "UPDATE sitepreff SET schedstatus='started', schedpid = :pid"; $sql = "UPDATE sitepreff SET schedstatus='started', schedpid = :pid";
$stmt = $conn->prepare($sql); $stmt = $conn->prepare($sql);
$stmt->execute(array(':pid' => getmypid())); $stmt->execute(array(':pid' => getmypid()));
$stmt->closeCursor(); $stmt->closeCursor();
@@ -139,14 +146,14 @@ class task_Scheduler
$this->log(sprintf("Warning : abox connection lost, restarting in 10 min.")); $this->log(sprintf("Warning : abox connection lost, restarting in 10 min."));
} }
sleep(60 * 10); sleep(60 * 10);
$conn = self::get_connection(); $conn = self::get_connection();
$connwaslost = true; $connwaslost = true;
} }
if ($connwaslost) if ($connwaslost)
{ {
$this->log("abox connection restored"); $this->log("abox connection restored");
$sql = 'UPDATE task SET crashed=0'; $sql = 'UPDATE task SET crashed=0';
$stmt = $conn->prepare($sql); $stmt = $conn->prepare($sql);
$stmt->execute(); $stmt->execute();
$stmt->closeCursor(); $stmt->closeCursor();
@@ -155,10 +162,10 @@ class task_Scheduler
} }
$schedstatus = ''; $schedstatus = '';
$sql = "SELECT schedstatus FROM sitepreff"; $sql = "SELECT schedstatus FROM sitepreff";
$stmt = $conn->prepare($sql); $stmt = $conn->prepare($sql);
$stmt->execute(); $stmt->execute();
$row = $stmt->fetch(PDO::FETCH_ASSOC); $row = $stmt->fetch(PDO::FETCH_ASSOC);
$stmt->closeCursor(); $stmt->closeCursor();
if ($row) if ($row)
@@ -168,13 +175,13 @@ class task_Scheduler
if ($schedstatus == 'tostop') if ($schedstatus == 'tostop')
{ {
$sql = 'UPDATE sitepreff SET schedstatus = "stopping"'; $sql = 'UPDATE sitepreff SET schedstatus = "stopping"';
$stmt = $conn->prepare($sql); $stmt = $conn->prepare($sql);
$stmt->execute(); $stmt->execute();
$stmt->closeCursor(); $stmt->closeCursor();
// if scheduler is stopped, stop the tasks // if scheduler is stopped, stop the tasks
$sql = 'UPDATE task2 SET status="tostop" WHERE status != "stopped" and status != "manual"'; $sql = 'UPDATE task2 SET status="tostop" WHERE status != "stopped" and status != "manual"';
$stmt = $conn->prepare($sql); $stmt = $conn->prepare($sql);
$stmt->execute(); $stmt->execute();
$stmt->closeCursor(); $stmt->closeCursor();
@@ -194,10 +201,10 @@ class task_Scheduler
$ttask[$tkey]["todel"] = true; $ttask[$tkey]["todel"] = true;
} }
$sql = "SELECT * FROM task2"; $sql = "SELECT * FROM task2";
$stmt = $conn->prepare($sql); $stmt = $conn->prepare($sql);
$stmt->execute(); $stmt->execute();
$rs = $stmt->fetchAll(PDO::FETCH_ASSOC); $rs = $stmt->fetchAll(PDO::FETCH_ASSOC);
$stmt->closeCursor(); $stmt->closeCursor();
foreach ($task_manager->get_tasks(true) as $task) foreach ($task_manager->get_tasks(true) as $task)
@@ -218,27 +225,27 @@ class task_Scheduler
case "WINDOWS": case "WINDOWS":
case "LINUX": case "LINUX":
$cmd = $phpcli . ' -f ' $cmd = $phpcli . ' -f '
. $registry->get('GV_RootPath') . $registry->get('GV_RootPath')
. "bin/console task:run " . "bin/console task:run "
. $task->get_task_id() . $task->get_task_id()
. " --runner=scheduler "; . " --runner=scheduler ";
break; break;
} }
$ttask[$tkey] = array( $ttask[$tkey] = array(
"task" => $task, "task" => $task,
"current_status" => $task->get_status(), "current_status" => $task->get_status(),
"process" => null, "process" => null,
"cmd" => $cmd, "cmd" => $cmd,
"killat" => null, "killat" => null,
"pipes" => null "pipes" => null
); );
$this->log( $this->log(
sprintf( sprintf(
"new Task %s, status=%s" "new Task %s, status=%s"
, $ttask[$tkey]["task"]->get_task_id() , $ttask[$tkey]["task"]->get_task_id()
, $task->get_status() , $task->get_status()
) )
); );
} }
else else
@@ -246,17 +253,17 @@ class task_Scheduler
if ($ttask[$tkey]["current_status"] != $task->get_status()) if ($ttask[$tkey]["current_status"] != $task->get_status())
{ {
$this->log( $this->log(
sprintf( sprintf(
"Task %s, oldstatus=%s, newstatus=%s" "Task %s, oldstatus=%s, newstatus=%s"
, $ttask[$tkey]["task"]->get_task_id() , $ttask[$tkey]["task"]->get_task_id()
, $ttask[$tkey]["current_status"] , $ttask[$tkey]["current_status"]
, $task->get_status() , $task->get_status()
) )
); );
$ttask[$tkey]["current_status"] = $task->get_status(); $ttask[$tkey]["current_status"] = $task->get_status();
} }
$ttask[$tkey]["task"] = $task; $ttask[$tkey]["task"] = $task;
} }
$ttask[$tkey]["todel"] = false; $ttask[$tkey]["todel"] = false;
} }
@@ -275,32 +282,32 @@ class task_Scheduler
* Launch task that are not yet launched * Launch task that are not yet launched
*/ */
$runningtask = 0; $runningtask = 0;
$common_status = array( $common_status = array(
task_abstract::STATUS_STARTED task_abstract::STATUS_STARTED
, task_abstract::RETURNSTATUS_STOPPED , task_abstract::RETURNSTATUS_STOPPED
); );
foreach ($ttask as $tkey => $tv) foreach ($ttask as $tkey => $tv)
{ {
if (!in_array($ttask[$tkey]["task"]->get_status(), $common_status)) if (!in_array($ttask[$tkey]["task"]->get_status(), $common_status))
{ {
$this->log( $this->log(
sprintf( sprintf(
'task %s has status %s' 'task %s has status %s'
, $ttask[$tkey]["task"]->get_task_id() , $ttask[$tkey]["task"]->get_task_id()
, $ttask[$tkey]["task"]->get_status() , $ttask[$tkey]["task"]->get_status()
) )
); );
} }
switch ($ttask[$tkey]["task"]->get_status()) switch ($ttask[$tkey]["task"]->get_status())
{ {
default: default:
$this->log( $this->log(
sprintf( sprintf(
'Unknow status `%s`' 'Unknow status `%s`'
, $ttask[$tkey]["task"]->get_status() , $ttask[$tkey]["task"]->get_status()
) )
); );
break; break;
case task_abstract::RETURNSTATUS_TORESTART: case task_abstract::RETURNSTATUS_TORESTART:
@@ -319,31 +326,31 @@ class task_Scheduler
if ($schedstatus == 'started' && !$ttask[$tkey]["process"]) if ($schedstatus == 'started' && !$ttask[$tkey]["process"])
{ {
$descriptors = array( $descriptors = array(
1 => array("pipe", "w") 1 => array("pipe", "w")
, 2 => array("pipe", "w") , 2 => array("pipe", "w")
); );
if ($log_tasks === true) if ($log_tasks === true)
{ {
$descriptors[1] = array( $descriptors[1] = array(
"file" "file"
, $logdir . "task_$tkey.log" , $logdir . "task_$tkey.log"
, "a+" , "a+"
); );
$descriptors[2] = array( $descriptors[2] = array(
"file" "file"
, $logdir . "task_$tkey.error.log" , $logdir . "task_$tkey.error.log"
, "a+" , "a+"
); );
} }
$ttask[$tkey]["process"] = proc_open( $ttask[$tkey]["process"] = proc_open(
$ttask[$tkey]["cmd"] $ttask[$tkey]["cmd"]
, $descriptors , $descriptors
, $ttask[$tkey]["pipes"] , $ttask[$tkey]["pipes"]
, $registry->get('GV_RootPath') . "bin/" , $registry->get('GV_RootPath') . "bin/"
, null , null
, array('bypass_shell' => true) , array('bypass_shell' => true)
); );
if (is_resource($ttask[$tkey]["process"])) if (is_resource($ttask[$tkey]["process"]))
@@ -356,12 +363,12 @@ class task_Scheduler
if ($ttask[$tkey]['task']->get_pid() !== null) if ($ttask[$tkey]['task']->get_pid() !== null)
{ {
$this->log( $this->log(
sprintf( sprintf(
"Task %s '%s' started (pid=%s)" "Task %s '%s' started (pid=%s)"
, $ttask[$tkey]['task']->get_task_id() , $ttask[$tkey]['task']->get_task_id()
, $ttask[$tkey]["cmd"] , $ttask[$tkey]["cmd"]
, $ttask[$tkey]['task']->get_pid() , $ttask[$tkey]['task']->get_pid()
) )
); );
$runningtask++; $runningtask++;
} }
@@ -375,12 +382,12 @@ class task_Scheduler
$ttask[$tkey]["process"] = null; $ttask[$tkey]["process"] = null;
$this->log( $this->log(
sprintf( sprintf(
"Task %s '%s' failed to start %d times" "Task %s '%s' failed to start %d times"
, $ttask[$tkey]["task"]->get_task_id() , $ttask[$tkey]["task"]->get_task_id()
, $ttask[$tkey]["cmd"] , $ttask[$tkey]["cmd"]
, $ttask[$tkey]["task"]->get_crash_counter() , $ttask[$tkey]["task"]->get_crash_counter()
) )
); );
$ttask[$tkey]["task"]->increment_crash_counter(); $ttask[$tkey]["task"]->increment_crash_counter();
@@ -411,7 +418,7 @@ class task_Scheduler
if ($proc_status['running']) if ($proc_status['running'])
$runningtask++; $runningtask++;
else else
$crashed = true; $crashed = true;
} }
else else
{ {
@@ -433,11 +440,11 @@ class task_Scheduler
$ttask[$tkey]["process"] = null; $ttask[$tkey]["process"] = null;
$this->log( $this->log(
sprintf( sprintf(
"Task %s crashed %d times" "Task %s crashed %d times"
, $ttask[$tkey]["task"]->get_task_id() , $ttask[$tkey]["task"]->get_task_id()
, $ttask[$tkey]["task"]->get_crash_counter() , $ttask[$tkey]["task"]->get_crash_counter()
) )
); );
@@ -459,7 +466,7 @@ class task_Scheduler
{ {
if ($ttask[$tkey]["killat"] === NULL) if ($ttask[$tkey]["killat"] === NULL)
$ttask[$tkey]["killat"] = time() + self::TASKDELAYTOQUIT; $ttask[$tkey]["killat"] = time() + self::TASKDELAYTOQUIT;
if (($dt = $ttask[$tkey]["killat"] - time()) < 0) if (($dt = $ttask[$tkey]["killat"] - time()) < 0)
{ {
$ppid = $ttask[$tkey]['task']->get_pid(); $ppid = $ttask[$tkey]['task']->get_pid();
$pids = preg_split('/\s+/', `ps -o pid --no-heading --ppid $ppid`); $pids = preg_split('/\s+/', `ps -o pid --no-heading --ppid $ppid`);
@@ -473,11 +480,11 @@ class task_Scheduler
} }
$this->log( $this->log(
sprintf( sprintf(
"SIGKILL sent to task %s (pid=%s)" "SIGKILL sent to task %s (pid=%s)"
, $ttask[$tkey]["task"]->get_task_id() , $ttask[$tkey]["task"]->get_task_id()
, $ttask[$tkey]["task"]->get_pid() , $ttask[$tkey]["task"]->get_pid()
) )
); );
proc_terminate($ttask[$tkey]["process"], 9); proc_terminate($ttask[$tkey]["process"], 9);
@@ -493,11 +500,11 @@ class task_Scheduler
else else
{ {
$this->log( $this->log(
sprintf( sprintf(
"waiting task %s to quit (kill in %d seconds)" "waiting task %s to quit (kill in %d seconds)"
, $ttask[$tkey]["task"]->get_task_id() , $ttask[$tkey]["task"]->get_task_id()
, $dt , $dt
) )
); );
$runningtask++; $runningtask++;
} }
@@ -523,6 +530,7 @@ class task_Scheduler
{ {
$conn->close(); $conn->close();
unset($conn); unset($conn);
self::$connection = null;
$to_reopen = true; $to_reopen = true;
} }
sleep($sleeptime); sleep($sleeptime);
@@ -532,7 +540,7 @@ class task_Scheduler
} }
} }
$sql = "UPDATE sitepreff SET schedstatus='stopped', schedpid='0'"; $sql = "UPDATE sitepreff SET schedstatus='stopped', schedpid='0'";
$stmt = $conn->prepare($sql); $stmt = $conn->prepare($sql);
$stmt->execute(); $stmt->execute();
$stmt->closeCursor(); $stmt->closeCursor();