diff --git a/bin/console b/bin/console index c49fc89052..744c7c7959 100755 --- a/bin/console +++ b/bin/console @@ -58,6 +58,9 @@ use Alchemy\Phrasea\Command\User\UserPasswordCommand; use Alchemy\Phrasea\Command\User\UserListCommand; use Alchemy\Phrasea\Command\UpgradeDBDatas; use Alchemy\Phrasea\Command\ApplyRightsCommand; +use Alchemy\Phrasea\WorkerManager\Command\WorkerExecuteCommand; +use Alchemy\Phrasea\WorkerManager\Command\WorkerRunServiceCommand; +use Alchemy\Phrasea\WorkerManager\Command\WorkerShowConfigCommand; require_once __DIR__ . '/../lib/autoload.php'; @@ -162,9 +165,9 @@ $cli->command(new QueryParseCommand()); $cli->command(new QuerySampleCommand()); $cli->command(new FindConceptsCommand()); -//$cli->command($cli['alchemy_worker.commands.run_dispatcher_command']); -//$cli->command($cli['alchemy_worker.commands.run_worker_command']); -//$cli->command($cli['alchemy_worker.commands.show_configuration']); +$cli->command(new WorkerExecuteCommand()); +$cli->command(new WorkerRunServiceCommand()); +$cli->command(new WorkerShowConfigCommand()); $cli->loadPlugins(); diff --git a/composer.json b/composer.json index 0fd22effb0..6ca0dfb982 100644 --- a/composer.json +++ b/composer.json @@ -133,7 +133,8 @@ "facebook/graph-sdk": "^5.6", "box/spout": "^2.7", "paragonie/random-lib": "^2.0", - "czproject/git-php": "^3.17" + "czproject/git-php": "^3.17", + "php-amqplib/php-amqplib": "2.9" }, "require-dev": { "mikey179/vfsstream": "~1.5", diff --git a/composer.lock b/composer.lock index 112a4ecd0f..eee43380df 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "008ff0b5d3d13b4f0ce5d34348ded83a", + "content-hash": "1fda2bd48bdb1ad3a2cf0bfca723d775", "packages": [ { "name": "alchemy-fr/tcpdf-clone", @@ -5145,6 +5145,80 @@ ], "time": "2019-03-20T17:19:05+00:00" }, + { + "name": "php-amqplib/php-amqplib", + "version": "v2.9.0", + "source": { + "type": "git", + "url": "https://github.com/php-amqplib/php-amqplib.git", + "reference": "08d105d6b69ff4d8fa4fe090c49afdc568a5665b" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/08d105d6b69ff4d8fa4fe090c49afdc568a5665b", + "reference": "08d105d6b69ff4d8fa4fe090c49afdc568a5665b", + "shasum": "" + }, + "require": { + "ext-bcmath": "*", + "ext-sockets": "*", + "php": ">=5.4.0" + }, + "replace": { + "videlalvaro/php-amqplib": "self.version" + }, + "require-dev": { + "ext-curl": "*", + "nategood/httpful": "^0.2.20", + "phpdocumentor/phpdocumentor": "^2.9", + "phpunit/phpunit": "^4.8", + "squizlabs/php_codesniffer": "^2.5" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.8-dev" + } + }, + "autoload": { + "psr-4": { + "PhpAmqpLib\\": "PhpAmqpLib/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "LGPL-2.1-or-later" + ], + "authors": [ + { + "name": "Alvaro Videla", + "role": "Original Maintainer" + }, + { + "name": "John Kelly", + "email": "johnmkelly86@gmail.com", + "role": "Maintainer" + }, + { + "name": "Raúl Araya", + "email": "nubeiro@gmail.com", + "role": "Maintainer" + }, + { + "name": "Luke Bakken", + "email": "luke@bakken.io", + "role": "Maintainer" + } + ], + "description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.", + "homepage": "https://github.com/php-amqplib/php-amqplib/", + "keywords": [ + "message", + "queue", + "rabbitmq" + ], + "time": "2019-03-22T16:02:15+00:00" + }, { "name": "php-ffmpeg/php-ffmpeg", "version": "v0.15", diff --git a/lib/Alchemy/Phrasea/Application.php b/lib/Alchemy/Phrasea/Application.php index 13c7728f86..95e4c19605 100644 --- a/lib/Alchemy/Phrasea/Application.php +++ b/lib/Alchemy/Phrasea/Application.php @@ -87,6 +87,8 @@ use Alchemy\Phrasea\Media\MediaAccessorResolver; use Alchemy\Phrasea\Media\PermalinkMediaResolver; use Alchemy\Phrasea\Media\TechnicalDataServiceProvider; use Alchemy\Phrasea\Model\Entities\User; +use Alchemy\Phrasea\WorkerManager\Provider\AlchemyWorkerServiceProvider; +use Alchemy\Phrasea\WorkerManager\Provider\QueueWorkerServiceProvider; use Alchemy\QueueProvider\QueueServiceProvider; use Alchemy\WorkerProvider\WorkerServiceProvider; use Doctrine\DBAL\Event\ConnectionEventArgs; @@ -268,6 +270,9 @@ class Application extends SilexApplication $this->register(new OrderServiceProvider()); $this->register(new WebhookServiceProvider()); + $this->register(new QueueWorkerServiceProvider()); + $this->register(new AlchemyWorkerServiceProvider()); + $this['monolog'] = $this->share( $this->extend('monolog', function (LoggerInterface $logger, Application $app) { diff --git a/lib/Alchemy/Phrasea/Application/RouteLoader.php b/lib/Alchemy/Phrasea/Application/RouteLoader.php index 2542ff25d1..fc1b25e042 100644 --- a/lib/Alchemy/Phrasea/Application/RouteLoader.php +++ b/lib/Alchemy/Phrasea/Application/RouteLoader.php @@ -6,6 +6,7 @@ use Alchemy\EmbedProvider\EmbedServiceProvider; use Alchemy\Phrasea\Application; use Alchemy\Phrasea\ControllerProvider as Providers; use Alchemy\Phrasea\Report\ControllerProvider\ProdReportControllerProvider; +use Alchemy\Phrasea\WorkerManager\Provider\ControllerServiceProvider as WorkerManagerProvider; use Assert\Assertion; use Silex\ControllerProviderInterface; @@ -28,6 +29,7 @@ class RouteLoader '/admin/setup' => Providers\Admin\Setup::class, '/admin/subdefs' => Providers\Admin\Subdefs::class, '/admin/task-manager' => Providers\Admin\TaskManager::class, + '/admin/worker-manager' => WorkerManagerProvider::class, '/admin/users' => Providers\Admin\Users::class, '/client/' => Providers\Client\Root::class, '/datafiles' => Providers\Datafiles::class, diff --git a/lib/Alchemy/Phrasea/Controller/Admin/RootController.php b/lib/Alchemy/Phrasea/Controller/Admin/RootController.php index 1d46c27b15..295633f4dc 100644 --- a/lib/Alchemy/Phrasea/Controller/Admin/RootController.php +++ b/lib/Alchemy/Phrasea/Controller/Admin/RootController.php @@ -368,6 +368,7 @@ class RootController extends Controller 'collection', 'user', 'users', + 'workermanager' ]; $feature = 'connected'; diff --git a/lib/Alchemy/Phrasea/Controller/Api/V1Controller.php b/lib/Alchemy/Phrasea/Controller/Api/V1Controller.php index 42339169d6..302b556223 100644 --- a/lib/Alchemy/Phrasea/Controller/Api/V1Controller.php +++ b/lib/Alchemy/Phrasea/Controller/Api/V1Controller.php @@ -87,6 +87,8 @@ use Alchemy\Phrasea\SearchEngine\SearchEngineResult; use Alchemy\Phrasea\Status\StatusStructure; use Alchemy\Phrasea\TaskManager\LiveInformation; use Alchemy\Phrasea\Utilities\NullableDateTime; +use Alchemy\Phrasea\WorkerManager\Event\AssetsCreateEvent; +use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; use Doctrine\ORM\EntityManager; use Guzzle\Http\Client as Guzzle; use League\Fractal\Resource\Item; @@ -217,6 +219,30 @@ class V1Controller extends Controller return $this->showTaskAction($request, $task); } + /** + * Use with the uploader service + * @param Request $request + * @return Response + */ + public function sendAssetsInQueue(Request $request) + { + $jsonBodyHelper = $this->getJsonBodyHelper(); + $schema = $this->app['json-schema.ref_resolver']->resolve($this->app['json-schema.base_uri']. 'assets_enqueue.json'); + $data = $request->getContent(); + + $errors = $jsonBodyHelper->validateJson(json_decode($data), $schema); + + if (count($errors) > 0) { + return Result::createError($request, 422, $errors[0])->createResponse(); + } + + $this->dispatch(WorkerEvents::ASSETS_CREATE, new AssetsCreateEvent(json_decode($data))); + + return Result::create($request, [ + "data" => json_decode($data), + ])->createResponse(); + } + private function getCacheInformation() { $caches = [ diff --git a/lib/Alchemy/Phrasea/ControllerProvider/Api/V1.php b/lib/Alchemy/Phrasea/ControllerProvider/Api/V1.php index d64689eca2..1b4d9bd1fe 100644 --- a/lib/Alchemy/Phrasea/ControllerProvider/Api/V1.php +++ b/lib/Alchemy/Phrasea/ControllerProvider/Api/V1.php @@ -284,6 +284,9 @@ class V1 extends Api implements ControllerProviderInterface, ServiceProviderInte $controllers->post('/accounts/unlock/{token}/', 'controller.api.v1:unlockAccount') ->before('controller.api.v1:ensureUserManagementRights'); + // the api route for the uploader service + $controllers->post('/upload/enqueue', 'controller.api.v1:sendAssetsInQueue'); + return $controllers; } } diff --git a/lib/Alchemy/Phrasea/ControllerProvider/ControllerProviderServiceProvider.php b/lib/Alchemy/Phrasea/ControllerProvider/ControllerProviderServiceProvider.php index 4cbec52624..39e59ed7fd 100644 --- a/lib/Alchemy/Phrasea/ControllerProvider/ControllerProviderServiceProvider.php +++ b/lib/Alchemy/Phrasea/ControllerProvider/ControllerProviderServiceProvider.php @@ -54,6 +54,7 @@ class ControllerProviderServiceProvider implements ServiceProviderInterface Admin\Setup::class => [], Admin\Subdefs::class => [], Admin\TaskManager::class => [], + \Alchemy\Phrasea\WorkerManager\Provider\ControllerServiceProvider::class => [], Admin\Users::class => [], Client\Root::class => [], Datafiles::class => [], diff --git a/lib/Alchemy/Phrasea/Core/Event/Subscriber/ExportSubscriber.php b/lib/Alchemy/Phrasea/Core/Event/Subscriber/ExportSubscriber.php index d3c05a64b6..85bc845be2 100644 --- a/lib/Alchemy/Phrasea/Core/Event/Subscriber/ExportSubscriber.php +++ b/lib/Alchemy/Phrasea/Core/Event/Subscriber/ExportSubscriber.php @@ -47,65 +47,10 @@ class ExportSubscriber extends AbstractNotificationSubscriber $this->app['event-manager']->notify($params['usr_id'], 'eventsmanager_notify_downloadmailfail', $datas, $mailed); } - public function onCreateExportMail(ExportMailEvent $event) - { - $destMails = $event->getDestinationMails(); - - $params = $event->getParams(); - - /** @var UserRepository $userRepository */ - $userRepository = $this->app['repo.users']; - - $user = $userRepository->find($event->getEmitterUserId()); - - /** @var TokenRepository $tokenRepository */ - $tokenRepository = $this->app['repo.tokens']; - - /** @var Token $token */ - $token = $tokenRepository->findValidToken($event->getTokenValue()); - - $list = unserialize($token->getData()); - - //zip documents - \set_export::build_zip( - $this->app, - $token, - $list, - $this->app['tmp.download.path'].'/'. $token->getValue() . '.zip' - ); - - $remaingEmails = $destMails; - - $emitter = new Emitter($user->getDisplayName(), $user->getEmail()); - - foreach ($destMails as $key => $mail) { - try { - $receiver = new Receiver(null, trim($mail)); - } catch (InvalidArgumentException $e) { - continue; - } - - $mail = MailRecordsExport::create($this->app, $receiver, $emitter, $params['textmail']); - $mail->setButtonUrl($params['url']); - $mail->setExpiration($token->getExpiration()); - - $this->deliver($mail, $params['reading_confirm']); - unset($remaingEmails[$key]); - } - - //some mails failed - if (count($remaingEmails) > 0) { - foreach ($remaingEmails as $mail) { - $this->app['dispatcher']->dispatch(PhraseaEvents::EXPORT_MAIL_FAILURE, new ExportFailureEvent($user, $params['ssttid'], $params['lst'], \eventsmanager_notify_downloadmailfail::MAIL_FAIL, $mail)); - } - } - } - public static function getSubscribedEvents() { return [ PhraseaEvents::EXPORT_MAIL_FAILURE => 'onMailExportFailure', - PhraseaEvents::EXPORT_MAIL_CREATE => 'onCreateExportMail', ]; } } diff --git a/lib/Alchemy/Phrasea/Core/Event/Subscriber/RecordEditSubscriber.php b/lib/Alchemy/Phrasea/Core/Event/Subscriber/RecordEditSubscriber.php index 8307749eb5..51a6f49af0 100644 --- a/lib/Alchemy/Phrasea/Core/Event/Subscriber/RecordEditSubscriber.php +++ b/lib/Alchemy/Phrasea/Core/Event/Subscriber/RecordEditSubscriber.php @@ -33,7 +33,6 @@ class RecordEditSubscriber implements EventSubscriberInterface RecordEvents::ROTATE => 'onRecordChange', RecordEvents::COLLECTION_CHANGED => 'onCollectionChanged', RecordEvents::SUBDEFINITION_CREATE => 'onSubdefinitionCreate', - RecordEvents::DELETE => 'onDelete', ); } @@ -59,12 +58,6 @@ class RecordEditSubscriber implements EventSubscriberInterface $recordAdapter->rebuild_subdefs(); } - public function onDelete(DeleteEvent $event) - { - $recordAdapter = $this->convertToRecordAdapter($event->getRecord()); - $recordAdapter->delete(); - } - public function onEdit(RecordEdit $event) { static $into = false; diff --git a/lib/Alchemy/Phrasea/Core/Provider/RepositoriesServiceProvider.php b/lib/Alchemy/Phrasea/Core/Provider/RepositoriesServiceProvider.php index b4d4e42fc7..f497b7fe58 100644 --- a/lib/Alchemy/Phrasea/Core/Provider/RepositoriesServiceProvider.php +++ b/lib/Alchemy/Phrasea/Core/Provider/RepositoriesServiceProvider.php @@ -147,6 +147,9 @@ class RepositoriesServiceProvider implements ServiceProviderInterface $app['repo.webhook-delivery'] = $app->share(function (PhraseaApplication $app) { return $app['orm.em']->getRepository('Phraseanet:WebhookEventDelivery'); }); + $app['repo.worker-running-job'] = $app->share(function (PhraseaApplication $app) { + return $app['orm.em']->getRepository('Phraseanet:WorkerRunningJob'); + }); $app['repo.databoxes'] = $app->share(function (PhraseaApplication $app) { $appbox = $app->getApplicationBox(); diff --git a/lib/Alchemy/Phrasea/Core/Version.php b/lib/Alchemy/Phrasea/Core/Version.php index 82a64c0955..007027470a 100644 --- a/lib/Alchemy/Phrasea/Core/Version.php +++ b/lib/Alchemy/Phrasea/Core/Version.php @@ -17,7 +17,7 @@ class Version * @var string */ - private $number = '4.1.0-alpha.26a'; + private $number = '4.1.0-alpha.27a'; /** * @var string diff --git a/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php new file mode 100644 index 0000000000..226e60142c --- /dev/null +++ b/lib/Alchemy/Phrasea/Model/Entities/WorkerRunningJob.php @@ -0,0 +1,133 @@ +id; + } + + /** + * @param $databoxId + * @return $this + */ + public function setDataboxId($databoxId) + { + $this->databoxId = $databoxId; + + return $this; + } + + /** + * @return mixed + */ + public function getDataboxId() + { + return $this->databoxId; + } + + + /** + * @param $recordId + * @return $this + */ + public function setRecordId($recordId) + { + $this->recordId = $recordId; + + return $this; + } + + /** + * @return mixed + */ + public function getRecordId() + { + return $this->recordId; + + } + + + /** + * @param $work + * @return $this + */ + public function setWork($work) + { + $this->work = $work; + + return $this; + } + + /** + * @return mixed + */ + public function getWork() + { + return $this->work; + } + + + /** + * @param $workOn + * @return $this + */ + public function setWorkOn($workOn) + { + $this->workOn = $workOn; + + return $this; + } + + /** + * @return mixed + */ + public function getWorkOn() + { + return $this->workOn; + } +} diff --git a/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php new file mode 100644 index 0000000000..03e159abfa --- /dev/null +++ b/lib/Alchemy/Phrasea/Model/Repositories/WorkerRunningJobRepository.php @@ -0,0 +1,72 @@ +createResultSetMappingBuilder('w'); + $rsm->addScalarResult('work_on','work_on'); + + $sql = 'SELECT work_on + FROM WorkerRunningJob + WHERE ((work & :write_meta) > 0 OR ((work & :make_subdef) > 0 AND work_on = :work_on) ) + AND record_id = :record_id + AND databox_id = :databox_id'; + + $query = $this->_em->createNativeQuery($sql, $rsm); + $query->setParameters([ + 'write_meta' => PhraseaTokens::WRITE_META, + 'make_subdef'=> PhraseaTokens::MAKE_SUBDEF, + 'work_on' => $subdefName, + 'record_id' => $recordId, + 'databox_id' => $databoxId + ] + ); + + return count($query->getResult()) == 0; + } + + /** + * return true if we can write meta + * + * @param $subdefName + * @param $recordId + * @param $databoxId + * @return bool + */ + public function canWriteMetadata($subdefName, $recordId, $databoxId) + { + $rsm = $this->createResultSetMappingBuilder('w'); + $rsm->addScalarResult('work_on','work_on'); + + $sql = 'SELECT work_on + FROM WorkerRunningJob + WHERE ((work & :make_subdef) > 0 OR ((work & :write_meta) > 0 AND work_on = :work_on) ) + AND record_id = :record_id + AND databox_id = :databox_id'; + + $query = $this->_em->createNativeQuery($sql, $rsm); + $query->setParameters([ + 'make_subdef'=> PhraseaTokens::MAKE_SUBDEF, + 'write_meta' => PhraseaTokens::WRITE_META, + 'work_on' => $subdefName, + 'record_id' => $recordId, + 'databox_id' => $databoxId + ] + ); + + return count($query->getResult()) == 0; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php new file mode 100644 index 0000000000..f25b6ff9c4 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php @@ -0,0 +1,83 @@ +setDescription('Listen queues define on configuration, launch corresponding service for execution') + ->addOption('preserve-payload', 'p', InputOption::VALUE_NONE, 'Preserve temporary payload file') + ->addOption('queue-name', '', InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'The name of queues to be consuming') + ->addOption('max-processes', 'm', InputOption::VALUE_REQUIRED, 'The max number of process allow to run (default 4) ') + ->addOption('MWG', '', InputOption::VALUE_NONE, 'Enable MWG metadata compatibility (use only for write metadata service)') + ->addOption('clear-metadatas', '', InputOption::VALUE_NONE, 'Delete metadatas from documents if not compliant with Database structure (use only for write metadata service)') + ->setHelp(''); + + return $this; + } + + protected function doExecute(InputInterface $input, OutputInterface $output) + { + $MWG = false; + $clearMetadatas = false; + + $argQueueName = $input->getOption('queue-name'); + $maxProcesses = intval($input->getOption('max-processes')); + + /** @var AMQPConnection $serverConnection */ + $serverConnection = $this->container['alchemy_worker.amqp.connection']; + + /** @var AMQPChannel $channel */ + $channel = $serverConnection->getChannel(); + + /** @var WorkerInvoker $workerInvoker */ + $workerInvoker = $this->container['alchemy_worker.worker_invoker']; + + if ($input->getOption('max-processes') != null && $maxProcesses == 0) { + $output->writeln('Invalid max-processes option.Need an integer'); + + return; + } elseif($maxProcesses) { + $workerInvoker->setMaxProcessPoolValue($maxProcesses); + } + + if ($input->getOption('MWG')) { + $MWG = true; + } + + if ($input->getOption('clear-metadatas')) { + $clearMetadatas = true; + } + + if ($input->getOption('preserve-payload')) { + $workerInvoker->preservePayloads(); + } + + /** @var MessageHandler $messageHandler */ + $messageHandler = $this->container['alchemy_worker.message.handler']; + $messageHandler->consume($serverConnection, $workerInvoker, $argQueueName, $maxProcesses); + + while (count($channel->callbacks)) { + $output->writeln("[*] Waiting for messages. To exit press CTRL+C"); + $channel->wait(); + } + + $serverConnection->connectionClose(); + } + +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerRunServiceCommand.php b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerRunServiceCommand.php new file mode 100644 index 0000000000..d757bc47be --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerRunServiceCommand.php @@ -0,0 +1,56 @@ +setDescription('Execute a service') + ->addArgument('type') + ->addArgument('body') + ->addOption('preserve-payload', 'p', InputOption::VALUE_NONE, 'Preserve temporary payload file'); + + return $this; + } + + protected function doExecute(InputInterface $input, OutputInterface $output) + { + /** @var WorkerResolverInterface $workerResolver */ + $workerResolver = $this->container['alchemy_worker.type_based_worker_resolver']; + + $type = $input->getArgument('type'); + $body = file_get_contents($input->getArgument('body')); + + if ($body === false) { + $output->writeln('Unable to read payload file'); + + return; + } + + $body = json_decode($body, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + $output->writeln('Invalid message body'); + + return; + } + + $worker = $workerResolver->getWorker($type, $body); + + $worker->process($body); + + if (! $input->getOption('preserve-payload')) { + unlink($input->getArgument('body')); + } + + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerShowConfigCommand.php b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerShowConfigCommand.php new file mode 100644 index 0000000000..98f13063de --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerShowConfigCommand.php @@ -0,0 +1,27 @@ +setDescription('Show queues configuration'); + } + + public function doExecute(InputInterface $input, OutputInterface $output) + { + $serverConfiguration = $this->container['conf']->get(['workers', 'queue', 'worker-queue']); + + $output->writeln(['', 'Configured server: ']); + + $output->writeln(['Rabbit Server : ' . Yaml::dump($serverConfiguration, 0), '']); + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Configuration/Config.php b/lib/Alchemy/Phrasea/WorkerManager/Configuration/Config.php new file mode 100644 index 0000000000..dfd876ad9c --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Configuration/Config.php @@ -0,0 +1,33 @@ +render('admin/worker-manager/index.html.twig'); + } + + /** + * @param PhraseaApplication $app + * @param Request $request + * @return mixed + */ + public function configurationAction(PhraseaApplication $app, Request $request) + { + $retryQueueConfig = $this->getRetryQueueConfiguration(); + + $form = $app->form(new WorkerConfigurationType(), $retryQueueConfig); + + $form->handleRequest($request); + + if ($form->isValid()) { + // save config in file + $app['conf']->set(['workers', 'retry_queue'], $form->getData()); + + $queues = array_intersect_key(AMQPConnection::$defaultQueues, $retryQueueConfig); + $retryQueuesToReset = array_intersect_key(AMQPConnection::$defaultRetryQueues, array_flip($queues)); + + /** @var AMQPConnection $serverConnection */ + $serverConnection = $this->app['alchemy_worker.amqp.connection']; + // change the queue TTL + $serverConnection->reinitializeQueue($retryQueuesToReset); + + return $app->redirectPath('worker_admin'); + } + + return $this->render('admin/worker-manager/worker_configuration.html.twig', [ + 'form' => $form->createView() + ]); + } + + public function searchengineAction(PhraseaApplication $app, Request $request) + { + $options = $this->getElasticsearchOptions(); + + $form = $app->form(new WorkerSearchengineType(), $options); + + $form->handleRequest($request); + + if ($form->isValid()) { + $populateInfo = $this->getData($form); + + $this->getDispatcher()->dispatch(WorkerEvents::POPULATE_INDEX, new PopulateIndexEvent($populateInfo)); + + return $app->redirectPath('worker_admin'); + } + + return $this->render('admin/worker-manager/worker_searchengine.html.twig', [ + 'form' => $form->createView() + ]); + } + + public function subviewAction(PhraseaApplication $app) + { + return $this->render('admin/worker-manager/worker_subview.html.twig', [ + ]); + } + + public function metadataAction(PhraseaApplication $app) + { + return $this->render('admin/worker-manager/worker_metadata.html.twig', [ + ]); + } + + public function populateStatusAction(PhraseaApplication $app, Request $request) + { + $databoxIds = $request->get('sbasIds'); + + return DBManipulator::checkPopulateIndexStatusByDataboxId($databoxIds); + } + + public function pullAssetsAction(PhraseaApplication $app, Request $request) + { + $pullAssetsConfig = $this->getPullAssetsConfiguration(); + $form = $app->form(new WorkerPullAssetsType(), $pullAssetsConfig); + + $form->handleRequest($request); + if ($form->isValid()) { + /** @var AMQPConnection $serverConnection */ + $serverConnection = $this->app['alchemy_worker.amqp.connection']; + $serverConnection->setQueue(MessagePublisher::PULL_QUEUE); + + // save new pull config + $app['conf']->set(['workers', 'pull_assets'], array_merge($pullAssetsConfig, $form->getData())); + + // reinitialize the pull queues + $serverConnection->reinitializeQueue([MessagePublisher::PULL_QUEUE]); + $this->app['alchemy_worker.message.publisher']->initializePullAssets(); + + return $app->redirectPath('worker_admin'); + } + + return $this->render('admin/worker-manager/worker_pull_assets.html.twig', [ + 'form' => $form->createView() + ]); + } + + /** + * @return EventDispatcherInterface + */ + private function getDispatcher() + { + return $this->app['dispatcher']; + } + + /** + * @return ElasticsearchOptions + */ + private function getElasticsearchOptions() + { + return $this->app['elasticsearch.options']; + } + + /** + * @param FormInterface $form + * @return array + */ + private function getData(FormInterface $form) + { + /** @var ElasticsearchOptions $options */ + $options = $form->getData(); + + $data['host'] = $options->getHost(); + $data['port'] = $options->getPort(); + $data['indexName'] = $options->getIndexName(); + $data['databoxIds'] = $form->getExtraData()['sbas']; + + return $data; + } + + private function getPullAssetsConfiguration() + { + return $this->app['conf']->get(['workers', 'pull_assets'], []); + } + + private function getRetryQueueConfiguration() + { + return $this->app['conf']->get(['workers', 'retry_queue'], []); + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Db/.gitkeep b/lib/Alchemy/Phrasea/WorkerManager/Db/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreateEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreateEvent.php new file mode 100644 index 0000000000..7cd5856374 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreateEvent.php @@ -0,0 +1,21 @@ +data = $data; + } + + public function getData() + { + return $this->data; + } + +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationFailureEvent.php new file mode 100644 index 0000000000..2e26d250e3 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationFailureEvent.php @@ -0,0 +1,34 @@ +payload = $payload; + $this->workerMessage = $workerMessage; + $this->count = $count; + } + + public function getPayload() + { + return $this->payload; + } + + public function getWorkerMessage() + { + return $this->workerMessage; + } + + public function getCount() + { + return $this->count; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php new file mode 100644 index 0000000000..8e76fc4394 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php @@ -0,0 +1,35 @@ +payload = $payload; + $this->workerMessage = $workerMessage; + $this->count = $count; + } + + public function getPayload() + { + return $this->payload; + } + + public function getWorkerMessage() + { + return $this->workerMessage; + } + + public function getCount() + { + return $this->count; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/ExportMailFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/ExportMailFailureEvent.php new file mode 100644 index 0000000000..30b03a011a --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/ExportMailFailureEvent.php @@ -0,0 +1,55 @@ +emitterUserId = $emitterUserId; + $this->tokenValue = $tokenValue; + $this->destinationMails = $destinationMails; + $this->params = $params; + $this->workerMessage = $workerMessage; + $this->count = $count; + } + + public function getEmitterUserId() + { + return $this->emitterUserId; + } + + public function getTokenValue() + { + return $this->tokenValue; + } + + public function getDestinationMails() + { + return $this->destinationMails; + } + + public function getParams() + { + return $this->params; + } + + public function getWorkerMessage() + { + return $this->workerMessage; + } + + public function getCount() + { + return $this->count; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexEvent.php new file mode 100644 index 0000000000..2287c9b5c5 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexEvent.php @@ -0,0 +1,25 @@ +data = $data; + } + + /** + * @return array + */ + public function getData() + { + return $this->data; + } + +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php new file mode 100644 index 0000000000..34c112e227 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php @@ -0,0 +1,55 @@ +host = $host; + $this->port = $port; + $this->indexName = $indexName; + $this->databoxId = $databoxId; + $this->workerMessage = $workerMessage; + $this->count = $count; + } + + public function getHost() + { + return $this->host; + } + + public function getPort() + { + return $this->port; + } + + public function getIndexName() + { + return $this->indexName; + } + + public function getDataboxId() + { + return $this->databoxId; + } + + public function getWorkerMessage() + { + return $this->workerMessage; + } + + public function getCount() + { + return $this->count; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/StoryCreateCoverEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/StoryCreateCoverEvent.php new file mode 100644 index 0000000000..965f22eec6 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/StoryCreateCoverEvent.php @@ -0,0 +1,21 @@ +data = $data; + } + + public function getData() + { + return $this->data; + } + +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/SubdefinitionCreationFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/SubdefinitionCreationFailureEvent.php new file mode 100644 index 0000000000..1628c76bb1 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/SubdefinitionCreationFailureEvent.php @@ -0,0 +1,37 @@ +subdefName = $subdefName; + $this->workerMessage = $workerMessage; + $this->count = $count; + } + + public function getSubdefName() + { + return $this->subdefName; + } + + public function getWorkerMessage() + { + return $this->workerMessage; + } + + public function getCount() + { + return $this->count; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/SubdefinitionWritemetaEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/SubdefinitionWritemetaEvent.php new file mode 100644 index 0000000000..6949f2a54b --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/SubdefinitionWritemetaEvent.php @@ -0,0 +1,47 @@ +subdefName = $subdefName; + $this->status = $status; + $this->workerMessage = $workerMessage; + $this->count = $count; + } + + public function getSubdefName() + { + return $this->subdefName; + } + + public function getStatus() + { + return $this->status; + } + + public function getWorkerMessage() + { + return $this->workerMessage; + } + + public function getCount() + { + return $this->count; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/WebhookDeliverFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/WebhookDeliverFailureEvent.php new file mode 100644 index 0000000000..f31790cad3 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/WebhookDeliverFailureEvent.php @@ -0,0 +1,41 @@ +webhookEventId = $webhookEventId; + $this->workerMessage = $workerMessage; + $this->count = $count; + $this->deleveryId = $deleveryId; + } + + public function getWebhookEventId() + { + return $this->webhookEventId; + } + + public function getWorkerMessage() + { + return $this->workerMessage; + } + + public function getCount() + { + return $this->count; + } + + public function getDeleveryId() + { + return $this->deleveryId; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/WorkerEvents.php b/lib/Alchemy/Phrasea/WorkerManager/Event/WorkerEvents.php new file mode 100644 index 0000000000..d195369a4a --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/WorkerEvents.php @@ -0,0 +1,22 @@ +add(MessagePublisher::ASSETS_INGEST_TYPE, 'text', [ + 'label' => 'Ingest retry delay in ms' + ]) + ->add(MessagePublisher::CREATE_RECORD_TYPE, 'text', [ + 'label' => 'Create record retry delay in ms' + ]) + ->add(MessagePublisher::SUBDEF_CREATION_TYPE, 'text', [ + 'label' => 'Subdefinition retry delay in ms' + ]) + ->add(MessagePublisher::WRITE_METADATAS_TYPE, 'text', [ + 'label' => 'Metadatas retry delay in ms' + ]) + ->add(MessagePublisher::WEBHOOK_TYPE, 'text', [ + 'label' => 'Webhook retry delay in ms' + ]) + ->add(MessagePublisher::EXPORT_MAIL_TYPE, 'text', [ + 'label' => 'Export mail retry delay in ms' + ]) + ->add(MessagePublisher::POPULATE_INDEX_TYPE, 'text', [ + 'label' => 'Populate Index retry delay in ms' + ]) + ; + } + + public function getName() + { + return 'worker_configuration'; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerPullAssetsType.php b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerPullAssetsType.php new file mode 100644 index 0000000000..1a7ccf69c3 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerPullAssetsType.php @@ -0,0 +1,37 @@ +add('endpointCommit', 'text', [ + 'label' => 'Endpoint get commit' + ]) + ->add('endpointToken', 'text', [ + 'label' => 'Endpoint get token' + ]) + ->add('clientSecret', 'text', [ + 'label' => 'Client secret' + ]) + ->add('clientId', 'text', [ + 'label' => 'Client ID' + ]) + ->add('pullInterval', 'text', [ + 'label' => 'Fetching interval in second' + ]) + ; + } + + public function getName() + { + return 'worker_pullAssets'; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerSearchengineType.php b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerSearchengineType.php new file mode 100644 index 0000000000..a3c732c5b1 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerSearchengineType.php @@ -0,0 +1,48 @@ +add('host', 'text', [ + 'label' => 'Elasticsearch server host', + 'constraints' => new NotBlank(), + ]) + ->add('port', 'integer', [ + 'label' => 'Elasticsearch service port', + 'constraints' => [ + new Range(['min' => 1, 'max' => 65535]), + new NotBlank() + ] + ]) + ->add('indexName', 'text', [ + 'label' => 'Elasticsearch index name', + 'constraints' => new NotBlank(), + 'attr' =>['data-class'=>'inline'] + ]) + ; + } + + public function setDefaultOptions(OptionsResolverInterface $resolver) + { + $resolver->setDefaults([ + 'allow_extra_fields' => true + ]); + } + + public function getName() + { + return 'worker_searchengine'; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Model/DBManipulator.php b/lib/Alchemy/Phrasea/WorkerManager/Model/DBManipulator.php new file mode 100644 index 0000000000..e4c4d62eb2 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Model/DBManipulator.php @@ -0,0 +1,175 @@ +beginTransaction(); + + try { + $pdo->query("CREATE TABLE IF NOT EXISTS populate_running(host TEXT NOT NULL, port TEXT NOT NULL, index_name TEXT NOT NULL, databox_id TEXT NOT NULL);"); + + $stmt = $pdo->prepare("INSERT INTO populate_running(host, port, index_name, databox_id) VALUES(:host, :port, :index_name, :databox_id)"); + + $stmt->execute([ + ':host' => $params['host'], + ':port' => $params['port'], + ':index_name' => $params['indexName'], + ':databox_id' => $params['databoxId'] + ]); + + $pdo->commit(); + } catch (\Exception $e) { + $pdo->rollBack(); + } + } + + /** + * @param array $params = [host, port, indexName, databoxId] + */ + public static function deletePopulateStatus(array $params) + { + $pdo = Config::getWorkerSqliteConnection(); + + $pdo->beginTransaction(); + + try { + $stmt = $pdo->prepare("DELETE FROM populate_running WHERE host = :host AND port= :port AND index_name= :index_name AND databox_id= :databox_id"); + + $stmt->execute([ + ':host' => $params['host'], + ':port' => $params['port'], + ':index_name' => $params['indexName'], + ':databox_id' => $params['databoxId'] + ]); + + $pdo->commit(); + } catch (\Exception $e) { + $pdo->rollBack(); + } + } + + /** + * Update commits table in the temporary sqlite worker.db + * @param $commitId + * @param $assetId + * @return int the number of the remaining assets in the commit + */ + public static function updateRemainingAssetsListByCommit($commitId, $assetId) + { + $row = 1; + $pdo = Config::getWorkerSqliteConnection(); + $pdo->beginTransaction(); + + try { + // remove assetId from the assets list + $stmt = $pdo->prepare("DELETE FROM commits WHERE commit_id = :commit_id AND asset= :assetId"); + $stmt->execute([ + ':commit_id' => $commitId, + ':assetId' => $assetId + ]); + + $stmt = $pdo->prepare("SELECT * FROM commits WHERE commit_id = :commit_id"); + $stmt->execute([ + ':commit_id' => $commitId, + ]); + + $row = $stmt->fetchAll(\PDO::FETCH_ASSOC); + $pdo->commit(); + + } catch (\Exception $e) { + $pdo->rollBack(); + } + + return count($row); + } + + /** + * @param $commitId + * @return bool + */ + public static function isCommitToBeCreating($commitId) + { + $pdo = Config::getWorkerSqliteConnection(); + + $pdo->beginTransaction(); + $row = 0; + try { + $pdo->query("CREATE TABLE IF NOT EXISTS commits(commit_id TEXT NOT NULL, asset TEXT NOT NULL);"); + + $stmt = $pdo->prepare("SELECT * FROM commits WHERE commit_id = :commit_id"); + $stmt->execute([ + ':commit_id' => $commitId, + ]); + + $row = $stmt->fetchAll(\PDO::FETCH_ASSOC); + $pdo->commit(); + } catch (\Exception $e) { + //no-op + } + + return count($row) ? true : false; + } + + public static function saveAssetsList($commitId, $assetIds) + { + $pdo = Config::getWorkerSqliteConnection(); + + $pdo->beginTransaction(); + + try { + $pdo->query("CREATE TABLE IF NOT EXISTS commits(commit_id TEXT NOT NULL, asset TEXT NOT NULL);"); + + // insert all assets ID in the temporary sqlite database + foreach ($assetIds as $assetId) { + $stmt = $pdo->prepare("INSERT INTO commits(commit_id, asset) VALUES(:commit_id, :asset)"); + + $stmt->execute([ + ':commit_id' => $commitId, + ':asset' => $assetId + ]); + } + + $pdo->commit(); + } catch (\Exception $e) { + $pdo->rollBack(); + } + } + + /** + * @param array $databoxIds + * @return int + */ + public static function checkPopulateIndexStatusByDataboxId(array $databoxIds) + { + $pdo = Config::getWorkerSqliteConnection(); + $in = str_repeat("?,", count($databoxIds)-1) . "?"; + + $pdo->beginTransaction(); + + try { + $pdo->query("CREATE TABLE IF NOT EXISTS populate_running(host TEXT NOT NULL, port TEXT NOT NULL, index_name TEXT NOT NULL, databox_id TEXT NOT NULL);"); + + $stmt = $pdo->prepare("SELECT * FROM populate_running WHERE databox_id IN ($in)"); + + $stmt->execute($databoxIds); + + $row = $stmt->fetchAll(\PDO::FETCH_ASSOC); + $pdo->commit(); + + } catch (\Exception $e) { + $pdo->rollBack(); + } + + return count($row); + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php new file mode 100644 index 0000000000..fe9e62c5d4 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/AlchemyWorkerServiceProvider.php @@ -0,0 +1,148 @@ +share(function () { + return new TypeBasedWorkerResolver(); + }); + + $app['alchemy_worker.logger'] = $app->share(function (Application $app) { + $logger = new $app['monolog.logger.class']('alchemy-service logger'); + $logger->pushHandler(new RotatingFileHandler( + $app['log.path'] . DIRECTORY_SEPARATOR . 'worker_service.log', + 10, + Logger::INFO + )); + + return $logger; + }); + + // use the console logger + $loggerSetter = function (LoggerAwareInterface $loggerAware) use ($app) { + if (isset($app['logger'])) { + $loggerAware->setLogger($app['logger']); + } + + return $loggerAware; + }; + + $app['alchemy_worker.process_pool'] = $app->share(function (Application $app) use ($loggerSetter) { + return $loggerSetter(new ProcessPool()); + }); + + $app['alchemy_worker.worker_invoker'] = $app->share(function (Application $app) use ($loggerSetter) { + return $loggerSetter(new WorkerInvoker($app['alchemy_worker.process_pool'])); + }); + + + // register workers + $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::SUBDEF_CREATION_TYPE, new CallableWorkerFactory(function () use ($app) { + return (new SubdefCreationWorker( + $app['subdef.generator'], + $app['alchemy_worker.message.publisher'], + $app['alchemy_worker.logger'], + $app['dispatcher'], + $app['phraseanet.filesystem'], + $app['repo.worker-running-job'] + )) + ->setApplicationBox($app['phraseanet.appbox']) + ->setEntityManagerLocator(new LazyLocator($app, 'orm.em')) + ; + })); + + $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::WRITE_METADATAS_TYPE, new CallableWorkerFactory(function () use ($app) { + return (new WriteMetadatasWorker( + $app['exiftool.writer'], + $app['alchemy_worker.logger'], + $app['alchemy_worker.message.publisher'], + $app['repo.worker-running-job'] + )) + ->setApplicationBox($app['phraseanet.appbox']) + ->setDispatcher($app['dispatcher']) + ->setEntityManagerLocator(new LazyLocator($app, 'orm.em')) + ; + })); + + $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::EXPORT_MAIL_TYPE, new CallableWorkerFactory(function () use ($app) { + return (new ExportMailWorker($app)) + ->setDelivererLocator(new LazyLocator($app, 'notification.deliverer')); + })); + + $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::ASSETS_INGEST_TYPE, new CallableWorkerFactory(function () use ($app) { + return (new AssetsIngestWorker($app)) + ->setEntityManagerLocator(new LazyLocator($app, 'orm.em')); + })); + + $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::WEBHOOK_TYPE, new CallableWorkerFactory(function () use ($app) { + return (new WebhookWorker($app)) + ->setDispatcher($app['dispatcher']); + })); + + $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::CREATE_RECORD_TYPE, new CallableWorkerFactory(function () use ($app) { + return (new CreateRecordWorker($app)) + ->setApplicationBox($app['phraseanet.appbox']) + ->setBorderManagerLocator(new LazyLocator($app, 'border-manager')) + ->setEntityManagerLocator(new LazyLocator($app, 'orm.em')) + ->setFileSystemLocator(new LazyLocator($app, 'filesystem')) + ->setTemporaryFileSystemLocator(new LazyLocator($app, 'temporary-filesystem')) + ->setDispatcher($app['dispatcher']); + })); + + $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::POPULATE_INDEX_TYPE, new CallableWorkerFactory(function () use ($app) { + return (new PopulateIndexWorker($app['alchemy_worker.message.publisher'], $app['elasticsearch.indexer'])) + ->setApplicationBox($app['phraseanet.appbox']) + ->setDispatcher($app['dispatcher']); + })); + + $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::PULL_ASSETS_TYPE, new CallableWorkerFactory(function () use ($app) { + return new PullAssetsWorker($app['alchemy_worker.message.publisher']); + })); + + $app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::DELETE_RECORD_TYPE, new CallableWorkerFactory(function () use ($app) { + return (new DeleteRecordWorker()) + ->setApplicationBox($app['phraseanet.appbox']); + })); + } + + /** + * {@inheritdoc} + */ + public function boot(Application $app) + { + } + + /** + * {@inheritdoc} + */ + public static function create(PhraseaApplication $app) + { + return new static(); + } + +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php new file mode 100644 index 0000000000..c7b247d73b --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php @@ -0,0 +1,103 @@ +share(function (PhraseaApplication $app) { + return new AdminConfigurationController($app); + }); + + // example of route to check webhook + $app->post('/webhook', array($this, 'getWebhookData')); + } + + /** + * {@inheritdoc} + */ + public function boot(Application $app) + { + } + + public function connect(Application $app) + { + $controllers = $this->createAuthenticatedCollection($app); + $firewall = $this->getFirewall($app); + + $controllers->before(function () use ($firewall) { + $firewall->requireRight(\ACL::TASKMANAGER); + }); + + $controllers->match('/', 'controller.worker.admin.configuration:indexAction') + ->method('GET') + ->bind('worker_admin'); + + $controllers->match('/configuration', 'controller.worker.admin.configuration:configurationAction') + ->method('GET|POST') + ->bind('worker_admin_configuration'); + + $controllers->match('/searchengine', 'controller.worker.admin.configuration:searchengineAction') + ->method('GET|POST') + ->bind('worker_admin_searchengine'); + + $controllers->match('/subview', 'controller.worker.admin.configuration:subviewAction') + ->method('GET|POST') + ->bind('worker_admin_subview'); + + $controllers->match('/metadata', 'controller.worker.admin.configuration:metadataAction') + ->method('GET|POST') + ->bind('worker_admin_metadata'); + + $controllers->get('/populate-status', 'controller.worker.admin.configuration:populateStatusAction') + ->bind('worker_admin_populate_status'); + + $controllers->match('/pull-assets', 'controller.worker.admin.configuration:pullAssetsAction') + ->method('GET|POST') + ->bind('worker_admin_pullAssets'); + + return $controllers; + } + + public function getWebhookData(Application $app, Request $request) + { + $messagePubliser = $this->getMessagePublisher($app); + $messagePubliser->pushLog("RECEIVED ON phraseanet WEBHOOK URL TEST = ". $request->getUri() . " DATA : ". $request->getContent()); + + return 0; + } + + /** + * @param Application $app + * @return Firewall + */ + private function getFirewall(Application $app) + { + return $app['firewall']; + } + + /** + * @param Application $app + * @return MessagePublisher + */ + private function getMessagePublisher(Application $app) + { + return $app['alchemy_worker.message.publisher']; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/QueueWorkerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/QueueWorkerServiceProvider.php new file mode 100644 index 0000000000..bc98560fed --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/QueueWorkerServiceProvider.php @@ -0,0 +1,95 @@ +share(function (Application $app) { + return new AMQPConnection($app['conf']); + }); + + $app['alchemy_worker.message.handler'] = $app->share(function (Application $app) { + return new MessageHandler($app['alchemy_worker.message.publisher']); + }); + + $app['alchemy_worker.message.publisher'] = $app->share(function (Application $app) { + return new MessagePublisher($app['alchemy_worker.amqp.connection'], $app['alchemy_worker.logger']); + }); + + $app['alchemy_worker.webhook.publisher'] = $app->share(function (Application $app) { + return new WebhookPublisher($app['alchemy_worker.message.publisher']); + }); + + $app['manipulator.webhook-event'] = $app->share(function (Application $app) { + return new WebhookEventManipulator( + $app['orm.em'], + $app['repo.webhook-event'], + $app['alchemy_worker.webhook.publisher'] + ); + }); + + $app['dispatcher'] = $app->share( + $app->extend('dispatcher', function (EventDispatcherInterface $dispatcher, Application $app) { + + $dispatcher->addSubscriber( + (new RecordSubscriber($app) + )->setApplicationBox($app['phraseanet.appbox']) + + ); + $dispatcher->addSubscriber(new ExportSubscriber($app['alchemy_worker.message.publisher'])); + $dispatcher->addSubscriber(new AssetsIngestSubscriber($app['alchemy_worker.message.publisher'])); + $dispatcher->addSubscriber(new SearchengineSubscriber($app['alchemy_worker.message.publisher'])); + $dispatcher->addSubscriber(new WebhookSubscriber($app['alchemy_worker.message.publisher'])); + + return $dispatcher; + }) + ); + + } + + /** + * {@inheritdoc} + */ + public function boot(Application $app) + { + + } + + /** + * {@inheritdoc} + */ + public static function create(PhraseaApplication $app) + { + return new static(); + } + +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php new file mode 100644 index 0000000000..0321669363 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php @@ -0,0 +1,219 @@ + MessagePublisher::METADATAS_QUEUE, + MessagePublisher::SUBDEF_CREATION_TYPE => MessagePublisher::SUBDEF_QUEUE, + MessagePublisher::EXPORT_MAIL_TYPE => MessagePublisher::EXPORT_QUEUE, + MessagePublisher::WEBHOOK_TYPE => MessagePublisher::WEBHOOK_QUEUE, + MessagePublisher::ASSETS_INGEST_TYPE => MessagePublisher::ASSETS_INGEST_QUEUE, + MessagePublisher::CREATE_RECORD_TYPE => MessagePublisher::CREATE_RECORD_QUEUE, + MessagePublisher::PULL_QUEUE => MessagePublisher::PULL_QUEUE, + MessagePublisher::POPULATE_INDEX_TYPE => MessagePublisher::POPULATE_INDEX_QUEUE, + MessagePublisher::DELETE_RECORD_TYPE => MessagePublisher::DELETE_RECORD_QUEUE + ]; + + // the corresponding worker queues and retry queues, loop queue + public static $defaultRetryQueues = [ + MessagePublisher::METADATAS_QUEUE => MessagePublisher::RETRY_METADATAS_QUEUE, + MessagePublisher::SUBDEF_QUEUE => MessagePublisher::RETRY_SUBDEF_QUEUE, + MessagePublisher::EXPORT_QUEUE => MessagePublisher::RETRY_EXPORT_QUEUE, + MessagePublisher::WEBHOOK_QUEUE => MessagePublisher::RETRY_WEBHOOK_QUEUE, + MessagePublisher::ASSETS_INGEST_QUEUE => MessagePublisher::RETRY_ASSETS_INGEST_QUEUE, + MessagePublisher::CREATE_RECORD_QUEUE => MessagePublisher::RETRY_CREATE_RECORD_QUEUE, + MessagePublisher::POPULATE_INDEX_QUEUE => MessagePublisher::RETRY_POPULATE_INDEX_QUEUE, + MessagePublisher::PULL_QUEUE => MessagePublisher::LOOP_PULL_QUEUE + ]; + + // default message TTL in retry queue in millisecond + public static $defaultFailedQueues = [ + MessagePublisher::WRITE_METADATAS_TYPE => MessagePublisher::FAILED_METADATAS_QUEUE, + MessagePublisher::SUBDEF_CREATION_TYPE => MessagePublisher::FAILED_SUBDEF_QUEUE, + MessagePublisher::EXPORT_MAIL_TYPE => MessagePublisher::FAILED_EXPORT_QUEUE, + MessagePublisher::WEBHOOK_TYPE => MessagePublisher::FAILED_WEBHOOK_QUEUE, + MessagePublisher::ASSETS_INGEST_TYPE => MessagePublisher::FAILED_ASSETS_INGEST_QUEUE, + MessagePublisher::CREATE_RECORD_TYPE => MessagePublisher::FAILED_CREATE_RECORD_QUEUE, + MessagePublisher::POPULATE_INDEX_TYPE => MessagePublisher::FAILED_POPULATE_INDEX_QUEUE + ]; + + public static $defaultDelayedQueues = [ + MessagePublisher::METADATAS_QUEUE => MessagePublisher::DELAYED_METADATAS_QUEUE, + MessagePublisher::SUBDEF_QUEUE => MessagePublisher::DELAYED_SUBDEF_QUEUE + ]; + + // default message TTL in retry queue in millisecond + const RETRY_DELAY = 10000; + + public function __construct(PropertyAccess $conf) + { + $defaultConfiguration = [ + 'host' => 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/' + ]; + + $this->hostConfig = $conf->get(['workers', 'queue', 'worker-queue'], $defaultConfiguration); + $this->conf = $conf; + + $this->getChannel(); + $this->declareExchange(); + } + + public function getConnection() + { + if (!isset($this->connection)) { + $this->connection = new AMQPStreamConnection( + $this->hostConfig['host'], + $this->hostConfig['port'], + $this->hostConfig['user'], + $this->hostConfig['password'], + $this->hostConfig['vhost']); + } + + return $this->connection; + } + + public function getChannel() + { + if (!isset($this->channel)) { + $this->channel = $this->getConnection()->channel(); + } + + return $this->channel; + } + + public function declareExchange() + { + $this->channel->exchange_declare(self::ALCHEMY_EXCHANGE, 'direct', false, true, false); + $this->channel->exchange_declare(self::RETRY_ALCHEMY_EXCHANGE, 'direct', false, true, false); + } + + /** + * @param $queueName + * @return AMQPChannel + */ + public function setQueue($queueName) + { + if (!isset($this->channel)) { + $this->getChannel(); + $this->declareExchange(); + } + + if (isset(self::$defaultRetryQueues[$queueName])) { + $this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ + 'x-dead-letter-exchange' => self::RETRY_ALCHEMY_EXCHANGE, // the exchange to which republish a 'dead' message + 'x-dead-letter-routing-key' => self::$defaultRetryQueues[$queueName] // the routing key to apply to this 'dead' message + ])); + + $this->channel->queue_bind($queueName, self::ALCHEMY_EXCHANGE, $queueName); + + // declare also the corresponding retry queue + // use this to delay the delivery of a message to the alchemy-exchange + $this->channel->queue_declare(self::$defaultRetryQueues[$queueName], false, true, false, false, false, new AMQPTable([ + 'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, + 'x-dead-letter-routing-key' => $queueName, + 'x-message-ttl' => $this->getTtlPerRouting($queueName) + ])); + + $this->channel->queue_bind(self::$defaultRetryQueues[$queueName], AMQPConnection::RETRY_ALCHEMY_EXCHANGE, self::$defaultRetryQueues[$queueName]); + + } elseif (in_array($queueName, self::$defaultRetryQueues)) { + // if it's a retry queue + $routing = array_search($queueName, AMQPConnection::$defaultRetryQueues); + $this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ + 'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, + 'x-dead-letter-routing-key' => $routing, + 'x-message-ttl' => $this->getTtlPerRouting($routing) + ])); + + $this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); + } elseif (in_array($queueName, self::$defaultFailedQueues)) { + // if it's a failed queue + $this->channel->queue_declare($queueName, false, true, false, false, false); + + $this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); + } elseif (in_array($queueName, self::$defaultDelayedQueues)) { + // if it's a delayed queue + $routing = array_search($queueName, AMQPConnection::$defaultDelayedQueues); + $this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ + 'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, + 'x-dead-letter-routing-key' => $routing, + 'x-message-ttl' => 5000 + ])); + + $this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); + } else { + $this->channel->queue_declare($queueName, false, true, false, false, false); + + $this->channel->queue_bind($queueName, AMQPConnection::ALCHEMY_EXCHANGE, $queueName); + } + + return $this->channel; + } + + public function reinitializeQueue(array $queuNames) + { + foreach ($queuNames as $queuName) { + if (in_array($queuName, self::$defaultQueues)) { + $this->channel->queue_purge($queuName); + } else { + $this->channel->queue_delete($queuName); + } + + if (isset(self::$defaultRetryQueues[$queuName])) { + $this->channel->queue_delete(self::$defaultRetryQueues[$queuName]); + } + + $this->setQueue($queuName); + } + } + + public function connectionClose() + { + $this->channel->close(); + $this->connection->close(); + } + + /** + * @param $routing + * @return int + */ + private function getTtlPerRouting($routing) + { + $config = $this->conf->get(['workers']); + + if ($routing == MessagePublisher::PULL_QUEUE && + isset($config['pull_assets']) && + isset($config['pull_assets']['pullInterval']) ) { + // convert in milli second + return (int)($config['pull_assets']['pullInterval']) * 1000; + } elseif (isset($config['retry_queue']) && + isset($config['retry_queue'][array_search($routing, AMQPConnection::$defaultQueues)])) { + + return (int)($config['retry_queue'][array_search($routing, AMQPConnection::$defaultQueues)]); + } + + return self::RETRY_DELAY; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php new file mode 100644 index 0000000000..dcd4f39bc5 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php @@ -0,0 +1,111 @@ +messagePublisher = $messagePublisher; + } + + public function consume(AMQPConnection $serverConnection, WorkerInvoker $workerInvoker, $argQueueName, $maxProcesses) + { + $publisher = $this->messagePublisher; + + $channel = $serverConnection->getChannel(); + + // define consume callbacks + $callback = function (AMQPMessage $message) use ($channel, $workerInvoker, $publisher) { + + $data = json_decode($message->getBody(), true); + + $count = 0; + + if ($message->has('application_headers')) { + /** @var AMQPTable $headers */ + $headers = $message->get('application_headers'); + + $headerData = $headers->getNativeData(); + if (isset($headerData['x-death'])) { + $xDeathHeader = $headerData['x-death']; + + foreach ($xDeathHeader as $xdeath) { + $queue = $xdeath['queue']; + if (!in_array($queue, AMQPConnection::$defaultQueues)) { + continue; + } + + $count = $xdeath['count']; + $data['payload']['count'] = $count; + } + } + } + + // if message is yet executed 3 times, save the unprocessed message in the corresponding failed queues + if ($count > self::MAX_OF_TRY && $data['message_type'] != MessagePublisher::PULL_ASSETS_TYPE) { + $this->messagePublisher->publishFailedMessage($data['payload'], $headers, AMQPConnection::$defaultFailedQueues[$data['message_type']]); + + $logMessage = sprintf("Rabbit message executed 3 times, it's to be saved in %s , payload >>> %s", + AMQPConnection::$defaultFailedQueues[$data['message_type']], + json_encode($data['payload']) + ); + $this->messagePublisher->pushLog($logMessage); + + $channel->basic_ack($message->delivery_info['delivery_tag']); + } else { + try { + $workerInvoker->invokeWorker($data['message_type'], json_encode($data['payload'])); + + if ($data['message_type'] == MessagePublisher::PULL_ASSETS_TYPE) { + // make a loop for the pull assets + $channel->basic_nack($message->delivery_info['delivery_tag']); + } else { + $channel->basic_ack($message->delivery_info['delivery_tag']); + } + + $oldPayload = $data['payload']; + $message = $data['message_type'].' to be consumed! >> Payload ::'. json_encode($oldPayload); + + $publisher->pushLog($message); + } catch (\Exception $e) { + $channel->basic_nack($message->delivery_info['delivery_tag']); + } + } + }; + + $prefetchCount = ProcessPool::MAX_PROCESSES; + + if ($maxProcesses) { + $prefetchCount = $maxProcesses; + } + + foreach (AMQPConnection::$defaultQueues as $queueName) { + if ($argQueueName ) { + if (in_array($queueName, $argQueueName)) { + $serverConnection->setQueue($queueName); + + // give prefetch message to a worker consumer at a time + $channel->basic_qos(null, $prefetchCount, null); + $channel->basic_consume($queueName, Uuid::uuid4(), false, false, false, false, $callback); + } + } else { + $serverConnection->setQueue($queueName); + + // give prefetch message to a worker consumer at a time + $channel->basic_qos(null, $prefetchCount, null); + $channel->basic_consume($queueName, Uuid::uuid4(), false, false, false, false, $callback); + } + } + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php new file mode 100644 index 0000000000..cc94de8b6f --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php @@ -0,0 +1,142 @@ +serverConnection = $serverConnection; + $this->logger = $logger; + } + + public function publishMessage(array $payload, $queueName, $retryCount = null, $workerMessage = '') + { + $msg = new AMQPMessage(json_encode($payload)); + $routing = array_search($queueName, AMQPConnection::$defaultRetryQueues); + + if (count($retryCount) && $routing != false) { + // add a message header information + $headers = new AMQPTable([ + 'x-death' => [ + [ + 'count' => $retryCount, + 'exchange' => AMQPConnection::ALCHEMY_EXCHANGE, + 'queue' => $routing, + 'routing-keys' => $routing, + 'reason' => 'rejected', // rejected is sended like nack + 'time' => new \DateTime('now', new \DateTimeZone('UTC')) + ] + ], + 'worker-message' => $workerMessage + ]); + + $msg->set('application_headers', $headers); + } + + $channel = $this->serverConnection->setQueue($queueName); + + $exchange = in_array($queueName, AMQPConnection::$defaultQueues) ? AMQPConnection::ALCHEMY_EXCHANGE : AMQPConnection::RETRY_ALCHEMY_EXCHANGE; + $channel->basic_publish($msg, $exchange, $queueName); + + return true; + } + + public function initializePullAssets() + { + $payload = [ + 'message_type' => self::PULL_ASSETS_TYPE, + 'payload' => [ + 'initTimestamp' => new \DateTime('now', new \DateTimeZone('UTC')) + ] + ]; + + $this->publishMessage($payload, self::PULL_QUEUE); + } + + public function connectionClose() + { + $this->serverConnection->connectionClose(); + } + + /** + * @param $message + * @param string $method + * @param array $context + */ + public function pushLog($message, $method = 'info', $context = []) + { + // write logs directly in file + + call_user_func(array($this->logger, $method), $message, $context); + } + + public function publishFailedMessage(array $payload, AMQPTable $headers, $queueName) + { + $msg = new AMQPMessage(json_encode($payload)); + $msg->set('application_headers', $headers); + + $channel = $this->serverConnection->setQueue($queueName); + $channel->basic_publish($msg, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/WebhookPublisher.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/WebhookPublisher.php new file mode 100644 index 0000000000..a1bdcc888a --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/WebhookPublisher.php @@ -0,0 +1,29 @@ +messagePublisher = $messagePublisher; + } + + public function publishWebhookEvent(WebhookEvent $event) + { + $payload = [ + 'message_type' => MessagePublisher::WEBHOOK_TYPE, + 'payload' => [ + 'id' => $event->getId() + ] + ]; + + $this->messagePublisher->publishMessage($payload, MessagePublisher::WEBHOOK_QUEUE); + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php new file mode 100644 index 0000000000..fbf49c420d --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php @@ -0,0 +1,70 @@ +messagePublisher = $messagePublisher; + } + + public function onAssetsCreate(AssetsCreateEvent $event) + { + $payload = [ + 'message_type' => MessagePublisher::ASSETS_INGEST_TYPE, + 'payload' => $event->getData() + ]; + + $this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE); + } + + public function onAssetsCreationFailure(AssetsCreationFailureEvent $event) + { + $payload = [ + 'message_type' => MessagePublisher::ASSETS_INGEST_TYPE, + 'payload' => $event->getPayload() + ]; + + $this->messagePublisher->publishMessage( + $payload, + MessagePublisher::RETRY_ASSETS_INGEST_QUEUE, + $event->getCount(), + $event->getWorkerMessage() + ); + } + + public function onAssetsCreationRecordFailure(AssetsCreationRecordFailureEvent $event) + { + $payload = [ + 'message_type' => MessagePublisher::CREATE_RECORD_TYPE, + 'payload' => $event->getPayload() + ]; + + $this->messagePublisher->publishMessage( + $payload, + MessagePublisher::RETRY_CREATE_RECORD_QUEUE, + $event->getCount(), + $event->getWorkerMessage() + ); + } + + public static function getSubscribedEvents() + { + return [ + WorkerEvents::ASSETS_CREATE => 'onAssetsCreate', + WorkerEvents::ASSETS_CREATION_FAILURE => 'onAssetsCreationFailure', + WorkerEvents::ASSETS_CREATION_RECORD_FAILURE => 'onAssetsCreationRecordFailure' + ]; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExportSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExportSubscriber.php new file mode 100644 index 0000000000..d75d662db4 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExportSubscriber.php @@ -0,0 +1,64 @@ +messagePublisher = $messagePublisher; + } + + public function onExportMailCreate(ExportMailEvent $event) + { + $payload = [ + 'message_type' => MessagePublisher::EXPORT_MAIL_TYPE, + 'payload' => [ + 'emitterUserId' => $event->getEmitterUserId(), + 'tokenValue' => $event->getTokenValue(), + 'destinationMails' => serialize($event->getDestinationMails()), + 'params' => serialize($event->getParams()) + ] + ]; + + $this->messagePublisher->publishMessage($payload, MessagePublisher::EXPORT_QUEUE); + } + + public function onExportMailFailure(ExportMailFailureEvent $event) + { + $payload = [ + 'message_type' => MessagePublisher::EXPORT_MAIL_TYPE, + 'payload' => [ + 'emitterUserId' => $event->getEmitterUserId(), + 'tokenValue' => $event->getTokenValue(), + 'destinationMails' => serialize($event->getDestinationMails()), + 'params' => serialize($event->getParams()) + ] + ]; + + $this->messagePublisher->publishMessage( + $payload, + MessagePublisher::RETRY_EXPORT_QUEUE, + $event->getCount(), + $event->getWorkerMessage() + ); + } + + public static function getSubscribedEvents() + { + return [ + PhraseaEvents::EXPORT_MAIL_CREATE => 'onExportMailCreate', + WorkerEvents::EXPORT_MAIL_FAILURE => 'onExportMailFailure' + ]; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php new file mode 100644 index 0000000000..bfd383ad2f --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php @@ -0,0 +1,268 @@ +messagePublisher = $app['alchemy_worker.message.publisher']; + $this->workerResolver = $app['alchemy_worker.type_based_worker_resolver']; + $this->app = $app; + } + + public function onSubdefinitionCreate(SubdefinitionCreateEvent $event) + { + $record = $this->findDataboxById($event->getRecord()->getDataboxId())->get_record($event->getRecord()->getRecordId()); + + $subdefs = $record->getDatabox()->get_subdef_structure()->getSubdefGroup($record->getType()); + + foreach ($subdefs as $subdef) { + $payload = [ + 'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE, + 'payload' => [ + 'recordId' => $event->getRecord()->getRecordId(), + 'databoxId' => $event->getRecord()->getDataboxId(), + 'subdefName' => $subdef->get_name(), + 'status' => $event->isNewRecord() ? MessagePublisher::NEW_RECORD_MESSAGE : '' + ] + ]; + + $this->messagePublisher->publishMessage($payload, MessagePublisher::SUBDEF_QUEUE); + } + + } + + public function onDelete(DeleteEvent $event) + { + // first remove record from the grid answer, so first delete the record in the index elastic + $this->app['dispatcher']->dispatch(RecordEvents::DELETED, new DeletedEvent($event->getRecord())); + + // publish payload to queue + $payload = [ + 'message_type' => MessagePublisher::DELETE_RECORD_TYPE, + 'payload' => [ + 'recordId' => $event->getRecord()->getRecordId(), + 'databoxId' => $event->getRecord()->getDataboxId(), + ] + ]; + + $this->messagePublisher->publishMessage($payload, MessagePublisher::DELETE_RECORD_QUEUE); + } + + public function onSubdefinitionCreationFailure(SubdefinitionCreationFailureEvent $event) + { + $payload = [ + 'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE, + 'payload' => [ + 'recordId' => $event->getRecord()->getRecordId(), + 'databoxId' => $event->getRecord()->getDataboxId(), + 'subdefName' => $event->getSubdefName(), + 'status' => '' + ] + ]; + + $this->messagePublisher->publishMessage( + $payload, + MessagePublisher::RETRY_SUBDEF_QUEUE, + $event->getCount(), + $event->getWorkerMessage() + ); + } + + public function onRecordCreated(RecordEvent $event) + { + $this->messagePublisher->pushLog(sprintf('The %s= %d was successfully created', + ($event->getRecord()->isStory() ? "story story_id" : "record record_id"), + $event->getRecord()->getRecordId() + )); + } + + public function onMetadataChanged(MetadataChangedEvent $event) + { + $databoxId = $event->getRecord()->getDataboxId(); + $recordId = $event->getRecord()->getRecordId(); + + $mediaSubdefRepository = $this->getMediaSubdefRepository($databoxId); + $mediaSubdefs = $mediaSubdefRepository->findByRecordIdsAndNames([$recordId]); + + $databox = $this->findDataboxById($databoxId); + $record = $databox->get_record($recordId); + $type = $record->getType(); + + foreach ($mediaSubdefs as $subdef) { + // check subdefmetadatarequired from the subview setup in admin + if ( $subdef->get_name() == 'document' || $this->isSubdefMetadataUpdateRequired($databox, $type, $subdef->get_name())) { + if ($subdef->is_physically_present()) { + $payload = [ + 'message_type' => MessagePublisher::WRITE_METADATAS_TYPE, + 'payload' => [ + 'recordId' => $recordId, + 'databoxId' => $databoxId, + 'subdefName' => $subdef->get_name() + ] + ]; + + $this->messagePublisher->publishMessage($payload, MessagePublisher::METADATAS_QUEUE); + } else { + $payload = [ + 'message_type' => MessagePublisher::WRITE_METADATAS_TYPE, + 'payload' => [ + 'recordId' => $recordId, + 'databoxId' => $databoxId, + 'subdefName' => $subdef->get_name() + ] + ]; + + $logMessage = sprintf("Subdef %s is not physically present! to be passed in the %s ! payload >>> %s", + $subdef->get_name(), + MessagePublisher::RETRY_METADATAS_QUEUE, + json_encode($payload) + ); + $this->messagePublisher->pushLog($logMessage); + + $this->messagePublisher->publishMessage( + $payload, + MessagePublisher::RETRY_METADATAS_QUEUE, + 2, + 'Subdef is not physically present!' + ); + } + } + } + + } + + public function onStoryCreateCover(StoryCreateCoverEvent $event) + { + /** @var WorkerFactoryInterface[] $factories */ + $factories = $this->workerResolver->getFactories(); + + /** @var CreateRecordWorker $createRecordWorker */ + $createRecordWorker = $factories[MessagePublisher::CREATE_RECORD_TYPE]->createWorker(); + + $createRecordWorker->setStoryCover($event->getData()); + } + + public function onSubdefinitionWritemeta(SubdefinitionWritemetaEvent $event) + { + if ($event->getStatus() == SubdefinitionWritemetaEvent::FAILED) { + $payload = [ + 'message_type' => MessagePublisher::WRITE_METADATAS_TYPE, + 'payload' => [ + 'recordId' => $event->getRecord()->getRecordId(), + 'databoxId' => $event->getRecord()->getDataboxId(), + 'subdefName' => $event->getSubdefName() + ] + ]; + + $logMessage = sprintf("Subdef %s write meta failed, error : %s ! to be passed in the %s ! payload >>> %s", + $event->getSubdefName(), + $event->getWorkerMessage(), + MessagePublisher::RETRY_METADATAS_QUEUE, + json_encode($payload) + ); + $this->messagePublisher->pushLog($logMessage); + + $this->messagePublisher->publishMessage( + $payload, + MessagePublisher::RETRY_METADATAS_QUEUE, + $event->getCount(), + $event->getWorkerMessage() + ); + + } else { + $databoxId = $event->getRecord()->getDataboxId(); + $recordId = $event->getRecord()->getRecordId(); + + $databox = $this->findDataboxById($databoxId); + $record = $databox->get_record($recordId); + $type = $record->getType(); + + $subdef = $record->get_subdef($event->getSubdefName()); + + // only the required writemetadata from admin > subview setup is to be writing + if ($subdef->get_name() == 'document' || $this->isSubdefMetadataUpdateRequired($databox, $type, $subdef->get_name())) { + $payload = [ + 'message_type' => MessagePublisher::WRITE_METADATAS_TYPE, + 'payload' => [ + 'recordId' => $recordId, + 'databoxId' => $databoxId, + 'subdefName' => $event->getSubdefName() + ] + ]; + + $this->messagePublisher->publishMessage($payload, MessagePublisher::METADATAS_QUEUE); + } + } + + } + + public static function getSubscribedEvents() + { + return [ + RecordEvents::CREATED => 'onRecordCreated', + RecordEvents::SUBDEFINITION_CREATE => 'onSubdefinitionCreate', + RecordEvents::DELETE => 'onDelete', + WorkerEvents::SUBDEFINITION_CREATION_FAILURE => 'onSubdefinitionCreationFailure', + RecordEvents::METADATA_CHANGED => 'onMetadataChanged', + WorkerEvents::STORY_CREATE_COVER => 'onStoryCreateCover', + WorkerEvents::SUBDEFINITION_WRITE_META => 'onSubdefinitionWritemeta' + ]; + } + + /** + * @param $databoxId + * + * @return MediaSubdefRepository + */ + private function getMediaSubdefRepository($databoxId) + { + return $this->app['provider.repo.media_subdef']->getRepositoryForDatabox($databoxId); + } + + /** + * @param \databox $databox + * @param string $subdefType + * @param string $subdefName + * @return bool + */ + private function isSubdefMetadataUpdateRequired(\databox $databox, $subdefType, $subdefName) + { + if ($databox->get_subdef_structure()->hasSubdef($subdefType, $subdefName)) { + return $databox->get_subdef_structure()->get_subdef($subdefType, $subdefName)->isMetadataUpdateRequired(); + } + + return false; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php new file mode 100644 index 0000000000..7f73f6ff44 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php @@ -0,0 +1,69 @@ +messagePublisher = $messagePublisher; + } + + public function onPopulateIndex(PopulateIndexEvent $event) + { + $populateInfo = $event->getData(); + + // make payload per databoxId + foreach ($populateInfo['databoxIds'] as $databoxId) { + $payload = [ + 'message_type' => MessagePublisher::POPULATE_INDEX_TYPE, + 'payload' => [ + 'host' => $populateInfo['host'], + 'port' => $populateInfo['port'], + 'indexName' => $populateInfo['indexName'], + 'databoxId' => $databoxId + ] + ]; + + $this->messagePublisher->publishMessage($payload, MessagePublisher::POPULATE_INDEX_QUEUE); + } + } + + public function onPopulateIndexFailure(PopulateIndexFailureEvent $event) + { + $payload = [ + 'message_type' => MessagePublisher::POPULATE_INDEX_TYPE, + 'payload' => [ + 'host' => $event->getHost(), + 'port' => $event->getPort(), + 'indexName' => $event->getIndexName(), + 'databoxId' => $event->getDataboxId(), + ] + ]; + + $this->messagePublisher->publishMessage( + $payload, + MessagePublisher::RETRY_POPULATE_INDEX_QUEUE, + $event->getCount(), + $event->getWorkerMessage() + ); + } + + public static function getSubscribedEvents() + { + return [ + WorkerEvents::POPULATE_INDEX => 'onPopulateIndex', + WorkerEvents::POPULATE_INDEX_FAILURE => 'onPopulateIndexFailure' + ]; + } +} + diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/WebhookSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/WebhookSubscriber.php new file mode 100644 index 0000000000..eb45da3a37 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/WebhookSubscriber.php @@ -0,0 +1,48 @@ +messagePublisher = $messagePublisher; + } + + public function onWebhookDeliverFailure(WebhookDeliverFailureEvent $event) + { + // count = 0 mean do not retry because no api application defined + if ($event->getCount() != 0) { + $payload = [ + 'message_type' => MessagePublisher::WEBHOOK_TYPE, + 'payload' => [ + 'id' => $event->getWebhookEventId(), + 'delivery_id' => $event->getDeleveryId(), + ] + ]; + + $this->messagePublisher->publishMessage( + $payload, + MessagePublisher::RETRY_WEBHOOK_QUEUE, + $event->getCount(), + $event->getWorkerMessage() + ); + } + + } + + public static function getSubscribedEvents() + { + return [ + WorkerEvents::WEBHOOK_DELIVER_FAILURE => 'onWebhookDeliverFailure', + ]; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php new file mode 100644 index 0000000000..9fa3d11b5f --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php @@ -0,0 +1,123 @@ +app = $app; + $this->messagePublisher = $this->app['alchemy_worker.message.publisher']; + } + + public function process(array $payload) + { + $assets = $payload['assets']; + + DBManipulator::saveAssetsList($payload['commit_id'], $assets); + + $uploaderClient = new Client(['base_uri' => $payload['base_url']]); + + //get first asset informations to check if it's a story + try { + $body = $uploaderClient->get('/assets/'.$assets[0], [ + 'headers' => [ + 'Authorization' => 'AssetToken '.$payload['token'] + ] + ])->getBody()->getContents(); + } catch(\Exception $e) { + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + $this->app['dispatcher']->dispatch(WorkerEvents::ASSETS_CREATION_FAILURE, new AssetsCreationFailureEvent( + $payload, + 'Error when getting assets information !' . $e->getMessage(), + $count + )); + + return; + } + + $body = json_decode($body,true); + + $storyId = null; + + if (!empty($body['formData']['is_story'])) { + $storyId = $this->createStory($body); + } + + foreach ($assets as $assetId) { + $createRecordMessage['message_type'] = MessagePublisher::CREATE_RECORD_TYPE; + $createRecordMessage['payload'] = [ + 'asset' => $assetId, + 'publisher' => $payload['publisher'], + 'assetToken' => $payload['token'], + 'storyId' => $storyId, + 'base_url' => $payload['base_url'], + 'commit_id' => $payload['commit_id'] + ]; + + $this->messagePublisher->publishMessage($createRecordMessage, MessagePublisher::CREATE_RECORD_QUEUE); + } + } + + private function createStory(array $body) + { + $storyId = null; + + $userRepository = $this->getUserRepository(); + $user = null; + + if (!empty($body['formData']['phraseanet_submiter_email'])) { + $user = $userRepository->findByEmail($body['formData']['phraseanet_submiter_email']); + } + + if ($user === null && !empty($body['formData']['phraseanet_user_submiter_id'])) { + $user = $userRepository->find($body['formData']['phraseanet_user_submiter_id']); + } + + if ($user !== null) { + $base_id = $body['formData']['collection_destination']; + + $collection = \collection::getByBaseId($this->app, $base_id); + + $story = \record_adapter::createStory($this->app, $collection); + $storyId = $story->getRecordId(); + + $storyWZ = new StoryWZ(); + + $storyWZ->setUser($user); + $storyWZ->setRecord($story); + + $entityManager = $this->getEntityManager(); + $entityManager->persist($storyWZ); + $entityManager->flush(); + } + + return $storyId; + } + + /** + * @return UserRepository + */ + private function getUserRepository() + { + return $this->app['repo.users']; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php new file mode 100644 index 0000000000..2779eaefcb --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/CreateRecordWorker.php @@ -0,0 +1,303 @@ +app = $app; + $this->logger = $this->app['alchemy_worker.logger']; + $this->messagePublisher = $this->app['alchemy_worker.message.publisher']; + } + + public function process(array $payload) + { + $uploaderClient = new Client(['base_uri' => $payload['base_url']]); + + //get asset informations + $body = $uploaderClient->get('/assets/'.$payload['asset'], [ + 'headers' => [ + 'Authorization' => 'AssetToken '.$payload['assetToken'] + ] + ])->getBody()->getContents(); + + $body = json_decode($body,true); + + $tempfile = $this->getTemporaryFilesystem()->createTemporaryFile('download_', null, pathinfo($body['originalName'], PATHINFO_EXTENSION)); + + //download the asset + try { + $res = $uploaderClient->get('/assets/'.$payload['asset'].'/download', [ + 'headers' => [ + 'Authorization' => 'AssetToken '.$payload['assetToken'] + ], + 'save_to' => $tempfile + ]); + } catch (\Exception $e) { + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + // send to retry queue + $this->dispatch(WorkerEvents::ASSETS_CREATION_RECORD_FAILURE, new AssetsCreationRecordFailureEvent( + $payload, + 'Error when downloading assets!', + $count + )); + + return; + } + + + if ($res->getStatusCode() !== 200) { + $workerMessage = sprintf('Error %s downloading "%s"', $res->getStatusCode(), $payload['base_url'].'/assets/'.$payload['asset'].'/download'); + $this->logger->error($workerMessage); + + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + // send to retry queue + $this->dispatch(WorkerEvents::ASSETS_CREATION_RECORD_FAILURE, new AssetsCreationRecordFailureEvent( + $payload, + $workerMessage, + $count + )); + + return; + } + + $remainingAssets = DBManipulator::updateRemainingAssetsListByCommit($payload['commit_id'], $payload['asset']); + + // if all assets in the commit are downloaded , send ack to the uploader + if ($remainingAssets == 0) { + // post ack to the uploader + $uploaderClient->post('/commits/' . $payload['commit_id'] . '/ack', [ + 'headers' => [ + 'Authorization' => 'AssetToken '.$payload['assetToken'] + ], + 'json' => [ + 'acknowledged' => true + ] + ] + ); + } + + $lazaretSession = new LazaretSession(); + + $userRepository = $this->getUserRepository(); + $user = null; + + if (!empty($body['formData']['phraseanet_submiter_email'])) { + $user = $userRepository->findByEmail($body['formData']['phraseanet_submiter_email']); + } + + if ($user === null && !empty($body['formData']['phraseanet_user_submiter_id'])) { + $user = $userRepository->find($body['formData']['phraseanet_user_submiter_id']); + } + + if ($user !== null) { + $lazaretSession->setUser($user); + } + + $this->getEntityManager()->persist($lazaretSession); + + + $renamedFilename = $tempfile; + $media = $this->app->getMediaFromUri($renamedFilename); + + if (!isset($body['formData']['collection_destination'])) { + $this->messagePublisher->pushLog("The collection_destination is not defined"); + + return ; + } + + $base_id = $body['formData']['collection_destination']; + $collection = \collection::getByBaseId($this->app, $base_id); + $sbasId = $collection->get_sbas_id(); + + $packageFile = new File($this->app, $media, $collection, $body['originalName']); + + // get metadata and status + $statusbit = null; + foreach ($body['formData'] as $key => $value) { + if (strstr($key, 'metadata')) { + $tMeta = explode('-', $key); + + $metaField = $collection->get_databox()->get_meta_structure()->get_element($tMeta[1]); + + $packageFile->addAttribute(new MetaField($metaField, [$value])); + } + + if (strstr($key, 'statusbit')) { + $tStatus = explode('-', $key); + $statusbit[$tStatus[1]] = $value; + } + } + + if (!is_null($statusbit)) { + $status = ''; + foreach (range(0, 31) as $i) { + $status .= isset($statusbit[$i]) ? ($statusbit[$i] ? '1' : '0') : '0'; + } + $packageFile->addAttribute(new Status($this->app, strrev($status))); + } + + $reasons = []; + $elementCreated = null; + + $callback = function ($element, Visa $visa) use (&$reasons, &$elementCreated) { + foreach ($visa->getResponses() as $response) { + if (!$response->isOk()) { + $reasons[] = $response->getMessage($this->app['translator']); + } + } + + $elementCreated = $element; + }; + + $this->getBorderManager()->process($lazaretSession, $packageFile, $callback); + + + if ($elementCreated instanceof \record_adapter) { + $this->dispatch(PhraseaEvents::RECORD_UPLOAD, new RecordEdit($elementCreated)); + } else { + $this->messagePublisher->pushLog(sprintf('The file was moved to the quarantine: %s', json_encode($reasons))); + /** @var LazaretFile $elementCreated */ + $this->dispatch(PhraseaEvents::LAZARET_CREATE, new LazaretEvent($elementCreated)); + } + + // add record in a story if story is defined + + if (is_int($payload['storyId']) && $elementCreated instanceof \record_adapter) { + $this->addRecordInStory($user, $elementCreated, $sbasId, $payload['storyId'], $body['formData']); + } + + } + + /** + * @param string $data databoxId_storyId_recordId subdefName + */ + public function setStoryCover($data) + { + // get databoxId , storyId , recordId + $tData = explode('_', $data); + + $record = $this->findDataboxById($tData[0])->get_record($tData[2]); + + $story = $this->findDataboxById($tData[0])->get_record($tData[1]); + $subdefName = $tData[3]; + + $subdef = $record->get_subdef($tData[3]); + $media = $this->app->getMediaFromUri($subdef->getRealPath()); + $this->getSubdefSubstituer()->substituteSubdef($story, $subdefName, $media); // subdefName = thumbnail | preview + + $this->messagePublisher->pushLog(sprintf("Cover %s set for story story_id= %d with the record record_id = %d", $subdefName, $story->getRecordId(), $record->getRecordId())); + } + + /** + * @param $user + * @param \record_adapter $elementCreated + * @param $sbasId + * @param $storyId + * @param $formData + */ + private function addRecordInStory($user, $elementCreated, $sbasId, $storyId, $formData) + { + $story = new \record_adapter($this->app, $sbasId, $storyId); + + if (!$this->getAclForUser($user)->has_right_on_base($story->getBaseId(), \ACL::CANMODIFRECORD)) { + $this->messagePublisher->pushLog(sprintf("The user %s can not add document to the story story_id = %d", $user->getLogin(), $story->getRecordId())); + + throw new AccessDeniedHttpException('You can not add document to this Story'); + } + + if (!$story->hasChild($elementCreated)) { + $story->appendChild($elementCreated); + + if (SubdefCreationWorker::checkIfFirstChild($story, $elementCreated)) { + // add metadata to the story + $metadatas = []; + foreach ($formData as $key => $value) { + if (strstr($key, 'metadata')) { + $tMeta = explode('-', $key); + + $metaField = $elementCreated->getDatabox()->get_meta_structure()->get_element($tMeta[1]); + + $metadatas[] = [ + 'meta_struct_id' => $metaField->get_id(), + 'meta_id' => null, + 'value' => $value, + ]; + } + } + + $story->set_metadatas($metadatas)->rebuild_subdefs(); + } + + $this->messagePublisher->pushLog(sprintf('The record record_id= %d was successfully added in the story record_id= %d', $elementCreated->getRecordId(), $story->getRecordId())); + $this->dispatch(PhraseaEvents::RECORD_EDIT, new RecordEdit($story)); + } + } + + /** + * @return UserRepository + */ + private function getUserRepository() + { + return $this->app['repo.users']; + } + + /** + * @param User $user + * @return \ACL + */ + private function getAclForUser(User $user) + { + $aclProvider = $this->app['acl']; + + return $aclProvider->get($user); + } + + /** + * @return SubdefSubstituer + */ + private function getSubdefSubstituer() + { + return $this->app['subdef.substituer']; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php new file mode 100644 index 0000000000..859054710a --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/DeleteRecordWorker.php @@ -0,0 +1,17 @@ +findDataboxById($payload['databoxId'])->get_record($payload['recordId']); + + $record->delete(); + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php new file mode 100644 index 0000000000..80a56f3147 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/ExportMailWorker.php @@ -0,0 +1,102 @@ +app = $app; + } + + public function process(array $payload) + { + $destMails = unserialize($payload['destinationMails']); + + $params = unserialize($payload['params']); + + /** @var UserRepository $userRepository */ + $userRepository = $this->app['repo.users']; + + $user = $userRepository->find($payload['emitterUserId']); + + /** @var TokenRepository $tokenRepository */ + $tokenRepository = $this->app['repo.tokens']; + + /** @var Token $token */ + $token = $tokenRepository->findValidToken($payload['tokenValue']); + + $list = unserialize($token->getData()); + + //zip documents + \set_export::build_zip( + $this->app, + $token, + $list, + $this->app['tmp.download.path'].'/'. $token->getValue() . '.zip' + ); + + $remaingEmails = $destMails; + + $emitter = new Emitter($user->getDisplayName(), $user->getEmail()); + + foreach ($destMails as $key => $mail) { + try { + $receiver = new Receiver(null, trim($mail)); + } catch (InvalidArgumentException $e) { + continue; + } + + $mail = MailRecordsExport::create($this->app, $receiver, $emitter, $params['textmail']); + $mail->setButtonUrl($params['url']); + $mail->setExpiration($token->getExpiration()); + + $this->deliver($mail, $params['reading_confirm']); + unset($remaingEmails[$key]); + } + + //some mails failed + if (count($remaingEmails) > 0) { + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + // notify to send to the retry queue + $this->app['dispatcher']->dispatch(WorkerEvents::EXPORT_MAIL_FAILURE, new ExportMailFailureEvent( + $payload['emitterUserId'], + $payload['tokenValue'], + $remaingEmails, + $payload['params'], + 'some mails failed', + $count + )); + + foreach ($remaingEmails as $mail) { + $this->app['dispatcher']->dispatch(PhraseaEvents::EXPORT_MAIL_FAILURE, new ExportFailureEvent( + $user, + $params['ssttid'], + $params['lst'], + \eventsmanager_notify_downloadmailfail::MAIL_FAIL, + $mail + ) + ); + } + } + + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/Factory/CallableWorkerFactory.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/Factory/CallableWorkerFactory.php new file mode 100644 index 0000000000..d794e000b7 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/Factory/CallableWorkerFactory.php @@ -0,0 +1,33 @@ +factory = $factory; + } + + /** + * @return WorkerInterface + */ + public function createWorker() + { + $factory = $this->factory; + $worker = $factory(); + + if (! $worker instanceof WorkerInterface) { + throw new \RuntimeException('Invalid worker created, expected an instance of \Alchemy\Phrasea\WorkerManager\Worker\WorkerInterface'); + } + + return $worker; + } +} \ No newline at end of file diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/Factory/WorkerFactoryInterface.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/Factory/WorkerFactoryInterface.php new file mode 100644 index 0000000000..c7b221e9bb --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/Factory/WorkerFactoryInterface.php @@ -0,0 +1,13 @@ +indexer = $indexer; + $this->messagePublisher = $messagePublisher; + } + + public function process(array $payload) + { + DBManipulator::savePopulateStatus($payload); + + /** @var ElasticsearchOptions $options */ + $options = $this->indexer->getIndex()->getOptions(); + + $options->setIndexName($payload['indexName']); + $options->setHost($payload['host']); + $options->setPort($payload['port']); + + $databoxId = $payload['databoxId']; + + $indexExists = $this->indexer->indexExists(); + + if (!$indexExists) { + $workerMessage = sprintf("Index %s don't exist!", $payload['indexName']); + $this->messagePublisher->pushLog($workerMessage); + + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + // send to retry queue + $this->dispatch(WorkerEvents::POPULATE_INDEX_FAILURE, new PopulateIndexFailureEvent( + $payload['host'], + $payload['port'], + $payload['indexName'], + $payload['databoxId'], + $workerMessage, + $count + )); + } else { + $databox = $this->findDataboxById($databoxId); + + try { + $r = $this->indexer->populateIndex(Indexer::THESAURUS | Indexer::RECORDS, $databox); // , $temporary); + + $this->messagePublisher->pushLog(sprintf( + "Indexation of databox \"%s\" finished in %0.2f sec (Mem. %0.2f Mo)", + $databox->get_dbname(), + $r['duration']/1000, + $r['memory']/1048576 + )); + } catch(\Exception $e) { + DBManipulator::deletePopulateStatus($payload); + + $workerMessage = sprintf("Error on indexing : %s ", $e->getMessage()); + $this->messagePublisher->pushLog($workerMessage); + + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + // notify to send a retry + $this->dispatch(WorkerEvents::POPULATE_INDEX_FAILURE, new PopulateIndexFailureEvent( + $payload['host'], + $payload['port'], + $payload['indexName'], + $payload['databoxId'], + $workerMessage, + $count + )); + } + } + + // delete entry in populate_running + DBManipulator::deletePopulateStatus($payload); + } + +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php new file mode 100644 index 0000000000..81917d3e56 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/ProcessPool.php @@ -0,0 +1,100 @@ +logger = new NullLogger(); + } + + public function setMaxProcesses($maxProcesses) + { + $this->maxProcesses = max(1, $maxProcesses); + } + + /** + * Sets a logger instance on the object + * + * @param LoggerInterface $logger + * @return null + */ + public function setLogger(LoggerInterface $logger) + { + $this->logger = $logger; + } + + /** + * @param array $processArguments + * @param string|null $workingDirectory + * @return Process + */ + public function getWorkerProcess(array $processArguments, $workingDirectory = null) + { + $this->detachFinishedProcesses(); + $this->waitForNextSlot(); + + $builder = new ProcessBuilder($processArguments); + + $builder->setWorkingDirectory($workingDirectory ?: getcwd()); + + return ($this->processes[] = $builder->getProcess()); + } + + private function detachFinishedProcesses() + { + $runningProcesses = []; + + foreach ($this->processes as $process) { + if ($process->isRunning()) { + $runningProcesses[] = $process; + } else { + $process->stop(0); + } + } + + $this->processes = $runningProcesses; + } + + private function waitForNextSlot() + { + $this->logger->debug( + sprintf('Checking for available process slot: %d processes found.', count($this->processes)) + ); + + $interval = 1; + + while (count($this->processes) >= $this->maxProcesses) { + $this->logger->debug(sprintf('%d Max process count reached, will retry in %d second.', $this->maxProcesses, $interval)); + + sleep($interval); + + $this->detachFinishedProcesses(); + $interval = min(10, $interval + 1); + } + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php new file mode 100644 index 0000000000..f8ef7a1310 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php @@ -0,0 +1,138 @@ +messagePublisher = $messagePublisher; + } + + public function process(array $payload) + { + $config = Config::getConfiguration(); + + if (isset($config['worker_plugin']) && isset($config['worker_plugin']['pull_assets'])) { + $config = $config['worker_plugin']['pull_assets']; + } else { + return; + } + + $uploaderClient = new Client(); + + // if a token exist , use it + if (isset($config['assetToken'])) { + $res = $this->getCommits($uploaderClient, $config); + if ($res == null) { + return; + } + + // if Unauthorized get a new token first + if ($res->getStatusCode() == 401) { + if (($config = $this->generateToken($uploaderClient, $config)) === null) { + return; + }; + $res = $this->getCommits($uploaderClient, $config); + } + } else { // if there is not a token , get one from the uploader service + if (($config = $this->generateToken($uploaderClient, $config)) === null) { + return; + }; + if (($res = $this->getCommits($uploaderClient, $config)) === null) { + return; + } + } + + $body = $res->getBody()->getContents(); + $body = json_decode($body,true); + $commits = $body['hydra:member']; + + $urlInfo = parse_url($config['endpointCommit']); + $baseUrl = $urlInfo['scheme'] . '://' . $urlInfo['host'] .':'.$urlInfo['port']; + + foreach ($commits as $commit) { + // send only payload in ingest-queue if the commit is ack false and it is not being creating + if (!$commit['acknowledged'] && !DBManipulator::isCommitToBeCreating($commit['id'])) { + $this->messagePublisher->pushLog("A new commit found in the uploader ! commit_ID : ".$commit['id']); + + $payload = [ + 'message_type' => MessagePublisher::ASSETS_INGEST_TYPE, + 'payload' => [ + 'assets' => array_map(function($asset) { + return str_replace('/assets/', '', $asset); + }, $commit['assets']), + 'publisher' => $commit['userId'], + 'commit_id' => $commit['id'], + 'token' => $commit['token'], + 'base_url' => $baseUrl + ] + ]; + + $this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE); + } + } + + } + + /** + * @param Client $uploaderClient + * @param array $config + * @return \Psr\Http\Message\ResponseInterface|null + */ + private function getCommits(Client $uploaderClient, array $config) + { + try { + $res = $uploaderClient->get($config['endpointCommit'], [ + 'headers' => [ + 'Authorization' => 'AssetToken '.$config['assetToken'] + ] + ]); + } catch(\Exception $e) { + $this->messagePublisher->pushLog("An error occurred when fetching endpointCommit : " . $e->getMessage()); + + return null; + } + + return $res; + } + + /** + * @param Client $uploaderClient + * @param array $config + * @return array|null + */ + private function generateToken(Client $uploaderClient, array $config) + { + try { + $tokenBody = $uploaderClient->post($config['endpointToken'], [ + 'json' => [ + 'client_id' => $config['clientId'], + 'client_secret' => $config['clientSecret'], + 'grant_type' => 'client_credentials', + 'scope' => 'uploader:commit_list' + ] + ])->getBody()->getContents(); + } catch (\Exception $e) { + $this->messagePublisher->pushLog("An error occurred when fetching endpointToken : " . $e->getMessage()); + + return null; + } + + $tokenBody = json_decode($tokenBody,true); + + $config['assetToken'] = $tokenBody['access_token']; + + Config::setConfiguration(['pull_assets' => $config]); + $config = Config::getConfiguration(); + + return $config['worker_plugin']['pull_assets']; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/Resolver/TypeBasedWorkerResolver.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/Resolver/TypeBasedWorkerResolver.php new file mode 100644 index 0000000000..e03d163bca --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/Resolver/TypeBasedWorkerResolver.php @@ -0,0 +1,46 @@ +factories[$messageType] = $workerFactory; + } + + /** + * @return WorkerFactoryInterface[] + */ + public function getFactories() + { + return $this->factories; + } + + public function getWorker($messageType, array $message) + { + if (isset($this->workers[$messageType])) { + return $this->workers[$messageType]; + } + + if (isset($this->factories[$messageType])) { + return $this->workers[$messageType] = $this->factories[$messageType]->createWorker(); + } + + throw new \RuntimeException('Invalid worker type requested: ' . $messageType); + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/Resolver/WorkerResolverInterface.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/Resolver/WorkerResolverInterface.php new file mode 100644 index 0000000000..2c23b15c9e --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/Resolver/WorkerResolverInterface.php @@ -0,0 +1,15 @@ +subdefGenerator = $subdefGenerator; + $this->messagePublisher = $messagePublisher; + $this->logger = $logger; + $this->dispatcher = $dispatcher; + $this->filesystem = $filesystem; + $this->repoWorker = $repoWorker; + } + + public function process(array $payload) + { + if(isset($payload['recordId']) && isset($payload['databoxId'])) { + $recordId = $payload['recordId']; + $databoxId = $payload['databoxId']; + $wantedSubdef = [$payload['subdefName']]; + + $databox = $this->findDataboxById($databoxId); + $record = $databox->get_record($recordId); + + $oldLogger = $this->subdefGenerator->getLogger(); + + if (!$record->isStory()) { + // check if there is a write meta running for the record or the same task running + $canCreateSubdef = $this->repoWorker->canCreateSubdef($payload['subdefName'], $recordId, $databoxId); + + if (!$canCreateSubdef) { + // the file is in used to write meta + $payload = [ + 'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE, + 'payload' => $payload + ]; + $this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_SUBDEF_QUEUE); + + $message = MessagePublisher::SUBDEF_CREATION_TYPE.' to be re-published! >> Payload ::'. json_encode($payload); + $this->messagePublisher->pushLog($message); + + return ; + } + + // tell that a file is in used to create subdef + $em = $this->getEntityManager(); + $em->beginTransaction(); + + try { + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setDataboxId($databoxId) + ->setRecordId($recordId) + ->setWork(PhraseaTokens::MAKE_SUBDEF) + ->setWorkOn($payload['subdefName']) + ; + + $em->persist($workerRunningJob); + $em->flush(); + + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + + $this->subdefGenerator->setLogger($this->logger); + + $this->subdefGenerator->generateSubdefs($record, $wantedSubdef); + + // begin to check if the subdef is successfully generated + $subdef = $record->getDatabox()->get_subdef_structure()->getSubdefGroup($record->getType())->getSubdef($payload['subdefName']); + $filePathToCheck = null; + + if ($record->has_subdef($payload['subdefName']) ) { + $filePathToCheck = $record->get_subdef($payload['subdefName'])->getRealPath(); + } + + $filePathToCheck = $this->filesystem->generateSubdefPathname($record, $subdef, $filePathToCheck); + + if (!$this->filesystem->exists($filePathToCheck)) { + + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent( + $record, + $payload['subdefName'], + 'Subdef generation failed !', + $count + )); + + $this->subdefGenerator->setLogger($oldLogger); + return ; + } + // checking ended + + // order to write meta for the subdef if needed + $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent($record, $payload['subdefName'])); + + $this->subdefGenerator->setLogger($oldLogger); + + // update jeton when subdef is created + $this->updateJeton($record); + + $parents = $record->get_grouping_parents(); + + // create a cover for a story + // used when uploaded via uploader-service and grouped as a story + if (!$parents->is_empty() && isset($payload['status']) && $payload['status'] == MessagePublisher::NEW_RECORD_MESSAGE && in_array($payload['subdefName'], array('thumbnail', 'preview'))) { + foreach ($parents->get_elements() as $story) { + if (self::checkIfFirstChild($story, $record)) { + $data = implode('_', [$databoxId, $story->getRecordId(), $recordId, $payload['subdefName']]); + + $this->dispatcher->dispatch(WorkerEvents::STORY_CREATE_COVER, new StoryCreateCoverEvent($data)); + } + } + } + + // tell that we have finished to work on this file + $em->beginTransaction(); + try { + $em->remove($workerRunningJob); + $em->flush(); + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + } + } + } + + public static function checkIfFirstChild(\record_adapter $story, \record_adapter $record) + { + $sql = "SELECT * FROM regroup WHERE rid_parent = :parent_record_id AND rid_child = :children_id and ord = :ord"; + + $connection = $record->getDatabox()->get_connection(); + + $stmt = $connection->prepare($sql); + + $stmt->execute([ + ':parent_record_id' => $story->getRecordId(), + ':children_id' => $record->getRecordId(), + ':ord' => 0, + ]); + + $row = $stmt->fetch(\PDO::FETCH_ASSOC); + + $stmt->closeCursor(); + + if ($row) { + return true; + } + + return false; + } + + private function updateJeton(\record_adapter $record) + { + $connection = $record->getDatabox()->get_connection(); + $connection->beginTransaction(); + + // mark subdef created + $sql = 'UPDATE record' + . ' SET jeton=(jeton & ~(:token)), moddate=NOW()' + . ' WHERE record_id=:record_id'; + + $stmt = $connection->prepare($sql); + + $stmt->execute([ + ':record_id' => $record->getRecordId(), + ':token' => PhraseaTokens::MAKE_SUBDEF, + ]); + + $connection->commit(); + $stmt->closeCursor(); + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php new file mode 100644 index 0000000000..13700e77fb --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php @@ -0,0 +1,206 @@ +app = $app; + $this->messagePublisher = $app['alchemy_worker.message.publisher']; + } + + /** + * @param array $payload + */ + public function process(array $payload) + { + if (isset($payload['id'])) { + $webhookEventId = $payload['id']; + $app = $this->app; + + $httpClient = new GuzzleClient(); + $version = new Version(); + $httpClient->setUserAgent(sprintf('Phraseanet/%s (%s)', $version->getNumber(), $version->getName())); + + $httpClient->getEventDispatcher()->addListener('request.error', function (Event $event) { + // override guzzle default behavior of throwing exceptions + // when 4xx & 5xx responses are encountered + $event->stopPropagation(); + }, -254); + + // Set callback which logs success or failure + $subscriber = new CallbackBackoffStrategy(function ($retries, Request $request, $response, $e) use ($app, $webhookEventId, $payload) { + $retry = true; + if ($response && (null !== $deliverId = parse_url($request->getUrl(), PHP_URL_FRAGMENT))) { + /** @var WebhookEventDelivery $delivery */ + $delivery = $app['repo.webhook-delivery']->find($deliverId); + + $logContext = [ 'host' => $request->getHost() ]; + + if ($response->isSuccessful()) { + $app['manipulator.webhook-delivery']->deliverySuccess($delivery); + + $logType = 'info'; + $logEntry = sprintf('Deliver success event "%d:%s" for app "%s"', + $delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(), + $delivery->getThirdPartyApplication()->getName() + ); + + $retry = false; + } else { + $app['manipulator.webhook-delivery']->deliveryFailure($delivery); + + $logType = 'error'; + $logEntry = sprintf('Deliver failure event "%d:%s" for app "%s"', + $delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(), + $delivery->getThirdPartyApplication()->getName() + ); + + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + $this->dispatch(WorkerEvents::WEBHOOK_DELIVER_FAILURE, new WebhookDeliverFailureEvent( + $webhookEventId, + $logEntry, + $count, + $deliverId + )); + } + + $app['alchemy_worker.message.publisher']->pushLog($logEntry, $logType, $logContext); + + return $retry; + } + }, true, new CurlBackoffStrategy()); + + // set max retries + $subscriber = new TruncatedBackoffStrategy(1, $subscriber); + $subscriber = new BackoffPlugin($subscriber); + + $httpClient->addSubscriber($subscriber); + + + $thirdPartyApplications = $this->app['repo.api-applications']->findWithDefinedWebhookCallback(); + + /** @var WebhookEvent|null $webhookevent */ + $webhookevent = $this->app['repo.webhook-event']->find($webhookEventId); + + if ($webhookevent !== null) { + $app['manipulator.webhook-event']->processed($webhookevent); + + $this->messagePublisher->pushLog(sprintf('Processing event "%s" with id %d', $webhookevent->getName(), $webhookevent->getId())); + // send requests + $this->deliverEvent($httpClient, $thirdPartyApplications, $webhookevent, $payload); + } + } + } + + private function deliverEvent(GuzzleClient $httpClient, array $thirdPartyApplications, WebhookEvent $webhookevent, $payload) + { + if (count($thirdPartyApplications) === 0) { + $workerMessage = 'No applications defined to listen for webhook events'; + $this->messagePublisher->pushLog($workerMessage); + + // count = 0 mean do not retry because no api application defined + $this->dispatch(WorkerEvents::WEBHOOK_DELIVER_FAILURE, new WebhookDeliverFailureEvent($webhookevent->getId(), $workerMessage, 0)); + + return; + } + + // format event data + if (!isset($payload['delivery_id'])) { + $webhookData = $webhookevent->getData(); + $webhookData['time'] = $webhookevent->getCreated(); + $webhookevent->setData($webhookData); + } + + /** @var ProcessorInterface $eventProcessor */ + $eventProcessor = $this->app['webhook.processor_factory']->get($webhookevent); + $data = $eventProcessor->process($webhookevent); + + // batch requests + $batch = BatchBuilder::factory() + ->transferRequests(10) + ->build(); + + /** @var ApiApplication $thirdPartyApplication */ + foreach ($thirdPartyApplications as $thirdPartyApplication) { + $creator = $thirdPartyApplication->getCreator(); + + if ($creator == null) { + continue; + } + + $creatorGrantedBaseIds = array_keys($this->app['acl']->get($creator)->get_granted_base()); + + $concernedBaseIds = array_intersect($webhookevent->getCollectionBaseIds(), $creatorGrantedBaseIds); + + if (count($webhookevent->getCollectionBaseIds()) != 0 && count($concernedBaseIds) == 0) { + continue; + } + + if (isset($payload['delivery_id']) && $payload['delivery_id'] != null) { + /** @var WebhookEventDelivery $delivery */ + $delivery = $this->app['repo.webhook-delivery']->find($payload['delivery_id']); + + // only the app url to retry + if ($delivery->getThirdPartyApplication()->getId() != $thirdPartyApplication->getId()) { + continue; + } + } else { + $delivery = $this->app['manipulator.webhook-delivery']->create($thirdPartyApplication, $webhookevent); + } + + // append delivery id as url anchor + $uniqueUrl = $this->getUrl($thirdPartyApplication, $delivery); + + // create http request with data as request body + $batch->add($httpClient->createRequest('POST', $uniqueUrl, [ + 'Content-Type' => 'application/vnd.phraseanet.event+json' + ], json_encode($data))); + } + + try { + $batch->flush(); + } catch (\Exception $e) { + $this->messagePublisher->pushLog($e->getMessage()); + $this->messagePublisher->publishFailedMessage( + $payload, + new AMQPTable(['worker-message' => $e->getMessage()]), + MessagePublisher::FAILED_WEBHOOK_QUEUE + ); + } + } + + private function getUrl(ApiApplication $application, WebhookEventDelivery $delivery) + { + return sprintf('%s#%s', $application->getWebhookUrl(), $delivery->getId()); + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WorkerInterface.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WorkerInterface.php new file mode 100644 index 0000000000..73b8dde300 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WorkerInterface.php @@ -0,0 +1,12 @@ +binaryPath = $_SERVER['SCRIPT_NAME']; + $this->environment = $environment; + $this->processPool = $processPool; + $this->logger = new NullLogger(); + } + + public function preservePayloads() + { + $this->preservePayloads = true; + } + + /** + * Sets a logger instance on the object + * + * @param LoggerInterface $logger + * @return null + */ + public function setLogger(LoggerInterface $logger) + { + $this->logger = $logger; + } + + public function setPrefix($prefix) + { + $this->prefix = $prefix; + } + + /** + * @param string $messageType + * @param string $payload + */ + public function invokeWorker($messageType, $payload) + { + $args = [ + $this->binaryPath, + $this->command, + '-vv', + $messageType, + $this->createPayloadFile($payload) + ]; + + if ($this->environment) { + $args[] = sprintf('-e=%s', $this->environment); + } + + if ($this->preservePayloads) { + $args[] = '--preserve-payload'; + } + + $process = $this->processPool->getWorkerProcess($args, getcwd()); + + $this->logger->debug('Invoking shell command: ' . $process->getCommandLine()); + + try { + $process->start([$this, 'logWorkerOutput']); + } catch (ProcessRuntimeException $e) { + $process->stop(); + + throw new \RuntimeException(sprintf('Command "%s" failed: %s', $process->getCommandLine(), + $e->getMessage()), 0, $e); + } + } + + public function logWorkerOutput($stream, $output) + { + if ($stream == 'err') { + $this->logger->error($output); + } else { + $this->logger->info($output); + } + } + + public function setMaxProcessPoolValue($maxProcesses) + { + $this->processPool->setMaxProcesses($maxProcesses); + } + + private function createPayloadFile($payload) + { + $path = tempnam(sys_get_temp_dir(), $this->prefix); + + if (file_put_contents($path, $payload) === false) { + throw new \RuntimeException('Cannot write payload file to path: ' . $path); + } + + return $path; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php new file mode 100644 index 0000000000..2816b8e757 --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -0,0 +1,310 @@ +writer = $writer; + $this->logger = $logger; + $this->messagePublisher = $messagePublisher; + $this->repoWorker = $repoWorker; + } + + public function process(array $payload) + { + if (isset($payload['recordId']) && isset($payload['databoxId'])) { + $recordId = $payload['recordId']; + $databoxId = $payload['databoxId']; + + $MWG = isset($payload['MWG']) ? $payload['MWG'] : false; + $clearDoc = isset($payload['clearDoc']) ? $payload['clearDoc'] : false; + $databox = $this->findDataboxById($databoxId); + + + $param = ($payload['subdefName'] == "document") ? PhraseaTokens::WRITE_META_DOC : PhraseaTokens::WRITE_META_SUBDEF; + + // check if there is a make subdef running for the record or the same task running + $canWriteMeta = $this->repoWorker->canWriteMetadata($payload['subdefName'], $recordId, $databoxId); + + if (!$canWriteMeta) { + // the file is in used to generate subdef + $payload = [ + 'message_type' => MessagePublisher::WRITE_METADATAS_TYPE, + 'payload' => $payload + ]; + $this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_METADATAS_QUEUE); + + $message = MessagePublisher::WRITE_METADATAS_TYPE.' to be re-published! >> Payload ::'. json_encode($payload); + $this->messagePublisher->pushLog($message); + + return ; + } + + // tell that a file is in used to create subdef + $em = $this->getEntityManager(); + $em->beginTransaction(); + + try { + $workerRunningJob = new WorkerRunningJob(); + $workerRunningJob + ->setDataboxId($databoxId) + ->setRecordId($recordId) + ->setWork($param) + ->setWorkOn($payload['subdefName']) + ; + + $em->persist($workerRunningJob); + $em->flush(); + + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + + $record = $databox->get_record($recordId); + + $subdef = $record->get_subdef($payload['subdefName']); + + if ($subdef->is_physically_present()) { + $metadata = new MetadataBag(); + + // add Uuid in metadatabag + if ($record->getUuid()) { + $metadata->add( + new Metadata( + new Tag\XMPExif\ImageUniqueID(), + new Mono($record->getUuid()) + ) + ); + $metadata->add( + new Metadata( + new Tag\ExifIFD\ImageUniqueID(), + new Mono($record->getUuid()) + ) + ); + $metadata->add( + new Metadata( + new Tag\IPTC\UniqueDocumentID(), + new Mono($record->getUuid()) + ) + ); + } + + // read document fields and add to metadatabag + $caption = $record->get_caption(); + foreach ($databox->get_meta_structure() as $fieldStructure) { + + $tagName = $fieldStructure->get_tag()->getTagname(); + $fieldName = $fieldStructure->get_name(); + + // skip fields with no src + if ($tagName == '' || $tagName == 'Phraseanet:no-source') { + continue; + } + + // check exiftool known tags to skip Phraseanet:tf-* + try { + $tag = TagFactory::getFromRDFTagname($tagName); + if(!$tag->isWritable()) { + continue; + } + } catch (TagUnknown $e) { + continue; + } + + try { + $field = $caption->get_field($fieldName); + $fieldValues = $field->get_values(); + + if ($fieldStructure->is_multi()) { + $values = array(); + foreach ($fieldValues as $value) { + $values[] = $this->removeNulChar($value->getValue()); + } + + $value = new Multi($values); + } else { + $fieldValue = array_pop($fieldValues); + $value = $this->removeNulChar($fieldValue->getValue()); + + // fix the dates edited into phraseanet + if($fieldStructure->get_type() === $fieldStructure::TYPE_DATE) { + try { + $value = self::fixDate($value); // will return NULL if the date is not valid + } + catch (\Exception $e) { + $value = null; // do NOT write back to iptc + } + } + + if($value !== null) { // do not write invalid dates + $value = new Mono($value); + } + } + } catch(\Exception $e) { + // the field is not set in the record, erase it + if ($fieldStructure->is_multi()) { + $value = new Multi(array('')); + } + else { + $value = new Mono(''); + } + } + + if($value !== null) { // do not write invalid data + $metadata->add( + new Metadata($fieldStructure->get_tag(), $value) + ); + } + } + + $this->writer->reset(); + + if ($MWG) { + $this->writer->setModule(Writer::MODULE_MWG, true); + } + + $this->writer->erase($subdef->get_name() != 'document' || $clearDoc, true); + + // write meta in file + try { + $this->writer->write($subdef->getRealPath(), $metadata); + + $this->messagePublisher->pushLog(sprintf('meta written for sbasid=%1$d - recordid=%2$d (%3$s)', $databox->get_sbas_id(), $recordId, $subdef->get_name() )); + } catch (PHPExiftoolException $e) { + $workerMessage = sprintf('meta NOT written for sbasid=%1$d - recordid=%2$d (%3$s) because "%s"', $databox->get_sbas_id(), $recordId, $subdef->get_name() , $e->getMessage()); + $this->logger->error($workerMessage); + + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( + $record, + $payload['subdefName'], + SubdefinitionWritemetaEvent::FAILED, + $workerMessage, + $count + )); + } + + // mark write metas finished + $this->updateJeton($record); + } else { + $count = isset($payload['count']) ? $payload['count'] + 1 : 2 ; + + $this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( + $record, + $payload['subdefName'], + SubdefinitionWritemetaEvent::FAILED, + 'Subdef is not physically present!', + $count + )); + } + + // tell that we have finished to work on this file + $em->beginTransaction(); + try { + $em->remove($workerRunningJob); + $em->flush(); + $em->commit(); + } catch (\Exception $e) { + $em->rollback(); + } + + } + + } + + private function removeNulChar($value) + { + return str_replace("\0", "", $value); + } + + private function updateJeton(\record_adapter $record) + { + $connection = $record->getDatabox()->get_connection(); + + $connection->beginTransaction(); + $stmt = $connection->prepare('UPDATE record SET jeton=(jeton & ~(:token)), moddate=NOW() WHERE record_id = :record_id'); + + $stmt->execute([ + ':record_id' => $record->getRecordId(), + ':token' => PhraseaTokens::WRITE_META, + ]); + + $connection->commit(); + $stmt->closeCursor(); + } + + /** + * re-format a phraseanet date for iptc writing + * return NULL if the date is not valid + * + * @param string $value + * @return string|null + */ + private static function fixDate($value) + { + $date = null; + try { + $a = explode(';', preg_replace('/\D+/', ';', trim($value))); + switch (count($a)) { + case 3: // yyyy;mm;dd + $date = new \DateTime($a[0] . '-' . $a[1] . '-' . $a[2]); + $date = $date->format('Y-m-d H:i:s'); + break; + case 6: // yyyy;mm;dd;hh;mm;ss + $date = new \DateTime($a[0] . '-' . $a[1] . '-' . $a[2] . ' ' . $a[3] . ':' . $a[4] . ':' . $a[5]); + $date = $date->format('Y-m-d H:i:s'); + break; + } + } + catch (\Exception $e) { + $date = null; + } + + return $date; + } +} diff --git a/lib/conf.d/json_schema/assets_enqueue.json b/lib/conf.d/json_schema/assets_enqueue.json new file mode 100644 index 0000000000..416f321fb6 --- /dev/null +++ b/lib/conf.d/json_schema/assets_enqueue.json @@ -0,0 +1,30 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "New assets", + "description": "List of assets to enqueue on Phraseanet", + "type": "object", + "properties": { + "assets": { + "type": "array" + }, + "publisher": { + "type": "string" + }, + "token": { + "type": "string" + }, + "base_url": { + "type": "string" + }, + "commit_id": { + "type": "string" + } + }, + "required": [ + "assets", + "publisher", + "token", + "base_url", + "commit_id" + ] +} diff --git a/templates/web/admin/tree.html.twig b/templates/web/admin/tree.html.twig index ecad1411c4..08da90f9b7 100644 --- a/templates/web/admin/tree.html.twig +++ b/templates/web/admin/tree.html.twig @@ -76,6 +76,15 @@ + {% if app.getAclForUser(app.getAuthenticatedUser()).has_right(constant('\\ACL::TASKMANAGER')) %} +
  • + + + {{ 'Worker Manager' | trans }} + +
  • + {% endif %} +
  • diff --git a/templates/web/admin/worker-manager/index.html.twig b/templates/web/admin/worker-manager/index.html.twig new file mode 100644 index 0000000000..7185cd55a5 --- /dev/null +++ b/templates/web/admin/worker-manager/index.html.twig @@ -0,0 +1,77 @@ +

    Worker

    + +
    + + + + +
    +
    +
    +
    +
    +
    +
    +
    + + diff --git a/templates/web/admin/worker-manager/worker_configuration.html.twig b/templates/web/admin/worker-manager/worker_configuration.html.twig new file mode 100644 index 0000000000..8ce8d649ab --- /dev/null +++ b/templates/web/admin/worker-manager/worker_configuration.html.twig @@ -0,0 +1,39 @@ +

    Config Worker queue retry

    + +

    Set up the delay between two attempts per queue!

    +{{ form_start(form, {'action': path('worker_admin_configuration')}) }} + +
    + {{ form_row(form.assetsIngest) }} +
    + +
    + {{ form_row(form.createRecord) }} +
    + +
    + {{ form_row(form.subdefCreation) }} +
    + +
    + {{ form_row(form.writeMetadatas) }} +
    + +
    + {{ form_row(form.webhook) }} +
    + +
    + {{ form_row(form.exportMail) }} +
    + +
    + {{ form_row(form.populateIndex) }} +
    + +
    + +
    + + +{{ form_end(form) }} diff --git a/templates/web/admin/worker-manager/worker_metadata.html.twig b/templates/web/admin/worker-manager/worker_metadata.html.twig new file mode 100644 index 0000000000..ec122fe8e8 --- /dev/null +++ b/templates/web/admin/worker-manager/worker_metadata.html.twig @@ -0,0 +1 @@ +

    Write metadata setting

    diff --git a/templates/web/admin/worker-manager/worker_pull_assets.html.twig b/templates/web/admin/worker-manager/worker_pull_assets.html.twig new file mode 100644 index 0000000000..11a8dfdb77 --- /dev/null +++ b/templates/web/admin/worker-manager/worker_pull_assets.html.twig @@ -0,0 +1,29 @@ +

    Pull Assets from uploader setting

    + +{{ form_start(form, {'action': path('worker_admin_pullAssets')}) }} + +
    + {{ form_row(form.endpointCommit) }} +
    + +
    + {{ form_row(form.endpointToken) }} +
    + +
    + {{ form_row(form.clientSecret) }} +
    + +
    + {{ form_row(form.clientId) }} +
    + +
    + {{ form_row(form.pullInterval) }} +
    + +
    + +
    + +{{ form_end(form) }} diff --git a/templates/web/admin/worker-manager/worker_searchengine.html.twig b/templates/web/admin/worker-manager/worker_searchengine.html.twig new file mode 100644 index 0000000000..f2c770aa68 --- /dev/null +++ b/templates/web/admin/worker-manager/worker_searchengine.html.twig @@ -0,0 +1,98 @@ +

    Populate elasticsearch index

    + +{{ form_start(form, {'action': path('worker_admin_searchengine')}) }} + +
    + {{ form_row(form.host) }} +
    + +
    + {{ form_row(form.port) }} +
    + +
    + {{ form_row(form.indexName) }} +
    + +
    + +
    + + + {% for databox in app.getApplicationBox().get_databoxes() %} + + {% endfor %} +
    +
    + +
    + +
    + +{{ form_end(form) }} + + diff --git a/templates/web/admin/worker-manager/worker_subview.html.twig b/templates/web/admin/worker-manager/worker_subview.html.twig new file mode 100644 index 0000000000..6b89604e63 --- /dev/null +++ b/templates/web/admin/worker-manager/worker_subview.html.twig @@ -0,0 +1 @@ +

    Subview setting

    diff --git a/tests/Alchemy/Tests/Phrasea/WorkerManager/Subscriber/SubscriberTest.php b/tests/Alchemy/Tests/Phrasea/WorkerManager/Subscriber/SubscriberTest.php new file mode 100644 index 0000000000..574699ee3c --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/WorkerManager/Subscriber/SubscriberTest.php @@ -0,0 +1,45 @@ +prophesize('Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher'); + + $sexportSubscriber = new ExportSubscriber($app['alchemy_worker.message.publisher']->reveal()); + $this->assertInstanceOf('Symfony\\Component\\EventDispatcher\\EventSubscriberInterface', $sexportSubscriber); + + $recordSubscriber = new RecordSubscriber($app); + $this->assertInstanceOf('Symfony\\Component\\EventDispatcher\\EventSubscriberInterface', $recordSubscriber); + } + + public function testIfPublisheMessageOnSubscribeEvent() + { + $app = new Application(Application::ENV_TEST); + $app['alchemy_worker.message.publisher'] = $this->getMockBuilder('Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher') + ->disableOriginalConstructor() + ->getMock(); + + $app['alchemy_worker.type_based_worker_resolver'] = $this->getMockBuilder('Alchemy\Phrasea\WorkerManager\Worker\Resolver\TypeBasedWorkerResolver') + ->disableOriginalConstructor() + ->getMock(); + + $app['alchemy_worker.message.publisher']->expects($this->atLeastOnce())->method('publishMessage'); + + $event = $this->prophesize('Alchemy\Phrasea\Core\Event\ExportMailEvent'); + $sut = new ExportSubscriber($app['alchemy_worker.message.publisher']); + $sut->onExportMailCreate($event->reveal()); + + } +} diff --git a/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/Factory/CallableWorkerFactoryTest.php b/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/Factory/CallableWorkerFactoryTest.php new file mode 100644 index 0000000000..c6f0db7e5c --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/Factory/CallableWorkerFactoryTest.php @@ -0,0 +1,15 @@ +assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\Factory\\WorkerFactoryInterface', $sut); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/Resolver/TypeBasedWorkerResolverTest.php b/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/Resolver/TypeBasedWorkerResolverTest.php new file mode 100644 index 0000000000..dcc531bead --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/Resolver/TypeBasedWorkerResolverTest.php @@ -0,0 +1,77 @@ +assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\Resolver\\WorkerResolverInterface', $sut); + } + + public function testGetFactories() + { + $workerFactory = $this->getMockBuilder(WorkerFactoryInterface::class) + ->disableOriginalConstructor() + ->getMock() + ; + + $sut = new TypeBasedWorkerResolver(); + + $sut->addFactory(MessagePublisher::SUBDEF_CREATION_TYPE, $workerFactory); + + $this->assertContainsOnlyInstancesOf(WorkerFactoryInterface::class, $sut->getFactories()); + } + + public function testGetWorkerSuccess() + { + $worker = $this->getMockBuilder(WorkerInterface::class) + ->disableOriginalConstructor() + ->getMock(); + + $workerFactory = $this->getMockBuilder(CallableWorkerFactory::class) + ->disableOriginalConstructor() + ->getMock(); + + $workerFactory->method('createWorker')->will($this->returnValue($worker)); + + $sut = new TypeBasedWorkerResolver(); + + $sut->addFactory(MessagePublisher::SUBDEF_CREATION_TYPE, $workerFactory); + + + $this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', + $sut->getWorker(MessagePublisher::SUBDEF_CREATION_TYPE, ['mock-message'])); + + } + + public function testGetWorkerWrongTypeThrowException() + { + $worker = $this->getMockBuilder(WorkerInterface::class) + ->disableOriginalConstructor() + ->getMock(); + + $workerFactory = $this->getMockBuilder(CallableWorkerFactory::class) + ->disableOriginalConstructor() + ->getMock(); + + $workerFactory->method('createWorker')->will($this->returnValue($worker)); + + $sut = new TypeBasedWorkerResolver(); + + $sut->addFactory(MessagePublisher::SUBDEF_CREATION_TYPE, $workerFactory); + + $this->expectException(\RuntimeException::class); + + $sut->getWorker(MessagePublisher::WRITE_METADATAS_TYPE, ['mock-message']); + + } +} diff --git a/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/WorkerInvokerTest.php b/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/WorkerInvokerTest.php new file mode 100644 index 0000000000..7c3666c241 --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/WorkerInvokerTest.php @@ -0,0 +1,73 @@ +prophesize(ProcessPool::class); + + $sut = new WorkerInvoker($processPool->reveal()); + + $this->assertInstanceOf('Psr\\Log\\LoggerAwareInterface', $sut); + } + + public function testInvokeWorkerSuccess() + { + $process = $this->getMockBuilder(Process::class) + ->disableOriginalConstructor() + ->getMock() + ; + + $process ->expects($this->exactly(1)) + ->method('start') + ; + + $processPool = $this->getMockBuilder(ProcessPool::class) + ->disableOriginalConstructor() + ->getMock(); + + $processPool->method('getWorkerProcess')->will($this->returnValue($process)); + + + $sut = new WorkerInvoker($processPool); + + $sut->invokeWorker(MessagePublisher::SUBDEF_CREATION_TYPE, json_encode(['mock-payload'])); + } + + public function testInvokeWorkerWhenThrowException() + { + $process = $this->getMockBuilder(Process::class) + ->disableOriginalConstructor() + ->getMock() + ; + + $process ->expects($this->exactly(1)) + ->method('start') + ->will($this->throwException(new ProcessRuntimeException())) + ; + + $processPool = $this->getMockBuilder(ProcessPool::class) + ->disableOriginalConstructor() + ->getMock(); + + $processPool->method('getWorkerProcess')->will($this->returnValue($process)); + + + $sut = new WorkerInvoker($processPool); + + try { + $sut->invokeWorker(MessagePublisher::SUBDEF_CREATION_TYPE, json_encode(['mock-payload'])); + $this->fail('Should have raised an exception'); + } catch (\Exception $e) { + + } + } +} diff --git a/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/WorkerServiceTest.php b/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/WorkerServiceTest.php new file mode 100644 index 0000000000..691696607d --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/WorkerManager/Worker/WorkerServiceTest.php @@ -0,0 +1,59 @@ +assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $exportMailWorker); + + $app['subdef.generator'] = $this->prophesize('Alchemy\Phrasea\Media\SubdefGenerator')->reveal(); + $app['alchemy_worker.message.publisher'] = $this->prophesize('Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher')->reveal(); + $app['alchemy_worker.logger'] = $this->prophesize("Monolog\Logger")->reveal(); + $app['dispatcher'] = $this->prophesize('Symfony\Component\EventDispatcher\EventDispatcherInterface')->reveal(); + $app['phraseanet.filesystem'] = $this->prophesize('Alchemy\Phrasea\Filesystem\FilesystemService')->reveal(); + $app['repo.worker-running-job'] = $this->prophesize('Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository')->reveal(); + + $writer = $this->prophesize('PHPExiftool\Writer')->reveal(); + + $subdefCreationWorker = new SubdefCreationWorker( + $app['subdef.generator'], + $app['alchemy_worker.message.publisher'], + $app['alchemy_worker.logger'], + $app['dispatcher'], + $app['phraseanet.filesystem'], + $app['repo.worker-running-job'] + ); + $this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $subdefCreationWorker); + + $writemetadatasWorker = new WriteMetadatasWorker( + $writer, + $app['alchemy_service.logger'], + $app['alchemy_service.message.publisher'], + $app['repo.worker-running-job'] + ); + $this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $writemetadatasWorker); + + $assetsWorker = new AssetsIngestWorker($app); + $this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $assetsWorker); + + $createRecordWorker = new CreateRecordWorker($app); + $this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $createRecordWorker); + } +}