diff --git a/lib/Alchemy/Phrasea/Core/Provider/RepositoriesServiceProvider.php b/lib/Alchemy/Phrasea/Core/Provider/RepositoriesServiceProvider.php index b81835842e..875d592978 100644 --- a/lib/Alchemy/Phrasea/Core/Provider/RepositoriesServiceProvider.php +++ b/lib/Alchemy/Phrasea/Core/Provider/RepositoriesServiceProvider.php @@ -153,12 +153,6 @@ class RepositoriesServiceProvider implements ServiceProviderInterface $app['repo.worker-job'] = $app->share(function (PhraseaApplication $app) { return $app['orm.em']->getRepository('Phraseanet:WorkerJob'); }); - $app['repo.worker-running-populate'] = $app->share(function (PhraseaApplication $app) { - return $app['orm.em']->getRepository('Phraseanet:WorkerRunningPopulate'); - }); - $app['repo.worker-running-uploader'] = $app->share(function (PhraseaApplication $app) { - return $app['orm.em']->getRepository('Phraseanet:WorkerRunningUploader'); - }); $app['repo.databoxes'] = $app->share(function (PhraseaApplication $app) { $appbox = $app->getApplicationBox(); diff --git a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php index 4d3b9093ed..5ab8b3fde5 100644 --- a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php +++ b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php @@ -7,12 +7,7 @@ use Doctrine\ORM\Mapping as ORM; use Gedmo\Mapping\Annotation as Gedmo; /** - * @ORM\Table(name="WorkerRunningJob", - * indexes={ - * @ORM\index(name="databox_id", columns={"databox_id"}), - * @ORM\index(name="record_id", columns={"record_id"}), - * } - * ) + * @ORM\Table(name="WorkerRunningJob") * @ORM\Entity(repositoryClass="Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository") */ class WorkerRunningJob @@ -20,6 +15,9 @@ class WorkerRunningJob const FINISHED = 'finished'; const RUNNING = 'running'; + const TYPE_PULL = 'uploader pull'; + const TYPE_PUSH = 'uploader push'; + const MAX_RESULT = 500; /** @@ -27,29 +25,38 @@ class WorkerRunningJob * @ORM\Id * @ORM\GeneratedValue */ - private $id; /** - * @ORM\Column(type="integer", name="databox_id") + * @ORM\Column(type="integer", name="databox_id", nullable=true) */ private $databoxId; /** - * @ORM\Column(type="integer", name="record_id") + * @ORM\Column(type="integer", name="record_id", nullable=true) */ private $recordId; /** - * @ORM\Column(type="integer", name="work") + * @ORM\Column(type="string", name="work", nullable=true) */ private $work; /** - * @ORM\Column(type="string", name="work_on") + * @ORM\Column(type="string", name="work_on", nullable=true) */ private $workOn; + /** + * @ORM\Column(type="string", name="commit_id", nullable=true) + */ + private $commitId; + + /** + * @ORM\Column(type="string", name="asset_id", nullable=true) + */ + private $assetId; + /** * @Gedmo\Timestampable(on="create") * @ORM\Column(type="datetime") @@ -98,7 +105,6 @@ class WorkerRunningJob return $this->databoxId; } - /** * @param $recordId * @return $this @@ -119,7 +125,6 @@ class WorkerRunningJob } - /** * @param $work * @return $this @@ -139,7 +144,6 @@ class WorkerRunningJob return $this->work; } - /** * @param $workOn * @return $this @@ -159,6 +163,44 @@ class WorkerRunningJob return $this->workOn; } + /** + * @param $commitId + * @return $this + */ + public function setCommitId($commitId) + { + $this->commitId = $commitId; + + return $this; + } + + /** + * @return mixed + */ + public function getCommitId() + { + return $this->commitId; + } + + /** + * @param $assetId + * @return $this + */ + public function setAssetId($assetId) + { + $this->assetId = $assetId; + + return $this; + } + + /** + * @return mixed + */ + public function getAssetId() + { + return $this->assetId; + } + /** * @return \DateTime */ @@ -223,19 +265,4 @@ class WorkerRunningJob { return $this->status; } - - public function getWorkName() - { - switch ($this->work) { - case PhraseaTokens::MAKE_SUBDEF: - return 'MAKE_SUBDEF'; - case PhraseaTokens::WRITE_META_DOC: - return 'WRITE_META_DOC'; - case PhraseaTokens::WRITE_META_SUBDEF: - return 'WRITE_META_SUBDEF'; - default: - return $this->work; - - } - } } diff --git a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningPopulate.php b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningPopulate.php deleted file mode 100644 index 08250e31e7..0000000000 --- a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningPopulate.php +++ /dev/null @@ -1,219 +0,0 @@ -id; - } - - /** - * @param $host - * @return $this - */ - public function setHost($host) - { - $this->host = $host; - - return $this; - } - - /** - * @return mixed - */ - public function getHost() - { - return $this->host; - } - - /** - * @param $port - * @return $this - */ - public function setPort($port) - { - $this->port = $port; - - return $this; - } - - /** - * @return mixed - */ - public function getPort() - { - return $this->port; - } - - /** - * @param $indexName - * @return $this - */ - public function setIndexName($indexName) - { - $this->indexName = $indexName; - - return $this; - } - - /** - * @return mixed - */ - public function getIndexName() - { - return $this->indexName; - } - - /** - * @param $databoxId - * @return $this - */ - public function setDataboxId($databoxId) - { - $this->databoxId = $databoxId; - - return $this; - } - - /** - * @return mixed - */ - public function getDataboxId() - { - return $this->databoxId; - } - - /** - * @return \DateTime - */ - public function getCreated() - { - return $this->created; - } - - /** - * @param \DateTime $published - * @return $this - */ - public function setPublished(\DateTime $published) - { - $this->published = $published; - - return $this; - } - - /** - * @return mixed - */ - public function getPublished() - { - return $this->published; - } - - /** - * @param \DateTime $finished - * @return $this - */ - public function setFinished(\DateTime $finished) - { - $this->finished = $finished; - - return $this; - } - - /** - * @return mixed - */ - public function getFinished() - { - return $this->finished; - } - - /** - * @param $status - * @return $this - */ - public function setStatus($status) - { - $this->status = $status; - - return $this; - } - - /** - * @return mixed - */ - public function getStatus() - { - return $this->status; - } -} diff --git a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningUploader.php b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningUploader.php deleted file mode 100644 index df8b3ec0f8..0000000000 --- a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningUploader.php +++ /dev/null @@ -1,198 +0,0 @@ -id; - } - - /** - * @param $commitId - * @return $this - */ - public function setCommitId($commitId) - { - $this->commitId = $commitId; - - return $this; - } - - /** - * @return mixed - */ - public function getCommitId() - { - return $this->commitId; - } - - /** - * @param $assetId - * @return $this - */ - public function setAssetId($assetId) - { - $this->assetId = $assetId; - - return $this; - } - - /** - * @return mixed - */ - public function getAssetId() - { - return $this->assetId; - } - - /** - * @return \DateTime - */ - public function getCreated() - { - return $this->created; - } - - /** - * @param \DateTime $published - * @return $this - */ - public function setPublished(\DateTime $published) - { - $this->published = $published; - - return $this; - } - - /** - * @return mixed - */ - public function getPublished() - { - return $this->published; - } - - /** - * @param \DateTime $finished - * @return $this - */ - public function setFinished(\DateTime $finished) - { - $this->finished = $finished; - - return $this; - } - - /** - * @return mixed - */ - public function getFinished() - { - return $this->finished; - } - - /** - * @param $status - * @return $this - */ - public function setStatus($status) - { - $this->status = $status; - - return $this; - } - - /** - * @return mixed - */ - public function getStatus() - { - return $this->status; - } - - /** - * @param $type - * @return $this - */ - public function setType($type) - { - $this->type = $type; - - return $this; - } - - /** - * @return mixed - */ - public function getType() - { - return $this->type; - } -} diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php index 8d8655ec6a..eabfea97fa 100644 --- a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php +++ b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php @@ -4,6 +4,7 @@ namespace Alchemy\Phrasea\Model\Repositories; use Alchemy\Phrasea\Core\PhraseaTokens; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; +use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; use Doctrine\ORM\EntityRepository; class WorkerRunningJobRepository extends EntityRepository @@ -22,15 +23,15 @@ class WorkerRunningJobRepository extends EntityRepository $sql = 'SELECT work_on FROM WorkerRunningJob - WHERE ((work & :write_meta) > 0 OR ((work & :make_subdef) > 0 AND work_on = :work_on) ) + WHERE ((work = :write_meta) OR ((work = :make_subdef) AND work_on = :work_on) ) AND record_id = :record_id AND databox_id = :databox_id AND status = :status'; $query = $this->_em->createNativeQuery($sql, $rsm); $query->setParameters([ - 'write_meta' => PhraseaTokens::WRITE_META, - 'make_subdef'=> PhraseaTokens::MAKE_SUBDEF, + 'write_meta' => MessagePublisher::WRITE_METADATAS_TYPE, + 'make_subdef'=> MessagePublisher::SUBDEF_CREATION_TYPE, 'work_on' => $subdefName, 'record_id' => $recordId, 'databox_id' => $databoxId, @@ -56,15 +57,15 @@ class WorkerRunningJobRepository extends EntityRepository $sql = 'SELECT work_on FROM WorkerRunningJob - WHERE ((work & :make_subdef) > 0 OR ((work & :write_meta) > 0 AND work_on = :work_on) ) + WHERE ((work = :make_subdef) OR ((work = :write_meta) AND work_on = :work_on) ) AND record_id = :record_id AND databox_id = :databox_id AND status = :status'; $query = $this->_em->createNativeQuery($sql, $rsm); $query->setParameters([ - 'make_subdef'=> PhraseaTokens::MAKE_SUBDEF, - 'write_meta' => PhraseaTokens::WRITE_META, + 'make_subdef'=> MessagePublisher::SUBDEF_CREATION_TYPE, + 'write_meta' => MessagePublisher::WRITE_METADATAS_TYPE, 'work_on' => $subdefName, 'record_id' => $recordId, 'databox_id' => $databoxId, @@ -75,6 +76,45 @@ class WorkerRunningJobRepository extends EntityRepository return count($query->getResult()) == 0; } + /** + * @param array $databoxIds + * @return int + */ + public function checkPopulateStatusByDataboxIds(array $databoxIds) + { + $qb = $this->createQueryBuilder('w'); + $qb->where($qb->expr()->in('w.databoxId', $databoxIds)) + ->andWhere('w.work = :work') + ->andWhere('w.status != :status') + ->setParameters([ 'work' => MessagePublisher::POPULATE_INDEX_TYPE, 'status' => WorkerRunningJob::FINISHED]) + ; + + return count($qb->getQuery()->getResult()); + } + + /** + * @param $commitId + * @return bool + */ + public function canAckUploader($commitId) + { + $qb = $this->createQueryBuilder('w'); + $res = $qb + ->where('w.commitId = :commitId') + ->andWhere('w.work = :work') + ->andWhere('w.status != :status') + ->setParameters([ + 'commitId' => $commitId, + 'work' => MessagePublisher::ASSETS_INGEST_TYPE, + 'status' => WorkerRunningJob::FINISHED + ]) + ->getQuery() + ->getResult() + ; + + return count($res) == 0; + } + public function truncateWorkerTable() { $connection = $this->_em->getConnection(); diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningPopulateRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningPopulateRepository.php deleted file mode 100644 index 6e7063a20d..0000000000 --- a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningPopulateRepository.php +++ /dev/null @@ -1,29 +0,0 @@ -createQueryBuilder('w'); - $qb->where($qb->expr()->in('w.databoxId', $databoxIds)) - ->andWhere('w.status = :status') - ->setParameter('status', WorkerRunningPopulate::RUNNING) - ; - - return count($qb->getQuery()->getResult()); - } -} diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningUploaderRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningUploaderRepository.php deleted file mode 100644 index 6af3580d0d..0000000000 --- a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningUploaderRepository.php +++ /dev/null @@ -1,35 +0,0 @@ -createQueryBuilder('w'); - $res = $qb - ->where('w.commitId = :commitId') - ->andWhere('w.status != :status') - ->setParameters([ - 'commitId' => $commitId, - 'status' => WorkerRunningUploader::DOWNLOADED - ]) - ->getQuery() - ->getResult() - ; - - return count($res) == 0; - } -} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php index 37b7546af5..7ddbd258aa 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php @@ -6,7 +6,6 @@ use Alchemy\Phrasea\Application as PhraseaApplication; use Alchemy\Phrasea\Controller\Controller; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; -use Alchemy\Phrasea\Model\Repositories\WorkerRunningPopulateRepository; use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions; use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexEvent; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; @@ -21,7 +20,7 @@ use Symfony\Component\HttpFoundation\Request; class AdminConfigurationController extends Controller { - public function indexAction(PhraseaApplication $app, Request $request) + public function indexAction(PhraseaApplication $app) { /** @var AMQPConnection $serverConnection */ $serverConnection = $this->app['alchemy_worker.amqp.connection']; @@ -108,7 +107,7 @@ class AdminConfigurationController extends Controller ]); } - public function truncateTableAction(PhraseaApplication $app, Request $request) + public function truncateTableAction(PhraseaApplication $app) { /** @var WorkerRunningJobRepository $repoWorker */ $repoWorker = $app['repo.worker-running-job']; @@ -117,7 +116,7 @@ class AdminConfigurationController extends Controller return $app->redirectPath('worker_admin'); } - public function deleteFinishedAction(PhraseaApplication $app, Request $request) + public function deleteFinishedAction(PhraseaApplication $app) { /** @var WorkerRunningJobRepository $repoWorker */ $repoWorker = $app['repo.worker-running-job']; @@ -147,13 +146,13 @@ class AdminConfigurationController extends Controller ]); } - public function subviewAction(PhraseaApplication $app) + public function subviewAction() { return $this->render('admin/worker-manager/worker_subview.html.twig', [ ]); } - public function metadataAction(PhraseaApplication $app) + public function metadataAction() { return $this->render('admin/worker-manager/worker_metadata.html.twig', [ ]); @@ -163,10 +162,10 @@ class AdminConfigurationController extends Controller { $databoxIds = $request->get('sbasIds'); - /** @var WorkerRunningPopulateRepository $repoWorkerPopulate */ - $repoWorkerPopulate = $app['repo.worker-running-populate']; + /** @var WorkerRunningJobRepository $repoWorkerJob */ + $repoWorkerJob = $app['repo.worker-running-job']; - return $repoWorkerPopulate->checkPopulateStatusByDataboxIds($databoxIds); + return $repoWorkerJob->checkPopulateStatusByDataboxIds($databoxIds); } public function pullAssetsAction(PhraseaApplication $app, Request $request) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php index 2011ef9921..716af6ebbd 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php @@ -117,17 +117,17 @@ class AlchemyWorkerServiceProvider implements PluginProviderInterface })); $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::POPULATE_INDEX_TYPE, new CallableWorkerFactory(function () use ($app) { - return (new PopulateIndexWorker($app['alchemy_worker.message.publisher'], $app['elasticsearch.indexer'], $app['repo.worker-running-populate'])) + return (new PopulateIndexWorker($app['alchemy_worker.message.publisher'], $app['elasticsearch.indexer'], $app['repo.worker-running-job'])) ->setApplicationBox($app['phraseanet.appbox']) ->setDispatcher($app['dispatcher']); })); $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::PULL_ASSETS_TYPE, new CallableWorkerFactory(function () use ($app) { - return new PullAssetsWorker($app['alchemy_worker.message.publisher'], $app['conf'], $app['repo.worker-running-uploader']); + return new PullAssetsWorker($app['alchemy_worker.message.publisher'], $app['conf'], $app['repo.worker-running-job']); })); $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::DELETE_RECORD_TYPE, new CallableWorkerFactory(function () use ($app) { - return (new DeleteRecordWorker()) + return (new DeleteRecordWorker($app['repo.worker-running-job'])) ->setApplicationBox($app['phraseanet.appbox']); })); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php index 2cc75abbe5..079ac535f3 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php @@ -2,7 +2,7 @@ namespace Alchemy\Phrasea\WorkerManager\Subscriber; -use Alchemy\Phrasea\Model\Entities\WorkerRunningUploader; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\WorkerManager\Event\AssetsCreateEvent; use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationFailureEvent; use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationRecordFailureEvent; @@ -25,7 +25,7 @@ class AssetsIngestSubscriber implements EventSubscriberInterface // this is an uploader PUSH mode $payload = [ 'message_type' => MessagePublisher::ASSETS_INGEST_TYPE, - 'payload' => array_merge($event->getData(), ['type' => WorkerRunningUploader::TYPE_PUSH]) + 'payload' => array_merge($event->getData(), ['type' => WorkerRunningJob::TYPE_PUSH]) ]; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php index fe8c191ebe..e0d09c4121 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php @@ -5,9 +5,9 @@ namespace Alchemy\Phrasea\WorkerManager\Worker; use Alchemy\Phrasea\Application\Helper\EntityManagerAware; use Alchemy\Phrasea\Application as PhraseaApplication; use Alchemy\Phrasea\Model\Entities\StoryWZ; -use Alchemy\Phrasea\Model\Entities\WorkerRunningUploader; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\UserRepository; -use Alchemy\Phrasea\Model\Repositories\WorkerRunningUploaderRepository; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationFailureEvent; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; @@ -22,8 +22,8 @@ class AssetsIngestWorker implements WorkerInterface /** @var MessagePublisher $messagePublisher */ private $messagePublisher; - /** @var WorkerRunningUploaderRepository $repoWorkerUploader */ - private $repoWorkerUploader; + /** @var WorkerRunningJobRepository $repoWorkerJob */ + private $repoWorkerJob; public function __construct(PhraseaApplication $app) { @@ -34,7 +34,7 @@ class AssetsIngestWorker implements WorkerInterface public function process(array $payload) { $assets = $payload['assets']; - $this->repoWorkerUploader = $this->getWorkerRunningUploaderRepository(); + $this->repoWorkerJob = $this->getWorkerRunningJobRepository(); $this->saveAssetsList($payload['commit_id'], $assets, $payload['published'], $payload['type']); @@ -127,33 +127,34 @@ class AssetsIngestWorker implements WorkerInterface } /** - * @return WorkerRunningUploaderRepository + * @return WorkerRunningJobRepository */ - private function getWorkerRunningUploaderRepository() + private function getWorkerRunningJobRepository() { - return $this->app['repo.worker-running-uploader']; + return $this->app['repo.worker-running-job']; } private function saveAssetsList($commitId, $assetsId, $published, $type) { - $em = $this->repoWorkerUploader->getEntityManager(); + $em = $this->repoWorkerJob->getEntityManager(); $em->beginTransaction(); $date = new \DateTime(); try { foreach ($assetsId as $assetId) { - $workerRunningUploader = new WorkerRunningUploader(); - $workerRunningUploader + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob ->setCommitId($commitId) ->setAssetId($assetId) ->setPublished($date->setTimestamp($published)) - ->setStatus(WorkerRunningUploader::RUNNING) - ->setType($type) + ->setWork(MessagePublisher::ASSETS_INGEST_TYPE) + ->setWorkOn($type) + ->setStatus(WorkerRunningJob::RUNNING) ; - $em->persist($workerRunningUploader); + $em->persist($workerRunningJob); - unset($workerRunningUploader); + unset($workerRunningJob); } $em->flush(); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php index 80850bf9ec..635577fc5b 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php @@ -19,9 +19,9 @@ use Alchemy\Phrasea\Media\SubdefSubstituer; use Alchemy\Phrasea\Model\Entities\LazaretFile; use Alchemy\Phrasea\Model\Entities\LazaretSession; use Alchemy\Phrasea\Model\Entities\User; -use Alchemy\Phrasea\Model\Entities\WorkerRunningUploader; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\UserRepository; -use Alchemy\Phrasea\Model\Repositories\WorkerRunningUploaderRepository; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationRecordFailureEvent; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; @@ -42,8 +42,8 @@ class CreateRecordWorker implements WorkerInterface /** @var MessagePublisher $messagePublisher */ private $messagePublisher; - /** @var WorkerRunningUploaderRepository $repoWorkerUploader */ - private $repoWorkerUploader; + /** @var WorkerRunningJobRepository $repoWorkerJob */ + private $repoWorkerJob; public function __construct(PhraseaApplication $app) { @@ -54,8 +54,8 @@ class CreateRecordWorker implements WorkerInterface public function process(array $payload) { - $this->repoWorkerUploader = $this->getWorkerRunningUploaderRepository(); - $em = $this->repoWorkerUploader->getEntityManager(); + $this->repoWorkerJob = $this->getWorkerRunningJobRepository(); + $em = $this->repoWorkerJob->getEntityManager(); $uploaderClient = new Client(['base_uri' => $payload['base_url']]); @@ -70,11 +70,11 @@ class CreateRecordWorker implements WorkerInterface $tempfile = $this->getTemporaryFilesystem()->createTemporaryFile('download_', null, pathinfo($body['originalName'], PATHINFO_EXTENSION)); - - /** @var WorkerRunningUploader $workerRunningUploader */ - $workerRunningUploader = $this->repoWorkerUploader->findOneBy([ + /** @var WorkerRunningJob $workerRunningJob */ + $workerRunningJob = $this->repoWorkerJob->findOneBy([ 'commitId' => $payload['commit_id'], - 'assetId' => $payload['asset'] + 'assetId' => $payload['asset'], + 'status' => WorkerRunningJob::RUNNING ]); //download the asset @@ -95,8 +95,11 @@ class CreateRecordWorker implements WorkerInterface $count )); - $em->remove($workerRunningUploader); - $em->flush(); + if ($workerRunningJob != null) { + $em->remove($workerRunningJob); + + $em->flush(); + } return; } @@ -115,18 +118,24 @@ class CreateRecordWorker implements WorkerInterface $count )); - $em->remove($workerRunningUploader); - $em->flush(); + if ($workerRunningJob != null) { + $em->remove($workerRunningJob); + + $em->flush(); + } return; } - if ($workerRunningUploader != null) { + if ($workerRunningJob != null) { $em->beginTransaction(); try { - $workerRunningUploader->setStatus(WorkerRunningUploader::DOWNLOADED); + $workerRunningJob + ->setStatus(WorkerRunningJob::FINISHED) + ->setFinished(new \DateTime('now')) + ; - $em->persist($workerRunningUploader); + $em->persist($workerRunningJob); $em->flush(); $em->commit(); @@ -136,7 +145,7 @@ class CreateRecordWorker implements WorkerInterface } - $canAck = $this->repoWorkerUploader->canAck($payload['commit_id']); + $canAck = $this->repoWorkerJob->canAckUploader($payload['commit_id']); // if all assets in the commit are downloaded , send ack to the uploader if ($canAck) { @@ -228,8 +237,10 @@ class CreateRecordWorker implements WorkerInterface $this->getBorderManager()->process($lazaretSession, $packageFile, $callback); + $recordId = null; if ($elementCreated instanceof \record_adapter) { $this->dispatch(PhraseaEvents::RECORD_UPLOAD, new RecordEdit($elementCreated)); + $recordId = $elementCreated->getRecordId(); } else { $this->messagePublisher->pushLog(sprintf('The file was moved to the quarantine: %s', json_encode($reasons))); /** @var LazaretFile $elementCreated */ @@ -241,7 +252,6 @@ class CreateRecordWorker implements WorkerInterface if (is_int($payload['storyId']) && $elementCreated instanceof \record_adapter) { $this->addRecordInStory($user, $elementCreated, $sbasId, $payload['storyId'], $body['formData']); } - } /** @@ -337,10 +347,10 @@ class CreateRecordWorker implements WorkerInterface } /** - * @return WorkerRunningUploaderRepository + * @return WorkerRunningJobRepository */ - private function getWorkerRunningUploaderRepository() + private function getWorkerRunningJobRepository() { - return $this->app['repo.worker-running-uploader']; + return $this->app['repo.worker-running-job']; } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php index 859054710a..c0c254c253 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php @@ -3,15 +3,63 @@ namespace Alchemy\Phrasea\WorkerManager\Worker; use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; +use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; class DeleteRecordWorker implements WorkerInterface { use ApplicationBoxAware; + /** @var WorkerRunningJobRepository $repoWorker*/ + private $repoWorker; + + public function __construct( WorkerRunningJobRepository $repoWorker) + { + $this->repoWorker = $repoWorker; + } + public function process(array $payload) { + $em = $this->repoWorker->getEntityManager(); + $em->beginTransaction(); + $date = new \DateTime(); + + try { + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setWork(MessagePublisher::DELETE_RECORD_TYPE) + ->setDataboxId($payload['databoxId']) + ->setRecordId($payload['recordId']) + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) + ; + + $em->persist($workerRunningJob); + + $em->flush(); + + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + + $record = $this->findDataboxById($payload['databoxId'])->get_record($payload['recordId']); $record->delete(); + + + // tell that the delete is finished + if ($workerRunningJob != null) { + $workerRunningJob + ->setStatus(WorkerRunningJob::FINISHED) + ->setFinished(new \DateTime('now')) + ; + + $em->persist($workerRunningJob); + + $em->flush(); + } } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php index 80a56f3147..b25190a8cf 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php @@ -7,13 +7,16 @@ use Alchemy\Phrasea\Core\Event\ExportFailureEvent; use Alchemy\Phrasea\Core\PhraseaEvents; use Alchemy\Phrasea\Exception\InvalidArgumentException; use Alchemy\Phrasea\Model\Entities\Token; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\TokenRepository; use Alchemy\Phrasea\Model\Repositories\UserRepository; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\Notification\Emitter; use Alchemy\Phrasea\Notification\Mail\MailRecordsExport; use Alchemy\Phrasea\Notification\Receiver; use Alchemy\Phrasea\WorkerManager\Event\ExportMailFailureEvent; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; +use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; class ExportMailWorker implements WorkerInterface { @@ -21,6 +24,9 @@ class ExportMailWorker implements WorkerInterface private $app; + /** @var WorkerRunningJobRepository $repoWorkerJob */ + private $repoWorkerJob; + public function __construct(Application $app) { $this->app = $app; @@ -28,6 +34,28 @@ class ExportMailWorker implements WorkerInterface public function process(array $payload) { + $this->repoWorkerJob = $this->getWorkerRunningJobRepository(); + $em = $this->repoWorkerJob->getEntityManager(); + $em->beginTransaction(); + $date = new \DateTime(); + + try { + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setWork(MessagePublisher::EXPORT_MAIL_TYPE) + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) + ; + + $em->persist($workerRunningJob); + + $em->flush(); + + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + $destMails = unserialize($payload['destinationMails']); $params = unserialize($payload['params']); @@ -54,6 +82,7 @@ class ExportMailWorker implements WorkerInterface ); $remaingEmails = $destMails; + $deliverEmails = []; $emitter = new Emitter($user->getDisplayName(), $user->getEmail()); @@ -69,6 +98,7 @@ class ExportMailWorker implements WorkerInterface $mail->setExpiration($token->getExpiration()); $this->deliver($mail, $params['reading_confirm']); + $deliverEmails[] = $mail; unset($remaingEmails[$key]); } @@ -98,5 +128,25 @@ class ExportMailWorker implements WorkerInterface } } + if ($workerRunningJob != null) { + $workerRunningJob + ->setWorkOn(implode(',', $deliverEmails)) + ->setStatus(WorkerRunningJob::FINISHED) + ->setFinished(new \DateTime('now')) + ; + + $em->persist($workerRunningJob); + + $em->flush(); + } + + } + + /** + * @return WorkerRunningJobRepository + */ + private function getWorkerRunningJobRepository() + { + return $this->app['repo.worker-running-job']; } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php index 0bda1aed37..6d54a7dc3d 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/PopulateIndexWorker.php @@ -4,8 +4,8 @@ namespace Alchemy\Phrasea\WorkerManager\Worker; use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware; use Alchemy\Phrasea\Application\Helper\DispatcherAware; -use Alchemy\Phrasea\Model\Entities\WorkerRunningPopulate; -use Alchemy\Phrasea\Model\Repositories\WorkerRunningPopulateRepository; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer; use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexFailureEvent; @@ -23,34 +23,33 @@ class PopulateIndexWorker implements WorkerInterface /** @var Indexer $indexer */ private $indexer; - /** @var WorkerRunningPopulateRepository $repoWorkerPopulate*/ - private $repoWorkerPopulate; + /** @var WorkerRunningJobRepository $repoWorker*/ + private $repoWorker; - public function __construct(MessagePublisher $messagePublisher, Indexer $indexer, WorkerRunningPopulateRepository $repoWorkerPopulate) + public function __construct(MessagePublisher $messagePublisher, Indexer $indexer, WorkerRunningJobRepository $repoWorker) { $this->indexer = $indexer; $this->messagePublisher = $messagePublisher; - $this->repoWorkerPopulate = $repoWorkerPopulate; + $this->repoWorker = $repoWorker; } public function process(array $payload) { - $em = $this->repoWorkerPopulate->getEntityManager(); + $em = $this->repoWorker->getEntityManager(); $em->beginTransaction(); $date = new \DateTime(); try { - $workerRunningPopulate = new WorkerRunningPopulate(); - $workerRunningPopulate - ->setHost($payload['host']) - ->setPort($payload['port']) - ->setIndexName($payload['indexName']) + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setWork(MessagePublisher::POPULATE_INDEX_TYPE) + ->setWorkOn($payload['indexName']) ->setDataboxId($payload['databoxId']) ->setPublished($date->setTimestamp($payload['published'])) - ->setStatus(WorkerRunningPopulate::RUNNING) + ->setStatus(WorkerRunningJob::RUNNING) ; - $em->persist($workerRunningPopulate); + $em->persist($workerRunningJob); $em->flush(); @@ -98,9 +97,9 @@ class PopulateIndexWorker implements WorkerInterface $r['memory']/1048576 )); } catch(\Exception $e) { - if ($workerRunningPopulate != null) { + if ($workerRunningJob != null) { - $em->remove($workerRunningPopulate); + $em->remove($workerRunningJob); $em->flush(); } @@ -123,13 +122,13 @@ class PopulateIndexWorker implements WorkerInterface } // tell that the populate is finished - if ($workerRunningPopulate != null) { - $workerRunningPopulate - ->setStatus(WorkerRunningPopulate::FINISHED) + if ($workerRunningJob != null) { + $workerRunningJob + ->setStatus(WorkerRunningJob::FINISHED) ->setFinished(new \DateTime('now')) ; - $em->persist($workerRunningPopulate); + $em->persist($workerRunningJob); $em->flush(); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php index 6090ff733d..1d4ab8f6ae 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php @@ -3,8 +3,8 @@ namespace Alchemy\Phrasea\WorkerManager\Worker; use Alchemy\Phrasea\Core\Configuration\PropertyAccess; -use Alchemy\Phrasea\Model\Entities\WorkerRunningUploader; -use Alchemy\Phrasea\Model\Repositories\WorkerRunningUploaderRepository; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; use GuzzleHttp\Client; @@ -13,14 +13,14 @@ class PullAssetsWorker implements WorkerInterface private $messagePublisher; private $conf; - /** @var WorkerRunningUploaderRepository $repoWorkerUploader */ - private $repoWorkerUploader; + /** @var WorkerRunningJobRepository $repoWorkerJob */ + private $repoWorkerJob; - public function __construct(MessagePublisher $messagePublisher, PropertyAccess $conf, WorkerRunningUploaderRepository $repoWorkerUploader) + public function __construct(MessagePublisher $messagePublisher, PropertyAccess $conf, WorkerRunningJobRepository $repoWorkerJob) { $this->messagePublisher = $messagePublisher; $this->conf = $conf; - $this->repoWorkerUploader = $repoWorkerUploader; + $this->repoWorkerJob = $repoWorkerJob; } public function process(array $payload) @@ -81,7 +81,7 @@ class PullAssetsWorker implements WorkerInterface 'commit_id' => $commit['id'], 'token' => $commit['token'], 'base_url' => $baseUrl, - 'type' => WorkerRunningUploader::TYPE_PULL + 'type' => WorkerRunningJob::TYPE_PULL ] ]; @@ -148,7 +148,7 @@ class PullAssetsWorker implements WorkerInterface */ private function isCommitToBeCreating($commitId) { - $res = $this->repoWorkerUploader->findBy(['commitId' => $commitId]); + $res = $this->repoWorkerJob->findBy(['commitId' => $commitId, 'status' => WorkerRunningJob::RUNNING]); return count($res) != 0; } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php index 1bf324943f..9a6b89e9f5 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php @@ -54,7 +54,7 @@ class SubdefCreationWorker implements WorkerInterface public function process(array $payload) { - if(isset($payload['recordId']) && isset($payload['databoxId'])) { + if (isset($payload['recordId']) && isset($payload['databoxId'])) { $recordId = $payload['recordId']; $databoxId = $payload['databoxId']; $wantedSubdef = [$payload['subdefName']]; @@ -90,7 +90,7 @@ class SubdefCreationWorker implements WorkerInterface $workerRunningJob ->setDataboxId($databoxId) ->setRecordId($recordId) - ->setWork(PhraseaTokens::MAKE_SUBDEF) + ->setWork(MessagePublisher::SUBDEF_CREATION_TYPE) ->setWorkOn($payload['subdefName']) ->setPublished($date->setTimestamp($payload['published'])) ->setStatus(WorkerRunningJob::RUNNING) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php index 13700e77fb..4ddc20480b 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php @@ -8,6 +8,8 @@ use Alchemy\Phrasea\Core\Version; use Alchemy\Phrasea\Model\Entities\ApiApplication; use Alchemy\Phrasea\Model\Entities\WebhookEvent; use Alchemy\Phrasea\Model\Entities\WebhookEventDelivery; +use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; +use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\Webhook\Processor\ProcessorInterface; use Alchemy\Phrasea\WorkerManager\Event\WebhookDeliverFailureEvent; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; @@ -31,6 +33,9 @@ class WebhookWorker implements WorkerInterface /** @var MessagePublisher $messagePublisher */ private $messagePublisher; + /** @var WorkerRunningJobRepository $repoWorkerJob */ + private $repoWorkerJob; + public function __construct(Application $app) { $this->app = $app; @@ -43,6 +48,29 @@ class WebhookWorker implements WorkerInterface public function process(array $payload) { if (isset($payload['id'])) { + $this->repoWorkerJob = $this->getWorkerRunningJobRepository(); + $em = $this->repoWorkerJob->getEntityManager(); + $em->beginTransaction(); + $date = new \DateTime(); + + try { + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setWork(MessagePublisher::WEBHOOK_TYPE) + ->setWorkOn('WebhookEventId: '. $payload['id']) + ->setPublished($date->setTimestamp($payload['published'])) + ->setStatus(WorkerRunningJob::RUNNING) + ; + + $em->persist($workerRunningJob); + + $em->flush(); + + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + $webhookEventId = $payload['id']; $app = $this->app; @@ -58,7 +86,6 @@ class WebhookWorker implements WorkerInterface // Set callback which logs success or failure $subscriber = new CallbackBackoffStrategy(function ($retries, Request $request, $response, $e) use ($app, $webhookEventId, $payload) { - $retry = true; if ($response && (null !== $deliverId = parse_url($request->getUrl(), PHP_URL_FRAGMENT))) { /** @var WebhookEventDelivery $delivery */ $delivery = $app['repo.webhook-delivery']->find($deliverId); @@ -74,7 +101,6 @@ class WebhookWorker implements WorkerInterface $delivery->getThirdPartyApplication()->getName() ); - $retry = false; } else { $app['manipulator.webhook-delivery']->deliveryFailure($delivery); @@ -95,8 +121,6 @@ class WebhookWorker implements WorkerInterface } $app['alchemy_worker.message.publisher']->pushLog($logEntry, $logType, $logContext); - - return $retry; } }, true, new CurlBackoffStrategy()); @@ -119,6 +143,17 @@ class WebhookWorker implements WorkerInterface // send requests $this->deliverEvent($httpClient, $thirdPartyApplications, $webhookevent, $payload); } + + if ($workerRunningJob != null) { + $workerRunningJob + ->setStatus(WorkerRunningJob::FINISHED) + ->setFinished(new \DateTime('now')) + ; + + $em->persist($workerRunningJob); + + $em->flush(); + } } } @@ -203,4 +238,12 @@ class WebhookWorker implements WorkerInterface { return sprintf('%s#%s', $application->getWebhookUrl(), $delivery->getId()); } + + /** + * @return WorkerRunningJobRepository + */ + private function getWorkerRunningJobRepository() + { + return $this->app['repo.worker-running-job']; + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php index 8ec819cb77..b18be17100 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -63,9 +63,6 @@ class WriteMetadatasWorker implements WorkerInterface $clearDoc = isset($payload['clearDoc']) ? $payload['clearDoc'] : false; $databox = $this->findDataboxById($databoxId); - - $param = ($payload['subdefName'] == "document") ? PhraseaTokens::WRITE_META_DOC : PhraseaTokens::WRITE_META_SUBDEF; - // check if there is a make subdef running for the record or the same task running $canWriteMeta = $this->repoWorker->canWriteMetadata($payload['subdefName'], $recordId, $databoxId); @@ -91,7 +88,7 @@ class WriteMetadatasWorker implements WorkerInterface $workerRunningJob ->setDataboxId($databoxId) ->setRecordId($recordId) - ->setWork($param) + ->setWork(MessagePublisher::WRITE_METADATAS_TYPE) ->setWorkOn($payload['subdefName']) ->setPublished($date->setTimestamp($payload['published'])) ->setStatus(WorkerRunningJob::RUNNING) diff --git a/templates/web/admin/worker-manager/worker_info.html.twig b/templates/web/admin/worker-manager/worker_info.html.twig index d516f6bacb..5905124782 100644 --- a/templates/web/admin/worker-manager/worker_info.html.twig +++ b/templates/web/admin/worker-manager/worker_info.html.twig @@ -57,9 +57,13 @@ {% for workerRow in workerRunningJob | sort | reverse %} - {{ workerRow.databoxId | sbas_labels(app) }} + + {% if workerRow.databoxId %} + {{ workerRow.databoxId | sbas_labels(app) }} + {% endif %} + {{ workerRow.recordId }} - {{ workerRow.getWorkName }} + {{ workerRow.work }} {{ workerRow.workOn }} {{ workerRow.published|date('Y-m-d H:i:s') }} {{ workerRow.created|date('Y-m-d H:i:s') }}