diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php index a3b584058d..8d8655ec6a 100644 --- a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php +++ b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php @@ -102,4 +102,12 @@ class WorkerRunningJobRepository extends EntityRepository { return parent::getEntityManager(); } + + public function reconnect() + { + if($this->_em->getConnection()->ping() === false) { + $this->_em->getConnection()->close(); + $this->_em->getConnection()->connect(); + } + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php index d225a0c37a..c0df49717d 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php @@ -101,6 +101,9 @@ class RecordSubscriber implements EventSubscriberInterface $repoWorker = $this->getRepoWorker(); $em = $repoWorker->getEntityManager(); + // check connection an re-connect if needed + $repoWorker->reconnect(); + $workerRunningJob = $repoWorker->findOneBy([ 'databoxId' => $event->getRecord()->getDataboxId(), 'recordId' => $event->getRecord()->getRecordId(), @@ -108,13 +111,15 @@ class RecordSubscriber implements EventSubscriberInterface 'workOn' => $event->getSubdefName() ]); - $em->beginTransaction(); - try { - $em->remove($workerRunningJob); - $em->flush(); - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); + if ($workerRunningJob) { + $em->beginTransaction(); + try { + $em->remove($workerRunningJob); + $em->flush(); + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } } $this->messagePublisher->publishMessage( @@ -223,6 +228,9 @@ class RecordSubscriber implements EventSubscriberInterface $repoWorker = $this->getRepoWorker(); $em = $repoWorker->getEntityManager(); + // check connection an re-connect if needed + $repoWorker->reconnect(); + $workerRunningJob = $repoWorker->findOneBy([ 'databoxId' => $event->getRecord()->getDataboxId(), 'recordId' => $event->getRecord()->getRecordId(), @@ -230,13 +238,15 @@ class RecordSubscriber implements EventSubscriberInterface 'workOn' => $event->getSubdefName() ]); - $em->beginTransaction(); - try { - $em->remove($workerRunningJob); - $em->flush(); - $em->commit(); - } catch (\Exception $e) { - $em->rollback(); + if ($workerRunningJob) { + $em->beginTransaction(); + try { + $em->remove($workerRunningJob); + $em->flush(); + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } } $this->messagePublisher->publishMessage( diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php index a05d8a1aa7..1bf324943f 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php @@ -81,6 +81,7 @@ class SubdefCreationWorker implements WorkerInterface // tell that a file is in used to create subdef $em = $this->repoWorker->getEntityManager(); + $this->repoWorker->reconnect(); $em->beginTransaction(); try { @@ -108,14 +109,28 @@ class SubdefCreationWorker implements WorkerInterface try { $this->subdefGenerator->generateSubdefs($record, $wantedSubdef); } catch (\Exception $e) { - $em->beginTransaction(); try { + $this->repoWorker->reconnect(); + $em->getConnection()->beginTransaction(); $em->remove($workerRunningJob); $em->flush(); $em->commit(); } catch (\Exception $e) { - $em->rollback(); } + } catch (\Throwable $e) { + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + $workerMessage = "Exception throwable catched when create subdef for the recordID: " .$recordId; + + $this->logger->error($workerMessage); + + $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent( + $record, + $payload['subdefName'], + $workerMessage, + $count + )); + + return ; } // begin to check if the subdef is successfully generated @@ -170,7 +185,8 @@ class SubdefCreationWorker implements WorkerInterface $this->indexer->flushQueue(); // tell that we have finished to work on this file - $em->beginTransaction(); + $this->repoWorker->reconnect(); + $em->getConnection()->beginTransaction(); try { $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); $workerRunningJob->setFinished(new \DateTime('now')); @@ -178,7 +194,17 @@ class SubdefCreationWorker implements WorkerInterface $em->flush(); $em->commit(); } catch (\Exception $e) { - $em->rollback(); + try { + $em->getConnection()->beginTransaction(); + $workerRunningJob->setStatus(WorkerRunningJob::FINISHED); + $em->persist($workerRunningJob); + $em->flush(); + $em->commit(); + } catch (\Exception $e) { + $this->messagePublisher->pushLog("rollback on recordID :" . $workerRunningJob->getRecordId()); + $em->rollback(); + } + } } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php index 6c02078ac1..8ec819cb77 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -82,6 +82,7 @@ class WriteMetadatasWorker implements WorkerInterface // tell that a file is in used to create subdef $em = $this->getEntityManager(); + $this->repoWorker->reconnect(); $em->beginTransaction(); try { @@ -106,7 +107,24 @@ class WriteMetadatasWorker implements WorkerInterface $record = $databox->get_record($recordId); - $subdef = $record->get_subdef($payload['subdefName']); + try { + $subdef = $record->get_subdef($payload['subdefName']); + } catch (\Exception $e) { + $workerMessage = "Exception catched when try to get subdef " .$payload['subdefName']. " from DB for the recordID: " .$recordId; + $this->logger->error($workerMessage); + + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( + $record, + $payload['subdefName'], + SubdefinitionWritemetaEvent::FAILED, + $workerMessage, + $count + )); + + return ; + } if ($subdef->is_physically_present()) { $metadata = new MetadataBag(); @@ -245,6 +263,7 @@ class WriteMetadatasWorker implements WorkerInterface // tell that we have finished to work on this file + $this->repoWorker->reconnect(); $em->beginTransaction(); try { $workerRunningJob->setStatus(WorkerRunningJob::FINISHED);