diff --git a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php index 7f778252f1..f313a99d59 100644 --- a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php +++ b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php @@ -6,7 +6,12 @@ use Doctrine\ORM\Mapping as ORM; use Gedmo\Mapping\Annotation as Gedmo; /** - * @ORM\Table(name="WorkerRunningJob") + * @ORM\Table( + * name="WorkerRunningJob", + * uniqueConstraints={ + * @ORM\uniqueConstraint(name="flock", columns={"databox_id", "record_id", "flock"}) + * } + * ) * @ORM\Entity(repositoryClass="Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository") */ class WorkerRunningJob @@ -14,7 +19,7 @@ class WorkerRunningJob const FINISHED = 'finished'; const RUNNING = 'running'; const ERROR = 'error'; - const INTERRUPT = 'interrupted manually'; + const INTERRUPT = 'canceled'; const ATTEMPT = 'attempt '; @@ -41,12 +46,17 @@ class WorkerRunningJob private $recordId; /** - * @ORM\Column(type="string", name="work", nullable=true) + * @ORM\Column(type="string", length=64, name="flock", nullable=true) + */ + private $flock; + + /** + * @ORM\Column(type="string", length=64, name="work", nullable=true) */ private $work; /** - * @ORM\Column(type="string", name="work_on", nullable=true) + * @ORM\Column(type="string", length=64, name="work_on", nullable=true) */ private $workOn; @@ -138,6 +148,25 @@ class WorkerRunningJob } + /** + * @return mixed + */ + public function getFlock() + { + return $this->flock; + } + + /** + * @param mixed $flock + * @return WorkerRunningJob + */ + public function setFlock($flock) + { + $this->flock = $flock; + + return $this; + } + /** * @param $work * @return $this diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php index c37c58bc3c..1bde5b1275 100644 --- a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php +++ b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php @@ -2,78 +2,150 @@ namespace Alchemy\Phrasea\Model\Repositories; -use Alchemy\Phrasea\Core\PhraseaTokens; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; +use DateTime; use Doctrine\ORM\EntityRepository; +use Doctrine\ORM\OptimisticLockException; +use Exception; class WorkerRunningJobRepository extends EntityRepository { /** - * return true if we can create subdef - * @param $subdefName - * @param $recordId - * @param $databoxId - * @return bool + * Acquire a "lock" to create a subdef + * @param array $payload + * @return WorkerRunningJob + * @throws OptimisticLockException */ - public function canCreateSubdef($subdefName, $recordId, $databoxId) + public function canCreateSubdef($payload) { - $rsm = $this->createResultSetMappingBuilder('w'); - $rsm->addScalarResult('work_on','work_on'); - - $sql = 'SELECT work_on - FROM WorkerRunningJob - WHERE ((work = :write_meta) OR ((work = :make_subdef) AND work_on = :work_on) ) - AND record_id = :record_id - AND databox_id = :databox_id - AND status = :status'; - - $query = $this->_em->createNativeQuery($sql, $rsm); - $query->setParameters([ - 'write_meta' => MessagePublisher::WRITE_METADATAS_TYPE, - 'make_subdef'=> MessagePublisher::SUBDEF_CREATION_TYPE, - 'work_on' => $subdefName, - 'record_id' => $recordId, - 'databox_id' => $databoxId, - 'status' => WorkerRunningJob::RUNNING - ] - ); - - return count($query->getResult()) == 0; + return $this->getLock($payload, MessagePublisher::SUBDEF_CREATION_TYPE); } /** - * return true if we can write meta - * - * @param $subdefName - * @param $recordId - * @param $databoxId - * @return bool + * Acquire a "lock" to write meta into a subdef + * @param array $payload + * @return WorkerRunningJob + * @throws OptimisticLockException */ - public function canWriteMetadata($subdefName, $recordId, $databoxId) + public function canWriteMetadata($payload) { - $rsm = $this->createResultSetMappingBuilder('w'); - $rsm->addScalarResult('work_on','work_on'); + return $this->getLock($payload, MessagePublisher::WRITE_METADATAS_TYPE); + } - $sql = 'SELECT work_on - FROM WorkerRunningJob - WHERE ((work = :make_subdef) OR ((work = :write_meta) AND work_on = :work_on) ) - AND record_id = :record_id - AND databox_id = :databox_id - AND status = :status'; + /** + * Acquire a "lock" to work on a (sbid + rid + subdef) by inserting a row in WorkerRunningJob table. + * If it fails that means that another worker is already working on this file. + * + * nb : this work only for "first try" where workerJobId is null (=insert). + * for some retries (lock was acquired but worker job failed), the "count" of existing row is incremented (=update), + * so many workers "could" update the same row... + * __Luckily__, a rabbitmq message is consumed only once by a unique worker, + * and different workers (write-meta, subdef) have their own queues and their own rows on table. + * So working on a file always starts by a "first try", and concurency is not possible. + * todo : do not update, but insert a line for every try ? + * + * @param array $payload + * @param string $type + * @return WorkerRunningJob the entity (created or updated) or null if file is already locked (duplicate key) + * @throws OptimisticLockException + */ + private function getLock(array $payload, string $type) + { + if(!isset($payload['workerJobId'])) { + // insert a new row WorkerRunningJob : will fail if concurency + try { + $this->getEntityManager()->beginTransaction(); + $date = new DateTime(); + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setDataboxId($payload['databoxId']) + ->setRecordId($payload['recordId']) + ->setWork($type) + ->setWorkOn($payload['subdefName']) + ->setPayload([ + 'message_type' => $type, + 'payload' => $payload + ]) + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) + ->setFlock($payload['subdefName']); + $this->getEntityManager()->persist($workerRunningJob); - $query = $this->_em->createNativeQuery($sql, $rsm); - $query->setParameters([ - 'make_subdef'=> MessagePublisher::SUBDEF_CREATION_TYPE, - 'write_meta' => MessagePublisher::WRITE_METADATAS_TYPE, - 'work_on' => $subdefName, - 'record_id' => $recordId, - 'databox_id' => $databoxId, - 'status' => WorkerRunningJob::RUNNING - ] - ); + $this->getEntityManager()->flush(); + $this->getEntityManager()->commit(); - return count($query->getResult()) == 0; + return $workerRunningJob; + } + catch(Exception $e) { + // duplicate key ? + $this->getEntityManager()->rollback(); + // for unpredicted other errors we can still ignore and return null (lock failed), + // because anyway the worker/rabbit retry-system will stop itself after n failures. + } + } + else { + // update an existing row : never fails (except bad id if row was purged) + try { + $this->getEntityManager()->beginTransaction(); + $this->getEntityManager()->createQueryBuilder() + ->update() + ->set('info', ':info')->setParameter('info', WorkerRunningJob::ATTEMPT . $payload['count']) + ->set('status', ':status')->setParameter('status', WorkerRunningJob::RUNNING) + ->set('flock', ':flock')->setParameter('flock', $payload['subdefName']) + ->where('id = :id')->setParameter('id', $payload['workerJobId']) + ->getQuery() + ->execute(); + + $this->getEntityManager()->flush(); + $this->getEntityManager()->commit(); + + return $this->find($payload['workerJobId']); + } + catch (Exception $e) { + // really bad ? return null anyway + $this->getEntityManager()->rollback(); + //$this->logger->error("Error persisting WorkerRunningJob !"); + } + } + + return null; + } + + + /** + * mark a job a "finished" + * nb : after a long job, connection may be lost so we reconnect. + * But sometimes (?) a first commit fails (due to reconnect ?), while the second one is ok. + * So here we try 2 times, just in case... + * + * @param WorkerRunningJob $workerRunningJob + * @param null $info + */ + public function markFinished(WorkerRunningJob $workerRunningJob, $info = null) + { + $this->reconnect(); + for($try=1; $try<=2; $try++) { + try { + $workerRunningJob->setStatus(WorkerRunningJob::FINISHED) + ->setFinished(new DateTime('now')) + ->setStatus(WorkerRunningJob::FINISHED) + ->setFlock(null); + if (!is_null($info)) { + $workerRunningJob->setInfo($info); + } + + $this->getEntityManager()->beginTransaction(); + $this->getEntityManager()->persist($workerRunningJob); + $this->getEntityManager()->flush(); + $this->getEntityManager()->commit(); + + break; + } + catch (Exception $e) { + $this->getEntityManager()->rollback(); + } + } } /** @@ -134,8 +206,9 @@ class WorkerRunningJobRepository extends EntityRepository $platform = $connection->getDatabasePlatform(); $this->_em->beginTransaction(); try { - $connection->executeUpdate($platform->getTruncateTableSQL('WorkerRunningJob')); - } catch (\Exception $e) { + $connection->executeUpdate($platform->getTruncateTableSQL('WorkerRunningJob')); + } + catch (Exception $e) { $this->_em->rollback(); } } @@ -146,7 +219,8 @@ class WorkerRunningJobRepository extends EntityRepository try { $this->_em->getConnection()->delete('WorkerRunningJob', ['status' => WorkerRunningJob::FINISHED]); $this->_em->commit(); - } catch (\Exception $e) { + } + catch (Exception $e) { $this->_em->rollback(); } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php index 5d2f6d9ecc..8c8ba70c26 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php @@ -157,10 +157,10 @@ class AdminConfigurationController extends Controller /** @var WorkerRunningJob $workerRunningJob */ $workerRunningJob = $repoWorker->find($workerId); - $workerRunningJob - ->setStatus($request->request->get('status')) - ->setFinished(new \DateTime('now')) - ; + $workerRunningJob->setStatus($request->request->get('status')); + if($request->request->get('finished') == '1') { + $workerRunningJob->setFinished(new \DateTime('now'))->setFlock(null); + } $em = $repoWorker->getEntityManager(); $em->persist($workerRunningJob); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php index a7b3793c48..3551548383 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php @@ -119,12 +119,14 @@ class RecordSubscriber implements EventSubscriberInterface $workerRunningJob ->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1)) ->setStatus(WorkerRunningJob::ERROR) + ->setFlock(null) // unlock ! ; $em->persist($workerRunningJob); $em->flush(); $em->commit(); - } catch (Exception $e) { + } + catch (Exception $e) { $em->rollback(); } } @@ -240,12 +242,14 @@ class RecordSubscriber implements EventSubscriberInterface $workerRunningJob ->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1)) ->setStatus(WorkerRunningJob::ERROR) + ->setFlock(null) // unlock ! ; $em->persist($workerRunningJob); $em->flush(); $em->commit(); - } catch (Exception $e) { + } + catch (Exception $e) { $em->rollback(); } } @@ -257,7 +261,8 @@ class RecordSubscriber implements EventSubscriberInterface $event->getWorkerMessage() ); - } else { + } + else { $databoxId = $event->getRecord()->getDataboxId(); $recordId = $event->getRecord()->getRecordId(); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php index 12dc1cfcd9..d487509486 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php @@ -6,7 +6,6 @@ use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware; use Alchemy\Phrasea\Core\PhraseaTokens; use Alchemy\Phrasea\Filesystem\FilesystemService; use Alchemy\Phrasea\Media\SubdefGenerator; -use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer; use Alchemy\Phrasea\WorkerManager\Event\StoryCreateCoverEvent; @@ -14,8 +13,10 @@ use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionCreationFailureEvent; use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; +use Exception; use Psr\Log\LoggerInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Throwable; class SubdefCreationWorker implements WorkerInterface { @@ -53,182 +54,136 @@ class SubdefCreationWorker implements WorkerInterface public function process(array $payload) { - if (isset($payload['recordId']) && isset($payload['databoxId'])) { - $recordId = $payload['recordId']; - $databoxId = $payload['databoxId']; - $wantedSubdef = [$payload['subdefName']]; + if (!isset($payload['recordId']) || !isset($payload['databoxId']) || !isset($payload['subdefName'])) { + // bad payload + $this->logger->error(sprintf("%s (%s) : bad payload", __FILE__, __LINE__)); + return; + } - $databox = $this->findDataboxById($databoxId); - $record = $databox->get_record($recordId); + $recordId = $payload['recordId']; + $databoxId = $payload['databoxId']; + $subdefName = $payload['subdefName']; - $oldLogger = $this->subdefGenerator->getLogger(); + $databox = $this->findDataboxById($databoxId); + $record = $databox->get_record($recordId); - $message = [ - 'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE, - 'payload' => $payload - ]; + if ($record->isStory()) { + return; + } - if (!$record->isStory()) { - // check if there is a write meta running for the record or the same task running - $canCreateSubdef = $this->repoWorker->canCreateSubdef($payload['subdefName'], $recordId, $databoxId); + $oldLogger = $this->subdefGenerator->getLogger(); - if (!$canCreateSubdef) { - // the file is in used to write meta + // try to "lock" the file, will return null if already locked (key unicity) + // = insert a row with unqiue sbid + rid + subdefname (todo : replace the subdefname with a subdef_id ?) + $workerRunningJob = $this->repoWorker->canCreateSubdef($payload); - $this->messagePublisher->publishDelayedMessage($message, MessagePublisher::SUBDEF_CREATION_TYPE); + if (is_null($workerRunningJob)) { + // the file was locked by another worker, delay to retry later + $this->messagePublisher->publishDelayedMessage( + [ + 'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE, + 'payload' => $payload + ], + MessagePublisher::SUBDEF_CREATION_TYPE + ); + return ; + } - return ; - } + // here the entity is "locked" (unique key) - // tell that a file is in used to create subdef - $em = $this->repoWorker->getEntityManager(); - $this->repoWorker->reconnect(); + $this->subdefGenerator->setLogger($this->logger); - if (isset($payload['workerJobId'])) { - /** @var WorkerRunningJob $workerRunningJob */ - $workerRunningJob = $this->repoWorker->find($payload['workerJobId']); + try { + $this->subdefGenerator->generateSubdefs($record, [$subdefName]); + } + catch (Exception $e) { + $this->logger->error("Exception catched: " . $e->getMessage()); + } + catch (Throwable $e) { + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + $workerMessage = "Exception throwable catched when create subdef for the recordID: " .$recordId; - if ($workerRunningJob == null) { - $this->logger->error("Given workerJobId not found !"); + $this->logger->error($workerMessage); - return ; - } - - $workerRunningJob - ->setInfo(WorkerRunningJob::ATTEMPT . $payload['count']) - ->setStatus(WorkerRunningJob::RUNNING); + /** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionCreationFailure() */ + $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent( + $record, + $subdefName, + $workerMessage, + $count, + $workerRunningJob->getId() + )); - $em->persist($workerRunningJob); + // the subscriber will "unlock" the row, no need to do it here + return ; + } - $em->flush(); + // begin to check if the subdef is successfully generated + $subdef = $record->getDatabox()->get_subdef_structure()->getSubdefGroup($record->getType())->getSubdef($subdefName); + $filePathToCheck = null; - } else { - $em->beginTransaction(); - try { - $date = new \DateTime(); - $workerRunningJob = new WorkerRunningJob(); - $workerRunningJob - ->setDataboxId($databoxId) - ->setRecordId($recordId) - ->setWork(MessagePublisher::SUBDEF_CREATION_TYPE) - ->setWorkOn($payload['subdefName']) - ->setPayload($message) - ->setPublished($date->setTimestamp($payload['published'])) - ->setStatus(WorkerRunningJob::RUNNING) - ; + if ($record->has_subdef($subdefName) ) { + $filePathToCheck = $record->get_subdef($subdefName)->getRealPath(); + } - $em->persist($workerRunningJob); - $em->flush(); + $filePathToCheck = $this->filesystem->generateSubdefPathname($record, $subdef, $filePathToCheck); - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); - } - } + if (!$this->filesystem->exists($filePathToCheck)) { - $this->subdefGenerator->setLogger($this->logger); + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; - try { - $this->subdefGenerator->generateSubdefs($record, $wantedSubdef); - } catch (\Exception $e) { - $this->logger->error("Exception catched: " . $e->getMessage()); + /** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionCreationFailure() */ + $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent( + $record, + $subdefName, + 'Subdef generation failed !', + $count, + $workerRunningJob->getId() + )); - } catch (\Throwable $e) { - $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; - $workerMessage = "Exception throwable catched when create subdef for the recordID: " .$recordId; + $this->subdefGenerator->setLogger($oldLogger); - $this->logger->error($workerMessage); + // the subscriber will "unlock" the row, no need to do it here + return ; + } - $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent( - $record, - $payload['subdefName'], - $workerMessage, - $count, - $workerRunningJob->getId() - )); + // checking ended - return ; - } + // order to write meta for the subdef if needed + /** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionWritemeta() */ + $this->dispatcher->dispatch( + WorkerEvents::SUBDEFINITION_WRITE_META, + new SubdefinitionWritemetaEvent( + $record, + $subdefName + ) + ); - // begin to check if the subdef is successfully generated - $subdef = $record->getDatabox()->get_subdef_structure()->getSubdefGroup($record->getType())->getSubdef($payload['subdefName']); - $filePathToCheck = null; + $this->subdefGenerator->setLogger($oldLogger); - if ($record->has_subdef($payload['subdefName']) ) { - $filePathToCheck = $record->get_subdef($payload['subdefName'])->getRealPath(); - } + // update jeton when subdef is created + $this->updateJeton($record); - $filePathToCheck = $this->filesystem->generateSubdefPathname($record, $subdef, $filePathToCheck); + $parents = $record->get_grouping_parents(); - if (!$this->filesystem->exists($filePathToCheck)) { - - $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; - - $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent( - $record, - $payload['subdefName'], - 'Subdef generation failed !', - $count, - $workerRunningJob->getId() - )); - - $this->subdefGenerator->setLogger($oldLogger); - return ; - } - // checking ended - - // order to write meta for the subdef if needed - $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( - $record, - $payload['subdefName']) - ); - - $this->subdefGenerator->setLogger($oldLogger); - - // update jeton when subdef is created - $this->updateJeton($record); - - $parents = $record->get_grouping_parents(); - - // create a cover for a story - // used when uploaded via uploader-service and grouped as a story - if (!$parents->is_empty() && isset($payload['status']) && $payload['status'] == MessagePublisher::NEW_RECORD_MESSAGE && in_array($payload['subdefName'], array('thumbnail', 'preview'))) { - foreach ($parents->get_elements() as $story) { - if (self::checkIfFirstChild($story, $record)) { - $data = implode('_', [$databoxId, $story->getRecordId(), $recordId, $payload['subdefName']]); - - $this->dispatcher->dispatch(WorkerEvents::STORY_CREATE_COVER, new StoryCreateCoverEvent($data)); - } - } - } - - // update elastic - $this->indexer->flushQueue(); - - // tell that we have finished to work on this file - $this->repoWorker->reconnect(); - $em->getConnection()->beginTransaction(); - try { - $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); - $workerRunningJob->setFinished(new \DateTime('now')); - $em->persist($workerRunningJob); - $em->flush(); - $em->commit(); - } catch (\Exception $e) { - try { - $em->getConnection()->beginTransaction(); - $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); - $em->persist($workerRunningJob); - $em->flush(); - $em->commit(); - } catch (\Exception $e) { - $this->messagePublisher->pushLog("rollback on recordID :" . $workerRunningJob->getRecordId()); - $em->rollback(); - } + // create a cover for a story + // used when uploaded via uploader-service and grouped as a story + if (!$parents->is_empty() && isset($payload['status']) && $payload['status'] == MessagePublisher::NEW_RECORD_MESSAGE && in_array($payload['subdefName'], array('thumbnail', 'preview'))) { + foreach ($parents->get_elements() as $story) { + if (self::checkIfFirstChild($story, $record)) { + $data = implode('_', [$databoxId, $story->getRecordId(), $recordId, $payload['subdefName']]); + $this->dispatcher->dispatch(WorkerEvents::STORY_CREATE_COVER, new StoryCreateCoverEvent($data)); } } } + + // update elastic + $this->indexer->flushQueue(); + + // tell that we have finished to work on this file + $this->repoWorker->markFinished($workerRunningJob); + } public static function checkIfFirstChild(\record_adapter $story, \record_adapter $record) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php index dac5ae527c..4a3a998fe5 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -7,7 +7,6 @@ use Alchemy\Phrasea\Application\Helper\DispatcherAware; use Alchemy\Phrasea\Application\Helper\EntityManagerAware; use Alchemy\Phrasea\Core\PhraseaTokens; use Alchemy\Phrasea\Metadata\TagFactory; -use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; @@ -57,262 +56,227 @@ class WriteMetadatasWorker implements WorkerInterface public function process(array $payload) { - if (isset($payload['recordId']) && isset($payload['databoxId'])) { - $recordId = $payload['recordId']; - $databoxId = $payload['databoxId']; - - $MWG = isset($payload['MWG']) ? $payload['MWG'] : false; - $clearDoc = isset($payload['clearDoc']) ? $payload['clearDoc'] : false; - $databox = $this->findDataboxById($databoxId); - - // check if there is a make subdef running for the record or the same task running - $canWriteMeta = $this->repoWorker->canWriteMetadata($payload['subdefName'], $recordId, $databoxId); - - $message = [ - 'message_type' => MessagePublisher::WRITE_METADATAS_TYPE, - 'payload' => $payload - ]; - - if (!$canWriteMeta) { - // the file is in used to generate subdef - - $this->messagePublisher->publishDelayedMessage($message, MessagePublisher::WRITE_METADATAS_TYPE); - - return ; - } - - $record = $databox->get_record($recordId); - - if ($record->getMimeType() == 'image/svg+xml') { - - $this->logger->error("Can't write meta on svg file!"); - - return; - } - - // tell that a file is in used to create subdef - $em = $this->getEntityManager(); - $this->repoWorker->reconnect(); - - if (isset($payload['workerJobId'])) { - /** @var WorkerRunningJob $workerRunningJob */ - $workerRunningJob = $this->repoWorker->find($payload['workerJobId']); - - if ($workerRunningJob == null) { - $this->logger->error("Given workerJobId not found !"); - - return ; - } - - $workerRunningJob - ->setInfo(WorkerRunningJob::ATTEMPT . $payload['count']) - ->setStatus(WorkerRunningJob::RUNNING) - ; - - $em->persist($workerRunningJob); - - $em->flush(); - } else { - $em->beginTransaction(); - - try { - $date = new DateTime(); - $workerRunningJob = new WorkerRunningJob(); - $workerRunningJob - ->setDataboxId($databoxId) - ->setRecordId($recordId) - ->setWork(MessagePublisher::WRITE_METADATAS_TYPE) - ->setWorkOn($payload['subdefName']) - ->setPayload($message) - ->setPublished($date->setTimestamp($payload['published'])) - ->setStatus(WorkerRunningJob::RUNNING) - ; - - $em->persist($workerRunningJob); - $em->flush(); - - $em->commit(); - } catch (Exception $e) { - $em->rollback(); - $this->logger->error("Error persisting WorkerRunningJob !"); - - return ; - } - } - - try { - $subdef = $record->get_subdef($payload['subdefName']); - } catch (Exception $e) { - $workerMessage = "Exception catched when try to get subdef " .$payload['subdefName']. " from DB for the recordID: " .$recordId; - $this->logger->error($workerMessage); - - $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; - - $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( - $record, - $payload['subdefName'], - SubdefinitionWritemetaEvent::FAILED, - $workerMessage, - $count, - $workerRunningJob->getId() - )); - - return ; - } - - if ($subdef->is_physically_present()) { - $metadata = new MetadataBag(); - - // add Uuid in metadatabag - if ($record->getUuid()) { - $metadata->add( - new Metadata( - new Tag\XMPExif\ImageUniqueID(), - new Mono($record->getUuid()) - ) - ); - $metadata->add( - new Metadata( - new Tag\ExifIFD\ImageUniqueID(), - new Mono($record->getUuid()) - ) - ); - $metadata->add( - new Metadata( - new Tag\IPTC\UniqueDocumentID(), - new Mono($record->getUuid()) - ) - ); - } - - // read document fields and add to metadatabag - $caption = $record->get_caption(); - foreach ($databox->get_meta_structure() as $fieldStructure) { - - $tagName = $fieldStructure->get_tag()->getTagname(); - $fieldName = $fieldStructure->get_name(); - - // skip fields with no src - if ($tagName == '' || $tagName == 'Phraseanet:no-source') { - continue; - } - - // check exiftool known tags to skip Phraseanet:tf-* - try { - $tag = TagFactory::getFromRDFTagname($tagName); - if(!$tag->isWritable()) { - continue; - } - } catch (TagUnknown $e) { - continue; - } - - try { - $field = $caption->get_field($fieldName); - $fieldValues = $field->get_values(); - - if ($fieldStructure->is_multi()) { - $values = array(); - foreach ($fieldValues as $value) { - $values[] = $this->removeNulChar($value->getValue()); - } - - $value = new Multi($values); - } else { - $fieldValue = array_pop($fieldValues); - $value = $this->removeNulChar($fieldValue->getValue()); - - // fix the dates edited into phraseanet - if($fieldStructure->get_type() === $fieldStructure::TYPE_DATE) { - try { - $value = self::fixDate($value); // will return NULL if the date is not valid - } - catch (Exception $e) { - $value = null; // do NOT write back to iptc - } - } - - if($value !== null) { // do not write invalid dates - $value = new Mono($value); - } - } - } catch(Exception $e) { - // the field is not set in the record, erase it - if ($fieldStructure->is_multi()) { - $value = new Multi(array('')); - } - else { - $value = new Mono(''); - } - } - - if($value !== null) { // do not write invalid data - $metadata->add( - new Metadata($fieldStructure->get_tag(), $value) - ); - } - } - - $this->writer->reset(); - - if ($MWG) { - $this->writer->setModule(Writer::MODULE_MWG, true); - } - - $this->writer->erase($subdef->get_name() != 'document' || $clearDoc, true); - - // write meta in file - try { - $this->writer->write($subdef->getRealPath(), $metadata); - - $this->messagePublisher->pushLog(sprintf('meta written for sbasid=%1$d - recordid=%2$d (%3$s)', $databox->get_sbas_id(), $recordId, $subdef->get_name() )); - } catch (Exception $e) { - $workerMessage = sprintf('meta NOT written for sbasid=%1$d - recordid=%2$d (%3$s) because "%4$s"', $databox->get_sbas_id(), $recordId, $subdef->get_name() , $e->getMessage()); - $this->logger->error($workerMessage); - - $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; - - $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( - $record, - $payload['subdefName'], - SubdefinitionWritemetaEvent::FAILED, - $workerMessage, - $count, - $workerRunningJob->getId() - )); - } - - // mark write metas finished - $this->updateJeton($record); - } else { - $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; - - $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( - $record, - $payload['subdefName'], - SubdefinitionWritemetaEvent::FAILED, - 'Subdef is not physically present!', - $count, - $workerRunningJob->getId() - )); - } - - - // tell that we have finished to work on this file - $this->repoWorker->reconnect(); - $em->beginTransaction(); - try { - $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); - $workerRunningJob->setFinished(new DateTime('now')); - $em->persist($workerRunningJob); - $em->flush(); - $em->commit(); - } catch (Exception $e) { - $em->rollback(); - } - + // mandatory args + if (!isset($payload['recordId']) || !isset($payload['databoxId']) || !isset($payload['subdefName'])) { + // bad payload + $this->logger->error(sprintf("%s (%s) : bad payload", __FILE__, __LINE__)); + return; } + $recordId = $payload['recordId']; + $databoxId = $payload['databoxId']; + $subdefName = $payload['subdefName']; + + $MWG = $payload['MWG'] ?? false; + $clearDoc = $payload['clearDoc'] ?? false; + $databox = $this->findDataboxById($databoxId); + + // try to "lock" the file, will return null if already locked (key unicity) + // = insert a row with unqiue sbid + rid + subdefname (todo : replace the subdefname with a subdef_id ?) + $workerRunningJob = $this->repoWorker->canWriteMetadata($payload); + + if (is_null($workerRunningJob)) { + // the file was locked by another worker, delay to retry later + $this->messagePublisher->publishDelayedMessage( + [ + 'message_type' => MessagePublisher::WRITE_METADATAS_TYPE, + 'payload' => $payload + ], + MessagePublisher::WRITE_METADATAS_TYPE + ); + return ; + } + + // here the entity is "locked" (unique key) + + $record = $databox->get_record($recordId); + + if ($record->getMimeType() == 'image/svg+xml') { + + $this->logger->error("Can't write meta on svg file!"); + + // tell that we have finished to work on this file ("unlock") + $this->repoWorker->markFinished($workerRunningJob, "Can't write meta on svg file!"); + + return; + } + + $this->repoWorker->reconnect(); + + try { + $subdef = $record->get_subdef($subdefName); + } + catch (Exception $e) { + $workerMessage = "Exception catched when try to get subdef " .$subdefName. " from DB for the recordID: " .$recordId; + $this->logger->error($workerMessage); + + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + /** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionWritemeta() */ + $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( + $record, + $subdefName, + SubdefinitionWritemetaEvent::FAILED, + $workerMessage, + $count, + $workerRunningJob->getId() + )); + + // the subscriber will "unlock" the row, no need to do it here + return ; + } + + if (!$subdef->is_physically_present()) { + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + /** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionWritemeta() */ + $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( + $record, + $subdefName, + SubdefinitionWritemetaEvent::FAILED, + 'Subdef is not physically present!', + $count, + $workerRunningJob->getId() + )); + + // the subscriber will "unlock" the row, no need to do it here + return ; + } + + // here we can try to do the job + + $metadata = new MetadataBag(); + + // add Uuid in metadatabag + if ($record->getUuid()) { + $metadata->add( + new Metadata( + new Tag\XMPExif\ImageUniqueID(), + new Mono($record->getUuid()) + ) + ); + $metadata->add( + new Metadata( + new Tag\ExifIFD\ImageUniqueID(), + new Mono($record->getUuid()) + ) + ); + $metadata->add( + new Metadata( + new Tag\IPTC\UniqueDocumentID(), + new Mono($record->getUuid()) + ) + ); + } + + // read document fields and add to metadatabag + $caption = $record->get_caption(); + foreach ($databox->get_meta_structure() as $fieldStructure) { + + $tagName = $fieldStructure->get_tag()->getTagname(); + $fieldName = $fieldStructure->get_name(); + + // skip fields with no src + if ($tagName == '' || $tagName == 'Phraseanet:no-source') { + continue; + } + + // check exiftool known tags to skip Phraseanet:tf-* + try { + $tag = TagFactory::getFromRDFTagname($tagName); + if(!$tag->isWritable()) { + continue; + } + } catch (TagUnknown $e) { + continue; + } + + try { + $field = $caption->get_field($fieldName); + $fieldValues = $field->get_values(); + + if ($fieldStructure->is_multi()) { + $values = array(); + foreach ($fieldValues as $value) { + $values[] = $this->removeNulChar($value->getValue()); + } + + $value = new Multi($values); + } else { + $fieldValue = array_pop($fieldValues); + $value = $this->removeNulChar($fieldValue->getValue()); + + // fix the dates edited into phraseanet + if($fieldStructure->get_type() === $fieldStructure::TYPE_DATE) { + try { + $value = self::fixDate($value); // will return NULL if the date is not valid + } + catch (Exception $e) { + $value = null; // do NOT write back to iptc + } + } + + if($value !== null) { // do not write invalid dates + $value = new Mono($value); + } + } + } + catch(Exception $e) { + // the field is not set in the record, erase it + if ($fieldStructure->is_multi()) { + $value = new Multi(array('')); + } + else { + $value = new Mono(''); + } + } + + if($value !== null) { // do not write invalid data + $metadata->add( + new Metadata($fieldStructure->get_tag(), $value) + ); + } + } + + $this->writer->reset(); + + if ($MWG) { + $this->writer->setModule(Writer::MODULE_MWG, true); + } + + $this->writer->erase($subdef->get_name() != 'document' || $clearDoc, true); + + // write meta in file + try { + $this->writer->write($subdef->getRealPath(), $metadata); + + $this->messagePublisher->pushLog(sprintf('meta written for sbasid=%1$d - recordid=%2$d (%3$s)', $databox->get_sbas_id(), $recordId, $subdef->get_name() )); + } + catch (Exception $e) { + $workerMessage = sprintf('meta NOT written for sbasid=%1$d - recordid=%2$d (%3$s) because "%4$s"', $databox->get_sbas_id(), $recordId, $subdef->get_name() , $e->getMessage()); + $this->logger->error($workerMessage); + + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + /** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionWritemeta() */ + $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( + $record, + $subdefName, + SubdefinitionWritemetaEvent::FAILED, + $workerMessage, + $count, + $workerRunningJob->getId() + )); + + // the subscriber will "unlock" the row, no need to do it here + return ; + } + + // mark write metas finished + $this->updateJeton($record); + + // tell that we have finished to work on this file + $this->repoWorker->markFinished($workerRunningJob); } private function removeNulChar($value) diff --git a/templates/web/admin/worker-manager/worker_info.html.twig b/templates/web/admin/worker-manager/worker_info.html.twig index fb03fd37eb..067f00aca6 100644 --- a/templates/web/admin/worker-manager/worker_info.html.twig +++ b/templates/web/admin/worker-manager/worker_info.html.twig @@ -153,7 +153,8 @@ type: "POST", dataType: 'json', data: { - status: '{{ constant("\\Alchemy\\Phrasea\\Model\\Entities\\WorkerRunningJob::INTERRUPT") }}' + status: '{{ constant("\\Alchemy\\Phrasea\\Model\\Entities\\WorkerRunningJob::INTERRUPT") }}', + finished: '1' // manual interrupt also means "finished", it must update the date and unlock the row }, url: "/admin/worker-manager/"+ workerId +"/change-status", success: function (data) {