diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php index 1bde5b1275..0e22c656b7 100644 --- a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php +++ b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php @@ -12,50 +12,157 @@ use Exception; class WorkerRunningJobRepository extends EntityRepository { /** - * Acquire a "lock" to create a subdef + * Check and declare that we want to create a subdef from a document + * + * - if it's possible : return WorkerRunningJob entity (created or updated) for the job + * - if not (needed resource(s) already in use by other job(s)) : return null + * + * rules : + * - if someone else is already writing the document, we can't create a subdef from it + * - if someone else is already writing this subdef, we can't re-create it + * * @param array $payload - * @return WorkerRunningJob + * @return WorkerRunningJob | null * @throws OptimisticLockException */ - public function canCreateSubdef($payload) + public function canCreateSubdef(array $payload) { - return $this->getLock($payload, MessagePublisher::SUBDEF_CREATION_TYPE); + $databoxId = $payload['databoxId']; + $recordId = $payload['recordId']; + $subdefName = $payload['subdefName']; + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf('canCreateSubdef for %s.%s.%s ?', $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + + // first protect sql by a critical section + if( !( $recordMutex = $this->getRecordMutex($databoxId, $recordId)) ) { + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + 'getRecordMutex() failed' + ), FILE_APPEND|LOCK_EX); + + return null; + } + + // here we can do sql atomically + $workerRunningJob = null; + + // check the rules + /** @var WorkerRunningJob $r */ + $r = $this->createQueryBuilder('w') + ->select('w') + ->where('w.status = :status')->setParameter('status', WorkerRunningJob::RUNNING) + ->andWhere('w.databoxId = :databox_id')->setParameter('databox_id', $databoxId) + ->andWhere('w.recordId = :record_id')->setParameter('record_id', $recordId) + ->andWhere('w.workOn = \'document\' OR w.work_on = :work_on')->setParameter(':work_on', $subdefName) + ->andWhere('w.work = :work_1 OR w.work = :work_2') + ->setParameter('work_1', MessagePublisher::WRITE_METADATAS_TYPE) + ->setParameter('work_2', MessagePublisher::SUBDEF_CREATION_TYPE) + ->setMaxResults(1) + ->getQuery() + ->getFirstResult() + ; + + if(!$r) { + // no conflict, create (or update) the job + $workerRunningJob = $this->creteOrUpdateJob($payload, MessagePublisher::SUBDEF_CREATION_TYPE); + } + else { + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("job %s already running on %s.%s.%s", $r->getId(), $r->getDataboxId(), $r->getRecordId(), $r->getWorkOn()) + ), FILE_APPEND|LOCK_EX); + } + + // end of critical section + $this->releaseRecordMutex($recordMutex); + + return $workerRunningJob; } /** - * Acquire a "lock" to write meta into a subdef + * Check and declare that we want to write meta into a subdef + * + * - if it's possible : return WorkerRunningJob entity (created or updated) for the job + * - if not (needed resource(s) already in use by other job(s)) : return null + * + * rule : + * - if someone is already working on the file, we can't write + * * @param array $payload - * @return WorkerRunningJob + * @return WorkerRunningJob | null * @throws OptimisticLockException */ - public function canWriteMetadata($payload) + public function canWriteMetadata(array $payload) { - return $this->getLock($payload, MessagePublisher::WRITE_METADATAS_TYPE); + $databoxId = $payload['databoxId']; + $recordId = $payload['recordId']; + $subdefName = $payload['subdefName']; + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf('canWriteMetadata for %s.%s.%s ?', $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + + // first protect sql by a critical section + if( !( $recordMutex = $this->getRecordMutex($databoxId, $recordId)) ) { + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + 'getRecordMutex() failed' + ), FILE_APPEND|LOCK_EX); + + return null; + } + + // here we can do sql atomically + $workerRunningJob = null; + + // check the rule + $r = $this->createQueryBuilder('w') + ->select('w') + ->where('w.status = :status')->setParameter('status', WorkerRunningJob::RUNNING) + ->andWhere('w.databox_id = :databox_id')->setParameter('databox_id', $databoxId) + ->andWhere('w.record_id = :record_id')->setParameter('record_id', $recordId) + ->andWhere('w.work_on = :work_on')->setParameter(':work_on', $subdefName) + ->andWhere('w.work = :work_1 OR w.work = :work_2') + ->setParameter('work_1', MessagePublisher::WRITE_METADATAS_TYPE) + ->setParameter('work_2', MessagePublisher::SUBDEF_CREATION_TYPE) + ->setMaxResults(1) + ->getQuery() + ->getFirstResult() + ; + + if(!$r) { + // no conflict, create (or update) the job + $workerRunningJob = $this->creteOrUpdateJob($payload, MessagePublisher::WRITE_METADATAS_TYPE); + } + else { + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("job %s already running on %s.%s.%s", $r->getId(), $r->getDataboxId(), $r->getRecordId(), $r->getWorkOn()) + ), FILE_APPEND|LOCK_EX); + + } + + // end of critical section + $this->releaseRecordMutex($recordMutex); + + return $workerRunningJob; } /** - * Acquire a "lock" to work on a (sbid + rid + subdef) by inserting a row in WorkerRunningJob table. - * If it fails that means that another worker is already working on this file. - * - * nb : this work only for "first try" where workerJobId is null (=insert). - * for some retries (lock was acquired but worker job failed), the "count" of existing row is incremented (=update), - * so many workers "could" update the same row... - * __Luckily__, a rabbitmq message is consumed only once by a unique worker, - * and different workers (write-meta, subdef) have their own queues and their own rows on table. - * So working on a file always starts by a "first try", and concurency is not possible. - * todo : do not update, but insert a line for every try ? - * * @param array $payload * @param string $type - * @return WorkerRunningJob the entity (created or updated) or null if file is already locked (duplicate key) + * @return WorkerRunningJob|null * @throws OptimisticLockException */ - private function getLock(array $payload, string $type) + private function creteOrUpdateJob(array $payload, string $type) { - if(!isset($payload['workerJobId'])) { - // insert a new row WorkerRunningJob : will fail if concurency + // for unpredicted sql errors we can still ignore and return null (lock failed), + // because anyway the worker/rabbit retry system will stop itself after n failures. + + if (!isset($payload['workerJobId'])) { + + // new job + + $this->getEntityManager()->beginTransaction(); try { - $this->getEntityManager()->beginTransaction(); $date = new DateTime(); $workerRunningJob = new WorkerRunningJob(); $workerRunningJob @@ -64,54 +171,120 @@ class WorkerRunningJobRepository extends EntityRepository ->setWork($type) ->setWorkOn($payload['subdefName']) ->setPayload([ - 'message_type' => $type, - 'payload' => $payload + 'message_type' => $type, + 'payload' => $payload ]) ->setPublished($date->setTimestamp($payload['published'])) ->setStatus(WorkerRunningJob::RUNNING) - ->setFlock($payload['subdefName']); - $this->getEntityManager()->persist($workerRunningJob); + ; + $this->getEntityManager()->persist($workerRunningJob); $this->getEntityManager()->flush(); $this->getEntityManager()->commit(); - return $workerRunningJob; + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("created job %s for %s.%s.%s", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName']) + ), FILE_APPEND|LOCK_EX); + } - catch(Exception $e) { - // duplicate key ? + catch (Exception $e) { + // bad case : we return false anyway $this->getEntityManager()->rollback(); - // for unpredicted other errors we can still ignore and return null (lock failed), - // because anyway the worker/rabbit retry-system will stop itself after n failures. + // $this->logger->error("Error persisting WorkerRunningJob !"); + $workerRunningJob = null; + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! error creating job %s for %s.%s.%s", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName']) + ), FILE_APPEND|LOCK_EX); + } } else { - // update an existing row : never fails (except bad id if row was purged) - try { - $this->getEntityManager()->beginTransaction(); - $this->getEntityManager()->createQueryBuilder() - ->update() - ->set('info', ':info')->setParameter('info', WorkerRunningJob::ATTEMPT . $payload['count']) - ->set('status', ':status')->setParameter('status', WorkerRunningJob::RUNNING) - ->set('flock', ':flock')->setParameter('flock', $payload['subdefName']) - ->where('id = :id')->setParameter('id', $payload['workerJobId']) - ->getQuery() - ->execute(); + // retry from delayed + + /** @var WorkerRunningJob $workerRunningJob */ + if(!is_null($workerRunningJob = $this->find($payload['workerJobId']))) { + // update retry count (value is already incremented in payload) + $workerRunningJob + ->setInfo(WorkerRunningJob::ATTEMPT . $payload['count']) + ->setStatus(WorkerRunningJob::RUNNING); + + $this->getEntityManager()->persist($workerRunningJob); $this->getEntityManager()->flush(); - $this->getEntityManager()->commit(); - return $this->find($payload['workerJobId']); + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("incremented job %s for %s.%s.%s (count=%s)", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName'], $payload['count']) + ), FILE_APPEND|LOCK_EX); + } - catch (Exception $e) { - // really bad ? return null anyway - $this->getEntityManager()->rollback(); - //$this->logger->error("Error persisting WorkerRunningJob !"); + else { + // the row has been deleted by purge ? + // bad case : we return false anyway + + // $this->logger->error("Given workerJobId not found !"); + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! error incrementing job %s for %s.%s.%s (count=%s)", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName'], $payload['count']) + ), FILE_APPEND|LOCK_EX); + } } - return null; + return $workerRunningJob; } + /** + * Acquire a "mutex" to protect critical section on a (sbid + rid) by trying to insert a row in WorkerRunningJob table. + * If it fails that means that another critical section is already running on this record. + * + * @param int $databoxId + * @param int $recordId + * @return WorkerRunningJob|null // the created mutex entity, or null if mutex already exists + */ + private function getRecordMutex(int $databoxId, int $recordId) + { + try { + $this->getEntityManager()->beginTransaction(); + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setDataboxId($databoxId) + ->setRecordId($recordId) + ->setPublished(new DateTime()) + ->setStatus('') + ->setFlock("_mutex_"); + $this->getEntityManager()->persist($workerRunningJob); + + $this->getEntityManager()->flush(); + $this->getEntityManager()->commit(); + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("getMutex for %s.%s OK (%s)", $databoxId, $recordId, $workerRunningJob->getId()) + ), FILE_APPEND|LOCK_EX); + + return $workerRunningJob; + } + catch(Exception $e) { + // duplicate key ? + $this->getEntityManager()->rollback(); + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("getMutex for %s.%s FAILED", $databoxId, $recordId) + ), FILE_APPEND|LOCK_EX); + + return null; + } + } + + private function releaseRecordMutex(WorkerRunningJob $workerRunningJob) + { + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("releaseMutex (%s) for %s.%s", $workerRunningJob->getId(), $workerRunningJob->getDataboxId(), $workerRunningJob->getRecordId()) + ), FILE_APPEND|LOCK_EX); + + $this->getEntityManager()->remove($workerRunningJob); + } + + /** * mark a job a "finished" @@ -129,8 +302,7 @@ class WorkerRunningJobRepository extends EntityRepository try { $workerRunningJob->setStatus(WorkerRunningJob::FINISHED) ->setFinished(new DateTime('now')) - ->setStatus(WorkerRunningJob::FINISHED) - ->setFlock(null); + ->setStatus(WorkerRunningJob::FINISHED); if (!is_null($info)) { $workerRunningJob->setInfo($info); } @@ -140,9 +312,17 @@ class WorkerRunningJobRepository extends EntityRepository $this->getEntityManager()->flush(); $this->getEntityManager()->commit(); + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("job %s (%d) finished for %s.%s.%s", $workerRunningJob->getWork(), $workerRunningJob->getId(), $workerRunningJob->getDataboxId(), $workerRunningJob->getRecordId(), $workerRunningJob->getWorkOn()) + ), FILE_APPEND|LOCK_EX); + break; } catch (Exception $e) { + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! failed to mark job %s (%d) as finished (try %s/2) for %s.%s.%s", $workerRunningJob->getWork(), $workerRunningJob->getId(), $try, $workerRunningJob->getDataboxId(), $workerRunningJob->getRecordId(), $workerRunningJob->getWorkOn()) + ), FILE_APPEND|LOCK_EX); + $this->getEntityManager()->rollback(); } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php index 3551548383..0e9c4b4df9 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php @@ -242,7 +242,6 @@ class RecordSubscriber implements EventSubscriberInterface $workerRunningJob ->setInfo(WorkerRunningJob::ATTEMPT. ($event->getCount() - 1)) ->setStatus(WorkerRunningJob::ERROR) - ->setFlock(null) // unlock ! ; $em->persist($workerRunningJob); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php index d487509486..7701e1796b 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php @@ -54,6 +54,7 @@ class SubdefCreationWorker implements WorkerInterface public function process(array $payload) { + // mandatory args if (!isset($payload['recordId']) || !isset($payload['databoxId']) || !isset($payload['subdefName'])) { // bad payload $this->logger->error(sprintf("%s (%s) : bad payload", __FILE__, __LINE__)); @@ -64,6 +65,10 @@ class SubdefCreationWorker implements WorkerInterface $databoxId = $payload['databoxId']; $subdefName = $payload['subdefName']; + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("process SubdefCreation for %s.%s.%s", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + $databox = $this->findDataboxById($databoxId); $record = $databox->get_record($recordId); @@ -73,12 +78,11 @@ class SubdefCreationWorker implements WorkerInterface $oldLogger = $this->subdefGenerator->getLogger(); - // try to "lock" the file, will return null if already locked (key unicity) - // = insert a row with unqiue sbid + rid + subdefname (todo : replace the subdefname with a subdef_id ?) + // try to "lock" the file, will return null if already locked $workerRunningJob = $this->repoWorker->canCreateSubdef($payload); if (is_null($workerRunningJob)) { - // the file was locked by another worker, delay to retry later + // the file is written by another worker, delay to retry later $this->messagePublisher->publishDelayedMessage( [ 'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE, @@ -86,20 +90,41 @@ class SubdefCreationWorker implements WorkerInterface ], MessagePublisher::SUBDEF_CREATION_TYPE ); + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("cannot CreateSubdef for %s.%s.%s, delayed", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); return ; } - // here the entity is "locked" (unique key) + // here we can work + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("ready to CreateSubdef for %s.%s.%s", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); $this->subdefGenerator->setLogger($this->logger); try { $this->subdefGenerator->generateSubdefs($record, [$subdefName]); + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("subdef generated for %s.%s.%s (?)", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + } catch (Exception $e) { $this->logger->error("Exception catched: " . $e->getMessage()); + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! subdef generation failed (ignored) for %s.%s.%s : %s", $databoxId, $recordId, $subdefName, $e->getMessage()) + ), FILE_APPEND|LOCK_EX); + } catch (Throwable $e) { + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("subdef generation failed, retry delayed for %s.%s.%s : %s", $databoxId, $recordId, $subdefName, $e->getMessage()) + ), FILE_APPEND|LOCK_EX); + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; $workerMessage = "Exception throwable catched when create subdef for the recordID: " .$recordId; @@ -118,6 +143,10 @@ class SubdefCreationWorker implements WorkerInterface return ; } + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("checking subdef file for %s.%s.%s", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + // begin to check if the subdef is successfully generated $subdef = $record->getDatabox()->get_subdef_structure()->getSubdefGroup($record->getType())->getSubdef($subdefName); $filePathToCheck = null; @@ -130,6 +159,10 @@ class SubdefCreationWorker implements WorkerInterface if (!$this->filesystem->exists($filePathToCheck)) { + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! subdef file missing, retry delayed for %s.%s.%s", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; /** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionCreationFailure() */ @@ -149,6 +182,10 @@ class SubdefCreationWorker implements WorkerInterface // checking ended + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("subdef file exists, order to write meta for %s.%s.%s", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + // order to write meta for the subdef if needed /** @uses \Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber::onSubdefinitionWritemeta() */ $this->dispatcher->dispatch( @@ -181,7 +218,7 @@ class SubdefCreationWorker implements WorkerInterface // update elastic $this->indexer->flushQueue(); - // tell that we have finished to work on this file + // tell that we have finished to work on this file (=unlock) $this->repoWorker->markFinished($workerRunningJob); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php index 4a3a998fe5..f6122c2e6f 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -63,20 +63,23 @@ class WriteMetadatasWorker implements WorkerInterface return; } - $recordId = $payload['recordId']; $databoxId = $payload['databoxId']; + $recordId = $payload['recordId']; $subdefName = $payload['subdefName']; + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("process WriteMeta for %s.%s.%s", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + $MWG = $payload['MWG'] ?? false; $clearDoc = $payload['clearDoc'] ?? false; $databox = $this->findDataboxById($databoxId); - // try to "lock" the file, will return null if already locked (key unicity) - // = insert a row with unqiue sbid + rid + subdefname (todo : replace the subdefname with a subdef_id ?) + // try to "lock" the file, will return null if already locked $workerRunningJob = $this->repoWorker->canWriteMetadata($payload); if (is_null($workerRunningJob)) { - // the file was locked by another worker, delay to retry later + // the file is written by another worker, delay to retry later $this->messagePublisher->publishDelayedMessage( [ 'message_type' => MessagePublisher::WRITE_METADATAS_TYPE, @@ -84,10 +87,18 @@ class WriteMetadatasWorker implements WorkerInterface ], MessagePublisher::WRITE_METADATAS_TYPE ); + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("cannot WriteMeta for %s.%s.%s, delayed", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + return ; } - // here the entity is "locked" (unique key) + // here we can work + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("ready to WriteMeta for %s.%s.%s", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); $record = $databox->get_record($recordId); @@ -122,7 +133,7 @@ class WriteMetadatasWorker implements WorkerInterface $workerRunningJob->getId() )); - // the subscriber will "unlock" the row, no need to do it here + // the subscriber will mark the job as errored, no need to do it here return ; } @@ -139,7 +150,7 @@ class WriteMetadatasWorker implements WorkerInterface $workerRunningJob->getId() )); - // the subscriber will "unlock" the row, no need to do it here + // the subscriber will mark the job as errored, no need to do it here return ; } @@ -251,8 +262,17 @@ class WriteMetadatasWorker implements WorkerInterface $this->writer->write($subdef->getRealPath(), $metadata); $this->messagePublisher->pushLog(sprintf('meta written for sbasid=%1$d - recordid=%2$d (%3$s)', $databox->get_sbas_id(), $recordId, $subdef->get_name() )); + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("meta written in %s.%s.%s", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + } catch (Exception $e) { + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("meta NOT written in %s.%s.%s", $databoxId, $recordId, $subdefName) + ), FILE_APPEND|LOCK_EX); + $workerMessage = sprintf('meta NOT written for sbasid=%1$d - recordid=%2$d (%3$s) because "%4$s"', $databox->get_sbas_id(), $recordId, $subdef->get_name() , $e->getMessage()); $this->logger->error($workerMessage); @@ -275,7 +295,7 @@ class WriteMetadatasWorker implements WorkerInterface // mark write metas finished $this->updateJeton($record); - // tell that we have finished to work on this file + // tell that we have finished to work on this file (=unlock) $this->repoWorker->markFinished($workerRunningJob); }