PHRAS-3447_file-lock-in-workers_4.1

fix : prevent file access with rules depending on job, wrap rules with mutex
This commit is contained in:
jygaulier
2021-05-25 20:48:06 +02:00
parent 7c236a531a
commit 5f5ee4af95
4 changed files with 303 additions and 67 deletions

View File

@@ -12,50 +12,157 @@ use Exception;
class WorkerRunningJobRepository extends EntityRepository
{
/**
* Acquire a "lock" to create a subdef
* Check and declare that we want to create a subdef from a document
*
* - if it's possible : return WorkerRunningJob entity (created or updated) for the job
* - if not (needed resource(s) already in use by other job(s)) : return null
*
* rules :
* - if someone else is already writing the document, we can't create a subdef from it
* - if someone else is already writing this subdef, we can't re-create it
*
* @param array $payload
* @return WorkerRunningJob
* @return WorkerRunningJob | null
* @throws OptimisticLockException
*/
public function canCreateSubdef($payload)
public function canCreateSubdef(array $payload)
{
return $this->getLock($payload, MessagePublisher::SUBDEF_CREATION_TYPE);
$databoxId = $payload['databoxId'];
$recordId = $payload['recordId'];
$subdefName = $payload['subdefName'];
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf('canCreateSubdef for %s.%s.%s ?', $databoxId, $recordId, $subdefName)
), FILE_APPEND|LOCK_EX);
// first protect sql by a critical section
if( !( $recordMutex = $this->getRecordMutex($databoxId, $recordId)) ) {
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
'getRecordMutex() failed'
), FILE_APPEND|LOCK_EX);
return null;
}
// here we can do sql atomically
$workerRunningJob = null;
// check the rules
/** @var WorkerRunningJob $r */
$r = $this->createQueryBuilder('w')
->select('w')
->where('w.status = :status')->setParameter('status', WorkerRunningJob::RUNNING)
->andWhere('w.databoxId = :databox_id')->setParameter('databox_id', $databoxId)
->andWhere('w.recordId = :record_id')->setParameter('record_id', $recordId)
->andWhere('w.workOn = \'document\' OR w.work_on = :work_on')->setParameter(':work_on', $subdefName)
->andWhere('w.work = :work_1 OR w.work = :work_2')
->setParameter('work_1', MessagePublisher::WRITE_METADATAS_TYPE)
->setParameter('work_2', MessagePublisher::SUBDEF_CREATION_TYPE)
->setMaxResults(1)
->getQuery()
->getFirstResult()
;
if(!$r) {
// no conflict, create (or update) the job
$workerRunningJob = $this->creteOrUpdateJob($payload, MessagePublisher::SUBDEF_CREATION_TYPE);
}
else {
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("job %s already running on %s.%s.%s", $r->getId(), $r->getDataboxId(), $r->getRecordId(), $r->getWorkOn())
), FILE_APPEND|LOCK_EX);
}
// end of critical section
$this->releaseRecordMutex($recordMutex);
return $workerRunningJob;
}
/**
* Acquire a "lock" to write meta into a subdef
* Check and declare that we want to write meta into a subdef
*
* - if it's possible : return WorkerRunningJob entity (created or updated) for the job
* - if not (needed resource(s) already in use by other job(s)) : return null
*
* rule :
* - if someone is already working on the file, we can't write
*
* @param array $payload
* @return WorkerRunningJob
* @return WorkerRunningJob | null
* @throws OptimisticLockException
*/
public function canWriteMetadata($payload)
public function canWriteMetadata(array $payload)
{
return $this->getLock($payload, MessagePublisher::WRITE_METADATAS_TYPE);
$databoxId = $payload['databoxId'];
$recordId = $payload['recordId'];
$subdefName = $payload['subdefName'];
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf('canWriteMetadata for %s.%s.%s ?', $databoxId, $recordId, $subdefName)
), FILE_APPEND|LOCK_EX);
// first protect sql by a critical section
if( !( $recordMutex = $this->getRecordMutex($databoxId, $recordId)) ) {
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
'getRecordMutex() failed'
), FILE_APPEND|LOCK_EX);
return null;
}
// here we can do sql atomically
$workerRunningJob = null;
// check the rule
$r = $this->createQueryBuilder('w')
->select('w')
->where('w.status = :status')->setParameter('status', WorkerRunningJob::RUNNING)
->andWhere('w.databox_id = :databox_id')->setParameter('databox_id', $databoxId)
->andWhere('w.record_id = :record_id')->setParameter('record_id', $recordId)
->andWhere('w.work_on = :work_on')->setParameter(':work_on', $subdefName)
->andWhere('w.work = :work_1 OR w.work = :work_2')
->setParameter('work_1', MessagePublisher::WRITE_METADATAS_TYPE)
->setParameter('work_2', MessagePublisher::SUBDEF_CREATION_TYPE)
->setMaxResults(1)
->getQuery()
->getFirstResult()
;
if(!$r) {
// no conflict, create (or update) the job
$workerRunningJob = $this->creteOrUpdateJob($payload, MessagePublisher::WRITE_METADATAS_TYPE);
}
else {
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("job %s already running on %s.%s.%s", $r->getId(), $r->getDataboxId(), $r->getRecordId(), $r->getWorkOn())
), FILE_APPEND|LOCK_EX);
}
// end of critical section
$this->releaseRecordMutex($recordMutex);
return $workerRunningJob;
}
/**
* Acquire a "lock" to work on a (sbid + rid + subdef) by inserting a row in WorkerRunningJob table.
* If it fails that means that another worker is already working on this file.
*
* nb : this work only for "first try" where workerJobId is null (=insert).
* for some retries (lock was acquired but worker job failed), the "count" of existing row is incremented (=update),
* so many workers "could" update the same row...
* __Luckily__, a rabbitmq message is consumed only once by a unique worker,
* and different workers (write-meta, subdef) have their own queues and their own rows on table.
* So working on a file always starts by a "first try", and concurency is not possible.
* todo : do not update, but insert a line for every try ?
*
* @param array $payload
* @param string $type
* @return WorkerRunningJob the entity (created or updated) or null if file is already locked (duplicate key)
* @return WorkerRunningJob|null
* @throws OptimisticLockException
*/
private function getLock(array $payload, string $type)
private function creteOrUpdateJob(array $payload, string $type)
{
if(!isset($payload['workerJobId'])) {
// insert a new row WorkerRunningJob : will fail if concurency
// for unpredicted sql errors we can still ignore and return null (lock failed),
// because anyway the worker/rabbit retry system will stop itself after n failures.
if (!isset($payload['workerJobId'])) {
// new job
$this->getEntityManager()->beginTransaction();
try {
$this->getEntityManager()->beginTransaction();
$date = new DateTime();
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
@@ -64,54 +171,120 @@ class WorkerRunningJobRepository extends EntityRepository
->setWork($type)
->setWorkOn($payload['subdefName'])
->setPayload([
'message_type' => $type,
'payload' => $payload
'message_type' => $type,
'payload' => $payload
])
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
->setFlock($payload['subdefName']);
$this->getEntityManager()->persist($workerRunningJob);
;
$this->getEntityManager()->persist($workerRunningJob);
$this->getEntityManager()->flush();
$this->getEntityManager()->commit();
return $workerRunningJob;
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("created job %s for %s.%s.%s", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName'])
), FILE_APPEND|LOCK_EX);
}
catch(Exception $e) {
// duplicate key ?
catch (Exception $e) {
// bad case : we return false anyway
$this->getEntityManager()->rollback();
// for unpredicted other errors we can still ignore and return null (lock failed),
// because anyway the worker/rabbit retry-system will stop itself after n failures.
// $this->logger->error("Error persisting WorkerRunningJob !");
$workerRunningJob = null;
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("!!! error creating job %s for %s.%s.%s", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName'])
), FILE_APPEND|LOCK_EX);
}
}
else {
// update an existing row : never fails (except bad id if row was purged)
try {
$this->getEntityManager()->beginTransaction();
$this->getEntityManager()->createQueryBuilder()
->update()
->set('info', ':info')->setParameter('info', WorkerRunningJob::ATTEMPT . $payload['count'])
->set('status', ':status')->setParameter('status', WorkerRunningJob::RUNNING)
->set('flock', ':flock')->setParameter('flock', $payload['subdefName'])
->where('id = :id')->setParameter('id', $payload['workerJobId'])
->getQuery()
->execute();
// retry from delayed
/** @var WorkerRunningJob $workerRunningJob */
if(!is_null($workerRunningJob = $this->find($payload['workerJobId']))) {
// update retry count (value is already incremented in payload)
$workerRunningJob
->setInfo(WorkerRunningJob::ATTEMPT . $payload['count'])
->setStatus(WorkerRunningJob::RUNNING);
$this->getEntityManager()->persist($workerRunningJob);
$this->getEntityManager()->flush();
$this->getEntityManager()->commit();
return $this->find($payload['workerJobId']);
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("incremented job %s for %s.%s.%s (count=%s)", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName'], $payload['count'])
), FILE_APPEND|LOCK_EX);
}
catch (Exception $e) {
// really bad ? return null anyway
$this->getEntityManager()->rollback();
//$this->logger->error("Error persisting WorkerRunningJob !");
else {
// the row has been deleted by purge ?
// bad case : we return false anyway
// $this->logger->error("Given workerJobId not found !");
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("!!! error incrementing job %s for %s.%s.%s (count=%s)", $type, $payload['databoxId'], $payload['recordId'], $payload['subdefName'], $payload['count'])
), FILE_APPEND|LOCK_EX);
}
}
return null;
return $workerRunningJob;
}
/**
* Acquire a "mutex" to protect critical section on a (sbid + rid) by trying to insert a row in WorkerRunningJob table.
* If it fails that means that another critical section is already running on this record.
*
* @param int $databoxId
* @param int $recordId
* @return WorkerRunningJob|null // the created mutex entity, or null if mutex already exists
*/
private function getRecordMutex(int $databoxId, int $recordId)
{
try {
$this->getEntityManager()->beginTransaction();
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setDataboxId($databoxId)
->setRecordId($recordId)
->setPublished(new DateTime())
->setStatus('')
->setFlock("_mutex_");
$this->getEntityManager()->persist($workerRunningJob);
$this->getEntityManager()->flush();
$this->getEntityManager()->commit();
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("getMutex for %s.%s OK (%s)", $databoxId, $recordId, $workerRunningJob->getId())
), FILE_APPEND|LOCK_EX);
return $workerRunningJob;
}
catch(Exception $e) {
// duplicate key ?
$this->getEntityManager()->rollback();
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("getMutex for %s.%s FAILED", $databoxId, $recordId)
), FILE_APPEND|LOCK_EX);
return null;
}
}
private function releaseRecordMutex(WorkerRunningJob $workerRunningJob)
{
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("releaseMutex (%s) for %s.%s", $workerRunningJob->getId(), $workerRunningJob->getDataboxId(), $workerRunningJob->getRecordId())
), FILE_APPEND|LOCK_EX);
$this->getEntityManager()->remove($workerRunningJob);
}
/**
* mark a job a "finished"
@@ -129,8 +302,7 @@ class WorkerRunningJobRepository extends EntityRepository
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED)
->setFinished(new DateTime('now'))
->setStatus(WorkerRunningJob::FINISHED)
->setFlock(null);
->setStatus(WorkerRunningJob::FINISHED);
if (!is_null($info)) {
$workerRunningJob->setInfo($info);
}
@@ -140,9 +312,17 @@ class WorkerRunningJobRepository extends EntityRepository
$this->getEntityManager()->flush();
$this->getEntityManager()->commit();
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("job %s (%d) finished for %s.%s.%s", $workerRunningJob->getWork(), $workerRunningJob->getId(), $workerRunningJob->getDataboxId(), $workerRunningJob->getRecordId(), $workerRunningJob->getWorkOn())
), FILE_APPEND|LOCK_EX);
break;
}
catch (Exception $e) {
file_put_contents(dirname(__FILE__).'/../../../../../logs/trace.txt', sprintf("%s [%s] : %s (%s); %s\n", (\DateTime::createFromFormat('U.u', microtime(TRUE)))->format('Y-m-d\TH:i:s.u'), getmypid(), __FILE__, __LINE__,
sprintf("!!! failed to mark job %s (%d) as finished (try %s/2) for %s.%s.%s", $workerRunningJob->getWork(), $workerRunningJob->getId(), $try, $workerRunningJob->getDataboxId(), $workerRunningJob->getRecordId(), $workerRunningJob->getWorkOn())
), FILE_APPEND|LOCK_EX);
$this->getEntityManager()->rollback();
}
}