manage error on subdef an write meta

This commit is contained in:
aynsix
2020-05-15 21:04:30 +03:00
parent 6a7ca08000
commit 5f9fb0224e
4 changed files with 210 additions and 206 deletions

View File

@@ -73,7 +73,6 @@ class AlchemyWorkerServiceProvider implements PluginProviderInterface
$app['elasticsearch.indexer']
))
->setApplicationBox($app['phraseanet.appbox'])
->setEntityManagerLocator(new LazyLocator($app, 'orm.em'))
;
}));

View File

@@ -9,7 +9,10 @@ use Alchemy\Phrasea\Core\Event\Record\MetadataChangedEvent;
use Alchemy\Phrasea\Core\Event\Record\RecordEvent;
use Alchemy\Phrasea\Core\Event\Record\RecordEvents;
use Alchemy\Phrasea\Core\Event\Record\SubdefinitionCreateEvent;
use Alchemy\Phrasea\Core\PhraseaTokens;
use Alchemy\Phrasea\Databox\Subdef\MediaSubdefRepository;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\WorkerManager\Event\StoryCreateCoverEvent;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionCreationFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent;
@@ -35,6 +38,10 @@ class RecordSubscriber implements EventSubscriberInterface
* @var callable
*/
private $appboxLocator;
/**
* @var WorkerRunningJobRepository
*/
private $repoWorker;
public function __construct(Application $app, callable $appboxLocator)
{
@@ -42,6 +49,7 @@ class RecordSubscriber implements EventSubscriberInterface
$this->workerResolver = $app['alchemy_worker.type_based_worker_resolver'];
$this->app = $app;
$this->appboxLocator = $appboxLocator;
$this->repoWorker = $app['repo.worker-running-job'];
}
public function onSubdefinitionCreate(SubdefinitionCreateEvent $event)
@@ -96,6 +104,23 @@ class RecordSubscriber implements EventSubscriberInterface
]
];
$em = $this->repoWorker->getEntityManager();
$workerRunningJob = $this->repoWorker->findOneBy([
'databoxId' => $event->getRecord()->getDataboxId(),
'recordId' => $event->getRecord()->getRecordId(),
'work' => PhraseaTokens::MAKE_SUBDEF,
'workOn' => $event->getSubdefName()
]);
$em->beginTransaction();
try {
$em->remove($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_SUBDEF_QUEUE,
@@ -198,6 +223,25 @@ class RecordSubscriber implements EventSubscriberInterface
);
$this->messagePublisher->pushLog($logMessage);
$jeton = ($event->getSubdefName() == "document") ? PhraseaTokens::WRITE_META_DOC : PhraseaTokens::WRITE_META_SUBDEF;
$em = $this->repoWorker->getEntityManager();
$workerRunningJob = $this->repoWorker->findOneBy([
'databoxId' => $event->getRecord()->getDataboxId(),
'recordId' => $event->getRecord()->getRecordId(),
'work' => $jeton,
'workOn' => $event->getSubdefName()
]);
$em->beginTransaction();
try {
$em->remove($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_METADATAS_QUEUE,

View File

@@ -21,7 +21,6 @@ use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class SubdefCreationWorker implements WorkerInterface
{
use ApplicationBoxAware;
use EntityManagerAware;
private $subdefGenerator;
@@ -84,7 +83,7 @@ class SubdefCreationWorker implements WorkerInterface
}
// tell that a file is in used to create subdef
$em = $this->getEntityManager();
$em = $this->repoWorker->getEntityManager();
$em->beginTransaction();
try {
@@ -107,7 +106,6 @@ class SubdefCreationWorker implements WorkerInterface
$em->rollback();
}
try {
$this->subdefGenerator->setLogger($this->logger);
$this->subdefGenerator->generateSubdefs($record, $wantedSubdef);
@@ -159,24 +157,6 @@ class SubdefCreationWorker implements WorkerInterface
}
}
}
} catch (\Exception $e) {
$payload = [
'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE,
'payload' => $payload
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_SUBDEF_QUEUE);
$em->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
// update elastic
$this->indexer->flushQueue();

View File

@@ -107,7 +107,6 @@ class WriteMetadatasWorker implements WorkerInterface
$em->rollback();
}
try {
$record = $databox->get_record($recordId);
$subdef = $record->get_subdef($payload['subdefName']);
@@ -247,24 +246,6 @@ class WriteMetadatasWorker implements WorkerInterface
));
}
} catch(\Exception $e) {
$payload = [
'message_type' => MessagePublisher::WRITE_METADATAS_TYPE,
'payload' => $payload
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_METADATAS_QUEUE);
$em->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
// tell that we have finished to work on this file
$em->beginTransaction();