diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php index be89a3589a..193f5d42be 100644 --- a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php +++ b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php @@ -5,9 +5,10 @@ namespace Alchemy\Phrasea\Model\Repositories; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; use DateTime; +use Doctrine\DBAL\Driver\Connection; use Doctrine\ORM\EntityRepository; -use Doctrine\ORM\OptimisticLockException; use Exception; +use PDO; class WorkerRunningJobRepository extends EntityRepository { @@ -15,7 +16,7 @@ class WorkerRunningJobRepository extends EntityRepository /** * Check and declare that we want to create a subdef from a document * - * - if it's possible : return WorkerRunningJob entity (created or updated) for the job + * - if it's possible : return WorkerRunningJobId (created or updated) for the job * - if not (needed resource(s) already in use by other job(s)) : return null * * rules : @@ -23,89 +24,68 @@ class WorkerRunningJobRepository extends EntityRepository * - if someone else is already writing this subdef, we can't re-create it * * @param array $payload - * @return WorkerRunningJob | null - * @throws OptimisticLockException + * @return int | null workerRunningJobId */ public function canCreateSubdef(array $payload) { - $databoxId = $payload['databoxId']; - $recordId = $payload['recordId']; - $subdefName = $payload['subdefName']; + $this->reconnect(); + $cnx = $this->getEntityManager()->getConnection()->getWrappedConnection(); - file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf('canCreateSubdef for %s.%s.%s ?', $databoxId, $recordId, $subdefName) - ), FILE_APPEND | LOCK_EX); + $sqlclause = "(`work_on` = " . $cnx->quote('document') . " OR `work_on` = " . $cnx->quote($payload['subdefName']) . ")"; - // first protect sql by a critical section - if( !( $recordMutex = $this->getRecordMutex($databoxId, $recordId)) ) { - file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - 'getRecordMutex() failed' - ), FILE_APPEND | LOCK_EX); - - return null; - } - - // here we can do sql atomically - $workerRunningJob = null; - - // check the rules - /** @var WorkerRunningJob[] $r */ - $r = $this->createQueryBuilder('w') - ->select('w') - ->where('w.status = :status')->setParameter('status', WorkerRunningJob::RUNNING) - ->andWhere('w.databoxId = :databox_id')->setParameter('databox_id', $databoxId) - ->andWhere('w.recordId = :record_id')->setParameter('record_id', $recordId) - ->andWhere('w.workOn = \'document\' OR w.workOn = :work_on')->setParameter(':work_on', $subdefName) - ->andWhere('w.work = :work_1 OR w.work = :work_2') - ->setParameter('work_1', MessagePublisher::WRITE_METADATAS_TYPE) - ->setParameter('work_2', MessagePublisher::SUBDEF_CREATION_TYPE) - ->setMaxResults(1) - ->getQuery() - ->getResult() - ; - - if(count($r) == 0) { - // no conflict, create (or update) the job - $workerRunningJob = $this->creteOrUpdateJob($payload, MessagePublisher::SUBDEF_CREATION_TYPE); - } - else { - $r = $r[0]; - file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("job %s already running on %s.%s.%s", $r->getId(), $r->getDataboxId(), $r->getRecordId(), $r->getWorkOn()) - ), FILE_APPEND | LOCK_EX); - } - - // end of critical section - $this->releaseRecordMutex($databoxId, $recordId); - - return $workerRunningJob; + return $this->canDoJob($payload, MessagePublisher::SUBDEF_CREATION_TYPE, $sqlclause); } + /** * Check and declare that we want to write meta into a subdef * - * - if it's possible : return WorkerRunningJob entity (created or updated) for the job + * - if it's possible : return WorkerRunningJobId (created or updated) for the job * - if not (needed resource(s) already in use by other job(s)) : return null * * rule : * - if someone is already working on the file, we can't write * * @param array $payload - * @return WorkerRunningJob | null - * @throws OptimisticLockException + * @return int | null workerRunningJobId */ public function canWriteMetadata(array $payload) { + $this->reconnect(); + $cnx = $this->getEntityManager()->getConnection()->getWrappedConnection(); + + $sqlclause = "(`work_on` = " . $cnx->quote($payload['subdefName']) . ")"; + + return $this->canDoJob($payload, MessagePublisher::WRITE_METADATAS_TYPE, $sqlclause); + } + + /** + * Check and declare that we want to do jon for a subdef + * + * - if it's possible : return WorkerRunningJobId (created or updated) for the job + * - if not (needed resource(s) already in use by other job(s)) : return null + * + * The rule depends on caller / jobType (canCreateSubdef or canWriteMetadata) + * + * @param array $payload + * @param string $jobType // MessagePublisher::WRITE_METADATAS_TYPE | MessagePublisher::SUBDEF_CREATION_TYPE + * @param string $sqlClause + * @return int | null workerRunningJobId + */ + private function canDoJob(array $payload, string $jobType, string $sqlClause) + { + $workerRunningJobId = null; // returned + $databoxId = $payload['databoxId']; $recordId = $payload['recordId']; $subdefName = $payload['subdefName']; file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf('canWriteMetadata for %s.%s.%s ?', $databoxId, $recordId, $subdefName) + sprintf('canDoJob("%s") for %s.%s.%s ?', $jobType, $databoxId, $recordId, $subdefName) ), FILE_APPEND | LOCK_EX); // first protect sql by a critical section - if( !( $recordMutex = $this->getRecordMutex($databoxId, $recordId)) ) { + if( !( $recordMutexId = $this->getRecordMutex($databoxId, $recordId)) ) { file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, 'getRecordMutex() failed' ), FILE_APPEND | LOCK_EX); @@ -114,127 +94,143 @@ class WorkerRunningJobRepository extends EntityRepository } // here we can do sql atomically - $workerRunningJob = null; - // check the rule - /** @var WorkerRunningJob[] $r */ - $r = $this->createQueryBuilder('w') - ->select('w') - ->where('w.status = :status')->setParameter('status', WorkerRunningJob::RUNNING) - ->andWhere('w.databoxId = :databox_id')->setParameter('databox_id', $databoxId) - ->andWhere('w.recordId = :record_id')->setParameter('record_id', $recordId) - ->andWhere('w.workOn = :work_on')->setParameter(':work_on', $subdefName) - ->andWhere('w.work = :work_1 OR w.work = :work_2') - ->setParameter('work_1', MessagePublisher::WRITE_METADATAS_TYPE) - ->setParameter('work_2', MessagePublisher::SUBDEF_CREATION_TYPE) - ->setMaxResults(1) - ->getQuery() - ->getResult() - ; + $this->reconnect(); + $cnx = $this->getEntityManager()->getConnection()->getWrappedConnection(); - if(count($r) == 0) { - // no conflict, create (or update) the job - $workerRunningJob = $this->creteOrUpdateJob($payload, MessagePublisher::WRITE_METADATAS_TYPE); + if($cnx->beginTransaction() === true) { + + try { + $row = null; + $sql = "SELECT * FROM `WorkerRunningJob` WHERE\n" + . " `status` = " . $cnx->quote(WorkerRunningJob::RUNNING) . " AND\n" + . " `databox_id` = " . $cnx->quote($databoxId, PDO::PARAM_INT) . " AND\n" + . " `record_id` = " . $cnx->quote($recordId, PDO::PARAM_INT) . " AND\n" + . " " . $sqlClause . " AND\n" + . " (`work` = " . $cnx->quote(MessagePublisher::WRITE_METADATAS_TYPE) . " OR `work` = " . $cnx->quote(MessagePublisher::SUBDEF_CREATION_TYPE) . ")\n" + . " LIMIT 1"; + + $stmt = $cnx->prepare($sql); + if ($stmt->execute() === true) { + $row = $stmt->fetch(PDO::FETCH_ASSOC); + } + else { + file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! FAILED select on %s.%s.%s because (%s)", $databoxId, $recordId, $subdefName, $stmt->errorCode()) + ), FILE_APPEND | LOCK_EX); + } + $stmt->closeCursor(); + + if(!$row) { + // no job running : create or update (may return false) if error + $workerRunningJobId = $this->creteOrUpdateJob($cnx, $payload, $jobType); + } + else { + file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("job %s (id=%s) already running on %s.%s.%s", $row['work'], $row['id'], $databoxId, $recordId, $subdefName) + ), FILE_APPEND | LOCK_EX); + } + + $cnx->commit(); + } + catch (Exception $e) { + $cnx->rollBack(); + + file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! FAILED in transaction to select/create on %s.%s.%s because (%s)", $databoxId, $recordId, $subdefName, $e->getMessage()) + ), FILE_APPEND | LOCK_EX); + } } else { - $r = $r[0]; - file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("job %s already running on %s.%s.%s", $r->getId(), $r->getDataboxId(), $r->getRecordId(), $r->getWorkOn()) + file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! FAILED to create transaction to select/create on %s.%s.%s", $databoxId, $recordId, $subdefName) ), FILE_APPEND | LOCK_EX); - } // end of critical section - $this->releaseRecordMutex($databoxId, $recordId); + $this->releaseMutex($recordMutexId); - return $workerRunningJob; + return $workerRunningJobId; } /** + * @param Connection $cnx * @param array $payload * @param string $type - * @return WorkerRunningJob|null - * @throws OptimisticLockException + * @return int|null // workerJobId */ - private function creteOrUpdateJob(array $payload, string $type) + private function creteOrUpdateJob(Connection $cnx, array $payload, string $type) { // for unpredicted sql errors we can still ignore and return null (lock failed), // because anyway the worker/rabbit retry system will stop itself after n failures. - if (!isset($payload['workerJobId'])) { + $workerJobId = null; - // new job + try { + if (!isset($payload['workerJobId'])) { - $this->getEntityManager()->beginTransaction(); - try { - $date = new DateTime(); - $workerRunningJob = new WorkerRunningJob(); - $workerRunningJob - ->setDataboxId($payload['databoxId']) - ->setRecordId($payload['recordId']) - ->setWork($type) - ->setWorkOn($payload['subdefName']) - ->setPayload([ - 'message_type' => $type, - 'payload' => $payload - ]) - ->setPublished($date->setTimestamp($payload['published'])) - ->setStatus(WorkerRunningJob::RUNNING) - ; + // new job - $this->getEntityManager()->persist($workerRunningJob); - $this->getEntityManager()->flush(); - $this->getEntityManager()->commit(); + $datePublished = new DateTime(); + $datePublished->setTimestamp($payload['published']); + $datePublished->format('Y-m-d H:i:s'); - file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("created job %s for %s.%s.%s", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName']) - ), FILE_APPEND | LOCK_EX); + $pl = json_encode([ + 'message_type' => $type, + 'payload' => $payload + ]); - } - catch (Exception $e) { - // bad case : we return false anyway - $this->getEntityManager()->rollback(); - // $this->logger->error("Error persisting WorkerRunningJob !"); - $workerRunningJob = null; - - file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("!!! error creating job %s for %s.%s.%s", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName']) - ), FILE_APPEND | LOCK_EX); - - } - } - else { - - // retry from delayed - - /** @var WorkerRunningJob $workerRunningJob */ - if(!is_null($workerRunningJob = $this->find($payload['workerJobId']))) { - // update retry count (value is already incremented in payload) - $workerRunningJob - ->setInfo(WorkerRunningJob::ATTEMPT . $payload['count']) - ->setStatus(WorkerRunningJob::RUNNING); - - $this->getEntityManager()->persist($workerRunningJob); - $this->getEntityManager()->flush(); - - file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("incremented job %s for %s.%s.%s (count=%s)", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName'], $payload['count']) - ), FILE_APPEND | LOCK_EX); + $sql = "INSERT INTO `WorkerRunningJob` SET \n" + . " `databox_id` = " . $cnx->quote($payload['databoxId'], PDO::PARAM_INT) . ",\n" + . " `record_id` = " . $cnx->quote($payload['recordId'], PDO::PARAM_INT) . ",\n" + . " `work` = " . $cnx->quote($type) . ",\n" + . " `work_on` = " . $cnx->quote($payload['subdefName']) . ",\n" + . " `payload` = " . $cnx->quote($pl) . ",\n" + . " `published` = " . $cnx->quote($datePublished->format('Y-m-d H:i:s')) . ",\n" + . " `status` = " . $cnx->quote(WorkerRunningJob::RUNNING); + if ($cnx->exec($sql) === 1) { + // went well, the row is inserted + $workerJobId = $cnx->lastInsertId(); + file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("created job %s (id=%s) for %s.%s.%s", $type, $workerJobId, $payload['databoxId'], $payload['recordId'], $payload['subdefName']) + ), FILE_APPEND | LOCK_EX); + } + else { + // row not inserted ? + throw new Exception("Failed to insert into WorkerRunningJob"); + } } else { - // the row has been deleted by purge ? - // bad case : we return false anyway + // retry from delayed : update retry count (value is already incremented in payload) + $sql = "UPDATE `WorkerRunningJob` SET \n" + . " `info` = " . $cnx->quote(WorkerRunningJob::ATTEMPT . $payload['count']) . ",\n" + . " `status` = " . $cnx->quote(WorkerRunningJob::RUNNING) + . " WHERE `id` = " . $cnx->quote($payload['workerJobId'], PDO::PARAM_INT); - // $this->logger->error("Given workerJobId not found !"); - file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("!!! error incrementing job %s for %s.%s.%s (count=%s)", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName'], $payload['count']) - ), FILE_APPEND | LOCK_EX); + if ($cnx->exec($sql) === 1) { + // went well, the row is updated + $workerJobId = $payload['workerJobId']; + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("updated job %s (id=%s) for %s.%s.%s", $type, $workerJobId, $payload['databoxId'], $payload['recordId'], $payload['subdefName']) + ), FILE_APPEND | LOCK_EX); + } + else { + // row not inserted ? + throw new Exception(sprintf("Failed to update WorkerRunningJob with id=%s", $payload['workerJobId'])); + } } } + catch (Exception $e) { + // bad case : we return null anyway - return $workerRunningJob; + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! FAILED creating/updating job %s for %s.%s.%s because (%s)", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName'], $e->getMessage()) + ), FILE_APPEND | LOCK_EX); + } + + return $workerJobId; } /** @@ -244,24 +240,58 @@ class WorkerRunningJobRepository extends EntityRepository * when many q-messages are consumed at the same time, many process may ask the same mutex immediatly, many fails. * so we retry after a short random delay which gives a good chance to ok, and avoids unnecessary "delayed" q-messages. * + * + * !!! IMPORTANT !!! + * we CAN'T use the entity manager to insert, because if this fails with exception (possible case), + * the EM will be closed and we will have no other chance for anothe tryout. + * So we play plain sql everywhere here. + * * @param int $databoxId * @param int $recordId * @return bool */ private function getRecordMutex(int $databoxId, int $recordId) { - $e = null; // exception if failed + // First we delete old unreleased mutex (which should never happen). + // A mutex is supposed to last only for a very short time (select + insert-or-update). + // 60s is considered as a dead mutex + // + try { + $this->reconnect(); + $cnx = $this->getEntityManager()->getConnection()->getWrappedConnection(); + + $sql = "DELETE FROM `WorkerRunningJob` WHERE\n" + . " `databox_id` = " . $cnx->quote($databoxId) . " AND\n" + . " `record_id` = " . $cnx->quote($recordId) . " AND\n" + . " `flock` = " . $cnx->quote('_mutex_') . " AND\n" + . " TIMESTAMPDIFF(SECOND, `published`, NOW()) > 60"; + + if ($cnx->exec($sql) > 0) { + // affected rows is 1 since by definition this key is unique + file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! old mutex for %s.%s deleted (!!! SHOULD NOT HAPPEN !!!)", $databoxId, $recordId) + ), FILE_APPEND | LOCK_EX); + } + } + catch(Exception $e) { + // here something went very wrong, like sql death + file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! FAILED while trying to delete old mutex for %s.%s (!!! SHOULD NOT HAPPEN !!!)", $databoxId, $recordId) + ), FILE_APPEND | LOCK_EX); + + return false; // we could choose to continue, but if we end up here... better to stop + } + + // here we create a mutex, which CAN fail if another process did the same right before us + // + $e = null; // last exception if failed + for($tryout=1; $tryout<=3; $tryout++) { try { $this->reconnect(); - /** - * !!! IMPORTANT !!! - * we CAN'T use the entity manager to insert, because if this fails with exception (possible case), - * the EM will be closed and we will have no other chance for anothe tryout. - * So we do plain sql here. - */ - $cnx = $this->getEntityManager()->getConnection(); + $cnx = $this->getEntityManager()->getConnection()->getWrappedConnection(); + $sql = "INSERT INTO WorkerRunningJob (`databox_id`, `record_id`, `published`, `status`, `flock`) VALUES (\n" . $cnx->quote($databoxId) . ",\n" . $cnx->quote($recordId) . ",\n" @@ -270,59 +300,88 @@ class WorkerRunningJobRepository extends EntityRepository . $cnx->quote('_mutex_') . "\n" . ")"; - $cnx->exec($sql); + if(($a = $cnx->exec($sql)) === 1) { - file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("getMutex tryout %s for %s.%s OK", $tryout, $databoxId, $recordId) - ), FILE_APPEND | LOCK_EX); + file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("getMutex tryout %s for %s.%s OK", $tryout, $databoxId, $recordId) + ), FILE_APPEND | LOCK_EX); - return true; + return $cnx->lastInsertId(); + } + + throw new Exception(sprintf("inserting mutex should return 1 row affected, got %s", $a)); } catch (Exception $e) { /** * with plain sql, EM should still be opened here */ - // duplicate key ? + // duplicate key is possible, we retry on any kind of error if($tryout < 3) { - //sleep(1); - $rnd = rand(10, 50) * 10; + $rnd = rand(10, 50) * 10; // 100 ms ... 500 ms with 10 ms steps file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, sprintf("getMutex retry in %d msec", $rnd) ), FILE_APPEND | LOCK_EX); - usleep($rnd * 1000); // 100 ms ... 500 ms with 10 ms steps + usleep($rnd * 1000); } } } file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("getMutex tryout %s for %s.%s FAILED because (%s)", $tryout, $databoxId, $recordId, $e->getMessage()) + sprintf("!!! FAILED getMutex for %s.%s because (%s)", $databoxId, $recordId, $e->getMessage()) ), FILE_APPEND | LOCK_EX); return false; } - private function releaseRecordMutex(int $databoxId, int $recordId) + /** + * Release a mutex by deleting it. + * This should not fail, but -as for creation-, we will try N times + * + * @param int $recordMutexId + */ + private function releaseMutex(int $recordMutexId) { - file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("releaseMutex for %s.%s", $databoxId, $recordId) + $e = null; // last exception if failed + for($tryout=1; $tryout<=3; $tryout++) { + try { + $this->reconnect(); + + /** + * because we did not create an entity for mutex row, + * we must use plain sql also to delete it + */ + $cnx = $this->getEntityManager()->getConnection()->getWrappedConnection(); + $sql = "DELETE FROM WorkerRunningJob WHERE `id` = " . $cnx->quote($recordMutexId); + + $cnx->exec($sql); + + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("releaseMutex (id=%s) DONE", $recordMutexId) + ), FILE_APPEND | LOCK_EX); + + return; + } + catch (Exception $e) { + if($tryout < 3) { + $rnd = rand(10, 50) * 10; // 100 ms ... 500 ms with 10 ms steps + + file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("releaseMutex (id=%s) retry in %d msec", $recordMutexId, $rnd) + ), FILE_APPEND | LOCK_EX); + + usleep($rnd * 1000); + } + } + } + + // Here we were not able to release a mutex (bad) + // The last chance will be later, when old mutex (60s) is deleted + file_put_contents(dirname(__FILE__) . '/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(true)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! FAILED release mutex (id=%s) because (%s)", $recordMutexId, $e->getMessage()) ), FILE_APPEND | LOCK_EX); - - $this->reconnect(); - - /** - * because we did not create an entity for mutex row, - * we must use plain sql also to delete it - */ - $cnx = $this->getEntityManager()->getConnection(); - $sql = "DELETE FROM WorkerRunningJob\n" - . " WHERE `databox_id` = " . $cnx->quote($databoxId) - . " AND `record_id` = " . $cnx->quote($recordId) - . " AND `flock` = " . $cnx->quote("_mutex_"); - - $cnx->exec($sql); } /** @@ -331,40 +390,46 @@ class WorkerRunningJobRepository extends EntityRepository * But sometimes (?) a first commit fails (due to reconnect ?), while the second one is ok. * So here we try 2 times, just in case... * - * @param WorkerRunningJob $workerRunningJob + * @param int $workerRunningJobId * @param null $info */ - public function markFinished(WorkerRunningJob $workerRunningJob, $info = null) + public function markFinished(int $workerRunningJobId, $info = null) { - $this->reconnect(); - for($try=1; $try<=2; $try++) { + for($tryout=1; $tryout<=2; $tryout++) { try { - $workerRunningJob->setStatus(WorkerRunningJob::FINISHED) - ->setFinished(new DateTime('now')) - ->setStatus(WorkerRunningJob::FINISHED); - if (!is_null($info)) { - $workerRunningJob->setInfo($info); + $this->reconnect(); + $cnx = $this->getEntityManager()->getConnection()->getWrappedConnection(); + $sql = "UPDATE `WorkerRunningJob` SET \n" + . " `finished` = NOW(),\n" + . " `status` = " . $cnx->quote(WorkerRunningJob::FINISHED); + if(!is_null($info)) { + $sql .= ",\n `info` = " . $cnx->quote(WorkerRunningJob::FINISHED); } + $sql .= "\n WHERE `id` = " . $cnx->quote($workerRunningJobId, PDO::PARAM_INT); - $this->getEntityManager()->beginTransaction(); - $this->getEntityManager()->persist($workerRunningJob); - $this->getEntityManager()->flush(); - $this->getEntityManager()->commit(); + if(($a = $cnx->exec($sql) )=== 1) { + // ok + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("job (id=%d) marked as finished", $workerRunningJobId) + ), FILE_APPEND | LOCK_EX); - file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("job %s (%d) finished for %s.%s.%s", $workerRunningJob->getWork(), $workerRunningJob->getId(), $workerRunningJob->getDataboxId(), $workerRunningJob->getRecordId(), $workerRunningJob->getWorkOn()) - ), FILE_APPEND | LOCK_EX); - - break; + return; + } + // not ok ? retry + throw new Exception(sprintf("updating WorkerRunningJob should return 1 row affected, got %s", $a)); } catch (Exception $e) { file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf("!!! failed to mark job %s (%d) as finished (try %s/2) for %s.%s.%s", $workerRunningJob->getWork(), $workerRunningJob->getId(), $try, $workerRunningJob->getDataboxId(), $workerRunningJob->getRecordId(), $workerRunningJob->getWorkOn()) + sprintf("failed to mark job (id=%d) as finished (tryout %s, retry in 1 sec) because (%s)", $workerRunningJobId, $tryout, $e->getMessage()) ), FILE_APPEND | LOCK_EX); - - $this->getEntityManager()->rollback(); + if($tryout < 2) { + sleep(1); // retry in 1 sec + } } } + file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, + sprintf("!!! FAILED to mark job (id=%d) as finished", $workerRunningJobId) + ), FILE_APPEND | LOCK_EX); } /** diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php index dd9a16af6c..c6a18c3e64 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php @@ -79,9 +79,9 @@ class SubdefCreationWorker implements WorkerInterface $oldLogger = $this->subdefGenerator->getLogger(); // try to "lock" the file, will return null if already locked - $workerRunningJob = $this->repoWorker->canCreateSubdef($payload); + $workerRunningJobId = $this->repoWorker->canCreateSubdef($payload); - if (is_null($workerRunningJob)) { + if (is_null($workerRunningJobId)) { // the file is written by another worker, delay to retry later $this->messagePublisher->publishDelayedMessage( [ @@ -136,7 +136,7 @@ class SubdefCreationWorker implements WorkerInterface $subdefName, $workerMessage, $count, - $workerRunningJob->getId() + $workerRunningJobId )); // the subscriber will "unlock" the row, no need to do it here @@ -171,7 +171,7 @@ class SubdefCreationWorker implements WorkerInterface $subdefName, 'Subdef generation failed !', $count, - $workerRunningJob->getId() + $workerRunningJobId )); $this->subdefGenerator->setLogger($oldLogger); @@ -219,7 +219,7 @@ class SubdefCreationWorker implements WorkerInterface $this->indexer->flushQueue(); // tell that we have finished to work on this file (=unlock) - $this->repoWorker->markFinished($workerRunningJob); + $this->repoWorker->markFinished($workerRunningJobId); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php index 7b8347cb18..e3fc7b2650 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -76,9 +76,9 @@ class WriteMetadatasWorker implements WorkerInterface $databox = $this->findDataboxById($databoxId); // try to "lock" the file, will return null if already locked - $workerRunningJob = $this->repoWorker->canWriteMetadata($payload); + $workerRunningJobId = $this->repoWorker->canWriteMetadata($payload); - if (is_null($workerRunningJob)) { + if (is_null($workerRunningJobId)) { // the file is written by another worker, delay to retry later $this->messagePublisher->publishDelayedMessage( [ @@ -103,7 +103,7 @@ class WriteMetadatasWorker implements WorkerInterface $record = $databox->get_record($recordId); file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf(" --!!!-- recordid = %s", $record->getRecordId()) + sprintf(" - recordid = %s", $record->getRecordId()) ), FILE_APPEND | LOCK_EX); if ($record->getMimeType() == 'image/svg+xml') { @@ -111,7 +111,7 @@ class WriteMetadatasWorker implements WorkerInterface $this->logger->error("Can't write meta on svg file!"); // tell that we have finished to work on this file ("unlock") - $this->repoWorker->markFinished($workerRunningJob, "Can't write meta on svg file!"); + $this->repoWorker->markFinished($workerRunningJobId, "Can't write meta on svg file!"); return; } @@ -120,14 +120,14 @@ class WriteMetadatasWorker implements WorkerInterface try { file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf(" --!!!-- getsubdef %s", $subdefName) + sprintf(" - getsubdef %s", $subdefName) ), FILE_APPEND | LOCK_EX); $subdef = $record->get_subdef($subdefName); } catch (Exception $e) { file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf(" --!!!-- %s", $e->getMessage()) + sprintf(" - %s", $e->getMessage()) ), FILE_APPEND | LOCK_EX); $workerMessage = "Exception catched when try to get subdef " .$subdefName. " from DB for the recordID: " .$recordId; @@ -142,7 +142,7 @@ class WriteMetadatasWorker implements WorkerInterface SubdefinitionWritemetaEvent::FAILED, $workerMessage, $count, - $workerRunningJob->getId() + $workerRunningJobId )); // the subscriber will mark the job as errored, no need to do it here @@ -151,7 +151,7 @@ class WriteMetadatasWorker implements WorkerInterface if (!$subdef->is_physically_present()) { file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf(" --!!!-- not present") + sprintf(" - not present") ), FILE_APPEND | LOCK_EX); $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; @@ -163,7 +163,7 @@ class WriteMetadatasWorker implements WorkerInterface SubdefinitionWritemetaEvent::FAILED, 'Subdef is not physically present!', $count, - $workerRunningJob->getId() + $workerRunningJobId )); // the subscriber will mark the job as errored, no need to do it here @@ -266,7 +266,7 @@ class WriteMetadatasWorker implements WorkerInterface } file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf(" --!!!-- reset") + sprintf(" - reset") ), FILE_APPEND | LOCK_EX); $this->writer->reset(); @@ -278,7 +278,7 @@ class WriteMetadatasWorker implements WorkerInterface $this->writer->erase($subdef->get_name() != 'document' || $clearDoc, true); file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__, - sprintf(" --!!!-- erased") + sprintf(" - erased") ), FILE_APPEND | LOCK_EX); // write meta in file @@ -309,7 +309,7 @@ class WriteMetadatasWorker implements WorkerInterface SubdefinitionWritemetaEvent::FAILED, $workerMessage, $count, - $workerRunningJob->getId() + $workerRunningJobId )); // the subscriber will "unlock" the row, no need to do it here @@ -320,7 +320,7 @@ class WriteMetadatasWorker implements WorkerInterface $this->updateJeton($record); // tell that we have finished to work on this file (=unlock) - $this->repoWorker->markFinished($workerRunningJob); + $this->repoWorker->markFinished($workerRunningJobId); } private function removeNulChar($value)