PHRAS-3316 add retry on expose queue and fix purge

This commit is contained in:
aynsix
2021-04-22 13:34:37 +03:00
parent 2b895807b5
commit 43f8d117b0
2 changed files with 86 additions and 29 deletions

View File

@@ -69,8 +69,9 @@ class AMQPConnection
self::TTL_RETRY => self::DEFAULT_RETRY_DELAY_VALUE, self::TTL_RETRY => self::DEFAULT_RETRY_DELAY_VALUE,
], ],
MessagePublisher::EXPOSE_UPLOAD_TYPE => [ MessagePublisher::EXPOSE_UPLOAD_TYPE => [
'with' => self::WITH_NOTHING, 'with' => self::WITH_RETRY,
self::MAX_RETRY => self::DEFAULT_MAX_RETRY_VALUE, self::MAX_RETRY => self::DEFAULT_MAX_RETRY_VALUE,
self::TTL_RETRY => self::DEFAULT_RETRY_DELAY_VALUE,
], ],
MessagePublisher::FTP_TYPE => [ MessagePublisher::FTP_TYPE => [
'with' => self::WITH_RETRY, 'with' => self::WITH_RETRY,
@@ -446,7 +447,7 @@ class AMQPConnection
foreach ($queueNames as $queueName) { foreach ($queueNames as $queueName) {
// re-inject conf values (some may have changed) // re-inject conf values (some may have changed)
$settings = $this->conf->get(['workers', 'queues', $queueName], []); $settings = $this->conf->get(['workers', 'queues', $queueName], []);
if(array_key_exists($queueName, $this->queues)) { if($this->isBaseQueue($queueName) && array_key_exists($queueName, $this->queues)) {
$this->queues[$queueName]['settings'] = array_merge($this->queues[$queueName]['settings'], $settings); $this->queues[$queueName]['settings'] = array_merge($this->queues[$queueName]['settings'], $settings);
} }

View File

@@ -31,42 +31,65 @@ class ExposeUploadWorker implements WorkerInterface
public function process(array $payload) public function process(array $payload)
{ {
$em = $this->repoWorker->getEntityManager(); $em = $this->repoWorker->getEntityManager();
$em->beginTransaction();
$date = new \DateTime();
$message = [ $message = [
'message_type' => MessagePublisher::EXPOSE_UPLOAD_TYPE, 'message_type' => MessagePublisher::EXPOSE_UPLOAD_TYPE,
'payload' => $payload 'payload' => $payload
]; ];
$workerRunningJob = new WorkerRunningJob();
try { if (isset($payload['workerJobId'])) {
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $this->repoWorker->find($payload['workerJobId']);
if ($workerRunningJob == null) {
$this->messagePublisher->pushLog("Given workerJobId not found !", 'error');
return ;
}
$workerRunningJob $workerRunningJob
->setWork(MessagePublisher::EXPOSE_UPLOAD_TYPE) ->setInfo(WorkerRunningJob::ATTEMPT . $payload['count'])
->setDataboxId($payload['databoxId']) ->setStatus(WorkerRunningJob::RUNNING);
->setRecordId($payload['recordId'])
->setWorkOn($payload['exposeName'])
->setPayload($message)
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
;
$em->persist($workerRunningJob); $em->persist($workerRunningJob);
$em->flush(); $em->flush();
$em->commit(); } else {
} catch (\Exception $e) { $em->beginTransaction();
$em->rollback(); $date = new \DateTime();
$workerRunningJob = new WorkerRunningJob();
try {
$workerRunningJob
->setWork(MessagePublisher::EXPOSE_UPLOAD_TYPE)
->setDataboxId($payload['databoxId'])
->setRecordId($payload['recordId'])
->setWorkOn($payload['exposeName'])
->setPayload($message)
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
} }
$exposeConfiguration = $this->app['conf']->get(['phraseanet-service', 'expose-service', 'exposes'], []);
$exposeConfiguration = $exposeConfiguration[$payload['exposeName']];
$exposeClient = new Client(['base_uri' => $exposeConfiguration['expose_base_uri'], 'http_errors' => false]);
$record = $this->findDataboxById($payload['databoxId'])->get_record($payload['recordId']);
try { try {
$exposeConfiguration = $this->app['conf']->get(['phraseanet-service', 'expose-service', 'exposes'], []);
$exposeConfiguration = $exposeConfiguration[$payload['exposeName']];
$exposeClient = new Client(['base_uri' => $exposeConfiguration['expose_base_uri'], 'http_errors' => false]);
$record = $this->findDataboxById($payload['databoxId'])->get_record($payload['recordId']);
$helpers = new PhraseanetExtension($this->app); $helpers = new PhraseanetExtension($this->app);
$canSeeBusiness = $helpers->isGrantedOnCollection($record->getBaseId(), [\ACL::CANMODIFRECORD]); $canSeeBusiness = $helpers->isGrantedOnCollection($record->getBaseId(), [\ACL::CANMODIFRECORD]);
@@ -131,7 +154,7 @@ class ExposeUploadWorker implements WorkerInterface
if ($response->getStatusCode() !==201) { if ($response->getStatusCode() !==201) {
$this->messagePublisher->pushLog("An error occurred when creating asset: status-code " . $response->getStatusCode()); $this->messagePublisher->pushLog("An error occurred when creating asset: status-code " . $response->getStatusCode());
$this->finishedJob($workerRunningJob, $em); $this->finishedJob($workerRunningJob, $em, WorkerRunningJob::ERROR);
return ; return ;
} }
@@ -171,8 +194,41 @@ class ExposeUploadWorker implements WorkerInterface
$this->messagePublisher->pushLog("Asset ID :". $assetsResponse['id'] ." successfully uploaded! "); $this->messagePublisher->pushLog("Asset ID :". $assetsResponse['id'] ." successfully uploaded! ");
} catch (\Exception $e) { } catch (\Exception $e) {
$this->messagePublisher->pushLog("An error occurred when creating asset!: ". $e->getMessage()); $workerMessage = "An error occurred when creating asset!: ". $e->getMessage();
$this->finishedJob($workerRunningJob, $em);
$this->messagePublisher->pushLog($workerMessage);
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$this->repoWorker->reconnect();
$em->beginTransaction();
try {
$workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT. ($count - 1))
->setStatus(WorkerRunningJob::ERROR)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
$payload['workerJobId'] = $workerRunningJob->getId();
$fullPayload = [
'message_type' => MessagePublisher::EXPOSE_UPLOAD_TYPE,
'payload' => $payload
];
$this->messagePublisher->publishRetryMessage(
$fullPayload,
MessagePublisher::EXPOSE_UPLOAD_TYPE,
$count,
$workerMessage
);
return;
} }
// tell that the upload is finished // tell that the upload is finished
@@ -224,12 +280,12 @@ class ExposeUploadWorker implements WorkerInterface
} }
} }
private function finishedJob(WorkerRunningJob $workerRunningJob, EntityManager $em) private function finishedJob(WorkerRunningJob $workerRunningJob, EntityManager $em, $status = WorkerRunningJob::FINISHED)
{ {
$this->repoWorker->reconnect(); $this->repoWorker->reconnect();
$em->getConnection()->beginTransaction(); $em->getConnection()->beginTransaction();
try { try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED); $workerRunningJob->setStatus($status);
$workerRunningJob->setFinished(new \DateTime('now')); $workerRunningJob->setFinished(new \DateTime('now'));
$em->persist($workerRunningJob); $em->persist($workerRunningJob);
$em->flush(); $em->flush();