PHRAS-3447_file-lock-in-workers_4.1

fix : exclusive lock for a worker to work on a subdef file
This commit is contained in:
jygaulier
2021-05-20 18:02:57 +02:00
parent efd335207b
commit 7c236a531a
7 changed files with 500 additions and 472 deletions

View File

@@ -6,7 +6,12 @@ use Doctrine\ORM\Mapping as ORM;
use Gedmo\Mapping\Annotation as Gedmo; use Gedmo\Mapping\Annotation as Gedmo;
/** /**
* @ORM\Table(name="WorkerRunningJob") * @ORM\Table(
* name="WorkerRunningJob",
* uniqueConstraints={
* @ORM\uniqueConstraint(name="flock", columns={"databox_id", "record_id", "flock"})
* }
* )
* @ORM\Entity(repositoryClass="Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository") * @ORM\Entity(repositoryClass="Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository")
*/ */
class WorkerRunningJob class WorkerRunningJob
@@ -14,7 +19,7 @@ class WorkerRunningJob
const FINISHED = 'finished'; const FINISHED = 'finished';
const RUNNING = 'running'; const RUNNING = 'running';
const ERROR = 'error'; const ERROR = 'error';
const INTERRUPT = 'interrupted manually'; const INTERRUPT = 'canceled';
const ATTEMPT = 'attempt '; const ATTEMPT = 'attempt ';
@@ -41,12 +46,17 @@ class WorkerRunningJob
private $recordId; private $recordId;
/** /**
* @ORM\Column(type="string", name="work", nullable=true) * @ORM\Column(type="string", length=64, name="flock", nullable=true)
*/
private $flock;
/**
* @ORM\Column(type="string", length=64, name="work", nullable=true)
*/ */
private $work; private $work;
/** /**
* @ORM\Column(type="string", name="work_on", nullable=true) * @ORM\Column(type="string", length=64, name="work_on", nullable=true)
*/ */
private $workOn; private $workOn;
@@ -138,6 +148,25 @@ class WorkerRunningJob
} }
/**
* @return mixed
*/
public function getFlock()
{
return $this->flock;
}
/**
* @param mixed $flock
* @return WorkerRunningJob
*/
public function setFlock($flock)
{
$this->flock = $flock;
return $this;
}
/** /**
* @param $work * @param $work
* @return $this * @return $this

View File

@@ -2,78 +2,150 @@
namespace Alchemy\Phrasea\Model\Repositories; namespace Alchemy\Phrasea\Model\Repositories;
use Alchemy\Phrasea\Core\PhraseaTokens;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use DateTime;
use Doctrine\ORM\EntityRepository; use Doctrine\ORM\EntityRepository;
use Doctrine\ORM\OptimisticLockException;
use Exception;
class WorkerRunningJobRepository extends EntityRepository class WorkerRunningJobRepository extends EntityRepository
{ {
/** /**
* return true if we can create subdef * Acquire a "lock" to create a subdef
* @param $subdefName * @param array $payload
* @param $recordId * @return WorkerRunningJob
* @param $databoxId * @throws OptimisticLockException
* @return bool
*/ */
public function canCreateSubdef($subdefName, $recordId, $databoxId) public function canCreateSubdef($payload)
{ {
$rsm = $this->createResultSetMappingBuilder('w'); return $this->getLock($payload, MessagePublisher::SUBDEF_CREATION_TYPE);
$rsm->addScalarResult('work_on','work_on');
$sql = 'SELECT work_on
FROM WorkerRunningJob
WHERE ((work = :write_meta) OR ((work = :make_subdef) AND work_on = :work_on) )
AND record_id = :record_id
AND databox_id = :databox_id
AND status = :status';
$query = $this->_em->createNativeQuery($sql, $rsm);
$query->setParameters([
'write_meta' => MessagePublisher::WRITE_METADATAS_TYPE,
'make_subdef'=> MessagePublisher::SUBDEF_CREATION_TYPE,
'work_on' => $subdefName,
'record_id' => $recordId,
'databox_id' => $databoxId,
'status' => WorkerRunningJob::RUNNING
]
);
return count($query->getResult()) == 0;
} }
/** /**
* return true if we can write meta * Acquire a "lock" to write meta into a subdef
* * @param array $payload
* @param $subdefName * @return WorkerRunningJob
* @param $recordId * @throws OptimisticLockException
* @param $databoxId
* @return bool
*/ */
public function canWriteMetadata($subdefName, $recordId, $databoxId) public function canWriteMetadata($payload)
{ {
$rsm = $this->createResultSetMappingBuilder('w'); return $this->getLock($payload, MessagePublisher::WRITE_METADATAS_TYPE);
$rsm->addScalarResult('work_on','work_on'); }
$sql = 'SELECT work_on /**
FROM WorkerRunningJob * Acquire a "lock" to work on a (sbid + rid + subdef) by inserting a row in WorkerRunningJob table.
WHERE ((work = :make_subdef) OR ((work = :write_meta) AND work_on = :work_on) ) * If it fails that means that another worker is already working on this file.
AND record_id = :record_id *
AND databox_id = :databox_id * nb : this work only for "first try" where workerJobId is null (=insert).
AND status = :status'; * for some retries (lock was acquired but worker job failed), the "count" of existing row is incremented (=update),
* so many workers "could" update the same row...
* __Luckily__, a rabbitmq message is consumed only once by a unique worker,
* and different workers (write-meta, subdef) have their own queues and their own rows on table.
* So working on a file always starts by a "first try", and concurency is not possible.
* todo : do not update, but insert a line for every try ?
*
* @param array $payload
* @param string $type
* @return WorkerRunningJob the entity (created or updated) or null if file is already locked (duplicate key)
* @throws OptimisticLockException
*/
private function getLock(array $payload, string $type)
{
if(!isset($payload['workerJobId'])) {
// insert a new row WorkerRunningJob : will fail if concurency
try {
$this->getEntityManager()->beginTransaction();
$date = new DateTime();
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setDataboxId($payload['databoxId'])
->setRecordId($payload['recordId'])
->setWork($type)
->setWorkOn($payload['subdefName'])
->setPayload([
'message_type' => $type,
'payload' => $payload
])
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
->setFlock($payload['subdefName']);
$this->getEntityManager()->persist($workerRunningJob);
$query = $this->_em->createNativeQuery($sql, $rsm); $this->getEntityManager()->flush();
$query->setParameters([ $this->getEntityManager()->commit();
'make_subdef'=> MessagePublisher::SUBDEF_CREATION_TYPE,
'write_meta' => MessagePublisher::WRITE_METADATAS_TYPE,
'work_on' => $subdefName,
'record_id' => $recordId,
'databox_id' => $databoxId,
'status' => WorkerRunningJob::RUNNING
]
);
return count($query->getResult()) == 0; return $workerRunningJob;
}
catch(Exception $e) {
// duplicate key ?
$this->getEntityManager()->rollback();
// for unpredicted other errors we can still ignore and return null (lock failed),
// because anyway the worker/rabbit retry-system will stop itself after n failures.
}
}
else {
// update an existing row : never fails (except bad id if row was purged)
try {
$this->getEntityManager()->beginTransaction();
$this->getEntityManager()->createQueryBuilder()
->update()
->set('info', ':info')->setParameter('info', WorkerRunningJob::ATTEMPT . $payload['count'])
->set('status', ':status')->setParameter('status', WorkerRunningJob::RUNNING)
->set('flock', ':flock')->setParameter('flock', $payload['subdefName'])
->where('id = :id')->setParameter('id', $payload['workerJobId'])
->getQuery()
->execute();
$this->getEntityManager()->flush();
$this->getEntityManager()->commit();
return $this->find($payload['workerJobId']);
}
catch (Exception $e) {
// really bad ? return null anyway
$this->getEntityManager()->rollback();
//$this->logger->error("Error persisting WorkerRunningJob !");
}
}
return null;
}
/**
* mark a job a "finished"
* nb : after a long job, connection may be lost so we reconnect.
* But sometimes (?) a first commit fails (due to reconnect ?), while the second one is ok.
* So here we try 2 times, just in case...
*
* @param WorkerRunningJob $workerRunningJob
* @param null $info
*/
public function markFinished(WorkerRunningJob $workerRunningJob, $info = null)
{
$this->reconnect();
for($try=1; $try<=2; $try++) {
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED)
->setFinished(new DateTime('now'))
->setStatus(WorkerRunningJob::FINISHED)
->setFlock(null);
if (!is_null($info)) {
$workerRunningJob->setInfo($info);
}
$this->getEntityManager()->beginTransaction();
$this->getEntityManager()->persist($workerRunningJob);
$this->getEntityManager()->flush();
$this->getEntityManager()->commit();
break;
}
catch (Exception $e) {
$this->getEntityManager()->rollback();
}
}
} }
/** /**
@@ -135,7 +207,8 @@ class WorkerRunningJobRepository extends EntityRepository
$this->_em->beginTransaction(); $this->_em->beginTransaction();
try { try {
$connection->executeUpdate($platform->getTruncateTableSQL('WorkerRunningJob')); $connection->executeUpdate($platform->getTruncateTableSQL('WorkerRunningJob'));
} catch (\Exception $e) { }
catch (Exception $e) {
$this->_em->rollback(); $this->_em->rollback();
} }
} }
@@ -146,7 +219,8 @@ class WorkerRunningJobRepository extends EntityRepository
try { try {
$this->_em->getConnection()->delete('WorkerRunningJob', ['status' => WorkerRunningJob::FINISHED]); $this->_em->getConnection()->delete('WorkerRunningJob', ['status' => WorkerRunningJob::FINISHED]);
$this->_em->commit(); $this->_em->commit();
} catch (\Exception $e) { }
catch (Exception $e) {
$this->_em->rollback(); $this->_em->rollback();
} }
} }

View File

@@ -157,10 +157,10 @@ class AdminConfigurationController extends Controller
/** @var WorkerRunningJob $workerRunningJob */ /** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $repoWorker->find($workerId); $workerRunningJob = $repoWorker->find($workerId);
$workerRunningJob $workerRunningJob->setStatus($request->request->get('status'));
->setStatus($request->request->get('status')) if($request->request->get('finished') == '1') {
->setFinished(new \DateTime('now')) $workerRunningJob->setFinished(new \DateTime('now'))->setFlock(null);
; }
$em = $repoWorker->getEntityManager(); $em = $repoWorker->getEntityManager();
$em->persist($workerRunningJob); $em->persist($workerRunningJob);

View File

@@ -119,12 +119,14 @@ class RecordSubscriber implements EventSubscriberInterface
$workerRunningJob $workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1)) ->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1))
->setStatus(WorkerRunningJob::ERROR) ->setStatus(WorkerRunningJob::ERROR)
->setFlock(null) // unlock !
; ;
$em->persist($workerRunningJob); $em->persist($workerRunningJob);
$em->flush(); $em->flush();
$em->commit(); $em->commit();
} catch (Exception $e) { }
catch (Exception $e) {
$em->rollback(); $em->rollback();
} }
} }
@@ -240,12 +242,14 @@ class RecordSubscriber implements EventSubscriberInterface
$workerRunningJob $workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1)) ->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1))
->setStatus(WorkerRunningJob::ERROR) ->setStatus(WorkerRunningJob::ERROR)
->setFlock(null) // unlock !
; ;
$em->persist($workerRunningJob); $em->persist($workerRunningJob);
$em->flush(); $em->flush();
$em->commit(); $em->commit();
} catch (Exception $e) { }
catch (Exception $e) {
$em->rollback(); $em->rollback();
} }
} }
@@ -257,7 +261,8 @@ class RecordSubscriber implements EventSubscriberInterface
$event->getWorkerMessage() $event->getWorkerMessage()
); );
} else { }
else {
$databoxId = $event->getRecord()->getDataboxId(); $databoxId = $event->getRecord()->getDataboxId();
$recordId = $event->getRecord()->getRecordId(); $recordId = $event->getRecord()->getRecordId();

View File

@@ -6,7 +6,6 @@ use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
use Alchemy\Phrasea\Core\PhraseaTokens; use Alchemy\Phrasea\Core\PhraseaTokens;
use Alchemy\Phrasea\Filesystem\FilesystemService; use Alchemy\Phrasea\Filesystem\FilesystemService;
use Alchemy\Phrasea\Media\SubdefGenerator; use Alchemy\Phrasea\Media\SubdefGenerator;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\WorkerManager\Event\StoryCreateCoverEvent; use Alchemy\Phrasea\WorkerManager\Event\StoryCreateCoverEvent;
@@ -14,8 +13,10 @@ use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionCreationFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent; use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Exception;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Throwable;
class SubdefCreationWorker implements WorkerInterface class SubdefCreationWorker implements WorkerInterface
{ {
@@ -53,109 +54,76 @@ class SubdefCreationWorker implements WorkerInterface
public function process(array $payload) public function process(array $payload)
{ {
if (isset($payload['recordId']) && isset($payload['databoxId'])) { if (!isset($payload['recordId']) || !isset($payload['databoxId']) || !isset($payload['subdefName'])) {
// bad payload
$this->logger->error(sprintf("%s (%s) : bad payload", __FILE__, __LINE__));
return;
}
$recordId = $payload['recordId']; $recordId = $payload['recordId'];
$databoxId = $payload['databoxId']; $databoxId = $payload['databoxId'];
$wantedSubdef = [$payload['subdefName']]; $subdefName = $payload['subdefName'];
$databox = $this->findDataboxById($databoxId); $databox = $this->findDataboxById($databoxId);
$record = $databox->get_record($recordId); $record = $databox->get_record($recordId);
if ($record->isStory()) {
return;
}
$oldLogger = $this->subdefGenerator->getLogger(); $oldLogger = $this->subdefGenerator->getLogger();
$message = [ // try to "lock" the file, will return null if already locked (key unicity)
// = insert a row with unqiue sbid + rid + subdefname (todo : replace the subdefname with a subdef_id ?)
$workerRunningJob = $this->repoWorker->canCreateSubdef($payload);
if (is_null($workerRunningJob)) {
// the file was locked by another worker, delay to retry later
$this->messagePublisher->publishDelayedMessage(
[
'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE, 'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE,
'payload' => $payload 'payload' => $payload
]; ],
MessagePublisher::SUBDEF_CREATION_TYPE
if (!$record->isStory()) { );
// check if there is a write meta running for the record or the same task running
$canCreateSubdef = $this->repoWorker->canCreateSubdef($payload['subdefName'], $recordId, $databoxId);
if (!$canCreateSubdef) {
// the file is in used to write meta
$this->messagePublisher->publishDelayedMessage($message, MessagePublisher::SUBDEF_CREATION_TYPE);
return ; return ;
} }
// tell that a file is in used to create subdef // here the entity is "locked" (unique key)
$em = $this->repoWorker->getEntityManager();
$this->repoWorker->reconnect();
if (isset($payload['workerJobId'])) {
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $this->repoWorker->find($payload['workerJobId']);
if ($workerRunningJob == null) {
$this->logger->error("Given workerJobId not found !");
return ;
}
$workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT . $payload['count'])
->setStatus(WorkerRunningJob::RUNNING);
$em->persist($workerRunningJob);
$em->flush();
} else {
$em->beginTransaction();
try {
$date = new \DateTime();
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setDataboxId($databoxId)
->setRecordId($recordId)
->setWork(MessagePublisher::SUBDEF_CREATION_TYPE)
->setWorkOn($payload['subdefName'])
->setPayload($message)
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
$this->subdefGenerator->setLogger($this->logger); $this->subdefGenerator->setLogger($this->logger);
try { try {
$this->subdefGenerator->generateSubdefs($record, $wantedSubdef); $this->subdefGenerator->generateSubdefs($record, [$subdefName]);
} catch (\Exception $e) { }
catch (Exception $e) {
$this->logger->error("Exception catched: " . $e->getMessage()); $this->logger->error("Exception catched: " . $e->getMessage());
}
} catch (\Throwable $e) { catch (Throwable $e) {
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$workerMessage = "Exception throwable catched when create subdef for the recordID: " .$recordId; $workerMessage = "Exception throwable catched when create subdef for the recordID: " .$recordId;
$this->logger->error($workerMessage); $this->logger->error($workerMessage);
/** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionCreationFailure() */
$this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent( $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent(
$record, $record,
$payload['subdefName'], $subdefName,
$workerMessage, $workerMessage,
$count, $count,
$workerRunningJob->getId() $workerRunningJob->getId()
)); ));
// the subscriber will "unlock" the row, no need to do it here
return ; return ;
} }
// begin to check if the subdef is successfully generated // begin to check if the subdef is successfully generated
$subdef = $record->getDatabox()->get_subdef_structure()->getSubdefGroup($record->getType())->getSubdef($payload['subdefName']); $subdef = $record->getDatabox()->get_subdef_structure()->getSubdefGroup($record->getType())->getSubdef($subdefName);
$filePathToCheck = null; $filePathToCheck = null;
if ($record->has_subdef($payload['subdefName']) ) { if ($record->has_subdef($subdefName) ) {
$filePathToCheck = $record->get_subdef($payload['subdefName'])->getRealPath(); $filePathToCheck = $record->get_subdef($subdefName)->getRealPath();
} }
$filePathToCheck = $this->filesystem->generateSubdefPathname($record, $subdef, $filePathToCheck); $filePathToCheck = $this->filesystem->generateSubdefPathname($record, $subdef, $filePathToCheck);
@@ -164,23 +132,31 @@ class SubdefCreationWorker implements WorkerInterface
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
/** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionCreationFailure() */
$this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent( $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent(
$record, $record,
$payload['subdefName'], $subdefName,
'Subdef generation failed !', 'Subdef generation failed !',
$count, $count,
$workerRunningJob->getId() $workerRunningJob->getId()
)); ));
$this->subdefGenerator->setLogger($oldLogger); $this->subdefGenerator->setLogger($oldLogger);
// the subscriber will "unlock" the row, no need to do it here
return ; return ;
} }
// checking ended // checking ended
// order to write meta for the subdef if needed // order to write meta for the subdef if needed
$this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( /** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionWritemeta() */
$this->dispatcher->dispatch(
WorkerEvents::SUBDEFINITION_WRITE_META,
new SubdefinitionWritemetaEvent(
$record, $record,
$payload['subdefName']) $subdefName
)
); );
$this->subdefGenerator->setLogger($oldLogger); $this->subdefGenerator->setLogger($oldLogger);
@@ -206,30 +182,9 @@ class SubdefCreationWorker implements WorkerInterface
$this->indexer->flushQueue(); $this->indexer->flushQueue();
// tell that we have finished to work on this file // tell that we have finished to work on this file
$this->repoWorker->reconnect(); $this->repoWorker->markFinished($workerRunningJob);
$em->getConnection()->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$workerRunningJob->setFinished(new \DateTime('now'));
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
try {
$em->getConnection()->beginTransaction();
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$this->messagePublisher->pushLog("rollback on recordID :" . $workerRunningJob->getRecordId());
$em->rollback();
}
} }
}
}
}
public static function checkIfFirstChild(\record_adapter $story, \record_adapter $record) public static function checkIfFirstChild(\record_adapter $story, \record_adapter $record)
{ {

View File

@@ -7,7 +7,6 @@ use Alchemy\Phrasea\Application\Helper\DispatcherAware;
use Alchemy\Phrasea\Application\Helper\EntityManagerAware; use Alchemy\Phrasea\Application\Helper\EntityManagerAware;
use Alchemy\Phrasea\Core\PhraseaTokens; use Alchemy\Phrasea\Core\PhraseaTokens;
use Alchemy\Phrasea\Metadata\TagFactory; use Alchemy\Phrasea\Metadata\TagFactory;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent; use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
@@ -57,110 +56,95 @@ class WriteMetadatasWorker implements WorkerInterface
public function process(array $payload) public function process(array $payload)
{ {
if (isset($payload['recordId']) && isset($payload['databoxId'])) { // mandatory args
$recordId = $payload['recordId']; if (!isset($payload['recordId']) || !isset($payload['databoxId']) || !isset($payload['subdefName'])) {
$databoxId = $payload['databoxId']; // bad payload
$this->logger->error(sprintf("%s (%s) : bad payload", __FILE__, __LINE__));
$MWG = isset($payload['MWG']) ? $payload['MWG'] : false;
$clearDoc = isset($payload['clearDoc']) ? $payload['clearDoc'] : false;
$databox = $this->findDataboxById($databoxId);
// check if there is a make subdef running for the record or the same task running
$canWriteMeta = $this->repoWorker->canWriteMetadata($payload['subdefName'], $recordId, $databoxId);
$message = [
'message_type' => MessagePublisher::WRITE_METADATAS_TYPE,
'payload' => $payload
];
if (!$canWriteMeta) {
// the file is in used to generate subdef
$this->messagePublisher->publishDelayedMessage($message, MessagePublisher::WRITE_METADATAS_TYPE);
return; return;
} }
$recordId = $payload['recordId'];
$databoxId = $payload['databoxId'];
$subdefName = $payload['subdefName'];
$MWG = $payload['MWG'] ?? false;
$clearDoc = $payload['clearDoc'] ?? false;
$databox = $this->findDataboxById($databoxId);
// try to "lock" the file, will return null if already locked (key unicity)
// = insert a row with unqiue sbid + rid + subdefname (todo : replace the subdefname with a subdef_id ?)
$workerRunningJob = $this->repoWorker->canWriteMetadata($payload);
if (is_null($workerRunningJob)) {
// the file was locked by another worker, delay to retry later
$this->messagePublisher->publishDelayedMessage(
[
'message_type' => MessagePublisher::WRITE_METADATAS_TYPE,
'payload' => $payload
],
MessagePublisher::WRITE_METADATAS_TYPE
);
return ;
}
// here the entity is "locked" (unique key)
$record = $databox->get_record($recordId); $record = $databox->get_record($recordId);
if ($record->getMimeType() == 'image/svg+xml') { if ($record->getMimeType() == 'image/svg+xml') {
$this->logger->error("Can't write meta on svg file!"); $this->logger->error("Can't write meta on svg file!");
// tell that we have finished to work on this file ("unlock")
$this->repoWorker->markFinished($workerRunningJob, "Can't write meta on svg file!");
return; return;
} }
// tell that a file is in used to create subdef
$em = $this->getEntityManager();
$this->repoWorker->reconnect(); $this->repoWorker->reconnect();
if (isset($payload['workerJobId'])) {
/** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $this->repoWorker->find($payload['workerJobId']);
if ($workerRunningJob == null) {
$this->logger->error("Given workerJobId not found !");
return ;
}
$workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT . $payload['count'])
->setStatus(WorkerRunningJob::RUNNING)
;
$em->persist($workerRunningJob);
$em->flush();
} else {
$em->beginTransaction();
try { try {
$date = new DateTime(); $subdef = $record->get_subdef($subdefName);
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setDataboxId($databoxId)
->setRecordId($recordId)
->setWork(MessagePublisher::WRITE_METADATAS_TYPE)
->setWorkOn($payload['subdefName'])
->setPayload($message)
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (Exception $e) {
$em->rollback();
$this->logger->error("Error persisting WorkerRunningJob !");
return ;
} }
} catch (Exception $e) {
$workerMessage = "Exception catched when try to get subdef " .$subdefName. " from DB for the recordID: " .$recordId;
try {
$subdef = $record->get_subdef($payload['subdefName']);
} catch (Exception $e) {
$workerMessage = "Exception catched when try to get subdef " .$payload['subdefName']. " from DB for the recordID: " .$recordId;
$this->logger->error($workerMessage); $this->logger->error($workerMessage);
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
/** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionWritemeta() */
$this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent(
$record, $record,
$payload['subdefName'], $subdefName,
SubdefinitionWritemetaEvent::FAILED, SubdefinitionWritemetaEvent::FAILED,
$workerMessage, $workerMessage,
$count, $count,
$workerRunningJob->getId() $workerRunningJob->getId()
)); ));
// the subscriber will "unlock" the row, no need to do it here
return ; return ;
} }
if ($subdef->is_physically_present()) { if (!$subdef->is_physically_present()) {
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
/** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionWritemeta() */
$this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent(
$record,
$subdefName,
SubdefinitionWritemetaEvent::FAILED,
'Subdef is not physically present!',
$count,
$workerRunningJob->getId()
));
// the subscriber will "unlock" the row, no need to do it here
return ;
}
// here we can try to do the job
$metadata = new MetadataBag(); $metadata = new MetadataBag();
// add Uuid in metadatabag // add Uuid in metadatabag
@@ -236,7 +220,8 @@ class WriteMetadatasWorker implements WorkerInterface
$value = new Mono($value); $value = new Mono($value);
} }
} }
} catch(Exception $e) { }
catch(Exception $e) {
// the field is not set in the record, erase it // the field is not set in the record, erase it
if ($fieldStructure->is_multi()) { if ($fieldStructure->is_multi()) {
$value = new Multi(array('')); $value = new Multi(array(''));
@@ -266,53 +251,32 @@ class WriteMetadatasWorker implements WorkerInterface
$this->writer->write($subdef->getRealPath(), $metadata); $this->writer->write($subdef->getRealPath(), $metadata);
$this->messagePublisher->pushLog(sprintf('meta written for sbasid=%1$d - recordid=%2$d (%3$s)', $databox->get_sbas_id(), $recordId, $subdef->get_name() )); $this->messagePublisher->pushLog(sprintf('meta written for sbasid=%1$d - recordid=%2$d (%3$s)', $databox->get_sbas_id(), $recordId, $subdef->get_name() ));
} catch (Exception $e) { }
catch (Exception $e) {
$workerMessage = sprintf('meta NOT written for sbasid=%1$d - recordid=%2$d (%3$s) because "%4$s"', $databox->get_sbas_id(), $recordId, $subdef->get_name() , $e->getMessage()); $workerMessage = sprintf('meta NOT written for sbasid=%1$d - recordid=%2$d (%3$s) because "%4$s"', $databox->get_sbas_id(), $recordId, $subdef->get_name() , $e->getMessage());
$this->logger->error($workerMessage); $this->logger->error($workerMessage);
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
/** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionWritemeta() */
$this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent(
$record, $record,
$payload['subdefName'], $subdefName,
SubdefinitionWritemetaEvent::FAILED, SubdefinitionWritemetaEvent::FAILED,
$workerMessage, $workerMessage,
$count, $count,
$workerRunningJob->getId() $workerRunningJob->getId()
)); ));
// the subscriber will "unlock" the row, no need to do it here
return ;
} }
// mark write metas finished // mark write metas finished
$this->updateJeton($record); $this->updateJeton($record);
} else {
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent(
$record,
$payload['subdefName'],
SubdefinitionWritemetaEvent::FAILED,
'Subdef is not physically present!',
$count,
$workerRunningJob->getId()
));
}
// tell that we have finished to work on this file // tell that we have finished to work on this file
$this->repoWorker->reconnect(); $this->repoWorker->markFinished($workerRunningJob);
$em->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$workerRunningJob->setFinished(new DateTime('now'));
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (Exception $e) {
$em->rollback();
}
}
} }
private function removeNulChar($value) private function removeNulChar($value)

View File

@@ -153,7 +153,8 @@
type: "POST", type: "POST",
dataType: 'json', dataType: 'json',
data: { data: {
status: '{{ constant("\\Alchemy\\Phrasea\\Model\\Entities\\WorkerRunningJob::INTERRUPT") }}' status: '{{ constant("\\Alchemy\\Phrasea\\Model\\Entities\\WorkerRunningJob::INTERRUPT") }}',
finished: '1' // manual interrupt also means "finished", it must update the date and unlock the row
}, },
url: "/admin/worker-manager/"+ workerId +"/change-status", url: "/admin/worker-manager/"+ workerId +"/change-status",
success: function (data) { success: function (data) {