diff --git a/.env b/.env index c88e687227..4b15fe233f 100644 --- a/.env +++ b/.env @@ -139,13 +139,13 @@ PHRASEANET_EXPLODE_WORKER=1 PHRASEANET_WORKER_assetsIngest=1 PHRASEANET_WORKER_createRecord=2 PHRASEANET_WORKER_deleteRecord=2 +PHRASEANET_WORKER_editRecord=2 PHRASEANET_WORKER_exportMail=2 PHRASEANET_WORKER_exposeUpload=2 PHRASEANET_WORKER_ftp=1 PHRASEANET_WORKER_mainQueue=3 PHRASEANET_WORKER_populateIndex=1 PHRASEANET_WORKER_pullAssets=1 -PHRASEANET_WORKER_recordEdit=2 PHRASEANET_WORKER_subdefCreation=1 PHRASEANET_WORKER_subtitle=1 PHRASEANET_WORKER_validationReminder=1 diff --git a/docker-compose.yml b/docker-compose.yml index 195eed5146..25e1e4a1dd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -156,13 +156,13 @@ services: - PHRASEANET_WORKER_assetsIngest - PHRASEANET_WORKER_createRecord - PHRASEANET_WORKER_deleteRecord + - PHRASEANET_WORKER_editRecord - PHRASEANET_WORKER_exportMail - PHRASEANET_WORKER_exposeUpload - PHRASEANET_WORKER_ftp - PHRASEANET_WORKER_mainQueue - PHRASEANET_WORKER_populateIndex - PHRASEANET_WORKER_pullAssets - - PHRASEANET_WORKER_recordEdit - PHRASEANET_WORKER_subdefCreation - PHRASEANET_WORKER_subtitle - PHRASEANET_WORKER_validationReminder diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php index 1a3cc213fb..3443020251 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php @@ -17,7 +17,7 @@ use Alchemy\Phrasea\WorkerManager\Worker\MainQueueWorker; use Alchemy\Phrasea\WorkerManager\Worker\PopulateIndexWorker; use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool; use Alchemy\Phrasea\WorkerManager\Worker\PullAssetsWorker; -use Alchemy\Phrasea\WorkerManager\Worker\RecordEditWorker; +use Alchemy\Phrasea\WorkerManager\Worker\EditRecordWorker; use Alchemy\Phrasea\WorkerManager\Worker\Resolver\TypeBasedWorkerResolver; use Alchemy\Phrasea\WorkerManager\Worker\SubdefCreationWorker; use Alchemy\Phrasea\WorkerManager\Worker\SubtitleWorker; @@ -158,8 +158,8 @@ class AlchemyWorkerServiceProvider implements PluginProviderInterface return new ValidationReminderWorker($app); })); - $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::RECORD_EDIT_TYPE, new CallableWorkerFactory(function () use ($app) { - return (new RecordEditWorker($app['repo.worker-running-job'], $app['dispatcher'])) + $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::EDIT_RECORD_TYPE, new CallableWorkerFactory(function () use ($app) { + return (new EditRecordWorker($app['repo.worker-running-job'], $app['dispatcher'], $app['alchemy_worker.message.publisher'])) ->setApplicationBox($app['phraseanet.appbox']) ->setDataboxLoggerLocator($app['phraseanet.logger']) ; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php index d6529c775a..8477664493 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php @@ -93,9 +93,10 @@ class AMQPConnection self::MAX_RETRY => self::DEFAULT_MAX_RETRY_VALUE, self::TTL_RETRY => self::DEFAULT_RETRY_DELAY_VALUE, ], - MessagePublisher::RECORD_EDIT_TYPE => [ - 'with' => self::WITH_NOTHING, + MessagePublisher::EDIT_RECORD_TYPE => [ + 'with' => self::WITH_RETRY, self::MAX_RETRY => self::DEFAULT_MAX_RETRY_VALUE, + self::TTL_RETRY => self::DEFAULT_RETRY_DELAY_VALUE, ], MessagePublisher::SUBDEF_CREATION_TYPE => [ 'with' => self::WITH_RETRY | self::WITH_DELAYED, diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php index bd62a20e42..bc8d8dada4 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php @@ -127,6 +127,6 @@ class MessageHandler // give prefetch message to a worker consumer at a time $channel->basic_qos(null, $prefetchCount, null); - $channel->basic_consume($queueName, Uuid::uuid4(), false, false, false, false, $callback); + $channel->basic_consume($queueName, Uuid::uuid4()->toString(), false, false, false, false, $callback); } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php index d58a5a7711..63c6ea8c65 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php @@ -19,7 +19,7 @@ class MessagePublisher const FTP_TYPE = 'ftp'; const POPULATE_INDEX_TYPE = 'populateIndex'; const PULL_ASSETS_TYPE = 'pullAssets'; - const RECORD_EDIT_TYPE = 'recordEdit'; + const EDIT_RECORD_TYPE = 'editRecord'; const SUBDEF_CREATION_TYPE = 'subdefCreation'; const VALIDATION_REMINDER_TYPE = 'validationReminder'; const WRITE_METADATAS_TYPE = 'writeMetadatas'; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php index deca83fb89..0bb2065a35 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php @@ -288,7 +288,7 @@ class RecordSubscriber implements EventSubscriberInterface { // publish payload to queue $payload = [ - 'message_type' => MessagePublisher::RECORD_EDIT_TYPE, + 'message_type' => MessagePublisher::EDIT_RECORD_TYPE, 'payload' => [ 'dataType' => $event->getDataType(), 'data' => $event->getData(), @@ -297,7 +297,7 @@ class RecordSubscriber implements EventSubscriberInterface ] ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::RECORD_EDIT_TYPE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::EDIT_RECORD_TYPE); } public static function getSubscribedEvents() diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php new file mode 100644 index 0000000000..9d275b2f96 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/EditRecordWorker.php @@ -0,0 +1,237 @@ +repoWorker = $repoWorker; + $this->dispatcher = $dispatcher; + $this->messagePublisher = $messagePublisher; + } + + public function process(array $payload) + { + try { + $databox = $this->findDataboxById($payload['databoxId']); + } catch(\Exception $e) { + return; + } + + $recordIds = []; + + $workerRunningJob = null; + $em = $this->repoWorker->getEntityManager(); + + if (isset($payload['workerJobId'])) { + /** @var WorkerRunningJob $workerRunningJob */ + $workerRunningJob = $this->repoWorker->find($payload['workerJobId']); + + if ($workerRunningJob == null) { + $this->messagePublisher->pushLog("Given workerJobId not found !", 'error'); + + return ; + } + + $workerRunningJob + ->setInfo(WorkerRunningJob::ATTEMPT . $payload['count']) + ->setStatus(WorkerRunningJob::RUNNING); + + $em->persist($workerRunningJob); + + $em->flush(); + } else { + $this->repoWorker->reconnect(); + + $em->beginTransaction(); + + try { + $message = [ + 'message_type' => MessagePublisher::EDIT_RECORD_TYPE, + 'payload' => $payload + ]; + + $date = new \DateTime(); + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setDataboxId($payload['databoxId']) + ->setWork(MessagePublisher::EDIT_RECORD_TYPE) + ->setWorkOn("record") + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) + ->setPayload($message) + ; + + $em->persist($workerRunningJob); + $em->flush(); + + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + } + + try { + if ($payload['dataType'] === RecordEditInWorkerEvent::MDS_TYPE) { + if (isset($payload['data']) && isset($payload['elementIds'])) { + $recordIds = array_column($payload['data'], 'record_id'); + + foreach ($payload['data'] as $rec) { + try { + /** @var \record_adapter $record */ + $record = $databox->get_record($rec['record_id']); + } catch (\Exception $e) { + continue; + } + + $key = $record->getId(); + + if (!in_array($key, $payload['elementIds'])) { + continue; + } + + $statbits = $rec['status']; + $editDirty = $rec['edit']; + + if ($editDirty == '0') { + $editDirty = false; + } else { + $editDirty = true; + } + + if (isset($rec['metadatas']) && is_array($rec['metadatas'])) { + try { + $record->set_metadatas($rec['metadatas']); + $this->dispatcher->dispatch(PhraseaEvents::RECORD_EDIT, new RecordEdit($record)); + } catch (\Exception $e) { + continue; + } + } + + if (isset($rec['technicalsdatas']) && is_array($rec['technicalsdatas'])) { + $record->insertOrUpdateTechnicalDatas($rec['technicalsdatas']); + } + + $newstat = $record->getStatus(); + $statbits = ltrim($statbits, 'x'); + if (!in_array($statbits, ['', 'null'])) { + $mask_and = ltrim(str_replace(['x', '0', '1', 'z'], ['1', 'z', '0', '1'], $statbits), '0'); + if ($mask_and != '') { + $newstat = \databox_status::operation_and_not($newstat, $mask_and); + } + + $mask_or = ltrim(str_replace('x', '0', $statbits), '0'); + + if ($mask_or != '') { + $newstat = \databox_status::operation_or($newstat, $mask_or); + } + + $record->setStatus($newstat); + } + + $record->write_metas(); + + if ($statbits != '') { + $this->getDataboxLogger($databox) + ->log($record, \Session_Logger::EVENT_STATUS, '', ''); + } + if ($editDirty) { + $this->getDataboxLogger($databox) + ->log($record, \Session_Logger::EVENT_EDIT, '', ''); + } + } + } + } else { + $arg = json_decode($payload['data']); + $recordIds = array_column($arg->records, 'record_id'); + + // for now call record_adapter. no check, no acl, ... + foreach($arg->records as $rec) { + try { + /** @var \record_adapter $r */ + $r = $databox->get_record($rec->record_id); + $r->setMetadatasByActions($arg->actions); + } catch(\Exception $e) { + continue; + } + } + } + } catch (\Exception $e) { + $workerMessage = "An error occurred when editing record!: ". $e->getMessage(); + + $this->messagePublisher->pushLog($workerMessage); + + // if payload count doesn't exist + // count is 2 because it's to be the second time the message will be treated + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + $this->repoWorker->reconnect(); + $em->beginTransaction(); + try { + $workerRunningJob + ->setInfo(WorkerRunningJob::ATTEMPT. ($count - 1)) + ->setStatus(WorkerRunningJob::ERROR) + ; + + $em->persist($workerRunningJob); + $em->flush(); + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + + $payload['workerJobId'] = $workerRunningJob->getId(); + $fullPayload = [ + 'message_type' => MessagePublisher::EDIT_RECORD_TYPE, + 'payload' => $payload + ]; + + $this->messagePublisher->publishRetryMessage( + $fullPayload, + MessagePublisher::EDIT_RECORD_TYPE, + $count, + $workerMessage + ); + + return; + } + + // order to write metas for those records + $this->dispatcher->dispatch(WorkerEvents::RECORDS_WRITE_META, + new RecordsWriteMetaEvent($recordIds, $payload['databoxId']) + ); + + // tell that we have finished to work on edit + $this->repoWorker->reconnect(); + $em->getConnection()->beginTransaction(); + try { + $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); + $workerRunningJob->setFinished(new \DateTime('now')); + $em->persist($workerRunningJob); + $em->flush(); + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/RecordEditWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/RecordEditWorker.php deleted file mode 100644 index 2e7f03ab1a..0000000000 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/RecordEditWorker.php +++ /dev/null @@ -1,176 +0,0 @@ -repoWorker = $repoWorker; - $this->dispatcher = $dispatcher; - } - - public function process(array $payload) - { - try { - $databox = $this->findDataboxById($payload['databoxId']); - } catch(\Exception $e) { - return; - } - - $recordIds = []; - - $workerRunningJob = null; - - $em = $this->repoWorker->getEntityManager(); - $this->repoWorker->reconnect(); - - $em->beginTransaction(); - - try { - $message = [ - 'message_type' => MessagePublisher::RECORD_EDIT_TYPE, - 'payload' => $payload - ]; - - $date = new \DateTime(); - $workerRunningJob = new WorkerRunningJob(); - $workerRunningJob - ->setDataboxId($payload['databoxId']) - ->setWork(MessagePublisher::RECORD_EDIT_TYPE) - ->setWorkOn("record") - ->setPublished($date->setTimestamp($payload['published'])) - ->setStatus(WorkerRunningJob::RUNNING) - ->setPayload($message) - ; - - $em->persist($workerRunningJob); - $em->flush(); - - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); - } - - if ($payload['dataType'] === RecordEditInWorkerEvent::MDS_TYPE) { - if (isset($payload['data']) && isset($payload['elementIds'])) { - $recordIds = array_column($payload['data'], 'record_id'); - - foreach ($payload['data'] as $rec) { - try { - /** @var \record_adapter $record */ - $record = $databox->get_record($rec['record_id']); - } catch (\Exception $e) { - continue; - } - - $key = $record->getId(); - - if (!in_array($key, $payload['elementIds'])) { - continue; - } - - $statbits = $rec['status']; - $editDirty = $rec['edit']; - - if ($editDirty == '0') { - $editDirty = false; - } else { - $editDirty = true; - } - - if (isset($rec['metadatas']) && is_array($rec['metadatas'])) { - try { - $record->set_metadatas($rec['metadatas']); - $this->dispatcher->dispatch(PhraseaEvents::RECORD_EDIT, new RecordEdit($record)); - } catch (\Exception $e) { - continue; - } - } - - if (isset($rec['technicalsdatas']) && is_array($rec['technicalsdatas'])) { - $record->insertOrUpdateTechnicalDatas($rec['technicalsdatas']); - } - - $newstat = $record->getStatus(); - $statbits = ltrim($statbits, 'x'); - if (!in_array($statbits, ['', 'null'])) { - $mask_and = ltrim(str_replace(['x', '0', '1', 'z'], ['1', 'z', '0', '1'], $statbits), '0'); - if ($mask_and != '') { - $newstat = \databox_status::operation_and_not($newstat, $mask_and); - } - - $mask_or = ltrim(str_replace('x', '0', $statbits), '0'); - - if ($mask_or != '') { - $newstat = \databox_status::operation_or($newstat, $mask_or); - } - - $record->setStatus($newstat); - } - - $record->write_metas(); - - if ($statbits != '') { - $this->getDataboxLogger($databox) - ->log($record, \Session_Logger::EVENT_STATUS, '', ''); - } - if ($editDirty) { - $this->getDataboxLogger($databox) - ->log($record, \Session_Logger::EVENT_EDIT, '', ''); - } - } - } - } else { - $arg = json_decode($payload['data']); - $recordIds = array_column($arg->records, 'record_id'); - - // for now call record_adapter. no check, no acl, ... - foreach($arg->records as $rec) { - try { - /** @var \record_adapter $r */ - $r = $databox->get_record($rec->record_id); - $r->setMetadatasByActions($arg->actions); - } catch(\Exception $e) { - continue; - } - } - } - - // order to write metas for those records - $this->dispatcher->dispatch(WorkerEvents::RECORDS_WRITE_META, - new RecordsWriteMetaEvent($recordIds, $payload['databoxId']) - ); - - // tell that we have finished to work on edit - $this->repoWorker->reconnect(); - $em->getConnection()->beginTransaction(); - try { - $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); - $workerRunningJob->setFinished(new \DateTime('now')); - $em->persist($workerRunningJob); - $em->flush(); - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); - } - } -} diff --git a/lib/classes/patch/413.php b/lib/classes/patch/413.php index f8a86afc91..e3c56c55a1 100644 --- a/lib/classes/patch/413.php +++ b/lib/classes/patch/413.php @@ -15,7 +15,7 @@ class patch_413 implements patchInterface 'ftp' => MessagePublisher::FTP_TYPE, 'populateIndex' => MessagePublisher::POPULATE_INDEX_TYPE, 'pullAssets' => MessagePublisher::PULL_ASSETS_TYPE, - 'recordEdit' => MessagePublisher::RECORD_EDIT_TYPE, + 'editRecord' => MessagePublisher::EDIT_RECORD_TYPE, 'subdefCreation' => MessagePublisher::SUBDEF_CREATION_TYPE, 'validationReminder' => MessagePublisher::VALIDATION_REMINDER_TYPE, 'writeMetadatas' => MessagePublisher::WRITE_METADATAS_TYPE,