PHRAS-3445 add edit retry and failed Q

This commit is contained in:
aynsix
2021-05-25 17:35:03 +03:00
parent 12166e8533
commit 6437023b21
10 changed files with 250 additions and 188 deletions

2
.env
View File

@@ -139,13 +139,13 @@ PHRASEANET_EXPLODE_WORKER=1
PHRASEANET_WORKER_assetsIngest=1 PHRASEANET_WORKER_assetsIngest=1
PHRASEANET_WORKER_createRecord=2 PHRASEANET_WORKER_createRecord=2
PHRASEANET_WORKER_deleteRecord=2 PHRASEANET_WORKER_deleteRecord=2
PHRASEANET_WORKER_editRecord=2
PHRASEANET_WORKER_exportMail=2 PHRASEANET_WORKER_exportMail=2
PHRASEANET_WORKER_exposeUpload=2 PHRASEANET_WORKER_exposeUpload=2
PHRASEANET_WORKER_ftp=1 PHRASEANET_WORKER_ftp=1
PHRASEANET_WORKER_mainQueue=3 PHRASEANET_WORKER_mainQueue=3
PHRASEANET_WORKER_populateIndex=1 PHRASEANET_WORKER_populateIndex=1
PHRASEANET_WORKER_pullAssets=1 PHRASEANET_WORKER_pullAssets=1
PHRASEANET_WORKER_recordEdit=2
PHRASEANET_WORKER_subdefCreation=1 PHRASEANET_WORKER_subdefCreation=1
PHRASEANET_WORKER_subtitle=1 PHRASEANET_WORKER_subtitle=1
PHRASEANET_WORKER_validationReminder=1 PHRASEANET_WORKER_validationReminder=1

View File

@@ -156,13 +156,13 @@ services:
- PHRASEANET_WORKER_assetsIngest - PHRASEANET_WORKER_assetsIngest
- PHRASEANET_WORKER_createRecord - PHRASEANET_WORKER_createRecord
- PHRASEANET_WORKER_deleteRecord - PHRASEANET_WORKER_deleteRecord
- PHRASEANET_WORKER_editRecord
- PHRASEANET_WORKER_exportMail - PHRASEANET_WORKER_exportMail
- PHRASEANET_WORKER_exposeUpload - PHRASEANET_WORKER_exposeUpload
- PHRASEANET_WORKER_ftp - PHRASEANET_WORKER_ftp
- PHRASEANET_WORKER_mainQueue - PHRASEANET_WORKER_mainQueue
- PHRASEANET_WORKER_populateIndex - PHRASEANET_WORKER_populateIndex
- PHRASEANET_WORKER_pullAssets - PHRASEANET_WORKER_pullAssets
- PHRASEANET_WORKER_recordEdit
- PHRASEANET_WORKER_subdefCreation - PHRASEANET_WORKER_subdefCreation
- PHRASEANET_WORKER_subtitle - PHRASEANET_WORKER_subtitle
- PHRASEANET_WORKER_validationReminder - PHRASEANET_WORKER_validationReminder

View File

@@ -17,7 +17,7 @@ use Alchemy\Phrasea\WorkerManager\Worker\MainQueueWorker;
use Alchemy\Phrasea\WorkerManager\Worker\PopulateIndexWorker; use Alchemy\Phrasea\WorkerManager\Worker\PopulateIndexWorker;
use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool; use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool;
use Alchemy\Phrasea\WorkerManager\Worker\PullAssetsWorker; use Alchemy\Phrasea\WorkerManager\Worker\PullAssetsWorker;
use Alchemy\Phrasea\WorkerManager\Worker\RecordEditWorker; use Alchemy\Phrasea\WorkerManager\Worker\EditRecordWorker;
use Alchemy\Phrasea\WorkerManager\Worker\Resolver\TypeBasedWorkerResolver; use Alchemy\Phrasea\WorkerManager\Worker\Resolver\TypeBasedWorkerResolver;
use Alchemy\Phrasea\WorkerManager\Worker\SubdefCreationWorker; use Alchemy\Phrasea\WorkerManager\Worker\SubdefCreationWorker;
use Alchemy\Phrasea\WorkerManager\Worker\SubtitleWorker; use Alchemy\Phrasea\WorkerManager\Worker\SubtitleWorker;
@@ -158,8 +158,8 @@ class AlchemyWorkerServiceProvider implements PluginProviderInterface
return new ValidationReminderWorker($app); return new ValidationReminderWorker($app);
})); }));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::RECORD_EDIT_TYPE, new CallableWorkerFactory(function () use ($app) { $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::EDIT_RECORD_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new RecordEditWorker($app['repo.worker-running-job'], $app['dispatcher'])) return (new EditRecordWorker($app['repo.worker-running-job'], $app['dispatcher'], $app['alchemy_worker.message.publisher']))
->setApplicationBox($app['phraseanet.appbox']) ->setApplicationBox($app['phraseanet.appbox'])
->setDataboxLoggerLocator($app['phraseanet.logger']) ->setDataboxLoggerLocator($app['phraseanet.logger'])
; ;

View File

@@ -93,9 +93,10 @@ class AMQPConnection
self::MAX_RETRY => self::DEFAULT_MAX_RETRY_VALUE, self::MAX_RETRY => self::DEFAULT_MAX_RETRY_VALUE,
self::TTL_RETRY => self::DEFAULT_RETRY_DELAY_VALUE, self::TTL_RETRY => self::DEFAULT_RETRY_DELAY_VALUE,
], ],
MessagePublisher::RECORD_EDIT_TYPE => [ MessagePublisher::EDIT_RECORD_TYPE => [
'with' => self::WITH_NOTHING, 'with' => self::WITH_RETRY,
self::MAX_RETRY => self::DEFAULT_MAX_RETRY_VALUE, self::MAX_RETRY => self::DEFAULT_MAX_RETRY_VALUE,
self::TTL_RETRY => self::DEFAULT_RETRY_DELAY_VALUE,
], ],
MessagePublisher::SUBDEF_CREATION_TYPE => [ MessagePublisher::SUBDEF_CREATION_TYPE => [
'with' => self::WITH_RETRY | self::WITH_DELAYED, 'with' => self::WITH_RETRY | self::WITH_DELAYED,

View File

@@ -127,6 +127,6 @@ class MessageHandler
// give prefetch message to a worker consumer at a time // give prefetch message to a worker consumer at a time
$channel->basic_qos(null, $prefetchCount, null); $channel->basic_qos(null, $prefetchCount, null);
$channel->basic_consume($queueName, Uuid::uuid4(), false, false, false, false, $callback); $channel->basic_consume($queueName, Uuid::uuid4()->toString(), false, false, false, false, $callback);
} }
} }

View File

@@ -19,7 +19,7 @@ class MessagePublisher
const FTP_TYPE = 'ftp'; const FTP_TYPE = 'ftp';
const POPULATE_INDEX_TYPE = 'populateIndex'; const POPULATE_INDEX_TYPE = 'populateIndex';
const PULL_ASSETS_TYPE = 'pullAssets'; const PULL_ASSETS_TYPE = 'pullAssets';
const RECORD_EDIT_TYPE = 'recordEdit'; const EDIT_RECORD_TYPE = 'editRecord';
const SUBDEF_CREATION_TYPE = 'subdefCreation'; const SUBDEF_CREATION_TYPE = 'subdefCreation';
const VALIDATION_REMINDER_TYPE = 'validationReminder'; const VALIDATION_REMINDER_TYPE = 'validationReminder';
const WRITE_METADATAS_TYPE = 'writeMetadatas'; const WRITE_METADATAS_TYPE = 'writeMetadatas';

View File

@@ -288,7 +288,7 @@ class RecordSubscriber implements EventSubscriberInterface
{ {
// publish payload to queue // publish payload to queue
$payload = [ $payload = [
'message_type' => MessagePublisher::RECORD_EDIT_TYPE, 'message_type' => MessagePublisher::EDIT_RECORD_TYPE,
'payload' => [ 'payload' => [
'dataType' => $event->getDataType(), 'dataType' => $event->getDataType(),
'data' => $event->getData(), 'data' => $event->getData(),
@@ -297,7 +297,7 @@ class RecordSubscriber implements EventSubscriberInterface
] ]
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::RECORD_EDIT_TYPE); $this->messagePublisher->publishMessage($payload, MessagePublisher::EDIT_RECORD_TYPE);
} }
public static function getSubscribedEvents() public static function getSubscribedEvents()

View File

@@ -0,0 +1,237 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
use Alchemy\Phrasea\Application\Helper\DataboxLoggerAware;
use Alchemy\Phrasea\Core\Event\RecordEdit;
use Alchemy\Phrasea\Core\PhraseaEvents;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\WorkerManager\Event\RecordEditInWorkerEvent;
use Alchemy\Phrasea\WorkerManager\Event\RecordsWriteMetaEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class EditRecordWorker implements WorkerInterface
{
use ApplicationBoxAware;
use DataboxLoggerAware;
private $repoWorker;
private $dispatcher;
private $messagePublisher;
public function __construct(WorkerRunningJobRepository $repoWorker, EventDispatcherInterface $dispatcher, MessagePublisher $messagePublisher)
{
$this->repoWorker = $repoWorker;
$this->dispatcher = $dispatcher;
$this->messagePublisher = $messagePublisher;
}
public function process(array $payload)
{
try {
$databox = $this->findDataboxById($payload['databoxId']);
} catch(\Exception $e) {
return;
}
$recordIds = [];
$workerRunningJob = null;
$em = $this->repoWorker->getEntityManager();
if (isset($payload['workerJobId'])) {
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $this->repoWorker->find($payload['workerJobId']);
if ($workerRunningJob == null) {
$this->messagePublisher->pushLog("Given workerJobId not found !", 'error');
return ;
}
$workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT . $payload['count'])
->setStatus(WorkerRunningJob::RUNNING);
$em->persist($workerRunningJob);
$em->flush();
} else {
$this->repoWorker->reconnect();
$em->beginTransaction();
try {
$message = [
'message_type' => MessagePublisher::EDIT_RECORD_TYPE,
'payload' => $payload
];
$date = new \DateTime();
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setDataboxId($payload['databoxId'])
->setWork(MessagePublisher::EDIT_RECORD_TYPE)
->setWorkOn("record")
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
->setPayload($message)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
try {
if ($payload['dataType'] === RecordEditInWorkerEvent::MDS_TYPE) {
if (isset($payload['data']) && isset($payload['elementIds'])) {
$recordIds = array_column($payload['data'], 'record_id');
foreach ($payload['data'] as $rec) {
try {
/** @var \record_adapter $record */
$record = $databox->get_record($rec['record_id']);
} catch (\Exception $e) {
continue;
}
$key = $record->getId();
if (!in_array($key, $payload['elementIds'])) {
continue;
}
$statbits = $rec['status'];
$editDirty = $rec['edit'];
if ($editDirty == '0') {
$editDirty = false;
} else {
$editDirty = true;
}
if (isset($rec['metadatas']) && is_array($rec['metadatas'])) {
try {
$record->set_metadatas($rec['metadatas']);
$this->dispatcher->dispatch(PhraseaEvents::RECORD_EDIT, new RecordEdit($record));
} catch (\Exception $e) {
continue;
}
}
if (isset($rec['technicalsdatas']) && is_array($rec['technicalsdatas'])) {
$record->insertOrUpdateTechnicalDatas($rec['technicalsdatas']);
}
$newstat = $record->getStatus();
$statbits = ltrim($statbits, 'x');
if (!in_array($statbits, ['', 'null'])) {
$mask_and = ltrim(str_replace(['x', '0', '1', 'z'], ['1', 'z', '0', '1'], $statbits), '0');
if ($mask_and != '') {
$newstat = \databox_status::operation_and_not($newstat, $mask_and);
}
$mask_or = ltrim(str_replace('x', '0', $statbits), '0');
if ($mask_or != '') {
$newstat = \databox_status::operation_or($newstat, $mask_or);
}
$record->setStatus($newstat);
}
$record->write_metas();
if ($statbits != '') {
$this->getDataboxLogger($databox)
->log($record, \Session_Logger::EVENT_STATUS, '', '');
}
if ($editDirty) {
$this->getDataboxLogger($databox)
->log($record, \Session_Logger::EVENT_EDIT, '', '');
}
}
}
} else {
$arg = json_decode($payload['data']);
$recordIds = array_column($arg->records, 'record_id');
// for now call record_adapter. no check, no acl, ...
foreach($arg->records as $rec) {
try {
/** @var \record_adapter $r */
$r = $databox->get_record($rec->record_id);
$r->setMetadatasByActions($arg->actions);
} catch(\Exception $e) {
continue;
}
}
}
} catch (\Exception $e) {
$workerMessage = "An error occurred when editing record!: ". $e->getMessage();
$this->messagePublisher->pushLog($workerMessage);
// if payload count doesn't exist
// count is 2 because it's to be the second time the message will be treated
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$this->repoWorker->reconnect();
$em->beginTransaction();
try {
$workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT. ($count - 1))
->setStatus(WorkerRunningJob::ERROR)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
$payload['workerJobId'] = $workerRunningJob->getId();
$fullPayload = [
'message_type' => MessagePublisher::EDIT_RECORD_TYPE,
'payload' => $payload
];
$this->messagePublisher->publishRetryMessage(
$fullPayload,
MessagePublisher::EDIT_RECORD_TYPE,
$count,
$workerMessage
);
return;
}
// order to write metas for those records
$this->dispatcher->dispatch(WorkerEvents::RECORDS_WRITE_META,
new RecordsWriteMetaEvent($recordIds, $payload['databoxId'])
);
// tell that we have finished to work on edit
$this->repoWorker->reconnect();
$em->getConnection()->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$workerRunningJob->setFinished(new \DateTime('now'));
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
}

View File

@@ -1,176 +0,0 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
use Alchemy\Phrasea\Application\Helper\DataboxLoggerAware;
use Alchemy\Phrasea\Core\Event\RecordEdit;
use Alchemy\Phrasea\Core\PhraseaEvents;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\WorkerManager\Event\RecordEditInWorkerEvent;
use Alchemy\Phrasea\WorkerManager\Event\RecordsWriteMetaEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class RecordEditWorker implements WorkerInterface
{
use ApplicationBoxAware;
use DataboxLoggerAware;
private $repoWorker;
private $dispatcher;
public function __construct(WorkerRunningJobRepository $repoWorker, EventDispatcherInterface $dispatcher)
{
$this->repoWorker = $repoWorker;
$this->dispatcher = $dispatcher;
}
public function process(array $payload)
{
try {
$databox = $this->findDataboxById($payload['databoxId']);
} catch(\Exception $e) {
return;
}
$recordIds = [];
$workerRunningJob = null;
$em = $this->repoWorker->getEntityManager();
$this->repoWorker->reconnect();
$em->beginTransaction();
try {
$message = [
'message_type' => MessagePublisher::RECORD_EDIT_TYPE,
'payload' => $payload
];
$date = new \DateTime();
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setDataboxId($payload['databoxId'])
->setWork(MessagePublisher::RECORD_EDIT_TYPE)
->setWorkOn("record")
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
->setPayload($message)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
if ($payload['dataType'] === RecordEditInWorkerEvent::MDS_TYPE) {
if (isset($payload['data']) && isset($payload['elementIds'])) {
$recordIds = array_column($payload['data'], 'record_id');
foreach ($payload['data'] as $rec) {
try {
/** @var \record_adapter $record */
$record = $databox->get_record($rec['record_id']);
} catch (\Exception $e) {
continue;
}
$key = $record->getId();
if (!in_array($key, $payload['elementIds'])) {
continue;
}
$statbits = $rec['status'];
$editDirty = $rec['edit'];
if ($editDirty == '0') {
$editDirty = false;
} else {
$editDirty = true;
}
if (isset($rec['metadatas']) && is_array($rec['metadatas'])) {
try {
$record->set_metadatas($rec['metadatas']);
$this->dispatcher->dispatch(PhraseaEvents::RECORD_EDIT, new RecordEdit($record));
} catch (\Exception $e) {
continue;
}
}
if (isset($rec['technicalsdatas']) && is_array($rec['technicalsdatas'])) {
$record->insertOrUpdateTechnicalDatas($rec['technicalsdatas']);
}
$newstat = $record->getStatus();
$statbits = ltrim($statbits, 'x');
if (!in_array($statbits, ['', 'null'])) {
$mask_and = ltrim(str_replace(['x', '0', '1', 'z'], ['1', 'z', '0', '1'], $statbits), '0');
if ($mask_and != '') {
$newstat = \databox_status::operation_and_not($newstat, $mask_and);
}
$mask_or = ltrim(str_replace('x', '0', $statbits), '0');
if ($mask_or != '') {
$newstat = \databox_status::operation_or($newstat, $mask_or);
}
$record->setStatus($newstat);
}
$record->write_metas();
if ($statbits != '') {
$this->getDataboxLogger($databox)
->log($record, \Session_Logger::EVENT_STATUS, '', '');
}
if ($editDirty) {
$this->getDataboxLogger($databox)
->log($record, \Session_Logger::EVENT_EDIT, '', '');
}
}
}
} else {
$arg = json_decode($payload['data']);
$recordIds = array_column($arg->records, 'record_id');
// for now call record_adapter. no check, no acl, ...
foreach($arg->records as $rec) {
try {
/** @var \record_adapter $r */
$r = $databox->get_record($rec->record_id);
$r->setMetadatasByActions($arg->actions);
} catch(\Exception $e) {
continue;
}
}
}
// order to write metas for those records
$this->dispatcher->dispatch(WorkerEvents::RECORDS_WRITE_META,
new RecordsWriteMetaEvent($recordIds, $payload['databoxId'])
);
// tell that we have finished to work on edit
$this->repoWorker->reconnect();
$em->getConnection()->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$workerRunningJob->setFinished(new \DateTime('now'));
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
}

View File

@@ -15,7 +15,7 @@ class patch_413 implements patchInterface
'ftp' => MessagePublisher::FTP_TYPE, 'ftp' => MessagePublisher::FTP_TYPE,
'populateIndex' => MessagePublisher::POPULATE_INDEX_TYPE, 'populateIndex' => MessagePublisher::POPULATE_INDEX_TYPE,
'pullAssets' => MessagePublisher::PULL_ASSETS_TYPE, 'pullAssets' => MessagePublisher::PULL_ASSETS_TYPE,
'recordEdit' => MessagePublisher::RECORD_EDIT_TYPE, 'editRecord' => MessagePublisher::EDIT_RECORD_TYPE,
'subdefCreation' => MessagePublisher::SUBDEF_CREATION_TYPE, 'subdefCreation' => MessagePublisher::SUBDEF_CREATION_TYPE,
'validationReminder' => MessagePublisher::VALIDATION_REMINDER_TYPE, 'validationReminder' => MessagePublisher::VALIDATION_REMINDER_TYPE,
'writeMetadatas' => MessagePublisher::WRITE_METADATAS_TYPE, 'writeMetadatas' => MessagePublisher::WRITE_METADATAS_TYPE,