diff --git a/config/configuration.sample.yml b/config/configuration.sample.yml index 2ee204596b..21fef21576 100644 --- a/config/configuration.sample.yml +++ b/config/configuration.sample.yml @@ -323,14 +323,23 @@ geocoding-providers: provincefields: Province countryfields: Country, Pays workers: - queue: - worker-queue: - registry: alchemy_worker.queue_registry - host: localhost - port: 5672 - user: guest - password: guest - vhost: / + queue: + worker-queue: + registry: alchemy_worker.queue_registry + host: localhost + port: 5672 + user: guest + password: guest + vhost: / + queues: + writeMetadatas: # this Q is "delayable" in case of record is locked + ttl_retry: 1500 # overwrite 1000 ms default delay + ttl_delayed: 10000 # overwrite 5000 ms default delay + subdefCreation: # this Q is "delayable" in case of record is locked + ttl_delayed: 10000 # overwrite 5000 ms default delay + pullAssets: + ttl_retry: 5000 + max_retry : 5 externalservice: ginger: diff --git a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php index df009c125c..a2f0a919dc 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php @@ -13,19 +13,22 @@ use Alchemy\Phrasea\WorkerManager\Form\WorkerConfigurationType; use Alchemy\Phrasea\WorkerManager\Form\WorkerFtpType; use Alchemy\Phrasea\WorkerManager\Form\WorkerPullAssetsType; use Alchemy\Phrasea\WorkerManager\Form\WorkerSearchengineType; +use Alchemy\Phrasea\WorkerManager\Form\WorkerValidationReminderType; use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; +use Doctrine\ORM\OptimisticLockException; use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Symfony\Component\Form\Form; use Symfony\Component\Form\FormInterface; +use Symfony\Component\HttpFoundation\JsonResponse; use Symfony\Component\HttpFoundation\Request; +use Symfony\Component\Routing\Generator\UrlGeneratorInterface; + class AdminConfigurationController extends Controller { - public function indexAction(PhraseaApplication $app) + public function indexAction(PhraseaApplication $app, Request $request) { - /** @var AMQPConnection $serverConnection */ - $serverConnection = $this->app['alchemy_worker.amqp.connection']; - /** @var WorkerRunningJobRepository $repoWorker */ $repoWorker = $app['repo.worker-running-job']; @@ -39,9 +42,10 @@ class AdminConfigurationController extends Controller $workerRunningJob = $repoWorker->findByStatus($filterStatus); return $this->render('admin/worker-manager/index.html.twig', [ - 'isConnected' => ($serverConnection->getChannel() != null) ? true : false, + 'isConnected' => $this->getAMQPConnection()->getChannel() != null, 'workerRunningJob' => $workerRunningJob, - 'reload' => false + 'reload' => false, + '_fragment' => $request->get('_fragment') ?? 'worker-configuration', ]); } @@ -52,26 +56,44 @@ class AdminConfigurationController extends Controller */ public function configurationAction(PhraseaApplication $app, Request $request) { - $retryQueueConfig = $this->getRetryQueueConfiguration(); + $AMQPConnection = $this->getAMQPConnection(); - $form = $app->form(new WorkerConfigurationType(), $retryQueueConfig); + $conf = $this->getConf()->get(['workers', 'queues'], []); + $form = $app->form(new WorkerConfigurationType($AMQPConnection), $conf); $form->handleRequest($request); if ($form->isValid()) { - // save config in file - $app['conf']->set(['workers', 'retry_queue'], $form->getData()); + // save config + // too bad we must remove null entries from data to not save in conf + $data = $form->getData(); + array_walk( + $data, + function(&$qSettings, $qName) { + $qSettings = array_filter( + $qSettings, + function($setting) { + return $setting !== null; + } + ); + } + ); + $app['conf']->set(['workers', 'queues'], $data); + /* + * todo : reinitialize q can't depend on form content : + * e.g. if a ttl_retry is blank in form, the value should go back to default, so the q should be reinit. + * $queues = array_intersect_key(AMQPConnection::$defaultQueues, $retryQueueConfig); $retryQueuesToReset = array_intersect_key(AMQPConnection::$defaultRetryQueues, array_flip($queues)); - /** @var AMQPConnection $serverConnection */ - $serverConnection = $this->app['alchemy_worker.amqp.connection']; // change the queue TTL - $serverConnection->reinitializeQueue($retryQueuesToReset); - $serverConnection->reinitializeQueue(AMQPConnection::$defaultDelayedQueues); + $AMQPConnection->reinitializeQueue($retryQueuesToReset); + $AMQPConnection->reinitializeQueue(AMQPConnection::$defaultDelayedQueues); + */ - return $app->redirectPath('worker_admin'); + // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter + return $app->redirectPath('worker_admin', ['_fragment'=>'worker-configuration']); } return $this->render('admin/worker-manager/worker_configuration.html.twig', [ @@ -84,7 +106,7 @@ class AdminConfigurationController extends Controller /** @var WorkerRunningJobRepository $repoWorker */ $repoWorker = $app['repo.worker-running-job']; - $reload = ($request->query->get('reload')) == 1 ? true : false ; + $reload = ($request->query->get('reload') == 1); $workerRunningJob = []; $filterStatus = []; @@ -114,8 +136,8 @@ class AdminConfigurationController extends Controller /** * @param Request $request * @param $workerId - * @return \Symfony\Component\HttpFoundation\JsonResponse - * @throws \Doctrine\ORM\OptimisticLockException + * @return JsonResponse + * @throws OptimisticLockException */ public function changeStatusAction(Request $request, $workerId) { @@ -140,13 +162,11 @@ class AdminConfigurationController extends Controller public function queueMonitorAction(PhraseaApplication $app, Request $request) { - $reload = ($request->query->get('reload')) == 1 ? true : false ; + $reload = ($request->query->get('reload') == 1); - /** @var AMQPConnection $serverConnection */ - $serverConnection = $app['alchemy_worker.amqp.connection']; - $serverConnection->getChannel(); - $serverConnection->declareExchange(); - $queuesStatus = $serverConnection->getQueuesStatus(); + $this->getAMQPConnection()->getChannel(); + $this->getAMQPConnection()->declareExchange(); + $queuesStatus = $this->getAMQPConnection()->getQueuesStatus(); return $this->render('admin/worker-manager/worker_queue_monitor.html.twig', [ 'queuesStatus' => $queuesStatus, @@ -162,10 +182,20 @@ class AdminConfigurationController extends Controller return $this->app->json(['success' => false]); } - /** @var AMQPConnection $serverConnection */ - $serverConnection = $this->app['alchemy_worker.amqp.connection']; + $this->getAMQPConnection()->reinitializeQueue([$queueName]); - $serverConnection->reinitializeQueue([$queueName]); + return $this->app->json(['success' => true]); + } + + public function deleteQueueAction(PhraseaApplication $app, Request $request) + { + $queueName = $request->request->get('queueName'); + + if (empty($queueName)) { + return $this->app->json(['success' => false]); + } + + $this->getAMQPConnection()->deleteQueue($queueName); return $this->app->json(['success' => true]); } @@ -176,7 +206,8 @@ class AdminConfigurationController extends Controller $repoWorker = $app['repo.worker-running-job']; $repoWorker->truncateWorkerTable(); - return $app->redirectPath('worker_admin'); + // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter + return $app->redirectPath('worker_admin', ['_fragment'=>'worker-info']); } public function deleteFinishedAction(PhraseaApplication $app) @@ -185,7 +216,8 @@ class AdminConfigurationController extends Controller $repoWorker = $app['repo.worker-running-job']; $repoWorker->deleteFinishedWorks(); - return $app->redirectPath('worker_admin'); + // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter + return $app->redirectPath('worker_admin', ['_fragment'=>'worker-info']); } public function searchengineAction(PhraseaApplication $app, Request $request) @@ -201,7 +233,8 @@ class AdminConfigurationController extends Controller $this->getDispatcher()->dispatch(WorkerEvents::POPULATE_INDEX, new PopulateIndexEvent($populateInfo)); - return $app->redirectPath('worker_admin'); + // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter + return $app->redirectPath('worker_admin', ['_fragment'=>'worker-searchengine']); } return $this->render('admin/worker-manager/worker_searchengine.html.twig', [ @@ -211,14 +244,12 @@ class AdminConfigurationController extends Controller public function subviewAction() { - return $this->render('admin/worker-manager/worker_subview.html.twig', [ - ]); + return $this->render('admin/worker-manager/worker_subview.html.twig', [ ]); } public function metadataAction() { - return $this->render('admin/worker-manager/worker_metadata.html.twig', [ - ]); + return $this->render('admin/worker-manager/worker_metadata.html.twig', [ ]); } public function ftpAction(PhraseaApplication $app, Request $request) @@ -231,7 +262,8 @@ class AdminConfigurationController extends Controller // save new ftp config $app['conf']->set(['workers', 'ftp'], array_merge($ftpConfig, $form->getData())); - return $app->redirectPath('worker_admin'); + // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter + return $app->redirectPath('worker_admin', ['_fragment'=>'worker-ftp']); } return $this->render('admin/worker-manager/worker_ftp.html.twig', [ @@ -241,26 +273,59 @@ class AdminConfigurationController extends Controller public function validationReminderAction(PhraseaApplication $app, Request $request) { - $interval = $app['conf']->get(['workers', 'validationReminder', 'interval'], 7200); + // nb : the "interval" for a loop-q is the ttl. + // so the setting is stored into the "queues" settings in conf. + // here only the "ttl_retry" can be set/changed in conf + $config = $this->getConf()->get(['workers', 'queues', MessagePublisher::VALIDATION_REMINDER_TYPE], []); + if(isset($config['ttl_retry'])) { + // all settings are in msec, but into the form we want large numbers in sec. + $config['ttl_retry'] /= 1000; + } + /** @var Form $form */ + $form = $app->form(new WorkerValidationReminderType($this->getAMQPConnection()), $config); - if ($request->getMethod() == 'POST') { - $reminderInterval = (int)$request->request->get('worker_reminder_interval'); + $form->handleRequest($request); + if ($form->isSubmitted() && $form->isValid()) { + $data = $form->getData(); + switch($data['act']) { + case 'save' : // save the form content (settings) + unset($data['act']); // don't save this + // the interval was displayed in sec. in form, convert back to msec + if(isset($data['ttl_retry'])) { + $data['ttl_retry'] *= 1000; + } + $app['conf']->set(['workers', 'queues', MessagePublisher::VALIDATION_REMINDER_TYPE], $data); + $this->getAMQPConnection()->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_TYPE]); + break; + case 'start': + // reinitialize the validation reminder queues + $this->getAMQPConnection()->setQueue(MessagePublisher::VALIDATION_REMINDER_TYPE); + $this->getAMQPConnection()->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_TYPE]); + $this->getMessagePublisher()->initializeLoopQueue(MessagePublisher::VALIDATION_REMINDER_TYPE); + break; + case 'stop': + $this->getAMQPConnection()->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_TYPE]); + break; + } - /** @var AMQPConnection $serverConnection */ - $serverConnection = $this->app['alchemy_worker.amqp.connection']; - $serverConnection->setQueue(MessagePublisher::VALIDATION_REMINDER_QUEUE); - - // save the period interval in second - $app['conf']->set(['workers', 'validationReminder', 'interval'], $reminderInterval); - // reinitialize the validation reminder queues - $serverConnection->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_QUEUE]); - $this->app['alchemy_worker.message.publisher']->initializeLoopQueue(MessagePublisher::VALIDATION_REMINDER_TYPE); - - return $app->redirectPath('worker_admin'); + // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter + return $app->redirectPath('worker_admin', ['_fragment'=>'worker-reminder']); } + // guess if the q is "running" = check if there are pending message on Q or loop-Q + $running = false; + $qStatuses = $this->getAMQPConnection()->getQueuesStatus(); + foreach([ + MessagePublisher::VALIDATION_REMINDER_TYPE, + $this->getAMQPConnection()->getLoopQueueName(MessagePublisher::VALIDATION_REMINDER_TYPE) + ] as $qName) { + if(isset($qStatuses[$qName]) && $qStatuses[$qName]['messageCount'] > 0) { + $running = true; + } + } return $this->render('admin/worker-manager/worker_validation_reminder.html.twig', [ - 'interval' => $interval + 'form' => $form->createView(), + 'running' => $running ]); } @@ -276,23 +341,35 @@ class AdminConfigurationController extends Controller public function pullAssetsAction(PhraseaApplication $app, Request $request) { - $pullAssetsConfig = $this->getPullAssetsConfiguration(); + $pullAssetsConfig = $this->getConf()->get(['workers', 'pull_assets'], []); + // the "pullInterval" comes from the ttl_retry + $ttl_retry = $this->getConf()->get(['workers','queues', 'pullAssets', 'ttl_retry'], null); + if(!is_null($ttl_retry)) { + $ttl_retry /= 1000; // form is in sec + } + $pullAssetsConfig['pullInterval'] = $ttl_retry; + $form = $app->form(new WorkerPullAssetsType(), $pullAssetsConfig); $form->handleRequest($request); if ($form->isValid()) { - /** @var AMQPConnection $serverConnection */ - $serverConnection = $this->app['alchemy_worker.amqp.connection']; - $serverConnection->setQueue(MessagePublisher::PULL_QUEUE); - // save new pull config - $app['conf']->set(['workers', 'pull_assets'], array_merge($pullAssetsConfig, $form->getData())); + // save new pull config in 2 places + $form_data = $form->getData(); + $ttl_retry = $form_data['pullInterval']; + unset($form_data['pullInterval'], $pullAssetsConfig['pullInterval']); + $app['conf']->set(['workers', 'pull_assets'], array_merge($pullAssetsConfig, $form_data)); + if(!is_null($ttl_retry)) { + $this->getConf()->set(['workers','queues', 'pullAssets', 'ttl_retry'], 1000 * (int)$ttl_retry); + } // reinitialize the pull queues - $serverConnection->reinitializeQueue([MessagePublisher::PULL_QUEUE]); - $this->app['alchemy_worker.message.publisher']->initializeLoopQueue(MessagePublisher::PULL_ASSETS_TYPE); + $this->getAMQPConnection()->setQueue(MessagePublisher::PULL_ASSETS_TYPE); + $this->getAMQPConnection()->reinitializeQueue([MessagePublisher::PULL_ASSETS_TYPE]); + $this->getMessagePublisher()->initializeLoopQueue(MessagePublisher::PULL_ASSETS_TYPE); - return $app->redirectPath('worker_admin'); + // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter + return $app->redirectPath('worker_admin', ['_fragment'=>'worker-pull-assets']); } return $this->render('admin/worker-manager/worker_pull_assets.html.twig', [ @@ -300,6 +377,16 @@ class AdminConfigurationController extends Controller ]); } + + + /** + * @return MessagePublisher + */ + private function getMessagePublisher() + { + return $this->app['alchemy_worker.message.publisher']; + } + /** * @return EventDispatcherInterface */ @@ -333,18 +420,25 @@ class AdminConfigurationController extends Controller return $data; } - private function getPullAssetsConfiguration() - { - return $this->app['conf']->get(['workers', 'pull_assets'], []); - } - private function getFtpConfiguration() { - return $this->app['conf']->get(['workers', 'ftp'], []); + return $this->getConf()->get(['workers', 'ftp'], []); } - private function getRetryQueueConfiguration() + /** + * @return AMQPConnection + */ + private function getAMQPConnection() { - return $this->app['conf']->get(['workers', 'retry_queue'], []); + return $this->app['alchemy_worker.amqp.connection']; } + + /** + * @return UrlGeneratorInterface + */ + private function getUrlGenerator() + { + return $this->app['url_generator']; + } + } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationFailureEvent.php index 2e26d250e3..50fd34290a 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationFailureEvent.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationFailureEvent.php @@ -10,7 +10,7 @@ class AssetsCreationFailureEvent extends SfEvent private $workerMessage; private $count; - public function __construct($payload, $workerMessage, $count = 2) + public function __construct($payload, $workerMessage, $count) { $this->payload = $payload; $this->workerMessage = $workerMessage; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php index bccf447317..c8a12aff32 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/AssetsCreationRecordFailureEvent.php @@ -12,7 +12,7 @@ class AssetsCreationRecordFailureEvent extends SfEvent private $count; private $workerJobId; - public function __construct($payload, $workerMessage = '', $count = 2, $workerJobId = 0) + public function __construct($payload, $workerMessage, $count, $workerJobId ) { $this->payload = $payload; $this->workerMessage = $workerMessage; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/ExportMailFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/ExportMailFailureEvent.php index 30b03a011a..31b0f66e6d 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Event/ExportMailFailureEvent.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/ExportMailFailureEvent.php @@ -13,7 +13,7 @@ class ExportMailFailureEvent extends SfEvent private $workerMessage; private $count; - public function __construct($emitterUserId, $tokenValue, $destinationMails, $params, $workerMessage = '', $count = 2) + public function __construct($emitterUserId, $tokenValue, $destinationMails, $params, $workerMessage, $count) { $this->emitterUserId = $emitterUserId; $this->tokenValue = $tokenValue; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php index 7f9305773a..832c9d2ead 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/PopulateIndexFailureEvent.php @@ -14,7 +14,7 @@ class PopulateIndexFailureEvent extends SfEvent private $count; private $workerJobId; - public function __construct($host, $port, $indexName, $databoxId, $workerMessage = '', $count = 2, $workerJobId = 0) + public function __construct($host, $port, $indexName, $databoxId, $workerMessage, $count, $workerJobId) { $this->host = $host; $this->port = $port; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/SubdefinitionCreationFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/SubdefinitionCreationFailureEvent.php index 16401d24c3..c1a12cf3f9 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Event/SubdefinitionCreationFailureEvent.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/SubdefinitionCreationFailureEvent.php @@ -12,7 +12,7 @@ class SubdefinitionCreationFailureEvent extends RecordEvent private $count; private $workerJobId; - public function __construct(RecordInterface $record, $subdefName, $workerMessage = '', $count = 2, $workerJobId = 0) + public function __construct(RecordInterface $record, $subdefName, $workerMessage, $count, $workerJobId) { parent::__construct($record); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Event/WebhookDeliverFailureEvent.php b/lib/Alchemy/Phrasea/WorkerManager/Event/WebhookDeliverFailureEvent.php index f31790cad3..4bcc4050ed 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Event/WebhookDeliverFailureEvent.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Event/WebhookDeliverFailureEvent.php @@ -11,7 +11,7 @@ class WebhookDeliverFailureEvent extends SfEvent private $count; private $deleveryId; - public function __construct($webhookEventId, $workerMessage, $count = 2, $deleveryId = null) + public function __construct($webhookEventId, $workerMessage, $count, $deleveryId = null) { $this->webhookEventId = $webhookEventId; $this->workerMessage = $workerMessage; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Form/QueueSettingsType.php b/lib/Alchemy/Phrasea/WorkerManager/Form/QueueSettingsType.php new file mode 100644 index 0000000000..d9d4299a8b --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Form/QueueSettingsType.php @@ -0,0 +1,68 @@ +AMQPConnection = $AMQPConnection; + $this->baseQueueName = $baseQueueName; + } + + public function buildForm(FormBuilderInterface $builder, array $options) + { + parent::buildForm($builder, $options); + + $builder->add('n_workers', HiddenType::class, [ + 'label' => 'admin::workermanager:tab:workerconfig:n_workers', + 'required' => false, + 'attr' => [ + 'placeholder' => 1 + ] + ]); + if($this->AMQPConnection->hasRetryQueue($this->baseQueueName) || $this->AMQPConnection->hasLoopQueue($this->baseQueueName)) { + $builder + ->add('max_retry', IntegerType::class, [ + 'label' => 'admin::workermanager:tab:workerconfig:max retry', + 'required' => false, + 'attr' => [ + 'placeholder' => $this->AMQPConnection->getMaxRetry($this->baseQueueName), + //'class'=>'col' + ] + ]) + ->add('ttl_retry', IntegerType::class, [ + 'label' => 'admin::workermanager:tab:workerconfig:retry delay in ms', + 'required' => false, + 'attr' => [ + 'placeholder' => $this->AMQPConnection->getTTLRetry($this->baseQueueName), + //'class'=>'col' + ] + ]); + } + if($this->AMQPConnection->hasDelayedQueue($this->baseQueueName)) { + $builder->add('ttl_delayed', IntegerType::class, [ + 'label' => 'admin::workermanager:tab:workerconfig:delayed delay in ms', + 'required' => false, + 'attr' => [ + 'placeholder' => $this->AMQPConnection->getTTLDelayed($this->baseQueueName), + //'class'=>'col' + ] + ]); + } + } + + public function getName() + { + return 'queue_settings'; + } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerConfigurationType.php b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerConfigurationType.php index 170cccd6fb..3828875fe9 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerConfigurationType.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerConfigurationType.php @@ -2,50 +2,80 @@ namespace Alchemy\Phrasea\WorkerManager\Form; -use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; +use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection; use Symfony\Component\Form\AbstractType; +use Symfony\Component\Form\Extension\Core\Type\SubmitType; use Symfony\Component\Form\FormBuilderInterface; class WorkerConfigurationType extends AbstractType { + private $AMQPConnection; + + public function __construct(AMQPConnection $AMQPConnection) + { + $this->AMQPConnection = $AMQPConnection; + } + public function buildForm(FormBuilderInterface $builder, array $options) { parent::buildForm($builder, $options); - $builder - ->add(MessagePublisher::ASSETS_INGEST_TYPE, 'text', [ - 'label' => 'admin::workermanager:tab:workerconfig: Ingest retry delay in ms' - ]) - ->add(MessagePublisher::CREATE_RECORD_TYPE, 'text', [ - 'label' => 'admin::workermanager:tab:workerconfig: Create record retry delay in ms' - ]) - ->add(MessagePublisher::SUBDEF_CREATION_TYPE, 'text', [ - 'label' => 'admin::workermanager:tab:workerconfig: Subdefinition retry delay in ms' - ]) - ->add(MessagePublisher::WRITE_METADATAS_TYPE, 'text', [ - 'label' => 'admin::workermanager:tab:workerconfig: Metadatas retry delay in ms' - ]) - ->add(MessagePublisher::WEBHOOK_TYPE, 'text', [ - 'label' => 'admin::workermanager:tab:workerconfig: Webhook retry delay in ms' - ]) - ->add(MessagePublisher::EXPORT_MAIL_TYPE, 'text', [ - 'label' => 'admin::workermanager:tab:workerconfig: Export mail retry delay in ms' - ]) - ->add(MessagePublisher::POPULATE_INDEX_TYPE, 'text', [ - 'label' => 'admin::workermanager:tab:workerconfig: Populate Index retry delay in ms' - ]) - ->add(MessagePublisher::FTP_TYPE, 'text', [ - 'label' => 'admin::workermanager:tab:workerconfig: Ftp retry delay in ms (default 3 min)' - ]) - ->add('delayedSubdef', 'text', [ - 'label' => 'admin::workermanager:tab:workerconfig: Subdef delay in ms' - ]) - ->add('delayedWriteMeta', 'text', [ - 'label' => 'admin::workermanager:tab:workerconfig: Write meta delay in ms' - ]) - ; + foreach($this->AMQPConnection->getBaseQueueNames() as $baseQueueName) { + /* + $g = null; + if($this->AMQPConnection->hasRetryQueue($baseQueueName) || $this->AMQPConnection->hasLoopQueue($baseQueueName)) { + $g = $g ?? $this->createFormGroup($builder, $baseQueueName); + $g->add('max_retry', TextType::class, [ + 'label' => 'admin::workermanager:tab:workerconfig:max retry', + 'required' => false, + 'attr' => [ + 'placeholder' => $this->AMQPConnection->getMaxRetry($baseQueueName) + ] + ]); + $g->add('ttl_retry', TextType::class, [ + 'label' => 'admin::workermanager:tab:workerconfig:retry delay in ms', + 'required' => false, + 'attr' => [ + 'placeholder' => $this->AMQPConnection->getTTLRetry($baseQueueName) + ] + ]); + } + if($this->AMQPConnection->hasDelayedQueue($baseQueueName)) { + $g = $g ?? $this->createFormGroup($builder, $baseQueueName); + $g->add('ttl_delayed', TextType::class, [ + 'label' => 'admin::workermanager:tab:workerconfig:delayed delay in ms', + 'required' => false, + 'attr' => [ + 'placeholder' => $this->AMQPConnection->getTTLDelayed($baseQueueName) + ] + ]); + } + if($g) { +// $builder->add($g); + } + */ + if($this->AMQPConnection->hasRetryQueue($baseQueueName) + || $this->AMQPConnection->hasLoopQueue($baseQueueName) + || $this->AMQPConnection->hasDelayedQueue($baseQueueName) + ) { + $f = new QueueSettingsType($this->AMQPConnection, $baseQueueName); + $builder->add($baseQueueName, $f, ['attr' => ['class' => 'norow'], 'block_name' => 'queue']); + } + } + $builder->add("boutton::appliquer", SubmitType::class, + [ + 'label' => "boutton::appliquer" + ]); } + /* + private function createFormGroup(FormBuilderInterface $builder, string $name) + { + // todo : fix form render : one horizontal block per queue + return $builder->create($name, FormType::class, ['attr'=>['class'=>'form-row']]); + } + */ + public function getName() { return 'worker_configuration'; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerFtpType.php b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerFtpType.php index a0998a7494..e50d30307c 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerFtpType.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerFtpType.php @@ -3,6 +3,7 @@ namespace Alchemy\Phrasea\WorkerManager\Form; use Symfony\Component\Form\AbstractType; +use Symfony\Component\Form\Extension\Core\Type\TextType; use Symfony\Component\Form\FormBuilderInterface; class WorkerFtpType extends AbstractType @@ -12,19 +13,19 @@ class WorkerFtpType extends AbstractType parent::buildForm($builder, $options); $builder - ->add('proxy', 'text', [ + ->add('proxy', TextType::class, [ 'label' => 'admin::workermanager:tab:ftp: Proxy', 'required' => false ]) - ->add('proxyPort', 'text', [ + ->add('proxyPort', TextType::class, [ 'label' => 'admin::workermanager:tab:ftp: Proxy port', 'required' => false ]) - ->add('proxyUser', 'text', [ + ->add('proxyUser', TextType::class, [ 'label' => 'admin::workermanager:tab:ftp: Proxy user', 'required' => false ]) - ->add('proxyPassword', 'text', [ + ->add('proxyPassword', TextType::class, [ 'label' => 'admin::workermanager:tab:ftp: Proxy password', 'required' => false ]) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerPullAssetsType.php b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerPullAssetsType.php index 2054e24288..b8698d4fea 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerPullAssetsType.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerPullAssetsType.php @@ -3,6 +3,7 @@ namespace Alchemy\Phrasea\WorkerManager\Form; use Symfony\Component\Form\AbstractType; +use Symfony\Component\Form\Extension\Core\Type\TextType; use Symfony\Component\Form\FormBuilderInterface; class WorkerPullAssetsType extends AbstractType @@ -12,16 +13,16 @@ class WorkerPullAssetsType extends AbstractType parent::buildForm($builder, $options); $builder - ->add('UploaderApiBaseUri', 'text', [ + ->add('UploaderApiBaseUri', TextType::class, [ 'label' => 'admin::workermanager:tab:pullassets: Uploader api base uri' ]) - ->add('clientSecret', 'text', [ + ->add('clientSecret', TextType::class, [ 'label' => 'admin::workermanager:tab:pullassets: Client secret' ]) - ->add('clientId', 'text', [ + ->add('clientId', TextType::class, [ 'label' => 'admin::workermanager:tab:pullassets: Client ID' ]) - ->add('pullInterval', 'text', [ + ->add('pullInterval', TextType::class, [ 'label' => 'admin::workermanager:tab:pullassets: Fetching interval in second' ]) ; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerSearchengineType.php b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerSearchengineType.php index 2f86bdda7a..66d1178bba 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerSearchengineType.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerSearchengineType.php @@ -3,6 +3,8 @@ namespace Alchemy\Phrasea\WorkerManager\Form; use Symfony\Component\Form\AbstractType; +use Symfony\Component\Form\Extension\Core\Type\IntegerType; +use Symfony\Component\Form\Extension\Core\Type\TextType; use Symfony\Component\Form\FormBuilderInterface; use Symfony\Component\OptionsResolver\OptionsResolverInterface; use Symfony\Component\Validator\Constraints\NotBlank; @@ -15,18 +17,18 @@ class WorkerSearchengineType extends AbstractType parent::buildForm($builder, $options); $builder - ->add('host', 'text', [ + ->add('host', TextType::class, [ 'label' => 'admin::workermanager:tab:searchengine: Elasticsearch server host', 'constraints' => new NotBlank(), ]) - ->add('port', 'integer', [ + ->add('port', IntegerType::class, [ 'label' => 'admin::workermanager:tab:searchengine: Elasticsearch service port', 'constraints' => [ new Range(['min' => 1, 'max' => 65535]), new NotBlank() ] ]) - ->add('indexName', 'text', [ + ->add('indexName', TextType::class, [ 'label' => 'admin::workermanager:tab:searchengine: Elasticsearch index name', 'constraints' => new NotBlank(), 'attr' =>['data-class'=>'inline'] diff --git a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerValidationReminderType.php b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerValidationReminderType.php new file mode 100644 index 0000000000..b88bfff2ba --- /dev/null +++ b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerValidationReminderType.php @@ -0,0 +1,68 @@ +AMQPConnection = $AMQPConnection; + } + + public function buildForm(FormBuilderInterface $builder, array $options) + { + parent::buildForm($builder, $options); + + // because this form will have 3 submit buttons - to use the same route -, this "act" field + // will reflect the value of the clicked button (js) + // !!! tried: using symfony "getClickedButton()" does to NOT work (submit button values seems not sent in request ?) + $builder + ->add('act', HiddenType::class, [ + 'attr' => [ + 'class' => 'act' + ] + ]); + + // every ttl is in msec, we display this large one (loop q) in sec in form. + $interval = $this->AMQPConnection->getTTLRetry(MessagePublisher::VALIDATION_REMINDER_TYPE) / 1000; + $builder + ->add('ttl_retry', TextType::class, [ + 'label' => 'admin::workermanager:tab:Reminder: Interval in second', + 'attr' => [ + 'placeholder' => $interval + ] + ]); + $builder + ->add("boutton::appliquer", SubmitType::class, [ + 'label' => "boutton::appliquer", + 'attr' => ['value' => 'save'] + ]); +/* + $builder + ->add("submit", ButtonType::class, [ + 'label' => "start", +// 'data' => 'truc', +// 'empty_data' => 'machin', + 'attr'=>[ + 'value' => 'start', + 'name' => 'zobi' + ] + ]); +*/ + } + +// public function getName() +// { +// return 'worker_pullAssets'; +// } +} diff --git a/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php b/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php index a53aa5e91d..8d51a0368d 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Provider/ControllerServiceProvider.php @@ -45,61 +45,81 @@ class ControllerServiceProvider implements ControllerProviderInterface, ServiceP $firewall->requireRight(\ACL::TASKMANAGER); }); + /** @uses AdminConfigurationController::indexAction */ $controllers->match('/', 'controller.worker.admin.configuration:indexAction') ->method('GET') ->bind('worker_admin'); + /** @uses AdminConfigurationController::configurationAction */ $controllers->match('/configuration', 'controller.worker.admin.configuration:configurationAction') ->method('GET|POST') ->bind('worker_admin_configuration'); + /** @uses AdminConfigurationController::infoAction */ $controllers->match('/info', 'controller.worker.admin.configuration:infoAction') ->method('GET') ->bind('worker_admin_info'); + /** @uses AdminConfigurationController::truncateTableAction */ $controllers->match('/truncate', 'controller.worker.admin.configuration:truncateTableAction') ->method('POST') ->bind('worker_admin_truncate'); + /** @uses AdminConfigurationController::deleteFinishedAction */ $controllers->match('/delete-finished', 'controller.worker.admin.configuration:deleteFinishedAction') ->method('POST') ->bind('worker_admin_delete_finished'); + /** @uses AdminConfigurationController::searchengineAction */ $controllers->match('/searchengine', 'controller.worker.admin.configuration:searchengineAction') ->method('GET|POST') ->bind('worker_admin_searchengine'); + /** @uses AdminConfigurationController::subviewAction */ $controllers->match('/subview', 'controller.worker.admin.configuration:subviewAction') ->method('GET|POST') ->bind('worker_admin_subview'); + /** @uses AdminConfigurationController::metadataAction */ $controllers->match('/metadata', 'controller.worker.admin.configuration:metadataAction') ->method('GET|POST') ->bind('worker_admin_metadata'); + /** @uses AdminConfigurationController::ftpAction */ $controllers->match('/ftp', 'controller.worker.admin.configuration:ftpAction') ->method('GET|POST') ->bind('worker_admin_ftp'); + /** @uses AdminConfigurationController::populateStatusAction */ $controllers->get('/populate-status', 'controller.worker.admin.configuration:populateStatusAction') ->bind('worker_admin_populate_status'); + /** @uses AdminConfigurationController::pullAssetsAction */ $controllers->match('/pull-assets', 'controller.worker.admin.configuration:pullAssetsAction') ->method('GET|POST') ->bind('worker_admin_pullAssets'); + /** @uses AdminConfigurationController::validationReminderAction */ $controllers->match('/validation-reminder', 'controller.worker.admin.configuration:validationReminderAction') ->method('GET|POST') ->bind('worker_admin_validationReminder'); + /** @uses AdminConfigurationController::queueMonitorAction */ $controllers->match('/queue-monitor', 'controller.worker.admin.configuration:queueMonitorAction') ->method('GET') ->bind('worker_admin_queue_monitor'); + /** @uses AdminConfigurationController::purgeQueueAction */ $controllers->match('/purge-queue', 'controller.worker.admin.configuration:purgeQueueAction') ->method('POST') ->bind('worker_admin_purge_queue'); + /** @uses AdminConfigurationController::deleteQueueAction */ + $controllers->match('/delete-queue', 'controller.worker.admin.configuration:deleteQueueAction') + ->method('POST') + ->bind('worker_admin_delete_queue'); + + /** @uses AdminConfigurationController::changeStatusAction */ $controllers->match('/{workerId}/change-status', 'controller.worker.admin.configuration:changeStatusAction') ->method('POST') ->assert('workerId', '\d+') diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php index a56a6e9380..32f05a945c 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php @@ -3,6 +3,7 @@ namespace Alchemy\Phrasea\WorkerManager\Queue; use Alchemy\Phrasea\Core\Configuration\PropertyAccess; +use Exception; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable; @@ -20,59 +21,6 @@ class AMQPConnection private $hostConfig; private $conf; - public static $defaultQueues = [ - MessagePublisher::WRITE_METADATAS_TYPE => MessagePublisher::METADATAS_QUEUE, - MessagePublisher::SUBDEF_CREATION_TYPE => MessagePublisher::SUBDEF_QUEUE, - MessagePublisher::EXPORT_MAIL_TYPE => MessagePublisher::EXPORT_QUEUE, - MessagePublisher::WEBHOOK_TYPE => MessagePublisher::WEBHOOK_QUEUE, - MessagePublisher::ASSETS_INGEST_TYPE => MessagePublisher::ASSETS_INGEST_QUEUE, - MessagePublisher::CREATE_RECORD_TYPE => MessagePublisher::CREATE_RECORD_QUEUE, - MessagePublisher::PULL_ASSETS_TYPE => MessagePublisher::PULL_QUEUE, - MessagePublisher::POPULATE_INDEX_TYPE => MessagePublisher::POPULATE_INDEX_QUEUE, - MessagePublisher::DELETE_RECORD_TYPE => MessagePublisher::DELETE_RECORD_QUEUE, - MessagePublisher::MAIN_QUEUE_TYPE => MessagePublisher::MAIN_QUEUE, - MessagePublisher::SUBTITLE_TYPE => MessagePublisher::SUBTITLE_QUEUE, - MessagePublisher::FTP_TYPE => MessagePublisher::FTP_QUEUE, - MessagePublisher::VALIDATION_REMINDER_TYPE => MessagePublisher::VALIDATION_REMINDER_QUEUE, - MessagePublisher::EXPOSE_UPLOAD_TYPE => MessagePublisher::EXPOSE_UPLOAD_QUEUE, - MessagePublisher::RECORD_EDIT_TYPE => MessagePublisher::RECORD_EDIT_QUEUE - ]; - - // the corresponding worker queues and retry queues, loop queue - public static $defaultRetryQueues = [ - MessagePublisher::METADATAS_QUEUE => MessagePublisher::RETRY_METADATAS_QUEUE, - MessagePublisher::SUBDEF_QUEUE => MessagePublisher::RETRY_SUBDEF_QUEUE, - MessagePublisher::EXPORT_QUEUE => MessagePublisher::RETRY_EXPORT_QUEUE, - MessagePublisher::WEBHOOK_QUEUE => MessagePublisher::RETRY_WEBHOOK_QUEUE, - MessagePublisher::ASSETS_INGEST_QUEUE => MessagePublisher::RETRY_ASSETS_INGEST_QUEUE, - MessagePublisher::CREATE_RECORD_QUEUE => MessagePublisher::RETRY_CREATE_RECORD_QUEUE, - MessagePublisher::POPULATE_INDEX_QUEUE => MessagePublisher::RETRY_POPULATE_INDEX_QUEUE, - MessagePublisher::PULL_QUEUE => MessagePublisher::LOOP_PULL_QUEUE, - MessagePublisher::FTP_QUEUE => MessagePublisher::RETRY_FTP_QUEUE, - MessagePublisher::VALIDATION_REMINDER_QUEUE => MessagePublisher::LOOP_VALIDATION_REMINDER_QUEUE - ]; - - public static $defaultFailedQueues = [ - MessagePublisher::WRITE_METADATAS_TYPE => MessagePublisher::FAILED_METADATAS_QUEUE, - MessagePublisher::SUBDEF_CREATION_TYPE => MessagePublisher::FAILED_SUBDEF_QUEUE, - MessagePublisher::EXPORT_MAIL_TYPE => MessagePublisher::FAILED_EXPORT_QUEUE, - MessagePublisher::WEBHOOK_TYPE => MessagePublisher::FAILED_WEBHOOK_QUEUE, - MessagePublisher::ASSETS_INGEST_TYPE => MessagePublisher::FAILED_ASSETS_INGEST_QUEUE, - MessagePublisher::CREATE_RECORD_TYPE => MessagePublisher::FAILED_CREATE_RECORD_QUEUE, - MessagePublisher::POPULATE_INDEX_TYPE => MessagePublisher::FAILED_POPULATE_INDEX_QUEUE, - MessagePublisher::FTP_TYPE => MessagePublisher::FAILED_FTP_QUEUE - ]; - - public static $defaultDelayedQueues = [ - MessagePublisher::METADATAS_QUEUE => MessagePublisher::DELAYED_METADATAS_QUEUE, - MessagePublisher::SUBDEF_QUEUE => MessagePublisher::DELAYED_SUBDEF_QUEUE - ]; - - public static $defaultLoopTypes = [ - MessagePublisher::PULL_ASSETS_TYPE, - MessagePublisher::VALIDATION_REMINDER_TYPE - ]; - // default message TTL in retry queue in millisecond const RETRY_DELAY = 10000; @@ -82,6 +30,91 @@ class AMQPConnection // default message TTL in delayed queue in millisecond const DELAY = 5000; + // max number of retry before a message goes in failed + const MAX_RETRY = 3; + + const WITH_NOTHING = 0; + const WITH_RETRY = 1; + const WITH_DELAYED = 2; + const WITH_LOOP = 4; + + const BASE_QUEUE = 'base_q'; + const BASE_QUEUE_WITH_RETRY = 'base_q_with_retry'; + const BASE_QUEUE_WITH_LOOP = 'base_q_with_loop'; + const RETRY_QUEUE = 'retry_q'; + const LOOP_QUEUE = 'loop_q'; + const FAILED_QUEUE = 'failed_q'; + const DELAYED_QUEUE = 'delayed_q'; + + const MESSAGES = [ + MessagePublisher::WRITE_METADATAS_TYPE => [ + 'with' => self::WITH_RETRY | self::WITH_DELAYED, + 'max_retry' => self::MAX_RETRY, + 'ttl_retry' => self::RETRY_DELAY, + 'ttl_delayed' => self::DELAY + ], + MessagePublisher::SUBDEF_CREATION_TYPE => [ + 'with' => self::WITH_RETRY | self::WITH_DELAYED, + 'max_retry' => self::MAX_RETRY, + 'ttl_retry' => self::RETRY_DELAY, + 'ttl_delayed' => self::DELAY + ], + MessagePublisher::EXPORT_MAIL_TYPE => [ + 'with' => self::WITH_RETRY, + 'max_retry' => self::MAX_RETRY, + 'ttl_retry' => self::RETRY_DELAY, + ], + MessagePublisher::WEBHOOK_TYPE => [ + 'with' => self::WITH_RETRY, + 'max_retry' => self::MAX_RETRY, + 'ttl_retry' => self::RETRY_DELAY, + ], + MessagePublisher::ASSETS_INGEST_TYPE => [ + 'with' => self::WITH_RETRY, + 'max_retry' => self::MAX_RETRY, + 'ttl_retry' => self::RETRY_DELAY, + ], + MessagePublisher::CREATE_RECORD_TYPE => [ + 'with' => self::WITH_RETRY, + 'max_retry' => self::MAX_RETRY, + 'ttl_retry' => self::RETRY_DELAY, + ], + MessagePublisher::PULL_ASSETS_TYPE => [ + 'with' => self::WITH_LOOP, + 'max_retry' => self::MAX_RETRY, + 'ttl_retry' => self::RETRY_DELAY, + ], + MessagePublisher::POPULATE_INDEX_TYPE => [ + 'with' => self::WITH_RETRY, + 'max_retry' => self::MAX_RETRY, + 'ttl_retry' => self::RETRY_DELAY, + ], + MessagePublisher::DELETE_RECORD_TYPE => [ + 'with' => self::WITH_NOTHING, + ], + MessagePublisher::MAIN_QUEUE_TYPE => [ + 'with' => self::WITH_NOTHING, + ], + MessagePublisher::SUBTITLE_TYPE => [ + 'with' => self::WITH_NOTHING, + ], + MessagePublisher::EXPOSE_UPLOAD_TYPE => [ + 'with' => self::WITH_NOTHING, + ], + MessagePublisher::FTP_TYPE => [ + 'with' => self::WITH_RETRY, + 'max_retry' => self::MAX_RETRY, + 'ttl_retry' => 180 * 1000, + ], + MessagePublisher::VALIDATION_REMINDER_TYPE => [ + 'with' => self::WITH_LOOP, + 'max_retry' => self::MAX_RETRY, + 'ttl_retry' => 7200 * 1000, + ], + ]; + + private $queues = []; // filled during construct, from msg list, default values and conf + public function __construct(PropertyAccess $conf) { $defaultConfiguration = [ @@ -94,6 +127,175 @@ class AMQPConnection $this->hostConfig = $conf->get(['workers', 'queue', 'worker-queue'], $defaultConfiguration); $this->conf = $conf; + + // fill list of type attributes + foreach (self::MESSAGES as $name => $attr) { + $this->queues[$name] = $attr; // default q + $this->queues[$name]['Name'] = $name; + $this->queues[$name]['QType'] = self::BASE_QUEUE; + $this->queues[$name]['Exchange'] = self::ALCHEMY_EXCHANGE; + + // a q with retry can fail (after n retry) or loop + if($attr['with'] & self::WITH_RETRY) { + $this->queues[$name]['QType'] = self::BASE_QUEUE_WITH_RETRY; // todo : avoid changing qtype ? + + // declare the retry q, cross-link with base q + $retry_name = 'retry_' . $name; + $this->queues[$name]['RetryQ'] = $retry_name; // link baseq to retryq + $this->queues[$retry_name] = $attr; + $this->queues[$retry_name]['Name'] = $retry_name; + $this->queues[$retry_name]['QType'] = self::RETRY_QUEUE; + $this->queues[$retry_name]['Exchange'] = self::RETRY_ALCHEMY_EXCHANGE; + $this->queues[$retry_name]['BaseQ'] = $name; // link retryq back to baseq + + // declare the failed q, cross-link with base q + $failed_name = 'failed_' . $name; + $this->queues[$name]['FailedQ'] = $failed_name; // link baseq to failedq + $this->queues[$failed_name] = $attr; + $this->queues[$failed_name]['Name'] = $failed_name; + $this->queues[$failed_name]['QType'] = self::FAILED_QUEUE; + $this->queues[$failed_name]['Exchange'] = self::RETRY_ALCHEMY_EXCHANGE; + $this->queues[$failed_name]['BaseQ'] = $name; // link failedq back to baseq + } + // a q can be "delayed" to solve "work in progress" lock on records + if($attr['with'] & self::WITH_DELAYED) { + // declare the delayed q, cross-link with base q + $delayed_name = 'delayed_' . $name; + $this->queues[$name]['DelayedQ'] = $delayed_name; // link baseq to delayedq + $this->queues[$delayed_name] = $attr; + $this->queues[$delayed_name]['Name'] = $delayed_name; + $this->queues[$delayed_name]['QType'] = self::DELAYED_QUEUE; + $this->queues[$delayed_name]['Exchange'] = self::RETRY_ALCHEMY_EXCHANGE; + $this->queues[$delayed_name]['BaseQ'] = $name; // link delayedq back to baseq + } + if($attr['with'] & self::WITH_LOOP) { + $this->queues[$name]['QType'] = self::BASE_QUEUE_WITH_LOOP; // todo : avoid changing qtype ? + + // declare the loop q, cross-link with base q + $loop_name = 'loop_' . $name; + $this->queues[$name]['LoopQ'] = $loop_name; // link baseq to loopq + $this->queues[$loop_name] = $attr; + $this->queues[$loop_name]['Name'] = $loop_name; + $this->queues[$loop_name]['QType'] = self::LOOP_QUEUE; + $this->queues[$loop_name]['Exchange'] = self::RETRY_ALCHEMY_EXCHANGE; + $this->queues[$loop_name]['BaseQ'] = $name; // link loopq back to baseq + } + } + // inject conf values + foreach($conf->get(['workers', 'queues'], []) as $name => $settings) { + if(!isset($this->queues[$name])) { + throw new Exception(sprintf('undefined queue "%s" in conf', $name)); + } + $this->queues[$name] = array_merge($this->queues[$name], $settings); + } + } + +// private function addQueue(string $name, string $Qtype, string $exchange, string $baseq=null) +// { +// $this->queues[$name]['Name'] = $name; +// $this->queues[$name]['QType'] = $Qtype; +// $this->queues[$name]['Exchange'] = $exchange; +// $this->queues[$name]['BaseQ'] = $baseq; // link back to baseq +// } + +// public function getQueueNames() +// { +// return array_keys($this->queues); +// } + + public function getBaseQueueNames() + { + $keys = array_keys(self::MESSAGES); + asort($keys); + return $keys; + } + + public function isBaseQueue(string $queueName) + { + return array_key_exists($queueName, self::MESSAGES); + } + + public function getBaseQueueName(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName); + return $q['Name']; + } + + public function hasRetryQueue(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName); + return array_key_exists('RetryQ', $q); + } + + public function getRetryQueueName(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName, 'RetryQ'); + return $q['Name']; + } + + public function getMaxRetry(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName); + return $q['max_retry']; + } + + public function getTTLRetry(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName); + return $q['ttl_retry']; + } + + public function hasDelayedQueue(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName); + return array_key_exists('DelayedQ', $q); + } + + public function getDelayedQueueName(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName, 'DelayedQ'); + return $q['Name']; + } + + public function getTTLDelayed(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName); + return $q['ttl_delayed']; + } + + public function getFailedQueueName(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName, 'FailedQ'); + return $q['Name']; + } + + public function hasLoopQueue(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName); + return array_key_exists('LoopQ', $q); + } + + public function getLoopQueueName(string $baseQueueName) + { + $q = $this->getQueue($baseQueueName, 'LoopQ'); + return $q['Name']; + } + + public function getExchange(string $queueName) + { + $q = $this->getQueue($queueName); + return $q['Exchange']; + } + + private function getQueue(string $queueName, string $subQueueKey = null) + { + if(!array_key_exists($queueName, $this->queues)) { + throw new Exception(sprintf('undefined queue "%s"', $queueName)); + } + if($subQueueKey && !array_key_exists($subQueueKey, $this->queues[$queueName])) { + throw new Exception(sprintf('base queue "%s" has no "%s"', $queueName, $subQueueKey)); + } + return $subQueueKey ? $this->queues[$this->queues[$queueName][$subQueueKey]] : $this->queues[$queueName]; } public function getConnection() @@ -108,8 +310,9 @@ class AMQPConnection $this->hostConfig['vhost'] ); - } catch (\Exception $e) { - + } + catch (Exception $e) { + // no-op } } @@ -127,7 +330,8 @@ class AMQPConnection } return null; - } else { + } + else { return $this->channel; } } @@ -156,107 +360,163 @@ class AMQPConnection $this->declareExchange(); } - if (isset(self::$defaultRetryQueues[$queueName])) { - $this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ - 'x-dead-letter-exchange' => self::RETRY_ALCHEMY_EXCHANGE, // the exchange to which republish a 'dead' message - 'x-dead-letter-routing-key' => self::$defaultRetryQueues[$queueName] // the routing key to apply to this 'dead' message - ])); - - $this->channel->queue_bind($queueName, self::ALCHEMY_EXCHANGE, $queueName); - - // declare also the corresponding retry queue - // use this to delay the delivery of a message to the alchemy-exchange - $this->channel->queue_declare(self::$defaultRetryQueues[$queueName], false, true, false, false, false, new AMQPTable([ - 'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, - 'x-dead-letter-routing-key' => $queueName, - 'x-message-ttl' => $this->getTtlRetryPerRouting($queueName) - ])); - - $this->channel->queue_bind(self::$defaultRetryQueues[$queueName], AMQPConnection::RETRY_ALCHEMY_EXCHANGE, self::$defaultRetryQueues[$queueName]); - - } elseif (in_array($queueName, self::$defaultRetryQueues)) { - // if it's a retry queue - $routing = array_search($queueName, AMQPConnection::$defaultRetryQueues); - $this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ - 'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, - 'x-dead-letter-routing-key' => $routing, - 'x-message-ttl' => $this->getTtlRetryPerRouting($routing) - ])); - - $this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); - } elseif (in_array($queueName, self::$defaultFailedQueues)) { - // if it's a failed queue - $this->channel->queue_declare($queueName, false, true, false, false, false); - - $this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); - } elseif (in_array($queueName, self::$defaultDelayedQueues)) { - // if it's a delayed queue - $routing = array_search($queueName, AMQPConnection::$defaultDelayedQueues); - $this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ - 'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, - 'x-dead-letter-routing-key' => $routing, - 'x-message-ttl' => $this->getTtlDelayedPerRouting($routing) - ])); - - $this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); - } else { - $this->channel->queue_declare($queueName, false, true, false, false, false); - - $this->channel->queue_bind($queueName, AMQPConnection::ALCHEMY_EXCHANGE, $queueName); + $queue = $this->queues[$queueName]; + switch($queue['QType']) { + case self::BASE_QUEUE_WITH_RETRY: + $this->queue_declare_and_bind($queueName, self::ALCHEMY_EXCHANGE, [ + 'x-dead-letter-exchange' => self::RETRY_ALCHEMY_EXCHANGE, // the exchange to which republish a 'dead' message + 'x-dead-letter-routing-key' => $queue['RetryQ'] // the routing key to apply to this 'dead' message + ]); + $this->setQueue($queue['RetryQ']); + break; + case self::BASE_QUEUE_WITH_LOOP: + $this->queue_declare_and_bind($queueName, self::ALCHEMY_EXCHANGE, [ + 'x-dead-letter-exchange' => self::RETRY_ALCHEMY_EXCHANGE, // the exchange to which republish a 'dead' message + 'x-dead-letter-routing-key' => $queue['LoopQ'] // the routing key to apply to this 'dead' message + ]); + $this->setQueue($queue['LoopQ']); + break; + case self::LOOP_QUEUE: + case self::RETRY_QUEUE: + $this->queue_declare_and_bind($queueName, self::RETRY_ALCHEMY_EXCHANGE, [ + 'x-dead-letter-exchange' => self::ALCHEMY_EXCHANGE, + 'x-dead-letter-routing-key' => $queue['BaseQ'], + 'x-message-ttl' => $this->queues[$queue['BaseQ']]['ttl_retry'] + ]); + break; + case self::DELAYED_QUEUE: + $this->queue_declare_and_bind($queueName, self::RETRY_ALCHEMY_EXCHANGE, [ + 'x-dead-letter-exchange' => self::ALCHEMY_EXCHANGE, + 'x-dead-letter-routing-key' => $queue['BaseQ'], + 'x-message-ttl' => $this->queues[$queue['BaseQ']]['ttl_delayed'] + ]); + break; + case self::FAILED_QUEUE: + $this->queue_declare_and_bind($queueName, self::RETRY_ALCHEMY_EXCHANGE); + break; + case self::BASE_QUEUE: + $this->queue_declare_and_bind($queueName, self::ALCHEMY_EXCHANGE); + break; + default: + throw new \Exception(sprintf('undefined q type "%s', $queueName)); + break; } return $this->channel; } - public function reinitializeQueue(array $queuNames) + private function queue_declare_and_bind(string $name, string $exchange, array $arguments = null) + { + $this->channel->queue_declare( + $name, + false, true, false, false, false, + $arguments ? new AMQPTable($arguments) : null + ); + $this->channel->queue_bind($name, $exchange, $name); + } + + /** + * purge some queues, delete related retry-q + * nb: called by admin/purgeQueuAction, so a q may be __any kind__ - not only base-types ! + * + * @param array $queueNames + * @throws Exception + */ + public function reinitializeQueue(array $queueNames) { if (!isset($this->channel)) { $this->getChannel(); $this->declareExchange(); } - foreach ($queuNames as $queuName) { - if (in_array($queuName, self::$defaultQueues)) { - $this->channel->queue_purge($queuName); - } else { - $this->channel->queue_delete($queuName); + + foreach ($queueNames as $queueName) { + // re-inject conf values (some may have changed) + $settings = $this->conf->get(['workers', 'queues', $queueName], []); + if(array_key_exists($queueName, $this->queues)) { + $this->queues[$queueName] = array_merge($this->queues[$queueName], $settings); } - if (isset(self::$defaultRetryQueues[$queuName])) { - $this->channel->queue_delete(self::$defaultRetryQueues[$queuName]); + if(array_key_exists($queueName, self::MESSAGES)) { + // base-q + $this->purgeQueue($queueName); + + if($this->hasRetryQueue($queueName)) { + $this->deleteQueue($this->getRetryQueueName($queueName)); + } + if($this->hasLoopQueue($queueName)) { + $this->deleteQueue($this->getLoopQueueName($queueName)); + } + } + else { + // retry, delayed, loop, ... q + $this->deleteQueue($queueName); } - $this->setQueue($queuName); + $this->setQueue($queueName); + } + } + + /** + * delete a queue, fails silently if the q does not exists + * + * @param $queueName + */ + public function deleteQueue($queueName) + { + if (!isset($this->channel)) { + $this->getChannel(); + $this->declareExchange(); + } + try { + $this->channel->queue_delete($queueName); + } + catch(\Exception $e) { + // no-op + } + } + + /** + * purge a queue, fails silently if the q does not exists + * + * @param $queueName + */ + public function purgeQueue($queueName) + { + if (!isset($this->channel)) { + $this->getChannel(); + $this->declareExchange(); + } + try { + $this->channel->queue_purge($queueName); + } + catch(\Exception $e) { + // no-op } } /** * Get queueName, messageCount, consumerCount of queues * @return array + * @throws Exception */ public function getQueuesStatus() { - $queuesList = array_merge( - array_values(self::$defaultQueues), - array_values(self::$defaultDelayedQueues), - array_values(self::$defaultRetryQueues), - array_values(self::$defaultFailedQueues) - ); - $this->getChannel(); $queuesStatus = []; - foreach ($queuesList as $queue) { - $this->setQueue($queue); - list($queueName, $messageCount, $consumerCount) = $this->channel->queue_declare($queue, true); + foreach($this->queues as $name => $queue) { + $this->setQueue($name); // todo : BASE_QUEUE_WITH_RETRY will set both BASE and RETRY Q, so we should skip one of 2 - $status['queueName'] = $queueName; - $status['messageCount'] = $messageCount; - $status['consumerCount'] = $consumerCount; - - $queuesStatus[] = $status; - unset($status); + list($queueName, $messageCount, $consumerCount) = $this->channel->queue_declare($name, true); + $queuesStatus[$queueName] = [ + 'queueName' => $queueName, + 'messageCount' => $messageCount, + 'consumerCount' => $consumerCount + ]; } + ksort($queuesStatus); + return $queuesStatus; } @@ -265,58 +525,4 @@ class AMQPConnection $this->channel->close(); $this->connection->close(); } - - /** - * @param $routing - * @return int - */ - private function getTtlRetryPerRouting($routing) - { - $config = $this->conf->get(['workers']); - - if ($routing == MessagePublisher::PULL_QUEUE && - isset($config['pull_assets']) && - isset($config['pull_assets']['pullInterval']) ) { - // convert in milli second - return (int)($config['pull_assets']['pullInterval']) * 1000; - } elseif ($routing == MessagePublisher::VALIDATION_REMINDER_QUEUE) { - - if (isset($config['validationReminder']) && - isset($config['validationReminder']['interval'])) { - - // convert in milli second - return (int)($config['validationReminder']['interval']) * 1000; - } - - // default value to 2 hour if not set - return (int) 7200 * 1000; - - } elseif (isset($config['retry_queue']) && - isset($config['retry_queue'][array_search($routing, AMQPConnection::$defaultQueues)])) { - - return (int)($config['retry_queue'][array_search($routing, AMQPConnection::$defaultQueues)]); - } - - if ($routing == MessagePublisher::FTP_QUEUE) { - return self::RETRY_LARGE_DELAY; - } else { - return self::RETRY_DELAY; - } - } - - private function getTtlDelayedPerRouting($routing) - { - $delayed = [ - MessagePublisher::METADATAS_QUEUE => 'delayedWriteMeta', - MessagePublisher::SUBDEF_QUEUE => 'delayedSubdef' - ]; - - $config = $this->conf->get(['workers']); - - if (isset($config['retry_queue']) && isset($config['retry_queue'][$delayed[$routing]])) { - return (int)$config['retry_queue'][$delayed[$routing]]; - } - - return self::DELAY; - } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php index 55ee1914b4..49db7da2d0 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php @@ -4,6 +4,7 @@ namespace Alchemy\Phrasea\WorkerManager\Queue; use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool; use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker; +use Exception; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -11,8 +12,6 @@ use Ramsey\Uuid\Uuid; class MessageHandler { - const MAX_OF_TRY = 3; - private $messagePublisher; public function __construct(MessagePublisher $messagePublisher) @@ -20,27 +19,37 @@ class MessageHandler $this->messagePublisher = $messagePublisher; } - public function consume(AMQPConnection $serverConnection, WorkerInvoker $workerInvoker, $argQueueName, $maxProcesses) + /** + * called by WorkerExecuteCommand cli + * + * @param AMQPConnection $AMQPConnection + * @param WorkerInvoker $workerInvoker + * @param array|null $argQueueNames + * @param $maxProcesses + */ + public function consume(AMQPConnection $AMQPConnection, WorkerInvoker $workerInvoker, $argQueueNames, $maxProcesses) { - $publisher = $this->messagePublisher; - $channel = $serverConnection->getChannel(); + $channel = $AMQPConnection->getChannel(); if ($channel == null) { + // todo : if there is no channel, can we push ? $this->messagePublisher->pushLog("Can't connect to rabbit, check configuration!", "error"); return ; } - $serverConnection->declareExchange(); + $AMQPConnection->declareExchange(); // define consume callbacks - $callback = function (AMQPMessage $message) use ($channel, $workerInvoker, $publisher) { + $publisher = $this->messagePublisher; + $callback = function (AMQPMessage $message) use ($AMQPConnection, $channel, $workerInvoker, $publisher) { $data = json_decode($message->getBody(), true); $count = 0; + $headers = null; if ($message->has('application_headers')) { /** @var AMQPTable $headers */ $headers = $message->get('application_headers'); @@ -49,9 +58,10 @@ class MessageHandler if (isset($headerData['x-death'])) { $xDeathHeader = $headerData['x-death']; + // todo : if there are more than 1 xdeath ? what is $count ? foreach ($xDeathHeader as $xdeath) { $queue = $xdeath['queue']; - if (!in_array($queue, AMQPConnection::$defaultQueues)) { + if (!$AMQPConnection->isBaseQueue($queue)) { continue; } @@ -61,51 +71,45 @@ class MessageHandler } } - // if message is yet executed 3 times, save the unprocessed message in the corresponding failed queues - if ($count > self::MAX_OF_TRY && !in_array($data['message_type'], AMQPConnection::$defaultLoopTypes)) { - $this->messagePublisher->publishFailedMessage($data['payload'], $headers, AMQPConnection::$defaultFailedQueues[$data['message_type']]); + $msgType = $data['message_type']; - $logMessage = sprintf("Rabbit message executed 3 times, it's to be saved in %s , payload >>> %s", - AMQPConnection::$defaultFailedQueues[$data['message_type']], + if($count > $AMQPConnection->getMaxRetry($msgType) && !$AMQPConnection->hasLoopQueue($msgType)) { + $publisher->publishFailedMessage($data['payload'], $headers, $data['message_type']); + + $logMessage = sprintf("Rabbit message executed %s times, it's to be saved in %s , payload >>> %s", + $count, + $msgType, json_encode($data['payload']) ); - $this->messagePublisher->pushLog($logMessage); + $publisher->pushLog($logMessage); $channel->basic_ack($message->delivery_info['delivery_tag']); - } else { + } + else { try { - $workerInvoker->invokeWorker($data['message_type'], json_encode($data['payload'])); + $workerInvoker->invokeWorker($msgType, json_encode($data['payload'])); - if (in_array($data['message_type'], AMQPConnection::$defaultLoopTypes)) { + if ($AMQPConnection->hasLoopQueue($msgType)) { // make a loop for the loop type $channel->basic_nack($message->delivery_info['delivery_tag']); } else { $channel->basic_ack($message->delivery_info['delivery_tag']); } - $oldPayload = $data['payload']; - $message = $data['message_type'].' to be consumed! >> Payload ::'. json_encode($oldPayload); - - $publisher->pushLog($message); - } catch (\Exception $e) { + $publisher->pushLog( + sprintf('"%s" to be consumed! >> Payload :: %s', $msgType, json_encode($data['payload'])) + ); + } + catch (Exception $e) { $channel->basic_nack($message->delivery_info['delivery_tag']); } } }; - $prefetchCount = ProcessPool::MAX_PROCESSES; - - if ($maxProcesses) { - $prefetchCount = $maxProcesses; - } - - foreach (AMQPConnection::$defaultQueues as $queueName) { - if ($argQueueName ) { - if (in_array($queueName, $argQueueName)) { - $this->runConsumer($queueName, $serverConnection, $channel, $prefetchCount, $callback); - } - } else { - $this->runConsumer($queueName, $serverConnection, $channel, $prefetchCount, $callback); + $prefetchCount = $maxProcesses ? $maxProcesses : ProcessPool::MAX_PROCESSES; + foreach($AMQPConnection->getBaseQueueNames() as $queueName) { + if (!$argQueueNames || in_array($queueName, $argQueueNames)) { + $this->runConsumer($queueName, $AMQPConnection, $channel, $prefetchCount, $callback); } } } @@ -114,9 +118,10 @@ class MessageHandler { $serverConnection->setQueue($queueName); + // todo : remove this if !!! move code to a generic place // initialize validation reminder when starting consumer - if ($queueName == MessagePublisher::VALIDATION_REMINDER_QUEUE) { - $serverConnection->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_QUEUE]); + if ($queueName == MessagePublisher::VALIDATION_REMINDER_TYPE) { + $serverConnection->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_TYPE]); $this->messagePublisher->initializeLoopQueue(MessagePublisher::VALIDATION_REMINDER_TYPE); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php index 7da797f630..d58a5a7711 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php @@ -2,6 +2,8 @@ namespace Alchemy\Phrasea\WorkerManager\Queue; +use DateTime; +use DateTimeZone; use Monolog\Logger; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -28,107 +30,96 @@ class MessagePublisher const MAIN_QUEUE_TYPE = 'mainQueue'; - const MAIN_QUEUE = 'main-queue'; - const SUBTITLE_QUEUE = 'subtitle-queue'; - // ** ** \\ - - // worker queue to be consumed, when no ack , it is requeued to the retry queue - const ASSETS_INGEST_QUEUE = 'ingest-queue'; - const CREATE_RECORD_QUEUE = 'createrecord-queue'; - const DELETE_RECORD_QUEUE = 'deleterecord-queue'; - const EXPORT_QUEUE = 'export-queue'; - const EXPOSE_UPLOAD_QUEUE = 'exposeupload-queue'; - const FTP_QUEUE = 'ftp-queue'; - const METADATAS_QUEUE = 'metadatas-queue'; - const POPULATE_INDEX_QUEUE = 'populateindex-queue'; - const PULL_QUEUE = 'pull-queue'; - const RECORD_EDIT_QUEUE = 'recordedit-queue'; - const SUBDEF_QUEUE = 'subdef-queue'; - const VALIDATION_REMINDER_QUEUE = 'validationReminder-queue'; - const WEBHOOK_QUEUE = 'webhook-queue'; - - - // retry queue - // we can use these retry queue with TTL, so when message expires it is requeued to the corresponding worker queue - const RETRY_ASSETS_INGEST_QUEUE = 'retry-ingest-queue'; - const RETRY_CREATE_RECORD_QUEUE = 'retry-createrecord-queue'; - const RETRY_EXPORT_QUEUE = 'retry-export-queue'; - const RETRY_FTP_QUEUE = 'retry-ftp-queue'; - const RETRY_METADATAS_QUEUE = 'retry-metadatas-queue'; - const RETRY_POPULATE_INDEX_QUEUE = 'retry-populateindex-queue'; - const RETRY_SUBDEF_QUEUE = 'retry-subdef-queue'; - const RETRY_WEBHOOK_QUEUE = 'retry-webhook-queue'; - - // use those queue to make a loop on a consumer - const LOOP_PULL_QUEUE = 'loop-pull-queue'; - const LOOP_VALIDATION_REMINDER_QUEUE = 'loop-validationReminder-queue'; - - - // all failed queue, if message is treated over 3 times it goes to the failed queue - const FAILED_ASSETS_INGEST_QUEUE = 'failed-ingest-queue'; - const FAILED_CREATE_RECORD_QUEUE = 'failed-createrecord-queue'; - const FAILED_EXPORT_QUEUE = 'failed-export-queue'; - const FAILED_FTP_QUEUE = 'failed-ftp-queue'; - const FAILED_METADATAS_QUEUE = 'failed-metadatas-queue'; - const FAILED_POPULATE_INDEX_QUEUE = 'failed-populateindex-queue'; - const FAILED_SUBDEF_QUEUE = 'failed-subdef-queue'; - const FAILED_WEBHOOK_QUEUE = 'failed-webhook-queue'; - - - // delayed queue when record is locked - const DELAYED_SUBDEF_QUEUE = 'delayed-subdef-queue'; - const DELAYED_METADATAS_QUEUE = 'delayed-metadatas-queue'; - const NEW_RECORD_MESSAGE = 'newrecord'; - /** @var AMQPConnection $serverConnection */ - private $serverConnection; + /** @var AMQPConnection $AMQPConnection */ + private $AMQPConnection; /** @var Logger */ private $logger; - public function __construct(AMQPConnection $serverConnection, LoggerInterface $logger) + public function __construct(AMQPConnection $AMQPConnection, LoggerInterface $logger) { - $this->serverConnection = $serverConnection; - $this->logger = $logger; + $this->AMQPConnection = $AMQPConnection; + $this->logger = $logger; } - public function publishMessage(array $payload, $queueName, $retryCount = null, $workerMessage = '') + public function publishMessage(array $payload, $queueName) { - // add published timestamp to all message payload - $payload['payload']['published'] = time(); - $msg = new AMQPMessage(json_encode($payload)); - $routing = array_search($queueName, AMQPConnection::$defaultRetryQueues); + $this->AMQPConnection->getBaseQueueName($queueName); // just to throw an exception if q is undefined - if (count($retryCount) && $routing != false) { + $this->_publishMessage($payload, $queueName); + } + + public function publishRetryMessage(array $payload, string $baseQueueName, $retryCount, $workerMessage) + { + $retryQ = $this->AMQPConnection->getRetryQueueName($baseQueueName); + + $headers = null; + if(!is_null($retryCount)) { // add a message header information $headers = new AMQPTable([ 'x-death' => [ [ 'count' => $retryCount, 'exchange' => AMQPConnection::ALCHEMY_EXCHANGE, - 'queue' => $routing, - 'routing-keys' => $routing, + 'queue' => $baseQueueName, + 'routing-keys' => $baseQueueName, 'reason' => 'rejected', // rejected is sended like nack - 'time' => new \DateTime('now', new \DateTimeZone('UTC')) + 'time' => new DateTime('now', new DateTimeZone('UTC')) ] ], 'worker-message' => $workerMessage ]); + } + $this->_publishMessage($payload, $retryQ, $headers); + } + public function publishDelayedMessage(array $payload, string $baseQueueName) + { + $delayedQ = $this->AMQPConnection->getDelayedQueueName($baseQueueName); + + $this->_publishMessage($payload, $delayedQ); + } + + public function publishFailedMessage(array $payload, AMQPTable $headers, $baseQueueName) + { + $FailedQ = $this->AMQPConnection->getFailedQueueName($baseQueueName); + + $msg = new AMQPMessage(json_encode($payload)); + $msg->set('application_headers', $headers); + + $channel = $this->AMQPConnection->setQueue($FailedQ); + if ($channel == null) { + $this->pushLog("Can't connect to rabbit, check configuration!", "error"); + + return ; + } + + $channel->basic_publish($msg, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $FailedQ); + + $this->_publishMessage($payload, $FailedQ, $headers); + } + + private function _publishMessage(array $payload, $queueName, $headers = null) + { + // add published timestamp to all message payload + $payload['payload']['published'] = time(); + $msg = new AMQPMessage(json_encode($payload)); + + if (!is_null($headers)) { + // add a message header information $msg->set('application_headers', $headers); } - $channel = $this->serverConnection->setQueue($queueName); - - if ($channel == null) { + if (is_null( ($channel = $this->AMQPConnection->setQueue($queueName)) )) { $this->pushLog("Can't connect to rabbit, check configuration!", "error"); return true; } - $exchange = in_array($queueName, AMQPConnection::$defaultQueues) ? AMQPConnection::ALCHEMY_EXCHANGE : AMQPConnection::RETRY_ALCHEMY_EXCHANGE; + $exchange = $this->AMQPConnection->getExchange($queueName); // in_array($queueName, AMQPConnection::$defaultQueues) ? AMQPConnection::ALCHEMY_EXCHANGE : AMQPConnection::RETRY_ALCHEMY_EXCHANGE; $channel->basic_publish($msg, $exchange, $queueName); return true; @@ -139,16 +130,16 @@ class MessagePublisher $payload = [ 'message_type' => $type, 'payload' => [ - 'initTimestamp' => new \DateTime('now', new \DateTimeZone('UTC')) + 'initTimestamp' => new DateTime('now', new DateTimeZone('UTC')) ] ]; - $this->publishMessage($payload, AMQPConnection::$defaultQueues[$type]); + $this->publishMessage($payload, $type); } public function connectionClose() { - $this->serverConnection->connectionClose(); + $this->AMQPConnection->connectionClose(); } /** @@ -163,18 +154,4 @@ class MessagePublisher call_user_func(array($this->logger, $method), $message, $context); } - public function publishFailedMessage(array $payload, AMQPTable $headers, $queueName) - { - $msg = new AMQPMessage(json_encode($payload)); - $msg->set('application_headers', $headers); - - $channel = $this->serverConnection->setQueue($queueName); - if ($channel == null) { - $this->pushLog("Can't connect to rabbit, check configuration!", "error"); - - return ; - } - - $channel->basic_publish($msg, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); - } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/WebhookPublisher.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/WebhookPublisher.php index a1bdcc888a..f58ec84c75 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/WebhookPublisher.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/WebhookPublisher.php @@ -24,6 +24,6 @@ class WebhookPublisher implements WebhookPublisherInterface ] ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::WEBHOOK_QUEUE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::WEBHOOK_TYPE); } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php index 07c37aa5fc..e6a211c564 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/AssetsIngestSubscriber.php @@ -33,7 +33,7 @@ class AssetsIngestSubscriber implements EventSubscriberInterface 'payload' => array_merge($event->getData(), ['type' => WorkerRunningJob::TYPE_PUSH]) ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_TYPE); } public function onAssetsCreationFailure(AssetsCreationFailureEvent $event) @@ -43,9 +43,9 @@ class AssetsIngestSubscriber implements EventSubscriberInterface 'payload' => $event->getPayload() ]; - $this->messagePublisher->publishMessage( + $this->messagePublisher->publishRetryMessage( $payload, - MessagePublisher::RETRY_ASSETS_INGEST_QUEUE, + MessagePublisher::ASSETS_INGEST_TYPE, $event->getCount(), $event->getWorkerMessage() ); @@ -84,9 +84,9 @@ class AssetsIngestSubscriber implements EventSubscriberInterface } } - $this->messagePublisher->publishMessage( + $this->messagePublisher->publishRetryMessage( $payload, - MessagePublisher::RETRY_CREATE_RECORD_QUEUE, + MessagePublisher::CREATE_RECORD_TYPE, // todo $event->getCount(), $event->getWorkerMessage() ); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExportSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExportSubscriber.php index 0bd7edfb9d..bbfde318bd 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExportSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExportSubscriber.php @@ -32,7 +32,7 @@ class ExportSubscriber implements EventSubscriberInterface ] ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::EXPORT_QUEUE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::EXPORT_MAIL_TYPE); } public function onExportMailFailure(ExportMailFailureEvent $event) @@ -47,9 +47,9 @@ class ExportSubscriber implements EventSubscriberInterface ] ]; - $this->messagePublisher->publishMessage( + $this->messagePublisher->publishRetryMessage( $payload, - MessagePublisher::RETRY_EXPORT_QUEUE, + MessagePublisher::EXPORT_MAIL_TYPE, $event->getCount(), $event->getWorkerMessage() ); @@ -66,7 +66,7 @@ class ExportSubscriber implements EventSubscriberInterface $this->messagePublisher->publishMessage( $payload, - MessagePublisher::FTP_QUEUE + MessagePublisher::FTP_TYPE ); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExposeSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExposeSubscriber.php index f4fed4d6be..44acd302ae 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExposeSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/ExposeSubscriber.php @@ -36,7 +36,7 @@ class ExposeSubscriber implements EventSubscriberInterface ] ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::EXPOSE_UPLOAD_QUEUE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::EXPOSE_UPLOAD_TYPE); } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php index 5d57699fa1..4d6e52cf2a 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/RecordSubscriber.php @@ -5,7 +5,6 @@ namespace Alchemy\Phrasea\WorkerManager\Subscriber; use Alchemy\Phrasea\Application; use Alchemy\Phrasea\Core\Event\Record\DeletedEvent; use Alchemy\Phrasea\Core\Event\Record\DeleteEvent; -use Alchemy\Phrasea\Core\Event\Record\MetadataChangedEvent; use Alchemy\Phrasea\Core\Event\Record\RecordEvent; use Alchemy\Phrasea\Core\Event\Record\RecordEvents; use Alchemy\Phrasea\Core\Event\Record\SubdefinitionCreateEvent; @@ -67,7 +66,7 @@ class RecordSubscriber implements EventSubscriberInterface ] ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::SUBDEF_QUEUE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::SUBDEF_CREATION_TYPE); } } } @@ -87,7 +86,7 @@ class RecordSubscriber implements EventSubscriberInterface ] ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::DELETE_RECORD_QUEUE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::DELETE_RECORD_TYPE); } public function onSubdefinitionCreationFailure(SubdefinitionCreationFailureEvent $event) @@ -128,9 +127,9 @@ class RecordSubscriber implements EventSubscriberInterface } } - $this->messagePublisher->publishMessage( + $this->messagePublisher->publishRetryMessage( $payload, - MessagePublisher::RETRY_SUBDEF_QUEUE, + MessagePublisher::SUBDEF_CREATION_TYPE, $event->getCount(), $event->getWorkerMessage() ); @@ -170,18 +169,19 @@ class RecordSubscriber implements EventSubscriberInterface ]; if ($subdef->is_physically_present()) { - $this->messagePublisher->publishMessage($payload, MessagePublisher::METADATAS_QUEUE); - } else { - $logMessage = sprintf("Subdef %s is not physically present! to be passed in the %s ! payload >>> %s", + $this->messagePublisher->publishMessage($payload, MessagePublisher::WRITE_METADATAS_TYPE); + } + else { + $logMessage = sprintf('Subdef "%s" is not physically present! to be passed in the retry q of "%s" ! payload >>> %s', $subdef->get_name(), - MessagePublisher::RETRY_METADATAS_QUEUE, + MessagePublisher::WRITE_METADATAS_TYPE, json_encode($payload) ); $this->messagePublisher->pushLog($logMessage); - $this->messagePublisher->publishMessage( + $this->messagePublisher->publishRetryMessage( $payload, - MessagePublisher::RETRY_METADATAS_QUEUE, + MessagePublisher::WRITE_METADATAS_TYPE, 2, 'Subdef is not physically present!' ); @@ -215,10 +215,10 @@ class RecordSubscriber implements EventSubscriberInterface ] ]; - $logMessage = sprintf("Subdef %s write meta failed, error : %s ! to be passed in the %s ! payload >>> %s", + $logMessage = sprintf('Subdef "%s" write meta failed, error : "%s" ! to be passed in the retry q of "%s" ! payload >>> %s', $event->getSubdefName(), $event->getWorkerMessage(), - MessagePublisher::RETRY_METADATAS_QUEUE, + MessagePublisher::WRITE_METADATAS_TYPE, json_encode($payload) ); $this->messagePublisher->pushLog($logMessage); @@ -248,9 +248,9 @@ class RecordSubscriber implements EventSubscriberInterface } } - $this->messagePublisher->publishMessage( + $this->messagePublisher->publishRetryMessage( $payload, - MessagePublisher::RETRY_METADATAS_QUEUE, + MessagePublisher::WRITE_METADATAS_TYPE, $event->getCount(), $event->getWorkerMessage() ); @@ -276,7 +276,7 @@ class RecordSubscriber implements EventSubscriberInterface ] ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::METADATAS_QUEUE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::WRITE_METADATAS_TYPE); } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php index b5bd748f2c..d6e92163ef 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SearchengineSubscriber.php @@ -40,7 +40,7 @@ class SearchengineSubscriber implements EventSubscriberInterface ] ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::POPULATE_INDEX_QUEUE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::POPULATE_INDEX_TYPE); } } @@ -83,9 +83,9 @@ class SearchengineSubscriber implements EventSubscriberInterface } } - $this->messagePublisher->publishMessage( + $this->messagePublisher->publishRetryMessage( $payload, - MessagePublisher::RETRY_POPULATE_INDEX_QUEUE, + MessagePublisher::POPULATE_INDEX_TYPE, $event->getCount(), $event->getWorkerMessage() ); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SubtitleSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SubtitleSubscriber.php index e67958165c..aab98ccebb 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SubtitleSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/SubtitleSubscriber.php @@ -65,7 +65,7 @@ class SubtitleSubscriber implements EventSubscriberInterface 'payload' => $data ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::MAIN_QUEUE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::MAIN_QUEUE_TYPE); } catch (\Exception $e) { $em->rollback(); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/WebhookSubscriber.php b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/WebhookSubscriber.php index eb45da3a37..414a3c5ac0 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Subscriber/WebhookSubscriber.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Subscriber/WebhookSubscriber.php @@ -29,9 +29,9 @@ class WebhookSubscriber implements EventSubscriberInterface ] ]; - $this->messagePublisher->publishMessage( + $this->messagePublisher->publishRetryMessage( $payload, - MessagePublisher::RETRY_WEBHOOK_QUEUE, + MessagePublisher::WEBHOOK_TYPE, $event->getCount(), $event->getWorkerMessage() ); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php index 4a46d8461c..e688a70ebe 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/AssetsIngestWorker.php @@ -2,8 +2,8 @@ namespace Alchemy\Phrasea\WorkerManager\Worker; -use Alchemy\Phrasea\Application\Helper\EntityManagerAware; use Alchemy\Phrasea\Application as PhraseaApplication; +use Alchemy\Phrasea\Application\Helper\EntityManagerAware; use Alchemy\Phrasea\Model\Entities\StoryWZ; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Repositories\UserRepository; @@ -80,7 +80,7 @@ class AssetsIngestWorker implements WorkerInterface 'commit_id' => $payload['commit_id'] ]; - $this->messagePublisher->publishMessage($createRecordMessage, MessagePublisher::CREATE_RECORD_QUEUE); + $this->messagePublisher->publishMessage($createRecordMessage, MessagePublisher::CREATE_RECORD_TYPE); /** @var WorkerRunningJob $workerRunningJob */ $workerRunningJob = $this->repoWorkerJob->findOneBy([ diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/FtpWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/FtpWorker.php index 3814e026ec..e849eadd1d 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/FtpWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/FtpWorker.php @@ -5,7 +5,6 @@ namespace Alchemy\Phrasea\WorkerManager\Worker; use Alchemy\Phrasea\Application; use Alchemy\Phrasea\Application\Helper\NotifierAware; use Alchemy\Phrasea\Core\LazyLocator; -use Alchemy\Phrasea\Exception\InvalidArgumentException; use Alchemy\Phrasea\Model\Entities\FtpExport; use Alchemy\Phrasea\Model\Entities\FtpExportElement; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; @@ -371,9 +370,9 @@ class FtpWorker implements WorkerInterface 'payload' => $payload ]; - $this->app['alchemy_worker.message.publisher']->publishMessage( + $this->getMessagePublisher()->publishRetryMessage( $fullPayload, - MessagePublisher::RETRY_FTP_QUEUE, + MessagePublisher::FTP_TYPE, $count, $workerMessage ); @@ -503,4 +502,13 @@ class FtpWorker implements WorkerInterface { return $this->app['repo.ftp-exports']; } + + /** + * @return MessagePublisher + */ + private function getMessagePublisher() + { + return $this->app['alchemy_worker.message.publisher']; + } + } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php index 82471f2b88..3448758cc7 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/MainQueueWorker.php @@ -29,7 +29,7 @@ class MainQueueWorker implements WorkerInterface switch ($payload['type']) { case MessagePublisher::SUBTITLE_TYPE: - $queue = MessagePublisher::SUBTITLE_QUEUE; + $queue = MessagePublisher::SUBTITLE_TYPE; $messageType = $payload['type']; unset($payload['type']); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php index fe83f5845c..9334c2e86c 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/PullAssetsWorker.php @@ -82,7 +82,7 @@ class PullAssetsWorker implements WorkerInterface ] ]; - $this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE); + $this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_TYPE); } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php index 6de0cb6228..12dc1cfcd9 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php @@ -75,7 +75,7 @@ class SubdefCreationWorker implements WorkerInterface if (!$canCreateSubdef) { // the file is in used to write meta - $this->messagePublisher->publishMessage($message, MessagePublisher::DELAYED_SUBDEF_QUEUE); + $this->messagePublisher->publishDelayedMessage($message, MessagePublisher::SUBDEF_CREATION_TYPE); return ; } @@ -178,7 +178,10 @@ class SubdefCreationWorker implements WorkerInterface // checking ended // order to write meta for the subdef if needed - $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent($record, $payload['subdefName'])); + $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent( + $record, + $payload['subdefName']) + ); $this->subdefGenerator->setLogger($oldLogger); diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php index d88d5f3332..c02a7a4cae 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WebhookWorker.php @@ -170,7 +170,11 @@ class WebhookWorker implements WorkerInterface $this->messagePublisher->pushLog($workerMessage); // count = 0 mean do not retry because no api application defined - $this->dispatch(WorkerEvents::WEBHOOK_DELIVER_FAILURE, new WebhookDeliverFailureEvent($webhookevent->getId(), $workerMessage, 0)); + $this->dispatch(WorkerEvents::WEBHOOK_DELIVER_FAILURE, new WebhookDeliverFailureEvent( + $webhookevent->getId(), + $workerMessage, + 0) + ); return; } @@ -235,7 +239,7 @@ class WebhookWorker implements WorkerInterface $this->messagePublisher->publishFailedMessage( $payload, new AMQPTable(['worker-message' => $e->getMessage()]), - MessagePublisher::FAILED_WEBHOOK_QUEUE + MessagePublisher::WEBHOOK_TYPE ); } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php index e181fe8f7c..ebe2dd6e9b 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -73,7 +73,7 @@ class WriteMetadatasWorker implements WorkerInterface if (!$canWriteMeta) { // the file is in used to generate subdef - $this->messagePublisher->publishMessage($message, MessagePublisher::DELAYED_METADATAS_QUEUE); + $this->messagePublisher->publishDeleyedMessage($message, MessagePublisher::WRITE_METADATAS_TYPE); return ; } diff --git a/lib/classes/patch/413_PHRAS_3282.php b/lib/classes/patch/413_PHRAS_3282.php new file mode 100644 index 0000000000..6ebfab3780 --- /dev/null +++ b/lib/classes/patch/413_PHRAS_3282.php @@ -0,0 +1,119 @@ + MessagePublisher::ASSETS_INGEST_TYPE, + 'createRecord' => MessagePublisher::CREATE_RECORD_TYPE, + 'deleteRecord' => MessagePublisher::DELETE_RECORD_TYPE, + 'exportMail' => MessagePublisher::EXPORT_MAIL_TYPE, + 'exposeUpload' => MessagePublisher::EXPOSE_UPLOAD_TYPE, + 'ftp' => MessagePublisher::FTP_TYPE, + 'populateIndex' => MessagePublisher::POPULATE_INDEX_TYPE, + 'pullAssets' => MessagePublisher::PULL_ASSETS_TYPE, + 'recordEdit' => MessagePublisher::RECORD_EDIT_TYPE, + 'subdefCreation' => MessagePublisher::SUBDEF_CREATION_TYPE, + 'validationReminder' => MessagePublisher::VALIDATION_REMINDER_TYPE, + 'writeMetadatas' => MessagePublisher::WRITE_METADATAS_TYPE, + 'webhook' => MessagePublisher::WEBHOOK_TYPE, + ]; + const OLDQ2NEWQ_ttl_delayed = [ + 'delayedSubdef' => MessagePublisher::SUBDEF_CREATION_TYPE, + 'delayedWriteMeta' => MessagePublisher::WRITE_METADATAS_TYPE, + ]; + /** @var string */ + private $release = '4.1.3'; + /** @var array */ + private $concern = [base::APPLICATION_BOX, base::DATA_BOX]; + + /** + * {@inheritdoc} + */ + public function get_release() + { + return $this->release; + } + + /** + * {@inheritdoc} + */ + public function getDoctrineMigrations() + { + return []; + } + + /** + * {@inheritdoc} + */ + public function require_all_upgrades() + { + return false; + } + + /** + * {@inheritdoc} + */ + public function concern() + { + return $this->concern; + } + + /** + * {@inheritdoc} + */ + public function apply(base $base, Application $app) + { + if ($base->get_base_type() === base::DATA_BOX) { + $this->patch_databox($base, $app); + } + elseif ($base->get_base_type() === base::APPLICATION_BOX) { + $this->patch_appbox($base, $app); + } + + return true; + } + + private function patch_databox(base $databox, Application $app) + { + } + + private function patch_appbox(base $databox, Application $app) + { + /** @var PropertyAccess $conf */ + $conf = $app['conf']; + + // -------------------------------------------- + // PHRAS-3282_refacto-some-code-on-workers_MASTER + // patch workers settings + // -------------------------------------------- + + foreach(self::OLDQ2NEWQ_ttl_retry as $old=>$new) { + if(($v = $conf->get(['workers', 'retry_queue', $old], null)) !== null) { + $conf->set(['workers', 'queues', $new, 'ttl_retry'], $v); + } + } + + foreach(self::OLDQ2NEWQ_ttl_delayed as $old=>$new) { + if(($v = $conf->get(['workers', 'retry_queue', $old], null)) !== null) { + $conf->set(['workers', 'queues', $new, 'ttl_delayed'], $v); + } + } + + if(($v = $conf->get(['workers', 'pull_assets', 'pullInterval'], null)) !== null) { + $conf->set(['workers', 'queues', MessagePublisher::PULL_ASSETS_TYPE, 'ttl_retry'], $v * 1000); + } + + if(($v = $conf->get(['workers', 'validationReminder', 'interval'], null)) !== null) { + $conf->set(['workers', 'queues', MessagePublisher::VALIDATION_REMINDER_TYPE, 'ttl_retry'], $v * 1000); + } + + $conf->remove(['workers', 'retry_queue']); + $conf->remove(['workers', 'pull_assets']); + $conf->remove(['workers', 'validationReminder']); + } +} diff --git a/templates/web/admin/worker-manager/index.html.twig b/templates/web/admin/worker-manager/index.html.twig index 556dbc059f..55e0fb0d4a 100644 --- a/templates/web/admin/worker-manager/index.html.twig +++ b/templates/web/admin/worker-manager/index.html.twig @@ -10,7 +10,7 @@ {{ "admin::workermanager:tab:configuration: title" | trans }} -
{{ 'admin::workermanager:tab:workerconfig: Set up the delay between two attempts per queue! (if not set, default 10000 millisecond)' |trans }}
{{ form_start(form, {'action': path('worker_admin_configuration')}) }} -{{ 'admin::workermanager:tab:workerconfig: if not set ,default 5000 millisecond' |trans }}
- -