Merge pull request #996 from romainneutron/better-notifier

[3.9] More improviements in live notifier
This commit is contained in:
Nicolas Le Goff
2014-02-28 12:59:51 +01:00
12 changed files with 108 additions and 31 deletions

View File

@@ -28,23 +28,24 @@ class TaskList extends Command
{ {
$output->writeln("<info>Querying the task manager...</info>"); $output->writeln("<info>Querying the task manager...</info>");
$errors = 0; $errors = 0;
$probe = $this->container['task-manager.live-information']; $tasks = $this->container['manipulator.task']->getRepository()->findAll();
$infos = $this->container['task-manager.live-information']->getTasks($tasks);
$rows = [];
$rows = array_map(function (Task $task) use ($probe, &$errors) { foreach ($tasks as $task) {
$info = $probe->getTask($task); $info = isset($infos[$task->getId()]) ? $infos[$task->getId()] : ['actual' => null];
$error = $info['actual'] !== $task->getStatus(); if (true === $error = $info['actual'] !== $task->getStatus()) {
if ($error) {
$errors ++; $errors ++;
} }
return [ $rows[] = [
$task->getId(), $task->getId(),
$task->getName(), $task->getName(),
$task->getStatus() !== 'started' ? "<comment>".$task->getStatus() . "</comment>" : $task->getStatus(), $task->getStatus() !== 'started' ? "<comment>".$task->getStatus() . "</comment>" : $task->getStatus(),
$error ? "<error>".$info['actual']."</error>" : $info['actual'], $error ? "<error>".$info['actual']."</error>" : $info['actual'],
$info['process-id'], $info['process-id'],
]; ];
}, $this->container['manipulator.task']->getRepository()->findAll()); }
$this $this
->getHelperSet()->get('table') ->getHelperSet()->get('table')

View File

@@ -32,14 +32,7 @@ class LiveInformation
*/ */
public function getManager($throwException = false) public function getManager($throwException = false)
{ {
try { $data = $this->query($throwException);
$data = $this->notifier->notify(Notifier::MESSAGE_INFORMATIONS, 2);
} catch (RuntimeException $e) {
if($throwException) {
throw $e;
}
$data = [];
}
return [ return [
'configuration' => $this->status->getStatus(), 'configuration' => $this->status->getStatus(),
@@ -55,14 +48,33 @@ class LiveInformation
*/ */
public function getTask(Task $task, $throwException = false) public function getTask(Task $task, $throwException = false)
{ {
try { $data = $this->query($throwException);
$data = $this->notifier->notify(Notifier::MESSAGE_INFORMATIONS, 2);
} catch (RuntimeException $e) { return $this->formatTask($task, $data);
if($throwException) { }
throw $e;
} /**
$data = []; * Returns live informations about some tasks.
*
* @param Task[] $tasks
* @param boolean $throwException
*
* @return array
*/
public function getTasks($tasks, $throwException = false)
{
$data = $this->query($throwException);
$ret = [];
foreach ($tasks as $task) {
$ret[$task->getId()] = $this->formatTask($task, $data);
} }
return $ret;
}
private function formatTask(Task $task, $data)
{
$taskData = (isset($data['jobs']) && isset($data['jobs'][$task->getId()])) ? $data['jobs'][$task->getId()] : []; $taskData = (isset($data['jobs']) && isset($data['jobs'][$task->getId()])) ? $data['jobs'][$task->getId()] : [];
return [ return [
@@ -71,4 +83,16 @@ class LiveInformation
'process-id' => isset($taskData['process-id']) ? $taskData['process-id'] : null, 'process-id' => isset($taskData['process-id']) ? $taskData['process-id'] : null,
]; ];
} }
private function query($throwException)
{
try {
return $this->notifier->notify(Notifier::MESSAGE_INFORMATIONS);
} catch (RuntimeException $e) {
if($throwException) {
throw $e;
}
return [];
}
}
} }

View File

@@ -33,40 +33,46 @@ class Notifier
/** @var LoggerInterface */ /** @var LoggerInterface */
private $logger; private $logger;
/** @var integer */
private $timeout = 2;
public function __construct(\ZMQSocket $socket, LoggerInterface $logger) public function __construct(\ZMQSocket $socket, LoggerInterface $logger)
{ {
$this->socket = $socket; $this->socket = $socket;
$this->logger = $logger; $this->logger = $logger;
} }
public function setTimeout($timeout)
{
if ($timeout <= 0) {
throw new \InvalidArgumentException('Timeout must be a positive value');
}
$this->timeout = (float) $timeout;
}
/** /**
* Notifies the task manager given a message constant, see MESSAGE_* constants. * Notifies the task manager given a message constant, see MESSAGE_* constants.
* *
* @param string $message * @param string $message
* @param integer $timeout
* *
* @return mixed|null The return value of the task manager. * @return mixed|null The return value of the task manager.
* *
* @throws RuntimeException in case notification did not occur within the timeout. * @throws RuntimeException in case notification did not occur within the timeout.
*/ */
public function notify($message, $timeout = 1) public function notify($message)
{ {
if ($timeout <= 0) {
throw new \InvalidArgumentException('Timeout must be a positive value');
}
try { try {
$command = $this->createCommand($message); $command = $this->createCommand($message);
$this->socket->send($command); $this->socket->send($command);
$limit = microtime(true) + $timeout; $limit = microtime(true) + $this->timeout;
while (microtime(true) < $limit && false === $result = $this->socket->recv(\ZMQ::MODE_NOBLOCK)) { while (microtime(true) < $limit && false === $result = $this->socket->recv(\ZMQ::MODE_NOBLOCK)) {
usleep(1000); usleep(1000);
} }
if (false === $result) { if (false === $result) {
$this->logger->error(sprintf('Unable to notify the task manager with message "%s" within timeout of %d seconds', $message, $timeout)); $this->logger->error(sprintf('Unable to notify the task manager with message "%s" within timeout of %d seconds', $message, $this->timeout));
throw new RuntimeException('Unable to retrieve information.'); throw new RuntimeException('Unable to retrieve information.');
} }
@@ -80,7 +86,7 @@ class Notifier
return $data['reply']; return $data['reply'];
} catch (\ZMQSocketException $e) { } catch (\ZMQSocketException $e) {
$this->logger->error(sprintf('Unable to notify the task manager with message "%s" within timeout of %d seconds', $message, $timeout), array('exception' => $e)); $this->logger->error(sprintf('Unable to notify the task manager with message "%s" within timeout of %d seconds', $message, $this->timeout), array('exception' => $e));
throw new RuntimeException('Unable to retrieve information.', $e->getCode(), $e); throw new RuntimeException('Unable to retrieve information.', $e->getCode(), $e);
} }
} }

View File

@@ -17,6 +17,10 @@ class SchedulerPauseTest extends \PhraseanetTestCase
$input = $this->getMock('Symfony\Component\Console\Input\InputInterface'); $input = $this->getMock('Symfony\Component\Console\Input\InputInterface');
$output = $this->getMock('Symfony\Component\Console\Output\OutputInterface'); $output = $this->getMock('Symfony\Component\Console\Output\OutputInterface');
self::$DI['cli']['monolog'] = self::$DI['cli']->share(function () {
return $this->createMonologMock();
});
$command = new SchedulerPauseTasks(); $command = new SchedulerPauseTasks();
$command->setContainer(self::$DI['cli']); $command->setContainer(self::$DI['cli']);
$command->execute($input, $output); $command->execute($input, $output);

View File

@@ -14,6 +14,10 @@ class SchedulerResumeTest extends \PhraseanetTestCase
self::$DI['cli']['task-manager.status']->expects($this->once()) self::$DI['cli']['task-manager.status']->expects($this->once())
->method('start'); ->method('start');
self::$DI['cli']['monolog'] = self::$DI['cli']->share(function () {
return $this->createMonologMock();
});
$input = $this->getMock('Symfony\Component\Console\Input\InputInterface'); $input = $this->getMock('Symfony\Component\Console\Input\InputInterface');
$output = $this->getMock('Symfony\Component\Console\Output\OutputInterface'); $output = $this->getMock('Symfony\Component\Console\Output\OutputInterface');

View File

@@ -17,6 +17,10 @@ class SchedulerRunTest extends \PhraseanetTestCase
self::$DI['cli']['task-manager']->expects($this->once()) self::$DI['cli']['task-manager']->expects($this->once())
->method('start'); ->method('start');
self::$DI['cli']['monolog'] = self::$DI['cli']->share(function () {
return $this->createMonologMock();
});
$input = $this->getMock('Symfony\Component\Console\Input\InputInterface'); $input = $this->getMock('Symfony\Component\Console\Input\InputInterface');
$output = $this->getMock('Symfony\Component\Console\Output\OutputInterface'); $output = $this->getMock('Symfony\Component\Console\Output\OutputInterface');

View File

@@ -11,6 +11,10 @@ class SchedulerStateTest extends \PhraseanetTestCase
$input = $this->getMock('Symfony\Component\Console\Input\InputInterface'); $input = $this->getMock('Symfony\Component\Console\Input\InputInterface');
$output = $this->getMock('Symfony\Component\Console\Output\OutputInterface'); $output = $this->getMock('Symfony\Component\Console\Output\OutputInterface');
self::$DI['cli']['monolog'] = self::$DI['cli']->share(function () {
return $this->createMonologMock();
});
$command = new SchedulerState(); $command = new SchedulerState();
$command->setContainer(self::$DI['cli']); $command->setContainer(self::$DI['cli']);
$command->execute($input, $output); $command->execute($input, $output);

View File

@@ -14,6 +14,10 @@ class TaskListTest extends \PhraseanetTestCase
->method('getFormatter') ->method('getFormatter')
->will($this->returnValue($this->getMock('Symfony\Component\Console\Formatter\OutputFormatterInterface'))); ->will($this->returnValue($this->getMock('Symfony\Component\Console\Formatter\OutputFormatterInterface')));
self::$DI['cli']['monolog'] = self::$DI['cli']->share(function () {
return $this->createMonologMock();
});
$command = new TaskList(); $command = new TaskList();
$command->setContainer(self::$DI['cli']); $command->setContainer(self::$DI['cli']);

View File

@@ -17,6 +17,10 @@ class TaskStartTest extends \PhraseanetTestCase
->with('task_id') ->with('task_id')
->will($this->returnValue(1)); ->will($this->returnValue(1));
self::$DI['cli']['monolog'] = self::$DI['cli']->share(function () {
return $this->createMonologMock();
});
$command = new TaskStart(); $command = new TaskStart();
$command->setContainer(self::$DI['cli']); $command->setContainer(self::$DI['cli']);
$command->execute($input, $output); $command->execute($input, $output);

View File

@@ -12,6 +12,10 @@ class TaskStateTest extends \PhraseanetTestCase
$input = $this->getMock('Symfony\Component\Console\Input\InputInterface'); $input = $this->getMock('Symfony\Component\Console\Input\InputInterface');
$output = $this->getMock('Symfony\Component\Console\Output\OutputInterface'); $output = $this->getMock('Symfony\Component\Console\Output\OutputInterface');
self::$DI['cli']['monolog'] = self::$DI['cli']->share(function () {
return $this->createMonologMock();
});
$input->expects($this->any()) $input->expects($this->any())
->method('getArgument') ->method('getArgument')
->with('task_id') ->with('task_id')

View File

@@ -17,6 +17,10 @@ class TaskStopTest extends \PhraseanetTestCase
->with('task_id') ->with('task_id')
->will($this->returnValue(1)); ->will($this->returnValue(1));
self::$DI['cli']['monolog'] = self::$DI['cli']->share(function () {
return $this->createMonologMock();
});
$command = new TaskStop(); $command = new TaskStop();
$command->setContainer(self::$DI['cli']); $command->setContainer(self::$DI['cli']);
$command->execute($input, $output); $command->execute($input, $output);

View File

@@ -14,6 +14,7 @@ use Symfony\Component\DomCrawler\Crawler;
use Symfony\Component\Routing\RequestContext; use Symfony\Component\Routing\RequestContext;
use Alchemy\Tests\Tools\TranslatorMockTrait; use Alchemy\Tests\Tools\TranslatorMockTrait;
use Alchemy\Phrasea\Authentication\ACLProvider; use Alchemy\Phrasea\Authentication\ACLProvider;
use Alchemy\Phrasea\TaskManager\Notifier;
use Guzzle\Http\Client as Guzzle; use Guzzle\Http\Client as Guzzle;
abstract class PhraseanetTestCase extends WebTestCase abstract class PhraseanetTestCase extends WebTestCase
@@ -294,6 +295,12 @@ abstract class PhraseanetTestCase extends WebTestCase
return $generator; return $generator;
})); }));
$app['task-manager.notifier'] = $app->share($app->extend('task-manager.notifier', function (Notifier $notifier) {
$notifier->setTimeout(0.0001);
return $notifier;
}));
$app['translator'] = $this->createTranslatorMock(); $app['translator'] = $this->createTranslatorMock();
$app['phraseanet.SE.subscriber'] = $this->getMock('Symfony\Component\EventDispatcher\EventSubscriberInterface'); $app['phraseanet.SE.subscriber'] = $this->getMock('Symfony\Component\EventDispatcher\EventSubscriberInterface');
@@ -642,4 +649,11 @@ abstract class PhraseanetTestCase extends WebTestCase
{ {
return $this->getMock('Psr\Log\LoggerInterface'); return $this->getMock('Psr\Log\LoggerInterface');
} }
protected function createMonologMock()
{
return $this->getMockBuilder('Monolog\Logger')
->disableOriginalConstructor()
->getMock();
}
} }