Merge pull request #3506 from alchemy-fr/PHRAS-3088-mysql-gone-away

PHRAS-3088 #comment merge  fix mysql server gone away and dead lock on table worker
This commit is contained in:
Nicolas Maillat
2020-05-29 12:16:47 +02:00
committed by GitHub
4 changed files with 82 additions and 19 deletions

View File

@@ -102,4 +102,12 @@ class WorkerRunningJobRepository extends EntityRepository
{
return parent::getEntityManager();
}
public function reconnect()
{
if($this->_em->getConnection()->ping() === false) {
$this->_em->getConnection()->close();
$this->_em->getConnection()->connect();
}
}
}

View File

@@ -101,6 +101,9 @@ class RecordSubscriber implements EventSubscriberInterface
$repoWorker = $this->getRepoWorker();
$em = $repoWorker->getEntityManager();
// check connection an re-connect if needed
$repoWorker->reconnect();
$workerRunningJob = $repoWorker->findOneBy([
'databoxId' => $event->getRecord()->getDataboxId(),
'recordId' => $event->getRecord()->getRecordId(),
@@ -108,13 +111,15 @@ class RecordSubscriber implements EventSubscriberInterface
'workOn' => $event->getSubdefName()
]);
$em->beginTransaction();
try {
$em->remove($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
if ($workerRunningJob) {
$em->beginTransaction();
try {
$em->remove($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
$this->messagePublisher->publishMessage(
@@ -223,6 +228,9 @@ class RecordSubscriber implements EventSubscriberInterface
$repoWorker = $this->getRepoWorker();
$em = $repoWorker->getEntityManager();
// check connection an re-connect if needed
$repoWorker->reconnect();
$workerRunningJob = $repoWorker->findOneBy([
'databoxId' => $event->getRecord()->getDataboxId(),
'recordId' => $event->getRecord()->getRecordId(),
@@ -230,13 +238,15 @@ class RecordSubscriber implements EventSubscriberInterface
'workOn' => $event->getSubdefName()
]);
$em->beginTransaction();
try {
$em->remove($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
if ($workerRunningJob) {
$em->beginTransaction();
try {
$em->remove($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
$this->messagePublisher->publishMessage(

View File

@@ -81,6 +81,7 @@ class SubdefCreationWorker implements WorkerInterface
// tell that a file is in used to create subdef
$em = $this->repoWorker->getEntityManager();
$this->repoWorker->reconnect();
$em->beginTransaction();
try {
@@ -108,14 +109,28 @@ class SubdefCreationWorker implements WorkerInterface
try {
$this->subdefGenerator->generateSubdefs($record, $wantedSubdef);
} catch (\Exception $e) {
$em->beginTransaction();
try {
$this->repoWorker->reconnect();
$em->getConnection()->beginTransaction();
$em->remove($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
} catch (\Throwable $e) {
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$workerMessage = "Exception throwable catched when create subdef for the recordID: " .$recordId;
$this->logger->error($workerMessage);
$this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent(
$record,
$payload['subdefName'],
$workerMessage,
$count
));
return ;
}
// begin to check if the subdef is successfully generated
@@ -170,7 +185,8 @@ class SubdefCreationWorker implements WorkerInterface
$this->indexer->flushQueue();
// tell that we have finished to work on this file
$em->beginTransaction();
$this->repoWorker->reconnect();
$em->getConnection()->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$workerRunningJob->setFinished(new \DateTime('now'));
@@ -178,7 +194,17 @@ class SubdefCreationWorker implements WorkerInterface
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
try {
$em->getConnection()->beginTransaction();
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$this->messagePublisher->pushLog("rollback on recordID :" . $workerRunningJob->getRecordId());
$em->rollback();
}
}
}
}

View File

@@ -82,6 +82,7 @@ class WriteMetadatasWorker implements WorkerInterface
// tell that a file is in used to create subdef
$em = $this->getEntityManager();
$this->repoWorker->reconnect();
$em->beginTransaction();
try {
@@ -106,7 +107,24 @@ class WriteMetadatasWorker implements WorkerInterface
$record = $databox->get_record($recordId);
$subdef = $record->get_subdef($payload['subdefName']);
try {
$subdef = $record->get_subdef($payload['subdefName']);
} catch (\Exception $e) {
$workerMessage = "Exception catched when try to get subdef " .$payload['subdefName']. " from DB for the recordID: " .$recordId;
$this->logger->error($workerMessage);
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent(
$record,
$payload['subdefName'],
SubdefinitionWritemetaEvent::FAILED,
$workerMessage,
$count
));
return ;
}
if ($subdef->is_physically_present()) {
$metadata = new MetadataBag();
@@ -245,6 +263,7 @@ class WriteMetadatasWorker implements WorkerInterface
// tell that we have finished to work on this file
$this->repoWorker->reconnect();
$em->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);