diff --git a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php index 226e60142c..d6f13782f2 100644 --- a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php +++ b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php @@ -3,6 +3,7 @@ namespace Alchemy\Phrasea\Model\Entities; use Doctrine\ORM\Mapping as ORM; +use Gedmo\Mapping\Annotation as Gedmo; /** * @ORM\Table(name="WorkerRunningJob", @@ -15,6 +16,9 @@ use Doctrine\ORM\Mapping as ORM; */ class WorkerRunningJob { + const FINISHED = 'finished'; + const RUNNING = 'running'; + /** * @ORM\Column(type="integer") * @ORM\Id @@ -43,6 +47,22 @@ class WorkerRunningJob */ private $workOn; + /** + * @Gedmo\Timestampable(on="create") + * @ORM\Column(type="datetime") + */ + private $created; + + /** + * @ORM\Column(type="datetime") + */ + private $published; + + /** + * @ORM\Column(type="string", name="status") + */ + private $status; + /** * @return integer */ @@ -130,4 +150,50 @@ class WorkerRunningJob { return $this->workOn; } + + /** + * @return \DateTime + */ + public function getCreated() + { + return $this->created; + } + + /** + * @param \DateTime $published + * @return $this + */ + public function setPublished(\DateTime $published) + { + $this->published = $published; + + return $this; + } + + /** + * @return mixed + */ + public function getPublished() + { + return $this->published; + } + + /** + * @param $status + * @return $this + */ + public function setStatus($status) + { + $this->status = $status; + + return $this; + } + + /** + * @return mixed + */ + public function getStatus() + { + return $this->status; + } } diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php index 03e159abfa..1937774143 100644 --- a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php +++ b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php @@ -3,6 +3,7 @@ namespace Alchemy\Phrasea\Model\Repositories; use Alchemy\Phrasea\Core\PhraseaTokens; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Doctrine\ORM\EntityRepository; class WorkerRunningJobRepository extends EntityRepository @@ -23,7 +24,8 @@ class WorkerRunningJobRepository extends EntityRepository FROM WorkerRunningJob WHERE ((work & :write_meta) > 0 OR ((work & :make_subdef) > 0 AND work_on = :work_on) ) AND record_id = :record_id - AND databox_id = :databox_id'; + AND databox_id = :databox_id + AND status = :status'; $query = $this->_em->createNativeQuery($sql, $rsm); $query->setParameters([ @@ -31,7 +33,8 @@ class WorkerRunningJobRepository extends EntityRepository 'make_subdef'=> PhraseaTokens::MAKE_SUBDEF, 'work_on' => $subdefName, 'record_id' => $recordId, - 'databox_id' => $databoxId + 'databox_id' => $databoxId, + 'status' => WorkerRunningJob::RUNNING ] ); @@ -55,7 +58,8 @@ class WorkerRunningJobRepository extends EntityRepository FROM WorkerRunningJob WHERE ((work & :make_subdef) > 0 OR ((work & :write_meta) > 0 AND work_on = :work_on) ) AND record_id = :record_id - AND databox_id = :databox_id'; + AND databox_id = :databox_id + AND status = :status'; $query = $this->_em->createNativeQuery($sql, $rsm); $query->setParameters([ @@ -63,10 +67,34 @@ class WorkerRunningJobRepository extends EntityRepository 'write_meta' => PhraseaTokens::WRITE_META, 'work_on' => $subdefName, 'record_id' => $recordId, - 'databox_id' => $databoxId + 'databox_id' => $databoxId, + 'status' => WorkerRunningJob::RUNNING ] ); return count($query->getResult()) == 0; } + + public function truncateWorkerTable() + { + $connection = $this->_em->getConnection(); + $platform = $connection->getDatabasePlatform(); + $this->_em->beginTransaction(); + try { + $connection->executeUpdate($platform->getTruncateTableSQL('WorkerRunningJob')); + } catch (\Exception $e) { + $this->_em->rollback(); + } + } + + public function deleteFinishedWorks() + { + $this->_em->beginTransaction(); + try { + $this->_em->getConnection()->delete('WorkerRunningJob', ['status' => WorkerRunningJob::FINISHED]); + $this->_em->commit(); + } catch (\Exception $e) { + $this->_em->rollback(); + } + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php index 02e91ffe76..37c18bbd1c 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php @@ -4,6 +4,7 @@ namespace Alchemy\Phrasea\WorkerManager\Controller; use Alchemy\Phrasea\Application as PhraseaApplication; use Alchemy\Phrasea\Controller\Controller; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions; use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexEvent; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; @@ -63,6 +64,34 @@ class AdminConfigurationController extends Controller ]); } + public function infoAction(PhraseaApplication $app, Request $request) + { + /** @var WorkerRunningJobRepository $repoWorker */ + $repoWorker = $app['repo.worker-running-job']; + + return $this->render('admin/worker-manager/worker_info.html.twig', [ + 'workerRunningJob' => $repoWorker->findAll() + ]); + } + + public function truncateTableAction(PhraseaApplication $app, Request $request) + { + /** @var WorkerRunningJobRepository $repoWorker */ + $repoWorker = $app['repo.worker-running-job']; + $repoWorker->truncateWorkerTable(); + + return $app->redirectPath('worker_admin'); + } + + public function deleteFinishedAction(PhraseaApplication $app, Request $request) + { + /** @var WorkerRunningJobRepository $repoWorker */ + $repoWorker = $app['repo.worker-running-job']; + $repoWorker->deleteFinishedWorks(); + + return $app->redirectPath('worker_admin'); + } + public function searchengineAction(PhraseaApplication $app, Request $request) { $options = $this->getElasticsearchOptions(); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php index fe9e62c5d4..22b20b8be2 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php @@ -69,7 +69,8 @@ class AlchemyWorkerServiceProvider implements PluginProviderInterface $app['alchemy_worker.logger'], $app['dispatcher'], $app['phraseanet.filesystem'], - $app['repo.worker-running-job'] + $app['repo.worker-running-job'], + $app['elasticsearch.indexer'] )) ->setApplicationBox($app['phraseanet.appbox']) ->setEntityManagerLocator(new LazyLocator($app, 'orm.em')) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php index c7b247d73b..7fdc988b93 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php @@ -53,6 +53,18 @@ class ControllerServiceProvider implements ControllerProviderInterface, ServiceP ->method('GET|POST') ->bind('worker_admin_configuration'); + $controllers->match('/info', 'controller.worker.admin.configuration:infoAction') + ->method('GET') + ->bind('worker_admin_info'); + + $controllers->match('/truncate', 'controller.worker.admin.configuration:truncateTableAction') + ->method('POST') + ->bind('worker_admin_truncate'); + + $controllers->match('/delete-finished', 'controller.worker.admin.configuration:deleteFinishedAction') + ->method('POST') + ->bind('worker_admin_delete_finished'); + $controllers->match('/searchengine', 'controller.worker.admin.configuration:searchengineAction') ->method('GET|POST') ->bind('worker_admin_searchengine'); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php index 62cc23ae52..c191017bf8 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php @@ -72,6 +72,8 @@ class MessagePublisher public function publishMessage(array $payload, $queueName, $retryCount = null, $workerMessage = '') { + // add published timestamp to all message payload + $payload['payload']['published'] = time(); $msg = new AMQPMessage(json_encode($payload)); $routing = array_search($queueName, AMQPConnection::$defaultRetryQueues); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php index 635ff7e3cc..0a0dc99996 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php @@ -9,6 +9,7 @@ use Alchemy\Phrasea\Filesystem\FilesystemService; use Alchemy\Phrasea\Media\SubdefGenerator; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; +use Alchemy\Phrasea\SearchEngine\Elastic\Indexer; use Alchemy\Phrasea\WorkerManager\Event\StoryCreateCoverEvent; use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionCreationFailureEvent; use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent; @@ -31,6 +32,7 @@ class SubdefCreationWorker implements WorkerInterface private $dispatcher; private $filesystem; private $repoWorker; + private $indexer; public function __construct( SubdefGenerator $subdefGenerator, @@ -38,7 +40,8 @@ class SubdefCreationWorker implements WorkerInterface LoggerInterface $logger, EventDispatcherInterface $dispatcher, FilesystemService $filesystem, - WorkerRunningJobRepository $repoWorker + WorkerRunningJobRepository $repoWorker, + Indexer $indexer ) { $this->subdefGenerator = $subdefGenerator; @@ -47,6 +50,7 @@ class SubdefCreationWorker implements WorkerInterface $this->dispatcher = $dispatcher; $this->filesystem = $filesystem; $this->repoWorker = $repoWorker; + $this->indexer = $indexer; } public function process(array $payload) @@ -84,12 +88,15 @@ class SubdefCreationWorker implements WorkerInterface $em->beginTransaction(); try { + $date = new \DateTime(); $workerRunningJob = new WorkerRunningJob(); $workerRunningJob ->setDataboxId($databoxId) ->setRecordId($recordId) ->setWork(PhraseaTokens::MAKE_SUBDEF) ->setWorkOn($payload['subdefName']) + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) ; $em->persist($workerRunningJob); @@ -152,10 +159,14 @@ class SubdefCreationWorker implements WorkerInterface } } + // update elastic + $this->indexer->flushQueue(); + // tell that we have finished to work on this file $em->beginTransaction(); try { - $em->remove($workerRunningJob); + $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); + $em->persist($workerRunningJob); $em->flush(); $em->commit(); } catch (\Exception $e) { diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php index ccaacdef0c..bfed663deb 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -88,12 +88,15 @@ class WriteMetadatasWorker implements WorkerInterface $em->beginTransaction(); try { + $date = new \DateTime(); $workerRunningJob = new WorkerRunningJob(); $workerRunningJob ->setDataboxId($databoxId) ->setRecordId($recordId) ->setWork($param) ->setWorkOn($payload['subdefName']) + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) ; $em->persist($workerRunningJob); @@ -246,7 +249,8 @@ class WriteMetadatasWorker implements WorkerInterface // tell that we have finished to work on this file $em->beginTransaction(); try { - $em->remove($workerRunningJob); + $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); + $em->persist($workerRunningJob); $em->flush(); $em->commit(); } catch (\Exception $e) { diff --git a/templates/web/admin/worker-manager/index.html.twig b/templates/web/admin/worker-manager/index.html.twig index ff10e205dc..00ef95e3cd 100644 --- a/templates/web/admin/worker-manager/index.html.twig +++ b/templates/web/admin/worker-manager/index.html.twig @@ -10,6 +10,11 @@ {{ "Configuration" | trans }} +
databox_id | +record_id | +work | +work_on | +created | +published | +status | +
---|---|---|---|---|---|---|
{{ workerRow.databoxId }} | +{{ workerRow.recordId }} | +{{ workerRow.work }} | +{{ workerRow.workOn }} | +{{ workerRow.created|date('Y-m-d H:i:s') }} | +{{ workerRow.published|date('Y-m-d H:i:s') }} | +{{ workerRow.status }} | + +