From 43f8d117b02a82a6996728343deb616999ceee4c Mon Sep 17 00:00:00 2001 From: aynsix Date: Thu, 22 Apr 2021 13:34:37 +0300 Subject: [PATCH] PHRAS-3316 add retry on expose queue and fix purge --- .../WorkerManager/Queue/AMQPConnection.php | 5 +- .../Worker/ExposeUploadWorker.php | 110 +++++++++++++----- 2 files changed, 86 insertions(+), 29 deletions(-) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php index a381a9b7d9..e379e42e93 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php @@ -69,8 +69,9 @@ class AMQPConnection self::TTL_RETRY => self::DEFAULT_RETRY_DELAY_VALUE, ], MessagePublisher::EXPOSE_UPLOAD_TYPE => [ - 'with' => self::WITH_NOTHING, + 'with' => self::WITH_RETRY, self::MAX_RETRY => self::DEFAULT_MAX_RETRY_VALUE, + self::TTL_RETRY => self::DEFAULT_RETRY_DELAY_VALUE, ], MessagePublisher::FTP_TYPE => [ 'with' => self::WITH_RETRY, @@ -446,7 +447,7 @@ class AMQPConnection foreach ($queueNames as $queueName) { // re-inject conf values (some may have changed) $settings = $this->conf->get(['workers', 'queues', $queueName], []); - if(array_key_exists($queueName, $this->queues)) { + if($this->isBaseQueue($queueName) && array_key_exists($queueName, $this->queues)) { $this->queues[$queueName]['settings'] = array_merge($this->queues[$queueName]['settings'], $settings); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/ExposeUploadWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/ExposeUploadWorker.php index 2969085148..2a82c8ec5b 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/ExposeUploadWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/ExposeUploadWorker.php @@ -31,42 +31,65 @@ class ExposeUploadWorker implements WorkerInterface public function process(array $payload) { $em = $this->repoWorker->getEntityManager(); - $em->beginTransaction(); - $date = new \DateTime(); $message = [ 'message_type' => MessagePublisher::EXPOSE_UPLOAD_TYPE, 'payload' => $payload ]; - $workerRunningJob = new WorkerRunningJob(); - try { + + if (isset($payload['workerJobId'])) { + /** @var WorkerRunningJob $workerRunningJob */ + $workerRunningJob = $this->repoWorker->find($payload['workerJobId']); + + if ($workerRunningJob == null) { + $this->messagePublisher->pushLog("Given workerJobId not found !", 'error'); + + return ; + } + $workerRunningJob - ->setWork(MessagePublisher::EXPOSE_UPLOAD_TYPE) - ->setDataboxId($payload['databoxId']) - ->setRecordId($payload['recordId']) - ->setWorkOn($payload['exposeName']) - ->setPayload($message) - ->setPublished($date->setTimestamp($payload['published'])) - ->setStatus(WorkerRunningJob::RUNNING) - ; + ->setInfo(WorkerRunningJob::ATTEMPT . $payload['count']) + ->setStatus(WorkerRunningJob::RUNNING); $em->persist($workerRunningJob); $em->flush(); - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); + } else { + $em->beginTransaction(); + $date = new \DateTime(); + + $workerRunningJob = new WorkerRunningJob(); + + try { + $workerRunningJob + ->setWork(MessagePublisher::EXPOSE_UPLOAD_TYPE) + ->setDataboxId($payload['databoxId']) + ->setRecordId($payload['recordId']) + ->setWorkOn($payload['exposeName']) + ->setPayload($message) + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) + ; + + $em->persist($workerRunningJob); + + $em->flush(); + + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } } - $exposeConfiguration = $this->app['conf']->get(['phraseanet-service', 'expose-service', 'exposes'], []); - $exposeConfiguration = $exposeConfiguration[$payload['exposeName']]; - - $exposeClient = new Client(['base_uri' => $exposeConfiguration['expose_base_uri'], 'http_errors' => false]); - - $record = $this->findDataboxById($payload['databoxId'])->get_record($payload['recordId']); - try { + $exposeConfiguration = $this->app['conf']->get(['phraseanet-service', 'expose-service', 'exposes'], []); + $exposeConfiguration = $exposeConfiguration[$payload['exposeName']]; + + $exposeClient = new Client(['base_uri' => $exposeConfiguration['expose_base_uri'], 'http_errors' => false]); + + $record = $this->findDataboxById($payload['databoxId'])->get_record($payload['recordId']); + $helpers = new PhraseanetExtension($this->app); $canSeeBusiness = $helpers->isGrantedOnCollection($record->getBaseId(), [\ACL::CANMODIFRECORD]); @@ -131,7 +154,7 @@ class ExposeUploadWorker implements WorkerInterface if ($response->getStatusCode() !==201) { $this->messagePublisher->pushLog("An error occurred when creating asset: status-code " . $response->getStatusCode()); - $this->finishedJob($workerRunningJob, $em); + $this->finishedJob($workerRunningJob, $em, WorkerRunningJob::ERROR); return ; } @@ -171,8 +194,41 @@ class ExposeUploadWorker implements WorkerInterface $this->messagePublisher->pushLog("Asset ID :". $assetsResponse['id'] ." successfully uploaded! "); } catch (\Exception $e) { - $this->messagePublisher->pushLog("An error occurred when creating asset!: ". $e->getMessage()); - $this->finishedJob($workerRunningJob, $em); + $workerMessage = "An error occurred when creating asset!: ". $e->getMessage(); + + $this->messagePublisher->pushLog($workerMessage); + + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + $this->repoWorker->reconnect(); + $em->beginTransaction(); + try { + $workerRunningJob + ->setInfo(WorkerRunningJob::ATTEMPT. ($count - 1)) + ->setStatus(WorkerRunningJob::ERROR) + ; + + $em->persist($workerRunningJob); + $em->flush(); + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + + $payload['workerJobId'] = $workerRunningJob->getId(); + $fullPayload = [ + 'message_type' => MessagePublisher::EXPOSE_UPLOAD_TYPE, + 'payload' => $payload + ]; + + $this->messagePublisher->publishRetryMessage( + $fullPayload, + MessagePublisher::EXPOSE_UPLOAD_TYPE, + $count, + $workerMessage + ); + + return; } // tell that the upload is finished @@ -224,12 +280,12 @@ class ExposeUploadWorker implements WorkerInterface } } - private function finishedJob(WorkerRunningJob $workerRunningJob, EntityManager $em) + private function finishedJob(WorkerRunningJob $workerRunningJob, EntityManager $em, $status = WorkerRunningJob::FINISHED) { $this->repoWorker->reconnect(); $em->getConnection()->beginTransaction(); try { - $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); + $workerRunningJob->setStatus($status); $workerRunningJob->setFinished(new \DateTime('now')); $em->persist($workerRunningJob); $em->flush();