diff --git a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php index bf65a26389..82de6f2d93 100644 --- a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php +++ b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php @@ -2,7 +2,6 @@ namespace Alchemy\Phrasea\Model\Entities; -use Alchemy\Phrasea\Core\PhraseaTokens; use Doctrine\ORM\Mapping as ORM; use Gedmo\Mapping\Annotation as Gedmo; @@ -14,7 +13,8 @@ class WorkerRunningJob { const FINISHED = 'finished'; const RUNNING = 'running'; - const ERROR = 'error attempt '; + const ERROR = 'error'; + const ATTEMPT = 'attempt '; const TYPE_PULL = 'uploader pull'; const TYPE_PUSH = 'uploader push'; @@ -58,6 +58,11 @@ class WorkerRunningJob */ private $assetId; + /** + * @ORM\Column(type="string", name="info", nullable=true) + */ + private $info; + /** * @Gedmo\Timestampable(on="create") * @ORM\Column(type="datetime") @@ -202,6 +207,25 @@ class WorkerRunningJob return $this->assetId; } + /** + * @param $info + * @return $this + */ + public function setInfo($info) + { + $this->info = $info; + + return $this; + } + + /** + * @return mixed + */ + public function getInfo() + { + return $this->info; + } + /** * @return \DateTime */ diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php index 8e76fc4394..bccf447317 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php @@ -10,12 +10,14 @@ class AssetsCreationRecordFailureEvent extends SfEvent private $payload; private $workerMessage; private $count; + private $workerJobId; - public function __construct($payload, $workerMessage = '', $count = 2) + public function __construct($payload, $workerMessage = '', $count = 2, $workerJobId = 0) { $this->payload = $payload; $this->workerMessage = $workerMessage; $this->count = $count; + $this->workerJobId = $workerJobId; } public function getPayload() @@ -32,4 +34,9 @@ class AssetsCreationRecordFailureEvent extends SfEvent { return $this->count; } + + public function getWorkerJobId() + { + return $this->workerJobId; + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php index 34c112e227..7f9305773a 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php @@ -12,8 +12,9 @@ class PopulateIndexFailureEvent extends SfEvent private $databoxId; private $workerMessage; private $count; + private $workerJobId; - public function __construct($host, $port, $indexName, $databoxId, $workerMessage = '', $count = 2) + public function __construct($host, $port, $indexName, $databoxId, $workerMessage = '', $count = 2, $workerJobId = 0) { $this->host = $host; $this->port = $port; @@ -21,6 +22,7 @@ class PopulateIndexFailureEvent extends SfEvent $this->databoxId = $databoxId; $this->workerMessage = $workerMessage; $this->count = $count; + $this->workerJobId = $workerJobId; } public function getHost() @@ -52,4 +54,9 @@ class PopulateIndexFailureEvent extends SfEvent { return $this->count; } + + public function getWorkerJobId() + { + return $this->workerJobId; + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/QueueWorkerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/QueueWorkerServiceProvider.php index 6bdf421fbf..a4e4596d73 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Provider/QueueWorkerServiceProvider.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/QueueWorkerServiceProvider.php @@ -66,8 +66,8 @@ class QueueWorkerServiceProvider implements PluginProviderInterface new RecordSubscriber($app, new LazyLocator($app, 'phraseanet.appbox')) ); $dispatcher->addSubscriber(new ExportSubscriber($app['alchemy_worker.message.publisher'])); - $dispatcher->addSubscriber(new AssetsIngestSubscriber($app['alchemy_worker.message.publisher'])); - $dispatcher->addSubscriber(new SearchengineSubscriber($app['alchemy_worker.message.publisher'])); + $dispatcher->addSubscriber(new AssetsIngestSubscriber($app['alchemy_worker.message.publisher'], new LazyLocator($app, 'repo.worker-running-job'))); + $dispatcher->addSubscriber(new SearchengineSubscriber($app['alchemy_worker.message.publisher'], new LazyLocator($app, 'repo.worker-running-job'))); $dispatcher->addSubscriber(new WebhookSubscriber($app['alchemy_worker.message.publisher'])); $dispatcher->addSubscriber(new SubtitleSubscriber(new LazyLocator($app, 'repo.worker-job'), $app['alchemy_worker.message.publisher'])); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php index 079ac535f3..0ebd0274fa 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php @@ -3,6 +3,7 @@ namespace Alchemy\Phrasea\WorkerManager\Subscriber; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\WorkerManager\Event\AssetsCreateEvent; use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationFailureEvent; use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationRecordFailureEvent; @@ -15,9 +16,13 @@ class AssetsIngestSubscriber implements EventSubscriberInterface /** @var MessagePublisher $messagePublisher */ private $messagePublisher; - public function __construct(MessagePublisher $messagePublisher) + /** @var callable */ + private $repoWorkerJobLocator; + + public function __construct(MessagePublisher $messagePublisher, callable $repoWorkerJobLocator) { - $this->messagePublisher = $messagePublisher; + $this->messagePublisher = $messagePublisher; + $this->repoWorkerJobLocator = $repoWorkerJobLocator; } public function onAssetsCreate(AssetsCreateEvent $event) @@ -28,7 +33,6 @@ class AssetsIngestSubscriber implements EventSubscriberInterface 'payload' => array_merge($event->getData(), ['type' => WorkerRunningJob::TYPE_PUSH]) ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE); } @@ -49,11 +53,36 @@ class AssetsIngestSubscriber implements EventSubscriberInterface public function onAssetsCreationRecordFailure(AssetsCreationRecordFailureEvent $event) { + $repoWorker = $this->getRepoWorkerJob(); + $payload = [ 'message_type' => MessagePublisher::CREATE_RECORD_TYPE, 'payload' => $event->getPayload() ]; + $em = $repoWorker->getEntityManager(); + // check connection an re-connect if needed + $repoWorker->reconnect(); + + /** @var WorkerRunningJob $workerRunningJob */ + $workerRunningJob = $repoWorker->find($event->getWorkerJobId()); + + if ($workerRunningJob) { + $em->beginTransaction(); + try { + // count-1 for the number of finished attempt + $workerRunningJob + ->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1)) + ; + + $em->persist($workerRunningJob); + $em->flush(); + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + } + $this->messagePublisher->publishMessage( $payload, MessagePublisher::RETRY_CREATE_RECORD_QUEUE, @@ -70,4 +99,14 @@ class AssetsIngestSubscriber implements EventSubscriberInterface WorkerEvents::ASSETS_CREATION_RECORD_FAILURE => 'onAssetsCreationRecordFailure' ]; } + + /** + * @return WorkerRunningJobRepository + */ + private function getRepoWorkerJob() + { + $callable = $this->repoWorkerJobLocator; + + return $callable(); + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php index 565c7304de..b24aad933b 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php @@ -9,7 +9,6 @@ use Alchemy\Phrasea\Core\Event\Record\MetadataChangedEvent; use Alchemy\Phrasea\Core\Event\Record\RecordEvent; use Alchemy\Phrasea\Core\Event\Record\RecordEvents; use Alchemy\Phrasea\Core\Event\Record\SubdefinitionCreateEvent; -use Alchemy\Phrasea\Core\PhraseaTokens; use Alchemy\Phrasea\Databox\Subdef\MediaSubdefRepository; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; @@ -97,7 +96,8 @@ class RecordSubscriber implements EventSubscriberInterface 'recordId' => $event->getRecord()->getRecordId(), 'databoxId' => $event->getRecord()->getDataboxId(), 'subdefName' => $event->getSubdefName(), - 'status' => '' + 'status' => '', + 'workerJobId' => $event->getWorkerJobId() ] ]; @@ -106,13 +106,17 @@ class RecordSubscriber implements EventSubscriberInterface // check connection an re-connect if needed $repoWorker->reconnect(); + /** @var WorkerRunningJob $workerRunningJob */ $workerRunningJob = $repoWorker->find($event->getWorkerJobId()); if ($workerRunningJob) { $em->beginTransaction(); try { // count-1 for the number of finished attempt - $workerRunningJob->setStatus(WorkerRunningJob::ERROR. ($event->getCount() - 1)); + $workerRunningJob + ->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1)) + ->setStatus(WorkerRunningJob::ERROR) + ; $em->persist($workerRunningJob); $em->flush(); @@ -212,7 +216,8 @@ class RecordSubscriber implements EventSubscriberInterface 'payload' => [ 'recordId' => $event->getRecord()->getRecordId(), 'databoxId' => $event->getRecord()->getDataboxId(), - 'subdefName' => $event->getSubdefName() + 'subdefName' => $event->getSubdefName(), + 'workerJobId' => $event->getWorkerJobId() ] ]; @@ -229,13 +234,17 @@ class RecordSubscriber implements EventSubscriberInterface // check connection an re-connect if needed $repoWorker->reconnect(); + /** @var WorkerRunningJob $workerRunningJob */ $workerRunningJob = $repoWorker->find($event->getWorkerJobId()); if ($workerRunningJob) { $em->beginTransaction(); try { // count-1 for the number of finished attempt - $workerRunningJob->setStatus(WorkerRunningJob::ERROR. ($event->getCount() - 1)); + $workerRunningJob + ->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1)) + ->setStatus(WorkerRunningJob::ERROR) + ; $em->persist($workerRunningJob); $em->flush(); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php index 7f73f6ff44..b5bd748f2c 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php @@ -2,6 +2,8 @@ namespace Alchemy\Phrasea\WorkerManager\Subscriber; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexEvent; use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexFailureEvent; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; @@ -13,9 +15,13 @@ class SearchengineSubscriber implements EventSubscriberInterface /** @var MessagePublisher $messagePublisher */ private $messagePublisher; - public function __construct(MessagePublisher $messagePublisher) + /** @var callable */ + private $repoWorkerJobLocator; + + public function __construct(MessagePublisher $messagePublisher, callable $repoWorkerJobLocator) { - $this->messagePublisher = $messagePublisher; + $this->messagePublisher = $messagePublisher; + $this->repoWorkerJobLocator = $repoWorkerJobLocator; } public function onPopulateIndex(PopulateIndexEvent $event) @@ -40,16 +46,43 @@ class SearchengineSubscriber implements EventSubscriberInterface public function onPopulateIndexFailure(PopulateIndexFailureEvent $event) { + $repoWorker = $this->getRepoWorkerJob(); + $payload = [ 'message_type' => MessagePublisher::POPULATE_INDEX_TYPE, 'payload' => [ - 'host' => $event->getHost(), - 'port' => $event->getPort(), - 'indexName' => $event->getIndexName(), - 'databoxId' => $event->getDataboxId(), + 'host' => $event->getHost(), + 'port' => $event->getPort(), + 'indexName' => $event->getIndexName(), + 'databoxId' => $event->getDataboxId(), + 'workerJobId' => $event->getWorkerJobId() ] ]; + $em = $repoWorker->getEntityManager(); + // check connection an re-connect if needed + $repoWorker->reconnect(); + + /** @var WorkerRunningJob $workerRunningJob */ + $workerRunningJob = $repoWorker->find($event->getWorkerJobId()); + + if ($workerRunningJob) { + $em->beginTransaction(); + try { + // count-1 for the number of finished attempt + $workerRunningJob + ->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1)) + ->setStatus(WorkerRunningJob::ERROR) + ; + + $em->persist($workerRunningJob); + $em->flush(); + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + } + $this->messagePublisher->publishMessage( $payload, MessagePublisher::RETRY_POPULATE_INDEX_QUEUE, @@ -65,5 +98,15 @@ class SearchengineSubscriber implements EventSubscriberInterface WorkerEvents::POPULATE_INDEX_FAILURE => 'onPopulateIndexFailure' ]; } + + /** + * @return WorkerRunningJobRepository + */ + private function getRepoWorkerJob() + { + $callable = $this->repoWorkerJobLocator; + + return $callable(); + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php index 635577fc5b..0482533101 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php @@ -92,15 +92,10 @@ class CreateRecordWorker implements WorkerInterface $this->dispatch(WorkerEvents::ASSETS_CREATION_RECORD_FAILURE, new AssetsCreationRecordFailureEvent( $payload, 'Error when downloading assets!', - $count + $count, + $workerRunningJob->getId() )); - if ($workerRunningJob != null) { - $em->remove($workerRunningJob); - - $em->flush(); - } - return; } @@ -115,15 +110,10 @@ class CreateRecordWorker implements WorkerInterface $this->dispatch(WorkerEvents::ASSETS_CREATION_RECORD_FAILURE, new AssetsCreationRecordFailureEvent( $payload, $workerMessage, - $count + $count, + $workerRunningJob->getId() )); - if ($workerRunningJob != null) { - $em->remove($workerRunningJob); - - $em->flush(); - } - return; } @@ -236,11 +226,8 @@ class CreateRecordWorker implements WorkerInterface $this->getBorderManager()->process($lazaretSession, $packageFile, $callback); - - $recordId = null; if ($elementCreated instanceof \record_adapter) { $this->dispatch(PhraseaEvents::RECORD_UPLOAD, new RecordEdit($elementCreated)); - $recordId = $elementCreated->getRecordId(); } else { $this->messagePublisher->pushLog(sprintf('The file was moved to the quarantine: %s', json_encode($reasons))); /** @var LazaretFile $elementCreated */ diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php index 6d54a7dc3d..f10f6d12af 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php @@ -36,26 +36,47 @@ class PopulateIndexWorker implements WorkerInterface public function process(array $payload) { $em = $this->repoWorker->getEntityManager(); - $em->beginTransaction(); - $date = new \DateTime(); - try { - $workerRunningJob = new WorkerRunningJob(); + if (isset($payload['workerJobId'])) { + /** @var WorkerRunningJob $workerRunningJob */ + $workerRunningJob = $this->repoWorker->find($payload['workerJobId']); + + if ($workerRunningJob == null) { + $this->messagePublisher->pushLog("Given workerJobId not found !", "error"); + + return ; + } + $workerRunningJob - ->setWork(MessagePublisher::POPULATE_INDEX_TYPE) - ->setWorkOn($payload['indexName']) - ->setDataboxId($payload['databoxId']) - ->setPublished($date->setTimestamp($payload['published'])) + ->setInfo(WorkerRunningJob::ATTEMPT . $payload['count']) ->setStatus(WorkerRunningJob::RUNNING) ; $em->persist($workerRunningJob); $em->flush(); + } else { + $em->beginTransaction(); + $date = new \DateTime(); - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); + try { + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setWork(MessagePublisher::POPULATE_INDEX_TYPE) + ->setWorkOn($payload['indexName']) + ->setDataboxId($payload['databoxId']) + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) + ; + + $em->persist($workerRunningJob); + + $em->flush(); + + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } } /** @var ElasticsearchOptions $options */ @@ -82,7 +103,8 @@ class PopulateIndexWorker implements WorkerInterface $payload['indexName'], $payload['databoxId'], $workerMessage, - $count + $count, + $workerRunningJob->getId() )); } else { $databox = $this->findDataboxById($databoxId); @@ -97,13 +119,6 @@ class PopulateIndexWorker implements WorkerInterface $r['memory']/1048576 )); } catch(\Exception $e) { - if ($workerRunningJob != null) { - - $em->remove($workerRunningJob); - - $em->flush(); - } - $workerMessage = sprintf("Error on indexing : %s ", $e->getMessage()); $this->messagePublisher->pushLog($workerMessage); @@ -116,7 +131,8 @@ class PopulateIndexWorker implements WorkerInterface $payload['indexName'], $payload['databoxId'], $workerMessage, - $count + $count, + $workerRunningJob->getId() )); } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php index 885102ab7e..6177a0cadb 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php @@ -81,26 +81,46 @@ class SubdefCreationWorker implements WorkerInterface // tell that a file is in used to create subdef $em = $this->repoWorker->getEntityManager(); $this->repoWorker->reconnect(); - $em->beginTransaction(); - try { - $date = new \DateTime(); - $workerRunningJob = new WorkerRunningJob(); + if (isset($payload['workerJobId'])) { + /** @var WorkerRunningJob $workerRunningJob */ + $workerRunningJob = $this->repoWorker->find($payload['workerJobId']); + + if ($workerRunningJob == null) { + $this->logger->error("Given workerJobId not found !"); + + return ; + } + $workerRunningJob - ->setDataboxId($databoxId) - ->setRecordId($recordId) - ->setWork(MessagePublisher::SUBDEF_CREATION_TYPE) - ->setWorkOn($payload['subdefName']) - ->setPublished($date->setTimestamp($payload['published'])) - ->setStatus(WorkerRunningJob::RUNNING) - ; + ->setInfo(WorkerRunningJob::ATTEMPT . $payload['count']) + ->setStatus(WorkerRunningJob::RUNNING); $em->persist($workerRunningJob); + $em->flush(); - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); + } else { + $em->beginTransaction(); + try { + $date = new \DateTime(); + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setDataboxId($databoxId) + ->setRecordId($recordId) + ->setWork(MessagePublisher::SUBDEF_CREATION_TYPE) + ->setWorkOn($payload['subdefName']) + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) + ; + + $em->persist($workerRunningJob); + $em->flush(); + + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } } $this->subdefGenerator->setLogger($this->logger); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php index b2b498f54a..d23d3ca646 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -18,7 +18,6 @@ use PHPExiftool\Driver\Metadata\MetadataBag; use PHPExiftool\Driver\Tag; use PHPExiftool\Driver\Value\Mono; use PHPExiftool\Driver\Value\Multi; -use PHPExiftool\Exception\ExceptionInterface as PHPExiftoolException; use PHPExiftool\Exception\TagUnknown; use PHPExiftool\Writer; use Psr\Log\LoggerInterface; @@ -80,26 +79,47 @@ class WriteMetadatasWorker implements WorkerInterface // tell that a file is in used to create subdef $em = $this->getEntityManager(); $this->repoWorker->reconnect(); - $em->beginTransaction(); - try { - $date = new \DateTime(); - $workerRunningJob = new WorkerRunningJob(); + if (isset($payload['workerJobId'])) { + /** @var WorkerRunningJob $workerRunningJob */ + $workerRunningJob = $this->repoWorker->find($payload['workerJobId']); + + if ($workerRunningJob == null) { + $this->logger->error("Given workerJobId not found !"); + + return ; + } + $workerRunningJob - ->setDataboxId($databoxId) - ->setRecordId($recordId) - ->setWork(MessagePublisher::WRITE_METADATAS_TYPE) - ->setWorkOn($payload['subdefName']) - ->setPublished($date->setTimestamp($payload['published'])) + ->setInfo(WorkerRunningJob::ATTEMPT . $payload['count']) ->setStatus(WorkerRunningJob::RUNNING) ; $em->persist($workerRunningJob); - $em->flush(); - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); + $em->flush(); + } else { + $em->beginTransaction(); + + try { + $date = new \DateTime(); + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setDataboxId($databoxId) + ->setRecordId($recordId) + ->setWork(MessagePublisher::WRITE_METADATAS_TYPE) + ->setWorkOn($payload['subdefName']) + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) + ; + + $em->persist($workerRunningJob); + $em->flush(); + + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } } $record = $databox->get_record($recordId); diff --git a/templates/web/admin/worker-manager/worker_info.html.twig b/templates/web/admin/worker-manager/worker_info.html.twig index a002eee5aa..e6c20e491a 100644 --- a/templates/web/admin/worker-manager/worker_info.html.twig +++ b/templates/web/admin/worker-manager/worker_info.html.twig @@ -58,7 +58,7 @@ {% endif %} {% for workerRow in workerRunningJob | sort | reverse %} - + {% if workerRow.databoxId %} {{ workerRow.databoxId | sbas_labels(app) }}