one line for one build job

This commit is contained in:
aynsix
2020-07-27 14:00:27 +03:00
parent f85ea02434
commit 4585f34996
12 changed files with 258 additions and 86 deletions

View File

@@ -2,7 +2,6 @@
namespace Alchemy\Phrasea\Model\Entities;
use Alchemy\Phrasea\Core\PhraseaTokens;
use Doctrine\ORM\Mapping as ORM;
use Gedmo\Mapping\Annotation as Gedmo;
@@ -14,7 +13,8 @@ class WorkerRunningJob
{
const FINISHED = 'finished';
const RUNNING = 'running';
const ERROR = 'error attempt ';
const ERROR = 'error';
const ATTEMPT = 'attempt ';
const TYPE_PULL = 'uploader pull';
const TYPE_PUSH = 'uploader push';
@@ -58,6 +58,11 @@ class WorkerRunningJob
*/
private $assetId;
/**
* @ORM\Column(type="string", name="info", nullable=true)
*/
private $info;
/**
* @Gedmo\Timestampable(on="create")
* @ORM\Column(type="datetime")
@@ -202,6 +207,25 @@ class WorkerRunningJob
return $this->assetId;
}
/**
* @param $info
* @return $this
*/
public function setInfo($info)
{
$this->info = $info;
return $this;
}
/**
* @return mixed
*/
public function getInfo()
{
return $this->info;
}
/**
* @return \DateTime
*/

View File

@@ -10,12 +10,14 @@ class AssetsCreationRecordFailureEvent extends SfEvent
private $payload;
private $workerMessage;
private $count;
private $workerJobId;
public function __construct($payload, $workerMessage = '', $count = 2)
public function __construct($payload, $workerMessage = '', $count = 2, $workerJobId = 0)
{
$this->payload = $payload;
$this->workerMessage = $workerMessage;
$this->count = $count;
$this->workerJobId = $workerJobId;
}
public function getPayload()
@@ -32,4 +34,9 @@ class AssetsCreationRecordFailureEvent extends SfEvent
{
return $this->count;
}
public function getWorkerJobId()
{
return $this->workerJobId;
}
}

View File

@@ -12,8 +12,9 @@ class PopulateIndexFailureEvent extends SfEvent
private $databoxId;
private $workerMessage;
private $count;
private $workerJobId;
public function __construct($host, $port, $indexName, $databoxId, $workerMessage = '', $count = 2)
public function __construct($host, $port, $indexName, $databoxId, $workerMessage = '', $count = 2, $workerJobId = 0)
{
$this->host = $host;
$this->port = $port;
@@ -21,6 +22,7 @@ class PopulateIndexFailureEvent extends SfEvent
$this->databoxId = $databoxId;
$this->workerMessage = $workerMessage;
$this->count = $count;
$this->workerJobId = $workerJobId;
}
public function getHost()
@@ -52,4 +54,9 @@ class PopulateIndexFailureEvent extends SfEvent
{
return $this->count;
}
public function getWorkerJobId()
{
return $this->workerJobId;
}
}

View File

@@ -66,8 +66,8 @@ class QueueWorkerServiceProvider implements PluginProviderInterface
new RecordSubscriber($app, new LazyLocator($app, 'phraseanet.appbox'))
);
$dispatcher->addSubscriber(new ExportSubscriber($app['alchemy_worker.message.publisher']));
$dispatcher->addSubscriber(new AssetsIngestSubscriber($app['alchemy_worker.message.publisher']));
$dispatcher->addSubscriber(new SearchengineSubscriber($app['alchemy_worker.message.publisher']));
$dispatcher->addSubscriber(new AssetsIngestSubscriber($app['alchemy_worker.message.publisher'], new LazyLocator($app, 'repo.worker-running-job')));
$dispatcher->addSubscriber(new SearchengineSubscriber($app['alchemy_worker.message.publisher'], new LazyLocator($app, 'repo.worker-running-job')));
$dispatcher->addSubscriber(new WebhookSubscriber($app['alchemy_worker.message.publisher']));
$dispatcher->addSubscriber(new SubtitleSubscriber(new LazyLocator($app, 'repo.worker-job'), $app['alchemy_worker.message.publisher']));

View File

@@ -3,6 +3,7 @@
namespace Alchemy\Phrasea\WorkerManager\Subscriber;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreateEvent;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationRecordFailureEvent;
@@ -15,9 +16,13 @@ class AssetsIngestSubscriber implements EventSubscriberInterface
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
public function __construct(MessagePublisher $messagePublisher)
/** @var callable */
private $repoWorkerJobLocator;
public function __construct(MessagePublisher $messagePublisher, callable $repoWorkerJobLocator)
{
$this->messagePublisher = $messagePublisher;
$this->messagePublisher = $messagePublisher;
$this->repoWorkerJobLocator = $repoWorkerJobLocator;
}
public function onAssetsCreate(AssetsCreateEvent $event)
@@ -28,7 +33,6 @@ class AssetsIngestSubscriber implements EventSubscriberInterface
'payload' => array_merge($event->getData(), ['type' => WorkerRunningJob::TYPE_PUSH])
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE);
}
@@ -49,11 +53,36 @@ class AssetsIngestSubscriber implements EventSubscriberInterface
public function onAssetsCreationRecordFailure(AssetsCreationRecordFailureEvent $event)
{
$repoWorker = $this->getRepoWorkerJob();
$payload = [
'message_type' => MessagePublisher::CREATE_RECORD_TYPE,
'payload' => $event->getPayload()
];
$em = $repoWorker->getEntityManager();
// check connection an re-connect if needed
$repoWorker->reconnect();
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $repoWorker->find($event->getWorkerJobId());
if ($workerRunningJob) {
$em->beginTransaction();
try {
// count-1 for the number of finished attempt
$workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1))
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_CREATE_RECORD_QUEUE,
@@ -70,4 +99,14 @@ class AssetsIngestSubscriber implements EventSubscriberInterface
WorkerEvents::ASSETS_CREATION_RECORD_FAILURE => 'onAssetsCreationRecordFailure'
];
}
/**
* @return WorkerRunningJobRepository
*/
private function getRepoWorkerJob()
{
$callable = $this->repoWorkerJobLocator;
return $callable();
}
}

View File

@@ -9,7 +9,6 @@ use Alchemy\Phrasea\Core\Event\Record\MetadataChangedEvent;
use Alchemy\Phrasea\Core\Event\Record\RecordEvent;
use Alchemy\Phrasea\Core\Event\Record\RecordEvents;
use Alchemy\Phrasea\Core\Event\Record\SubdefinitionCreateEvent;
use Alchemy\Phrasea\Core\PhraseaTokens;
use Alchemy\Phrasea\Databox\Subdef\MediaSubdefRepository;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
@@ -97,7 +96,8 @@ class RecordSubscriber implements EventSubscriberInterface
'recordId' => $event->getRecord()->getRecordId(),
'databoxId' => $event->getRecord()->getDataboxId(),
'subdefName' => $event->getSubdefName(),
'status' => ''
'status' => '',
'workerJobId' => $event->getWorkerJobId()
]
];
@@ -106,13 +106,17 @@ class RecordSubscriber implements EventSubscriberInterface
// check connection an re-connect if needed
$repoWorker->reconnect();
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $repoWorker->find($event->getWorkerJobId());
if ($workerRunningJob) {
$em->beginTransaction();
try {
// count-1 for the number of finished attempt
$workerRunningJob->setStatus(WorkerRunningJob::ERROR. ($event->getCount() - 1));
$workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1))
->setStatus(WorkerRunningJob::ERROR)
;
$em->persist($workerRunningJob);
$em->flush();
@@ -212,7 +216,8 @@ class RecordSubscriber implements EventSubscriberInterface
'payload' => [
'recordId' => $event->getRecord()->getRecordId(),
'databoxId' => $event->getRecord()->getDataboxId(),
'subdefName' => $event->getSubdefName()
'subdefName' => $event->getSubdefName(),
'workerJobId' => $event->getWorkerJobId()
]
];
@@ -229,13 +234,17 @@ class RecordSubscriber implements EventSubscriberInterface
// check connection an re-connect if needed
$repoWorker->reconnect();
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $repoWorker->find($event->getWorkerJobId());
if ($workerRunningJob) {
$em->beginTransaction();
try {
// count-1 for the number of finished attempt
$workerRunningJob->setStatus(WorkerRunningJob::ERROR. ($event->getCount() - 1));
$workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1))
->setStatus(WorkerRunningJob::ERROR)
;
$em->persist($workerRunningJob);
$em->flush();

View File

@@ -2,6 +2,8 @@
namespace Alchemy\Phrasea\WorkerManager\Subscriber;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexEvent;
use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
@@ -13,9 +15,13 @@ class SearchengineSubscriber implements EventSubscriberInterface
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
public function __construct(MessagePublisher $messagePublisher)
/** @var callable */
private $repoWorkerJobLocator;
public function __construct(MessagePublisher $messagePublisher, callable $repoWorkerJobLocator)
{
$this->messagePublisher = $messagePublisher;
$this->messagePublisher = $messagePublisher;
$this->repoWorkerJobLocator = $repoWorkerJobLocator;
}
public function onPopulateIndex(PopulateIndexEvent $event)
@@ -40,16 +46,43 @@ class SearchengineSubscriber implements EventSubscriberInterface
public function onPopulateIndexFailure(PopulateIndexFailureEvent $event)
{
$repoWorker = $this->getRepoWorkerJob();
$payload = [
'message_type' => MessagePublisher::POPULATE_INDEX_TYPE,
'payload' => [
'host' => $event->getHost(),
'port' => $event->getPort(),
'indexName' => $event->getIndexName(),
'databoxId' => $event->getDataboxId(),
'host' => $event->getHost(),
'port' => $event->getPort(),
'indexName' => $event->getIndexName(),
'databoxId' => $event->getDataboxId(),
'workerJobId' => $event->getWorkerJobId()
]
];
$em = $repoWorker->getEntityManager();
// check connection an re-connect if needed
$repoWorker->reconnect();
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $repoWorker->find($event->getWorkerJobId());
if ($workerRunningJob) {
$em->beginTransaction();
try {
// count-1 for the number of finished attempt
$workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1))
->setStatus(WorkerRunningJob::ERROR)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_POPULATE_INDEX_QUEUE,
@@ -65,5 +98,15 @@ class SearchengineSubscriber implements EventSubscriberInterface
WorkerEvents::POPULATE_INDEX_FAILURE => 'onPopulateIndexFailure'
];
}
/**
* @return WorkerRunningJobRepository
*/
private function getRepoWorkerJob()
{
$callable = $this->repoWorkerJobLocator;
return $callable();
}
}

View File

@@ -92,15 +92,10 @@ class CreateRecordWorker implements WorkerInterface
$this->dispatch(WorkerEvents::ASSETS_CREATION_RECORD_FAILURE, new AssetsCreationRecordFailureEvent(
$payload,
'Error when downloading assets!',
$count
$count,
$workerRunningJob->getId()
));
if ($workerRunningJob != null) {
$em->remove($workerRunningJob);
$em->flush();
}
return;
}
@@ -115,15 +110,10 @@ class CreateRecordWorker implements WorkerInterface
$this->dispatch(WorkerEvents::ASSETS_CREATION_RECORD_FAILURE, new AssetsCreationRecordFailureEvent(
$payload,
$workerMessage,
$count
$count,
$workerRunningJob->getId()
));
if ($workerRunningJob != null) {
$em->remove($workerRunningJob);
$em->flush();
}
return;
}
@@ -236,11 +226,8 @@ class CreateRecordWorker implements WorkerInterface
$this->getBorderManager()->process($lazaretSession, $packageFile, $callback);
$recordId = null;
if ($elementCreated instanceof \record_adapter) {
$this->dispatch(PhraseaEvents::RECORD_UPLOAD, new RecordEdit($elementCreated));
$recordId = $elementCreated->getRecordId();
} else {
$this->messagePublisher->pushLog(sprintf('The file was moved to the quarantine: %s', json_encode($reasons)));
/** @var LazaretFile $elementCreated */

View File

@@ -36,26 +36,47 @@ class PopulateIndexWorker implements WorkerInterface
public function process(array $payload)
{
$em = $this->repoWorker->getEntityManager();
$em->beginTransaction();
$date = new \DateTime();
try {
$workerRunningJob = new WorkerRunningJob();
if (isset($payload['workerJobId'])) {
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $this->repoWorker->find($payload['workerJobId']);
if ($workerRunningJob == null) {
$this->messagePublisher->pushLog("Given workerJobId not found !", "error");
return ;
}
$workerRunningJob
->setWork(MessagePublisher::POPULATE_INDEX_TYPE)
->setWorkOn($payload['indexName'])
->setDataboxId($payload['databoxId'])
->setPublished($date->setTimestamp($payload['published']))
->setInfo(WorkerRunningJob::ATTEMPT . $payload['count'])
->setStatus(WorkerRunningJob::RUNNING)
;
$em->persist($workerRunningJob);
$em->flush();
} else {
$em->beginTransaction();
$date = new \DateTime();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
try {
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setWork(MessagePublisher::POPULATE_INDEX_TYPE)
->setWorkOn($payload['indexName'])
->setDataboxId($payload['databoxId'])
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
/** @var ElasticsearchOptions $options */
@@ -82,7 +103,8 @@ class PopulateIndexWorker implements WorkerInterface
$payload['indexName'],
$payload['databoxId'],
$workerMessage,
$count
$count,
$workerRunningJob->getId()
));
} else {
$databox = $this->findDataboxById($databoxId);
@@ -97,13 +119,6 @@ class PopulateIndexWorker implements WorkerInterface
$r['memory']/1048576
));
} catch(\Exception $e) {
if ($workerRunningJob != null) {
$em->remove($workerRunningJob);
$em->flush();
}
$workerMessage = sprintf("Error on indexing : %s ", $e->getMessage());
$this->messagePublisher->pushLog($workerMessage);
@@ -116,7 +131,8 @@ class PopulateIndexWorker implements WorkerInterface
$payload['indexName'],
$payload['databoxId'],
$workerMessage,
$count
$count,
$workerRunningJob->getId()
));
}
}

View File

@@ -81,26 +81,46 @@ class SubdefCreationWorker implements WorkerInterface
// tell that a file is in used to create subdef
$em = $this->repoWorker->getEntityManager();
$this->repoWorker->reconnect();
$em->beginTransaction();
try {
$date = new \DateTime();
$workerRunningJob = new WorkerRunningJob();
if (isset($payload['workerJobId'])) {
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $this->repoWorker->find($payload['workerJobId']);
if ($workerRunningJob == null) {
$this->logger->error("Given workerJobId not found !");
return ;
}
$workerRunningJob
->setDataboxId($databoxId)
->setRecordId($recordId)
->setWork(MessagePublisher::SUBDEF_CREATION_TYPE)
->setWorkOn($payload['subdefName'])
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
;
->setInfo(WorkerRunningJob::ATTEMPT . $payload['count'])
->setStatus(WorkerRunningJob::RUNNING);
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
} else {
$em->beginTransaction();
try {
$date = new \DateTime();
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setDataboxId($databoxId)
->setRecordId($recordId)
->setWork(MessagePublisher::SUBDEF_CREATION_TYPE)
->setWorkOn($payload['subdefName'])
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
$this->subdefGenerator->setLogger($this->logger);

View File

@@ -18,7 +18,6 @@ use PHPExiftool\Driver\Metadata\MetadataBag;
use PHPExiftool\Driver\Tag;
use PHPExiftool\Driver\Value\Mono;
use PHPExiftool\Driver\Value\Multi;
use PHPExiftool\Exception\ExceptionInterface as PHPExiftoolException;
use PHPExiftool\Exception\TagUnknown;
use PHPExiftool\Writer;
use Psr\Log\LoggerInterface;
@@ -80,26 +79,47 @@ class WriteMetadatasWorker implements WorkerInterface
// tell that a file is in used to create subdef
$em = $this->getEntityManager();
$this->repoWorker->reconnect();
$em->beginTransaction();
try {
$date = new \DateTime();
$workerRunningJob = new WorkerRunningJob();
if (isset($payload['workerJobId'])) {
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $this->repoWorker->find($payload['workerJobId']);
if ($workerRunningJob == null) {
$this->logger->error("Given workerJobId not found !");
return ;
}
$workerRunningJob
->setDataboxId($databoxId)
->setRecordId($recordId)
->setWork(MessagePublisher::WRITE_METADATAS_TYPE)
->setWorkOn($payload['subdefName'])
->setPublished($date->setTimestamp($payload['published']))
->setInfo(WorkerRunningJob::ATTEMPT . $payload['count'])
->setStatus(WorkerRunningJob::RUNNING)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
$em->flush();
} else {
$em->beginTransaction();
try {
$date = new \DateTime();
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setDataboxId($databoxId)
->setRecordId($recordId)
->setWork(MessagePublisher::WRITE_METADATAS_TYPE)
->setWorkOn($payload['subdefName'])
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
$record = $databox->get_record($recordId);