Refacto queues & workers

- define queues in one unique place
- fix some admin ux
- patch conf
This commit is contained in:
jygaulier
2020-12-21 17:42:22 +01:00
parent 03690c0556
commit f2f5db3df6
38 changed files with 1183 additions and 552 deletions

View File

@@ -323,14 +323,23 @@ geocoding-providers:
provincefields: Province provincefields: Province
countryfields: Country, Pays countryfields: Country, Pays
workers: workers:
queue: queue:
worker-queue: worker-queue:
registry: alchemy_worker.queue_registry registry: alchemy_worker.queue_registry
host: localhost host: localhost
port: 5672 port: 5672
user: guest user: guest
password: guest password: guest
vhost: / vhost: /
queues:
writeMetadatas: # this Q is "delayable" in case of record is locked
ttl_retry: 1500 # overwrite 1000 ms default delay
ttl_delayed: 10000 # overwrite 5000 ms default delay
subdefCreation: # this Q is "delayable" in case of record is locked
ttl_delayed: 10000 # overwrite 5000 ms default delay
pullAssets:
ttl_retry: 5000
max_retry : 5
externalservice: externalservice:
ginger: ginger:

View File

@@ -13,19 +13,22 @@ use Alchemy\Phrasea\WorkerManager\Form\WorkerConfigurationType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerFtpType; use Alchemy\Phrasea\WorkerManager\Form\WorkerFtpType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerPullAssetsType; use Alchemy\Phrasea\WorkerManager\Form\WorkerPullAssetsType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerSearchengineType; use Alchemy\Phrasea\WorkerManager\Form\WorkerSearchengineType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerValidationReminderType;
use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection; use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Doctrine\ORM\OptimisticLockException;
use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Form\Form;
use Symfony\Component\Form\FormInterface; use Symfony\Component\Form\FormInterface;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;
class AdminConfigurationController extends Controller class AdminConfigurationController extends Controller
{ {
public function indexAction(PhraseaApplication $app) public function indexAction(PhraseaApplication $app, Request $request)
{ {
/** @var AMQPConnection $serverConnection */
$serverConnection = $this->app['alchemy_worker.amqp.connection'];
/** @var WorkerRunningJobRepository $repoWorker */ /** @var WorkerRunningJobRepository $repoWorker */
$repoWorker = $app['repo.worker-running-job']; $repoWorker = $app['repo.worker-running-job'];
@@ -39,9 +42,10 @@ class AdminConfigurationController extends Controller
$workerRunningJob = $repoWorker->findByStatus($filterStatus); $workerRunningJob = $repoWorker->findByStatus($filterStatus);
return $this->render('admin/worker-manager/index.html.twig', [ return $this->render('admin/worker-manager/index.html.twig', [
'isConnected' => ($serverConnection->getChannel() != null) ? true : false, 'isConnected' => $this->getAMQPConnection()->getChannel() != null,
'workerRunningJob' => $workerRunningJob, 'workerRunningJob' => $workerRunningJob,
'reload' => false 'reload' => false,
'_fragment' => $request->get('_fragment') ?? 'worker-configuration',
]); ]);
} }
@@ -52,26 +56,44 @@ class AdminConfigurationController extends Controller
*/ */
public function configurationAction(PhraseaApplication $app, Request $request) public function configurationAction(PhraseaApplication $app, Request $request)
{ {
$retryQueueConfig = $this->getRetryQueueConfiguration(); $AMQPConnection = $this->getAMQPConnection();
$form = $app->form(new WorkerConfigurationType(), $retryQueueConfig); $conf = $this->getConf()->get(['workers', 'queues'], []);
$form = $app->form(new WorkerConfigurationType($AMQPConnection), $conf);
$form->handleRequest($request); $form->handleRequest($request);
if ($form->isValid()) { if ($form->isValid()) {
// save config in file // save config
$app['conf']->set(['workers', 'retry_queue'], $form->getData()); // too bad we must remove null entries from data to not save in conf
$data = $form->getData();
array_walk(
$data,
function(&$qSettings, $qName) {
$qSettings = array_filter(
$qSettings,
function($setting) {
return $setting !== null;
}
);
}
);
$app['conf']->set(['workers', 'queues'], $data);
/*
* todo : reinitialize q can't depend on form content :
* e.g. if a ttl_retry is blank in form, the value should go back to default, so the q should be reinit.
*
$queues = array_intersect_key(AMQPConnection::$defaultQueues, $retryQueueConfig); $queues = array_intersect_key(AMQPConnection::$defaultQueues, $retryQueueConfig);
$retryQueuesToReset = array_intersect_key(AMQPConnection::$defaultRetryQueues, array_flip($queues)); $retryQueuesToReset = array_intersect_key(AMQPConnection::$defaultRetryQueues, array_flip($queues));
/** @var AMQPConnection $serverConnection */
$serverConnection = $this->app['alchemy_worker.amqp.connection'];
// change the queue TTL // change the queue TTL
$serverConnection->reinitializeQueue($retryQueuesToReset); $AMQPConnection->reinitializeQueue($retryQueuesToReset);
$serverConnection->reinitializeQueue(AMQPConnection::$defaultDelayedQueues); $AMQPConnection->reinitializeQueue(AMQPConnection::$defaultDelayedQueues);
*/
return $app->redirectPath('worker_admin'); // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter
return $app->redirectPath('worker_admin', ['_fragment'=>'worker-configuration']);
} }
return $this->render('admin/worker-manager/worker_configuration.html.twig', [ return $this->render('admin/worker-manager/worker_configuration.html.twig', [
@@ -84,7 +106,7 @@ class AdminConfigurationController extends Controller
/** @var WorkerRunningJobRepository $repoWorker */ /** @var WorkerRunningJobRepository $repoWorker */
$repoWorker = $app['repo.worker-running-job']; $repoWorker = $app['repo.worker-running-job'];
$reload = ($request->query->get('reload')) == 1 ? true : false ; $reload = ($request->query->get('reload') == 1);
$workerRunningJob = []; $workerRunningJob = [];
$filterStatus = []; $filterStatus = [];
@@ -114,8 +136,8 @@ class AdminConfigurationController extends Controller
/** /**
* @param Request $request * @param Request $request
* @param $workerId * @param $workerId
* @return \Symfony\Component\HttpFoundation\JsonResponse * @return JsonResponse
* @throws \Doctrine\ORM\OptimisticLockException * @throws OptimisticLockException
*/ */
public function changeStatusAction(Request $request, $workerId) public function changeStatusAction(Request $request, $workerId)
{ {
@@ -140,13 +162,11 @@ class AdminConfigurationController extends Controller
public function queueMonitorAction(PhraseaApplication $app, Request $request) public function queueMonitorAction(PhraseaApplication $app, Request $request)
{ {
$reload = ($request->query->get('reload')) == 1 ? true : false ; $reload = ($request->query->get('reload') == 1);
/** @var AMQPConnection $serverConnection */ $this->getAMQPConnection()->getChannel();
$serverConnection = $app['alchemy_worker.amqp.connection']; $this->getAMQPConnection()->declareExchange();
$serverConnection->getChannel(); $queuesStatus = $this->getAMQPConnection()->getQueuesStatus();
$serverConnection->declareExchange();
$queuesStatus = $serverConnection->getQueuesStatus();
return $this->render('admin/worker-manager/worker_queue_monitor.html.twig', [ return $this->render('admin/worker-manager/worker_queue_monitor.html.twig', [
'queuesStatus' => $queuesStatus, 'queuesStatus' => $queuesStatus,
@@ -162,10 +182,20 @@ class AdminConfigurationController extends Controller
return $this->app->json(['success' => false]); return $this->app->json(['success' => false]);
} }
/** @var AMQPConnection $serverConnection */ $this->getAMQPConnection()->reinitializeQueue([$queueName]);
$serverConnection = $this->app['alchemy_worker.amqp.connection'];
$serverConnection->reinitializeQueue([$queueName]); return $this->app->json(['success' => true]);
}
public function deleteQueueAction(PhraseaApplication $app, Request $request)
{
$queueName = $request->request->get('queueName');
if (empty($queueName)) {
return $this->app->json(['success' => false]);
}
$this->getAMQPConnection()->deleteQueue($queueName);
return $this->app->json(['success' => true]); return $this->app->json(['success' => true]);
} }
@@ -176,7 +206,8 @@ class AdminConfigurationController extends Controller
$repoWorker = $app['repo.worker-running-job']; $repoWorker = $app['repo.worker-running-job'];
$repoWorker->truncateWorkerTable(); $repoWorker->truncateWorkerTable();
return $app->redirectPath('worker_admin'); // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter
return $app->redirectPath('worker_admin', ['_fragment'=>'worker-info']);
} }
public function deleteFinishedAction(PhraseaApplication $app) public function deleteFinishedAction(PhraseaApplication $app)
@@ -185,7 +216,8 @@ class AdminConfigurationController extends Controller
$repoWorker = $app['repo.worker-running-job']; $repoWorker = $app['repo.worker-running-job'];
$repoWorker->deleteFinishedWorks(); $repoWorker->deleteFinishedWorks();
return $app->redirectPath('worker_admin'); // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter
return $app->redirectPath('worker_admin', ['_fragment'=>'worker-info']);
} }
public function searchengineAction(PhraseaApplication $app, Request $request) public function searchengineAction(PhraseaApplication $app, Request $request)
@@ -201,7 +233,8 @@ class AdminConfigurationController extends Controller
$this->getDispatcher()->dispatch(WorkerEvents::POPULATE_INDEX, new PopulateIndexEvent($populateInfo)); $this->getDispatcher()->dispatch(WorkerEvents::POPULATE_INDEX, new PopulateIndexEvent($populateInfo));
return $app->redirectPath('worker_admin'); // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter
return $app->redirectPath('worker_admin', ['_fragment'=>'worker-searchengine']);
} }
return $this->render('admin/worker-manager/worker_searchengine.html.twig', [ return $this->render('admin/worker-manager/worker_searchengine.html.twig', [
@@ -211,14 +244,12 @@ class AdminConfigurationController extends Controller
public function subviewAction() public function subviewAction()
{ {
return $this->render('admin/worker-manager/worker_subview.html.twig', [ return $this->render('admin/worker-manager/worker_subview.html.twig', [ ]);
]);
} }
public function metadataAction() public function metadataAction()
{ {
return $this->render('admin/worker-manager/worker_metadata.html.twig', [ return $this->render('admin/worker-manager/worker_metadata.html.twig', [ ]);
]);
} }
public function ftpAction(PhraseaApplication $app, Request $request) public function ftpAction(PhraseaApplication $app, Request $request)
@@ -231,7 +262,8 @@ class AdminConfigurationController extends Controller
// save new ftp config // save new ftp config
$app['conf']->set(['workers', 'ftp'], array_merge($ftpConfig, $form->getData())); $app['conf']->set(['workers', 'ftp'], array_merge($ftpConfig, $form->getData()));
return $app->redirectPath('worker_admin'); // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter
return $app->redirectPath('worker_admin', ['_fragment'=>'worker-ftp']);
} }
return $this->render('admin/worker-manager/worker_ftp.html.twig', [ return $this->render('admin/worker-manager/worker_ftp.html.twig', [
@@ -241,26 +273,59 @@ class AdminConfigurationController extends Controller
public function validationReminderAction(PhraseaApplication $app, Request $request) public function validationReminderAction(PhraseaApplication $app, Request $request)
{ {
$interval = $app['conf']->get(['workers', 'validationReminder', 'interval'], 7200); // nb : the "interval" for a loop-q is the ttl.
// so the setting is stored into the "queues" settings in conf.
// here only the "ttl_retry" can be set/changed in conf
$config = $this->getConf()->get(['workers', 'queues', MessagePublisher::VALIDATION_REMINDER_TYPE], []);
if(isset($config['ttl_retry'])) {
// all settings are in msec, but into the form we want large numbers in sec.
$config['ttl_retry'] /= 1000;
}
/** @var Form $form */
$form = $app->form(new WorkerValidationReminderType($this->getAMQPConnection()), $config);
if ($request->getMethod() == 'POST') { $form->handleRequest($request);
$reminderInterval = (int)$request->request->get('worker_reminder_interval'); if ($form->isSubmitted() && $form->isValid()) {
$data = $form->getData();
switch($data['act']) {
case 'save' : // save the form content (settings)
unset($data['act']); // don't save this
// the interval was displayed in sec. in form, convert back to msec
if(isset($data['ttl_retry'])) {
$data['ttl_retry'] *= 1000;
}
$app['conf']->set(['workers', 'queues', MessagePublisher::VALIDATION_REMINDER_TYPE], $data);
$this->getAMQPConnection()->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_TYPE]);
break;
case 'start':
// reinitialize the validation reminder queues
$this->getAMQPConnection()->setQueue(MessagePublisher::VALIDATION_REMINDER_TYPE);
$this->getAMQPConnection()->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_TYPE]);
$this->getMessagePublisher()->initializeLoopQueue(MessagePublisher::VALIDATION_REMINDER_TYPE);
break;
case 'stop':
$this->getAMQPConnection()->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_TYPE]);
break;
}
/** @var AMQPConnection $serverConnection */ // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter
$serverConnection = $this->app['alchemy_worker.amqp.connection']; return $app->redirectPath('worker_admin', ['_fragment'=>'worker-reminder']);
$serverConnection->setQueue(MessagePublisher::VALIDATION_REMINDER_QUEUE);
// save the period interval in second
$app['conf']->set(['workers', 'validationReminder', 'interval'], $reminderInterval);
// reinitialize the validation reminder queues
$serverConnection->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_QUEUE]);
$this->app['alchemy_worker.message.publisher']->initializeLoopQueue(MessagePublisher::VALIDATION_REMINDER_TYPE);
return $app->redirectPath('worker_admin');
} }
// guess if the q is "running" = check if there are pending message on Q or loop-Q
$running = false;
$qStatuses = $this->getAMQPConnection()->getQueuesStatus();
foreach([
MessagePublisher::VALIDATION_REMINDER_TYPE,
$this->getAMQPConnection()->getLoopQueueName(MessagePublisher::VALIDATION_REMINDER_TYPE)
] as $qName) {
if(isset($qStatuses[$qName]) && $qStatuses[$qName]['messageCount'] > 0) {
$running = true;
}
}
return $this->render('admin/worker-manager/worker_validation_reminder.html.twig', [ return $this->render('admin/worker-manager/worker_validation_reminder.html.twig', [
'interval' => $interval 'form' => $form->createView(),
'running' => $running
]); ]);
} }
@@ -276,23 +341,35 @@ class AdminConfigurationController extends Controller
public function pullAssetsAction(PhraseaApplication $app, Request $request) public function pullAssetsAction(PhraseaApplication $app, Request $request)
{ {
$pullAssetsConfig = $this->getPullAssetsConfiguration(); $pullAssetsConfig = $this->getConf()->get(['workers', 'pull_assets'], []);
// the "pullInterval" comes from the ttl_retry
$ttl_retry = $this->getConf()->get(['workers','queues', 'pullAssets', 'ttl_retry'], null);
if(!is_null($ttl_retry)) {
$ttl_retry /= 1000; // form is in sec
}
$pullAssetsConfig['pullInterval'] = $ttl_retry;
$form = $app->form(new WorkerPullAssetsType(), $pullAssetsConfig); $form = $app->form(new WorkerPullAssetsType(), $pullAssetsConfig);
$form->handleRequest($request); $form->handleRequest($request);
if ($form->isValid()) { if ($form->isValid()) {
/** @var AMQPConnection $serverConnection */
$serverConnection = $this->app['alchemy_worker.amqp.connection'];
$serverConnection->setQueue(MessagePublisher::PULL_QUEUE);
// save new pull config // save new pull config in 2 places
$app['conf']->set(['workers', 'pull_assets'], array_merge($pullAssetsConfig, $form->getData())); $form_data = $form->getData();
$ttl_retry = $form_data['pullInterval'];
unset($form_data['pullInterval'], $pullAssetsConfig['pullInterval']);
$app['conf']->set(['workers', 'pull_assets'], array_merge($pullAssetsConfig, $form_data));
if(!is_null($ttl_retry)) {
$this->getConf()->set(['workers','queues', 'pullAssets', 'ttl_retry'], 1000 * (int)$ttl_retry);
}
// reinitialize the pull queues // reinitialize the pull queues
$serverConnection->reinitializeQueue([MessagePublisher::PULL_QUEUE]); $this->getAMQPConnection()->setQueue(MessagePublisher::PULL_ASSETS_TYPE);
$this->app['alchemy_worker.message.publisher']->initializeLoopQueue(MessagePublisher::PULL_ASSETS_TYPE); $this->getAMQPConnection()->reinitializeQueue([MessagePublisher::PULL_ASSETS_TYPE]);
$this->getMessagePublisher()->initializeLoopQueue(MessagePublisher::PULL_ASSETS_TYPE);
return $app->redirectPath('worker_admin'); // too bad : _fragment does not work with our old url generator... it will be passed as plain url parameter
return $app->redirectPath('worker_admin', ['_fragment'=>'worker-pull-assets']);
} }
return $this->render('admin/worker-manager/worker_pull_assets.html.twig', [ return $this->render('admin/worker-manager/worker_pull_assets.html.twig', [
@@ -300,6 +377,16 @@ class AdminConfigurationController extends Controller
]); ]);
} }
/**
* @return MessagePublisher
*/
private function getMessagePublisher()
{
return $this->app['alchemy_worker.message.publisher'];
}
/** /**
* @return EventDispatcherInterface * @return EventDispatcherInterface
*/ */
@@ -333,18 +420,25 @@ class AdminConfigurationController extends Controller
return $data; return $data;
} }
private function getPullAssetsConfiguration()
{
return $this->app['conf']->get(['workers', 'pull_assets'], []);
}
private function getFtpConfiguration() private function getFtpConfiguration()
{ {
return $this->app['conf']->get(['workers', 'ftp'], []); return $this->getConf()->get(['workers', 'ftp'], []);
} }
private function getRetryQueueConfiguration() /**
* @return AMQPConnection
*/
private function getAMQPConnection()
{ {
return $this->app['conf']->get(['workers', 'retry_queue'], []); return $this->app['alchemy_worker.amqp.connection'];
} }
/**
* @return UrlGeneratorInterface
*/
private function getUrlGenerator()
{
return $this->app['url_generator'];
}
} }

View File

@@ -10,7 +10,7 @@ class AssetsCreationFailureEvent extends SfEvent
private $workerMessage; private $workerMessage;
private $count; private $count;
public function __construct($payload, $workerMessage, $count = 2) public function __construct($payload, $workerMessage, $count)
{ {
$this->payload = $payload; $this->payload = $payload;
$this->workerMessage = $workerMessage; $this->workerMessage = $workerMessage;

View File

@@ -12,7 +12,7 @@ class AssetsCreationRecordFailureEvent extends SfEvent
private $count; private $count;
private $workerJobId; private $workerJobId;
public function __construct($payload, $workerMessage = '', $count = 2, $workerJobId = 0) public function __construct($payload, $workerMessage, $count, $workerJobId )
{ {
$this->payload = $payload; $this->payload = $payload;
$this->workerMessage = $workerMessage; $this->workerMessage = $workerMessage;

View File

@@ -13,7 +13,7 @@ class ExportMailFailureEvent extends SfEvent
private $workerMessage; private $workerMessage;
private $count; private $count;
public function __construct($emitterUserId, $tokenValue, $destinationMails, $params, $workerMessage = '', $count = 2) public function __construct($emitterUserId, $tokenValue, $destinationMails, $params, $workerMessage, $count)
{ {
$this->emitterUserId = $emitterUserId; $this->emitterUserId = $emitterUserId;
$this->tokenValue = $tokenValue; $this->tokenValue = $tokenValue;

View File

@@ -14,7 +14,7 @@ class PopulateIndexFailureEvent extends SfEvent
private $count; private $count;
private $workerJobId; private $workerJobId;
public function __construct($host, $port, $indexName, $databoxId, $workerMessage = '', $count = 2, $workerJobId = 0) public function __construct($host, $port, $indexName, $databoxId, $workerMessage, $count, $workerJobId)
{ {
$this->host = $host; $this->host = $host;
$this->port = $port; $this->port = $port;

View File

@@ -12,7 +12,7 @@ class SubdefinitionCreationFailureEvent extends RecordEvent
private $count; private $count;
private $workerJobId; private $workerJobId;
public function __construct(RecordInterface $record, $subdefName, $workerMessage = '', $count = 2, $workerJobId = 0) public function __construct(RecordInterface $record, $subdefName, $workerMessage, $count, $workerJobId)
{ {
parent::__construct($record); parent::__construct($record);

View File

@@ -11,7 +11,7 @@ class WebhookDeliverFailureEvent extends SfEvent
private $count; private $count;
private $deleveryId; private $deleveryId;
public function __construct($webhookEventId, $workerMessage, $count = 2, $deleveryId = null) public function __construct($webhookEventId, $workerMessage, $count, $deleveryId = null)
{ {
$this->webhookEventId = $webhookEventId; $this->webhookEventId = $webhookEventId;
$this->workerMessage = $workerMessage; $this->workerMessage = $workerMessage;

View File

@@ -0,0 +1,68 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Form;
use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection;
use Symfony\Component\Form\AbstractType;
use Symfony\Component\Form\Extension\Core\Type\HiddenType;
use Symfony\Component\Form\Extension\Core\Type\IntegerType;
use Symfony\Component\Form\FormBuilderInterface;
class QueueSettingsType extends AbstractType
{
private $AMQPConnection;
private $baseQueueName;
public function __construct(AMQPConnection $AMQPConnection, string $baseQueueName)
{
$this->AMQPConnection = $AMQPConnection;
$this->baseQueueName = $baseQueueName;
}
public function buildForm(FormBuilderInterface $builder, array $options)
{
parent::buildForm($builder, $options);
$builder->add('n_workers', HiddenType::class, [
'label' => 'admin::workermanager:tab:workerconfig:n_workers',
'required' => false,
'attr' => [
'placeholder' => 1
]
]);
if($this->AMQPConnection->hasRetryQueue($this->baseQueueName) || $this->AMQPConnection->hasLoopQueue($this->baseQueueName)) {
$builder
->add('max_retry', IntegerType::class, [
'label' => 'admin::workermanager:tab:workerconfig:max retry',
'required' => false,
'attr' => [
'placeholder' => $this->AMQPConnection->getMaxRetry($this->baseQueueName),
//'class'=>'col'
]
])
->add('ttl_retry', IntegerType::class, [
'label' => 'admin::workermanager:tab:workerconfig:retry delay in ms',
'required' => false,
'attr' => [
'placeholder' => $this->AMQPConnection->getTTLRetry($this->baseQueueName),
//'class'=>'col'
]
]);
}
if($this->AMQPConnection->hasDelayedQueue($this->baseQueueName)) {
$builder->add('ttl_delayed', IntegerType::class, [
'label' => 'admin::workermanager:tab:workerconfig:delayed delay in ms',
'required' => false,
'attr' => [
'placeholder' => $this->AMQPConnection->getTTLDelayed($this->baseQueueName),
//'class'=>'col'
]
]);
}
}
public function getName()
{
return 'queue_settings';
}
}

View File

@@ -2,50 +2,80 @@
namespace Alchemy\Phrasea\WorkerManager\Form; namespace Alchemy\Phrasea\WorkerManager\Form;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher; use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection;
use Symfony\Component\Form\AbstractType; use Symfony\Component\Form\AbstractType;
use Symfony\Component\Form\Extension\Core\Type\SubmitType;
use Symfony\Component\Form\FormBuilderInterface; use Symfony\Component\Form\FormBuilderInterface;
class WorkerConfigurationType extends AbstractType class WorkerConfigurationType extends AbstractType
{ {
private $AMQPConnection;
public function __construct(AMQPConnection $AMQPConnection)
{
$this->AMQPConnection = $AMQPConnection;
}
public function buildForm(FormBuilderInterface $builder, array $options) public function buildForm(FormBuilderInterface $builder, array $options)
{ {
parent::buildForm($builder, $options); parent::buildForm($builder, $options);
$builder foreach($this->AMQPConnection->getBaseQueueNames() as $baseQueueName) {
->add(MessagePublisher::ASSETS_INGEST_TYPE, 'text', [ /*
'label' => 'admin::workermanager:tab:workerconfig: Ingest retry delay in ms' $g = null;
]) if($this->AMQPConnection->hasRetryQueue($baseQueueName) || $this->AMQPConnection->hasLoopQueue($baseQueueName)) {
->add(MessagePublisher::CREATE_RECORD_TYPE, 'text', [ $g = $g ?? $this->createFormGroup($builder, $baseQueueName);
'label' => 'admin::workermanager:tab:workerconfig: Create record retry delay in ms' $g->add('max_retry', TextType::class, [
]) 'label' => 'admin::workermanager:tab:workerconfig:max retry',
->add(MessagePublisher::SUBDEF_CREATION_TYPE, 'text', [ 'required' => false,
'label' => 'admin::workermanager:tab:workerconfig: Subdefinition retry delay in ms' 'attr' => [
]) 'placeholder' => $this->AMQPConnection->getMaxRetry($baseQueueName)
->add(MessagePublisher::WRITE_METADATAS_TYPE, 'text', [ ]
'label' => 'admin::workermanager:tab:workerconfig: Metadatas retry delay in ms' ]);
]) $g->add('ttl_retry', TextType::class, [
->add(MessagePublisher::WEBHOOK_TYPE, 'text', [ 'label' => 'admin::workermanager:tab:workerconfig:retry delay in ms',
'label' => 'admin::workermanager:tab:workerconfig: Webhook retry delay in ms' 'required' => false,
]) 'attr' => [
->add(MessagePublisher::EXPORT_MAIL_TYPE, 'text', [ 'placeholder' => $this->AMQPConnection->getTTLRetry($baseQueueName)
'label' => 'admin::workermanager:tab:workerconfig: Export mail retry delay in ms' ]
]) ]);
->add(MessagePublisher::POPULATE_INDEX_TYPE, 'text', [ }
'label' => 'admin::workermanager:tab:workerconfig: Populate Index retry delay in ms' if($this->AMQPConnection->hasDelayedQueue($baseQueueName)) {
]) $g = $g ?? $this->createFormGroup($builder, $baseQueueName);
->add(MessagePublisher::FTP_TYPE, 'text', [ $g->add('ttl_delayed', TextType::class, [
'label' => 'admin::workermanager:tab:workerconfig: Ftp retry delay in ms (default 3 min)' 'label' => 'admin::workermanager:tab:workerconfig:delayed delay in ms',
]) 'required' => false,
->add('delayedSubdef', 'text', [ 'attr' => [
'label' => 'admin::workermanager:tab:workerconfig: Subdef delay in ms' 'placeholder' => $this->AMQPConnection->getTTLDelayed($baseQueueName)
]) ]
->add('delayedWriteMeta', 'text', [ ]);
'label' => 'admin::workermanager:tab:workerconfig: Write meta delay in ms' }
]) if($g) {
; // $builder->add($g);
}
*/
if($this->AMQPConnection->hasRetryQueue($baseQueueName)
|| $this->AMQPConnection->hasLoopQueue($baseQueueName)
|| $this->AMQPConnection->hasDelayedQueue($baseQueueName)
) {
$f = new QueueSettingsType($this->AMQPConnection, $baseQueueName);
$builder->add($baseQueueName, $f, ['attr' => ['class' => 'norow'], 'block_name' => 'queue']);
}
}
$builder->add("boutton::appliquer", SubmitType::class,
[
'label' => "boutton::appliquer"
]);
} }
/*
private function createFormGroup(FormBuilderInterface $builder, string $name)
{
// todo : fix form render : one horizontal block per queue
return $builder->create($name, FormType::class, ['attr'=>['class'=>'form-row']]);
}
*/
public function getName() public function getName()
{ {
return 'worker_configuration'; return 'worker_configuration';

View File

@@ -3,6 +3,7 @@
namespace Alchemy\Phrasea\WorkerManager\Form; namespace Alchemy\Phrasea\WorkerManager\Form;
use Symfony\Component\Form\AbstractType; use Symfony\Component\Form\AbstractType;
use Symfony\Component\Form\Extension\Core\Type\TextType;
use Symfony\Component\Form\FormBuilderInterface; use Symfony\Component\Form\FormBuilderInterface;
class WorkerFtpType extends AbstractType class WorkerFtpType extends AbstractType
@@ -12,19 +13,19 @@ class WorkerFtpType extends AbstractType
parent::buildForm($builder, $options); parent::buildForm($builder, $options);
$builder $builder
->add('proxy', 'text', [ ->add('proxy', TextType::class, [
'label' => 'admin::workermanager:tab:ftp: Proxy', 'label' => 'admin::workermanager:tab:ftp: Proxy',
'required' => false 'required' => false
]) ])
->add('proxyPort', 'text', [ ->add('proxyPort', TextType::class, [
'label' => 'admin::workermanager:tab:ftp: Proxy port', 'label' => 'admin::workermanager:tab:ftp: Proxy port',
'required' => false 'required' => false
]) ])
->add('proxyUser', 'text', [ ->add('proxyUser', TextType::class, [
'label' => 'admin::workermanager:tab:ftp: Proxy user', 'label' => 'admin::workermanager:tab:ftp: Proxy user',
'required' => false 'required' => false
]) ])
->add('proxyPassword', 'text', [ ->add('proxyPassword', TextType::class, [
'label' => 'admin::workermanager:tab:ftp: Proxy password', 'label' => 'admin::workermanager:tab:ftp: Proxy password',
'required' => false 'required' => false
]) ])

View File

@@ -3,6 +3,7 @@
namespace Alchemy\Phrasea\WorkerManager\Form; namespace Alchemy\Phrasea\WorkerManager\Form;
use Symfony\Component\Form\AbstractType; use Symfony\Component\Form\AbstractType;
use Symfony\Component\Form\Extension\Core\Type\TextType;
use Symfony\Component\Form\FormBuilderInterface; use Symfony\Component\Form\FormBuilderInterface;
class WorkerPullAssetsType extends AbstractType class WorkerPullAssetsType extends AbstractType
@@ -12,16 +13,16 @@ class WorkerPullAssetsType extends AbstractType
parent::buildForm($builder, $options); parent::buildForm($builder, $options);
$builder $builder
->add('UploaderApiBaseUri', 'text', [ ->add('UploaderApiBaseUri', TextType::class, [
'label' => 'admin::workermanager:tab:pullassets: Uploader api base uri' 'label' => 'admin::workermanager:tab:pullassets: Uploader api base uri'
]) ])
->add('clientSecret', 'text', [ ->add('clientSecret', TextType::class, [
'label' => 'admin::workermanager:tab:pullassets: Client secret' 'label' => 'admin::workermanager:tab:pullassets: Client secret'
]) ])
->add('clientId', 'text', [ ->add('clientId', TextType::class, [
'label' => 'admin::workermanager:tab:pullassets: Client ID' 'label' => 'admin::workermanager:tab:pullassets: Client ID'
]) ])
->add('pullInterval', 'text', [ ->add('pullInterval', TextType::class, [
'label' => 'admin::workermanager:tab:pullassets: Fetching interval in second' 'label' => 'admin::workermanager:tab:pullassets: Fetching interval in second'
]) ])
; ;

View File

@@ -3,6 +3,8 @@
namespace Alchemy\Phrasea\WorkerManager\Form; namespace Alchemy\Phrasea\WorkerManager\Form;
use Symfony\Component\Form\AbstractType; use Symfony\Component\Form\AbstractType;
use Symfony\Component\Form\Extension\Core\Type\IntegerType;
use Symfony\Component\Form\Extension\Core\Type\TextType;
use Symfony\Component\Form\FormBuilderInterface; use Symfony\Component\Form\FormBuilderInterface;
use Symfony\Component\OptionsResolver\OptionsResolverInterface; use Symfony\Component\OptionsResolver\OptionsResolverInterface;
use Symfony\Component\Validator\Constraints\NotBlank; use Symfony\Component\Validator\Constraints\NotBlank;
@@ -15,18 +17,18 @@ class WorkerSearchengineType extends AbstractType
parent::buildForm($builder, $options); parent::buildForm($builder, $options);
$builder $builder
->add('host', 'text', [ ->add('host', TextType::class, [
'label' => 'admin::workermanager:tab:searchengine: Elasticsearch server host', 'label' => 'admin::workermanager:tab:searchengine: Elasticsearch server host',
'constraints' => new NotBlank(), 'constraints' => new NotBlank(),
]) ])
->add('port', 'integer', [ ->add('port', IntegerType::class, [
'label' => 'admin::workermanager:tab:searchengine: Elasticsearch service port', 'label' => 'admin::workermanager:tab:searchengine: Elasticsearch service port',
'constraints' => [ 'constraints' => [
new Range(['min' => 1, 'max' => 65535]), new Range(['min' => 1, 'max' => 65535]),
new NotBlank() new NotBlank()
] ]
]) ])
->add('indexName', 'text', [ ->add('indexName', TextType::class, [
'label' => 'admin::workermanager:tab:searchengine: Elasticsearch index name', 'label' => 'admin::workermanager:tab:searchengine: Elasticsearch index name',
'constraints' => new NotBlank(), 'constraints' => new NotBlank(),
'attr' =>['data-class'=>'inline'] 'attr' =>['data-class'=>'inline']

View File

@@ -0,0 +1,68 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Form;
use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Symfony\Component\Form\AbstractType;
use Symfony\Component\Form\Extension\Core\Type\HiddenType;
use Symfony\Component\Form\Extension\Core\Type\SubmitType;
use Symfony\Component\Form\Extension\Core\Type\TextType;
use Symfony\Component\Form\FormBuilderInterface;
class WorkerValidationReminderType extends AbstractType
{
private $AMQPConnection;
public function __construct(AMQPConnection $AMQPConnection)
{
$this->AMQPConnection = $AMQPConnection;
}
public function buildForm(FormBuilderInterface $builder, array $options)
{
parent::buildForm($builder, $options);
// because this form will have 3 submit buttons - to use the same route -, this "act" field
// will reflect the value of the clicked button (js)
// !!! tried: using symfony "getClickedButton()" does to NOT work (submit button values seems not sent in request ?)
$builder
->add('act', HiddenType::class, [
'attr' => [
'class' => 'act'
]
]);
// every ttl is in msec, we display this large one (loop q) in sec in form.
$interval = $this->AMQPConnection->getTTLRetry(MessagePublisher::VALIDATION_REMINDER_TYPE) / 1000;
$builder
->add('ttl_retry', TextType::class, [
'label' => 'admin::workermanager:tab:Reminder: Interval in second',
'attr' => [
'placeholder' => $interval
]
]);
$builder
->add("boutton::appliquer", SubmitType::class, [
'label' => "boutton::appliquer",
'attr' => ['value' => 'save']
]);
/*
$builder
->add("submit", ButtonType::class, [
'label' => "start",
// 'data' => 'truc',
// 'empty_data' => 'machin',
'attr'=>[
'value' => 'start',
'name' => 'zobi'
]
]);
*/
}
// public function getName()
// {
// return 'worker_pullAssets';
// }
}

View File

@@ -45,61 +45,81 @@ class ControllerServiceProvider implements ControllerProviderInterface, ServiceP
$firewall->requireRight(\ACL::TASKMANAGER); $firewall->requireRight(\ACL::TASKMANAGER);
}); });
/** @uses AdminConfigurationController::indexAction */
$controllers->match('/', 'controller.worker.admin.configuration:indexAction') $controllers->match('/', 'controller.worker.admin.configuration:indexAction')
->method('GET') ->method('GET')
->bind('worker_admin'); ->bind('worker_admin');
/** @uses AdminConfigurationController::configurationAction */
$controllers->match('/configuration', 'controller.worker.admin.configuration:configurationAction') $controllers->match('/configuration', 'controller.worker.admin.configuration:configurationAction')
->method('GET|POST') ->method('GET|POST')
->bind('worker_admin_configuration'); ->bind('worker_admin_configuration');
/** @uses AdminConfigurationController::infoAction */
$controllers->match('/info', 'controller.worker.admin.configuration:infoAction') $controllers->match('/info', 'controller.worker.admin.configuration:infoAction')
->method('GET') ->method('GET')
->bind('worker_admin_info'); ->bind('worker_admin_info');
/** @uses AdminConfigurationController::truncateTableAction */
$controllers->match('/truncate', 'controller.worker.admin.configuration:truncateTableAction') $controllers->match('/truncate', 'controller.worker.admin.configuration:truncateTableAction')
->method('POST') ->method('POST')
->bind('worker_admin_truncate'); ->bind('worker_admin_truncate');
/** @uses AdminConfigurationController::deleteFinishedAction */
$controllers->match('/delete-finished', 'controller.worker.admin.configuration:deleteFinishedAction') $controllers->match('/delete-finished', 'controller.worker.admin.configuration:deleteFinishedAction')
->method('POST') ->method('POST')
->bind('worker_admin_delete_finished'); ->bind('worker_admin_delete_finished');
/** @uses AdminConfigurationController::searchengineAction */
$controllers->match('/searchengine', 'controller.worker.admin.configuration:searchengineAction') $controllers->match('/searchengine', 'controller.worker.admin.configuration:searchengineAction')
->method('GET|POST') ->method('GET|POST')
->bind('worker_admin_searchengine'); ->bind('worker_admin_searchengine');
/** @uses AdminConfigurationController::subviewAction */
$controllers->match('/subview', 'controller.worker.admin.configuration:subviewAction') $controllers->match('/subview', 'controller.worker.admin.configuration:subviewAction')
->method('GET|POST') ->method('GET|POST')
->bind('worker_admin_subview'); ->bind('worker_admin_subview');
/** @uses AdminConfigurationController::metadataAction */
$controllers->match('/metadata', 'controller.worker.admin.configuration:metadataAction') $controllers->match('/metadata', 'controller.worker.admin.configuration:metadataAction')
->method('GET|POST') ->method('GET|POST')
->bind('worker_admin_metadata'); ->bind('worker_admin_metadata');
/** @uses AdminConfigurationController::ftpAction */
$controllers->match('/ftp', 'controller.worker.admin.configuration:ftpAction') $controllers->match('/ftp', 'controller.worker.admin.configuration:ftpAction')
->method('GET|POST') ->method('GET|POST')
->bind('worker_admin_ftp'); ->bind('worker_admin_ftp');
/** @uses AdminConfigurationController::populateStatusAction */
$controllers->get('/populate-status', 'controller.worker.admin.configuration:populateStatusAction') $controllers->get('/populate-status', 'controller.worker.admin.configuration:populateStatusAction')
->bind('worker_admin_populate_status'); ->bind('worker_admin_populate_status');
/** @uses AdminConfigurationController::pullAssetsAction */
$controllers->match('/pull-assets', 'controller.worker.admin.configuration:pullAssetsAction') $controllers->match('/pull-assets', 'controller.worker.admin.configuration:pullAssetsAction')
->method('GET|POST') ->method('GET|POST')
->bind('worker_admin_pullAssets'); ->bind('worker_admin_pullAssets');
/** @uses AdminConfigurationController::validationReminderAction */
$controllers->match('/validation-reminder', 'controller.worker.admin.configuration:validationReminderAction') $controllers->match('/validation-reminder', 'controller.worker.admin.configuration:validationReminderAction')
->method('GET|POST') ->method('GET|POST')
->bind('worker_admin_validationReminder'); ->bind('worker_admin_validationReminder');
/** @uses AdminConfigurationController::queueMonitorAction */
$controllers->match('/queue-monitor', 'controller.worker.admin.configuration:queueMonitorAction') $controllers->match('/queue-monitor', 'controller.worker.admin.configuration:queueMonitorAction')
->method('GET') ->method('GET')
->bind('worker_admin_queue_monitor'); ->bind('worker_admin_queue_monitor');
/** @uses AdminConfigurationController::purgeQueueAction */
$controllers->match('/purge-queue', 'controller.worker.admin.configuration:purgeQueueAction') $controllers->match('/purge-queue', 'controller.worker.admin.configuration:purgeQueueAction')
->method('POST') ->method('POST')
->bind('worker_admin_purge_queue'); ->bind('worker_admin_purge_queue');
/** @uses AdminConfigurationController::deleteQueueAction */
$controllers->match('/delete-queue', 'controller.worker.admin.configuration:deleteQueueAction')
->method('POST')
->bind('worker_admin_delete_queue');
/** @uses AdminConfigurationController::changeStatusAction */
$controllers->match('/{workerId}/change-status', 'controller.worker.admin.configuration:changeStatusAction') $controllers->match('/{workerId}/change-status', 'controller.worker.admin.configuration:changeStatusAction')
->method('POST') ->method('POST')
->assert('workerId', '\d+') ->assert('workerId', '\d+')

View File

@@ -3,6 +3,7 @@
namespace Alchemy\Phrasea\WorkerManager\Queue; namespace Alchemy\Phrasea\WorkerManager\Queue;
use Alchemy\Phrasea\Core\Configuration\PropertyAccess; use Alchemy\Phrasea\Core\Configuration\PropertyAccess;
use Exception;
use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable; use PhpAmqpLib\Wire\AMQPTable;
@@ -20,59 +21,6 @@ class AMQPConnection
private $hostConfig; private $hostConfig;
private $conf; private $conf;
public static $defaultQueues = [
MessagePublisher::WRITE_METADATAS_TYPE => MessagePublisher::METADATAS_QUEUE,
MessagePublisher::SUBDEF_CREATION_TYPE => MessagePublisher::SUBDEF_QUEUE,
MessagePublisher::EXPORT_MAIL_TYPE => MessagePublisher::EXPORT_QUEUE,
MessagePublisher::WEBHOOK_TYPE => MessagePublisher::WEBHOOK_QUEUE,
MessagePublisher::ASSETS_INGEST_TYPE => MessagePublisher::ASSETS_INGEST_QUEUE,
MessagePublisher::CREATE_RECORD_TYPE => MessagePublisher::CREATE_RECORD_QUEUE,
MessagePublisher::PULL_ASSETS_TYPE => MessagePublisher::PULL_QUEUE,
MessagePublisher::POPULATE_INDEX_TYPE => MessagePublisher::POPULATE_INDEX_QUEUE,
MessagePublisher::DELETE_RECORD_TYPE => MessagePublisher::DELETE_RECORD_QUEUE,
MessagePublisher::MAIN_QUEUE_TYPE => MessagePublisher::MAIN_QUEUE,
MessagePublisher::SUBTITLE_TYPE => MessagePublisher::SUBTITLE_QUEUE,
MessagePublisher::FTP_TYPE => MessagePublisher::FTP_QUEUE,
MessagePublisher::VALIDATION_REMINDER_TYPE => MessagePublisher::VALIDATION_REMINDER_QUEUE,
MessagePublisher::EXPOSE_UPLOAD_TYPE => MessagePublisher::EXPOSE_UPLOAD_QUEUE,
MessagePublisher::RECORD_EDIT_TYPE => MessagePublisher::RECORD_EDIT_QUEUE
];
// the corresponding worker queues and retry queues, loop queue
public static $defaultRetryQueues = [
MessagePublisher::METADATAS_QUEUE => MessagePublisher::RETRY_METADATAS_QUEUE,
MessagePublisher::SUBDEF_QUEUE => MessagePublisher::RETRY_SUBDEF_QUEUE,
MessagePublisher::EXPORT_QUEUE => MessagePublisher::RETRY_EXPORT_QUEUE,
MessagePublisher::WEBHOOK_QUEUE => MessagePublisher::RETRY_WEBHOOK_QUEUE,
MessagePublisher::ASSETS_INGEST_QUEUE => MessagePublisher::RETRY_ASSETS_INGEST_QUEUE,
MessagePublisher::CREATE_RECORD_QUEUE => MessagePublisher::RETRY_CREATE_RECORD_QUEUE,
MessagePublisher::POPULATE_INDEX_QUEUE => MessagePublisher::RETRY_POPULATE_INDEX_QUEUE,
MessagePublisher::PULL_QUEUE => MessagePublisher::LOOP_PULL_QUEUE,
MessagePublisher::FTP_QUEUE => MessagePublisher::RETRY_FTP_QUEUE,
MessagePublisher::VALIDATION_REMINDER_QUEUE => MessagePublisher::LOOP_VALIDATION_REMINDER_QUEUE
];
public static $defaultFailedQueues = [
MessagePublisher::WRITE_METADATAS_TYPE => MessagePublisher::FAILED_METADATAS_QUEUE,
MessagePublisher::SUBDEF_CREATION_TYPE => MessagePublisher::FAILED_SUBDEF_QUEUE,
MessagePublisher::EXPORT_MAIL_TYPE => MessagePublisher::FAILED_EXPORT_QUEUE,
MessagePublisher::WEBHOOK_TYPE => MessagePublisher::FAILED_WEBHOOK_QUEUE,
MessagePublisher::ASSETS_INGEST_TYPE => MessagePublisher::FAILED_ASSETS_INGEST_QUEUE,
MessagePublisher::CREATE_RECORD_TYPE => MessagePublisher::FAILED_CREATE_RECORD_QUEUE,
MessagePublisher::POPULATE_INDEX_TYPE => MessagePublisher::FAILED_POPULATE_INDEX_QUEUE,
MessagePublisher::FTP_TYPE => MessagePublisher::FAILED_FTP_QUEUE
];
public static $defaultDelayedQueues = [
MessagePublisher::METADATAS_QUEUE => MessagePublisher::DELAYED_METADATAS_QUEUE,
MessagePublisher::SUBDEF_QUEUE => MessagePublisher::DELAYED_SUBDEF_QUEUE
];
public static $defaultLoopTypes = [
MessagePublisher::PULL_ASSETS_TYPE,
MessagePublisher::VALIDATION_REMINDER_TYPE
];
// default message TTL in retry queue in millisecond // default message TTL in retry queue in millisecond
const RETRY_DELAY = 10000; const RETRY_DELAY = 10000;
@@ -82,6 +30,91 @@ class AMQPConnection
// default message TTL in delayed queue in millisecond // default message TTL in delayed queue in millisecond
const DELAY = 5000; const DELAY = 5000;
// max number of retry before a message goes in failed
const MAX_RETRY = 3;
const WITH_NOTHING = 0;
const WITH_RETRY = 1;
const WITH_DELAYED = 2;
const WITH_LOOP = 4;
const BASE_QUEUE = 'base_q';
const BASE_QUEUE_WITH_RETRY = 'base_q_with_retry';
const BASE_QUEUE_WITH_LOOP = 'base_q_with_loop';
const RETRY_QUEUE = 'retry_q';
const LOOP_QUEUE = 'loop_q';
const FAILED_QUEUE = 'failed_q';
const DELAYED_QUEUE = 'delayed_q';
const MESSAGES = [
MessagePublisher::WRITE_METADATAS_TYPE => [
'with' => self::WITH_RETRY | self::WITH_DELAYED,
'max_retry' => self::MAX_RETRY,
'ttl_retry' => self::RETRY_DELAY,
'ttl_delayed' => self::DELAY
],
MessagePublisher::SUBDEF_CREATION_TYPE => [
'with' => self::WITH_RETRY | self::WITH_DELAYED,
'max_retry' => self::MAX_RETRY,
'ttl_retry' => self::RETRY_DELAY,
'ttl_delayed' => self::DELAY
],
MessagePublisher::EXPORT_MAIL_TYPE => [
'with' => self::WITH_RETRY,
'max_retry' => self::MAX_RETRY,
'ttl_retry' => self::RETRY_DELAY,
],
MessagePublisher::WEBHOOK_TYPE => [
'with' => self::WITH_RETRY,
'max_retry' => self::MAX_RETRY,
'ttl_retry' => self::RETRY_DELAY,
],
MessagePublisher::ASSETS_INGEST_TYPE => [
'with' => self::WITH_RETRY,
'max_retry' => self::MAX_RETRY,
'ttl_retry' => self::RETRY_DELAY,
],
MessagePublisher::CREATE_RECORD_TYPE => [
'with' => self::WITH_RETRY,
'max_retry' => self::MAX_RETRY,
'ttl_retry' => self::RETRY_DELAY,
],
MessagePublisher::PULL_ASSETS_TYPE => [
'with' => self::WITH_LOOP,
'max_retry' => self::MAX_RETRY,
'ttl_retry' => self::RETRY_DELAY,
],
MessagePublisher::POPULATE_INDEX_TYPE => [
'with' => self::WITH_RETRY,
'max_retry' => self::MAX_RETRY,
'ttl_retry' => self::RETRY_DELAY,
],
MessagePublisher::DELETE_RECORD_TYPE => [
'with' => self::WITH_NOTHING,
],
MessagePublisher::MAIN_QUEUE_TYPE => [
'with' => self::WITH_NOTHING,
],
MessagePublisher::SUBTITLE_TYPE => [
'with' => self::WITH_NOTHING,
],
MessagePublisher::EXPOSE_UPLOAD_TYPE => [
'with' => self::WITH_NOTHING,
],
MessagePublisher::FTP_TYPE => [
'with' => self::WITH_RETRY,
'max_retry' => self::MAX_RETRY,
'ttl_retry' => 180 * 1000,
],
MessagePublisher::VALIDATION_REMINDER_TYPE => [
'with' => self::WITH_LOOP,
'max_retry' => self::MAX_RETRY,
'ttl_retry' => 7200 * 1000,
],
];
private $queues = []; // filled during construct, from msg list, default values and conf
public function __construct(PropertyAccess $conf) public function __construct(PropertyAccess $conf)
{ {
$defaultConfiguration = [ $defaultConfiguration = [
@@ -94,6 +127,175 @@ class AMQPConnection
$this->hostConfig = $conf->get(['workers', 'queue', 'worker-queue'], $defaultConfiguration); $this->hostConfig = $conf->get(['workers', 'queue', 'worker-queue'], $defaultConfiguration);
$this->conf = $conf; $this->conf = $conf;
// fill list of type attributes
foreach (self::MESSAGES as $name => $attr) {
$this->queues[$name] = $attr; // default q
$this->queues[$name]['Name'] = $name;
$this->queues[$name]['QType'] = self::BASE_QUEUE;
$this->queues[$name]['Exchange'] = self::ALCHEMY_EXCHANGE;
// a q with retry can fail (after n retry) or loop
if($attr['with'] & self::WITH_RETRY) {
$this->queues[$name]['QType'] = self::BASE_QUEUE_WITH_RETRY; // todo : avoid changing qtype ?
// declare the retry q, cross-link with base q
$retry_name = 'retry_' . $name;
$this->queues[$name]['RetryQ'] = $retry_name; // link baseq to retryq
$this->queues[$retry_name] = $attr;
$this->queues[$retry_name]['Name'] = $retry_name;
$this->queues[$retry_name]['QType'] = self::RETRY_QUEUE;
$this->queues[$retry_name]['Exchange'] = self::RETRY_ALCHEMY_EXCHANGE;
$this->queues[$retry_name]['BaseQ'] = $name; // link retryq back to baseq
// declare the failed q, cross-link with base q
$failed_name = 'failed_' . $name;
$this->queues[$name]['FailedQ'] = $failed_name; // link baseq to failedq
$this->queues[$failed_name] = $attr;
$this->queues[$failed_name]['Name'] = $failed_name;
$this->queues[$failed_name]['QType'] = self::FAILED_QUEUE;
$this->queues[$failed_name]['Exchange'] = self::RETRY_ALCHEMY_EXCHANGE;
$this->queues[$failed_name]['BaseQ'] = $name; // link failedq back to baseq
}
// a q can be "delayed" to solve "work in progress" lock on records
if($attr['with'] & self::WITH_DELAYED) {
// declare the delayed q, cross-link with base q
$delayed_name = 'delayed_' . $name;
$this->queues[$name]['DelayedQ'] = $delayed_name; // link baseq to delayedq
$this->queues[$delayed_name] = $attr;
$this->queues[$delayed_name]['Name'] = $delayed_name;
$this->queues[$delayed_name]['QType'] = self::DELAYED_QUEUE;
$this->queues[$delayed_name]['Exchange'] = self::RETRY_ALCHEMY_EXCHANGE;
$this->queues[$delayed_name]['BaseQ'] = $name; // link delayedq back to baseq
}
if($attr['with'] & self::WITH_LOOP) {
$this->queues[$name]['QType'] = self::BASE_QUEUE_WITH_LOOP; // todo : avoid changing qtype ?
// declare the loop q, cross-link with base q
$loop_name = 'loop_' . $name;
$this->queues[$name]['LoopQ'] = $loop_name; // link baseq to loopq
$this->queues[$loop_name] = $attr;
$this->queues[$loop_name]['Name'] = $loop_name;
$this->queues[$loop_name]['QType'] = self::LOOP_QUEUE;
$this->queues[$loop_name]['Exchange'] = self::RETRY_ALCHEMY_EXCHANGE;
$this->queues[$loop_name]['BaseQ'] = $name; // link loopq back to baseq
}
}
// inject conf values
foreach($conf->get(['workers', 'queues'], []) as $name => $settings) {
if(!isset($this->queues[$name])) {
throw new Exception(sprintf('undefined queue "%s" in conf', $name));
}
$this->queues[$name] = array_merge($this->queues[$name], $settings);
}
}
// private function addQueue(string $name, string $Qtype, string $exchange, string $baseq=null)
// {
// $this->queues[$name]['Name'] = $name;
// $this->queues[$name]['QType'] = $Qtype;
// $this->queues[$name]['Exchange'] = $exchange;
// $this->queues[$name]['BaseQ'] = $baseq; // link back to baseq
// }
// public function getQueueNames()
// {
// return array_keys($this->queues);
// }
public function getBaseQueueNames()
{
$keys = array_keys(self::MESSAGES);
asort($keys);
return $keys;
}
public function isBaseQueue(string $queueName)
{
return array_key_exists($queueName, self::MESSAGES);
}
public function getBaseQueueName(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName);
return $q['Name'];
}
public function hasRetryQueue(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName);
return array_key_exists('RetryQ', $q);
}
public function getRetryQueueName(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName, 'RetryQ');
return $q['Name'];
}
public function getMaxRetry(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName);
return $q['max_retry'];
}
public function getTTLRetry(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName);
return $q['ttl_retry'];
}
public function hasDelayedQueue(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName);
return array_key_exists('DelayedQ', $q);
}
public function getDelayedQueueName(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName, 'DelayedQ');
return $q['Name'];
}
public function getTTLDelayed(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName);
return $q['ttl_delayed'];
}
public function getFailedQueueName(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName, 'FailedQ');
return $q['Name'];
}
public function hasLoopQueue(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName);
return array_key_exists('LoopQ', $q);
}
public function getLoopQueueName(string $baseQueueName)
{
$q = $this->getQueue($baseQueueName, 'LoopQ');
return $q['Name'];
}
public function getExchange(string $queueName)
{
$q = $this->getQueue($queueName);
return $q['Exchange'];
}
private function getQueue(string $queueName, string $subQueueKey = null)
{
if(!array_key_exists($queueName, $this->queues)) {
throw new Exception(sprintf('undefined queue "%s"', $queueName));
}
if($subQueueKey && !array_key_exists($subQueueKey, $this->queues[$queueName])) {
throw new Exception(sprintf('base queue "%s" has no "%s"', $queueName, $subQueueKey));
}
return $subQueueKey ? $this->queues[$this->queues[$queueName][$subQueueKey]] : $this->queues[$queueName];
} }
public function getConnection() public function getConnection()
@@ -108,8 +310,9 @@ class AMQPConnection
$this->hostConfig['vhost'] $this->hostConfig['vhost']
); );
} catch (\Exception $e) { }
catch (Exception $e) {
// no-op
} }
} }
@@ -127,7 +330,8 @@ class AMQPConnection
} }
return null; return null;
} else { }
else {
return $this->channel; return $this->channel;
} }
} }
@@ -156,107 +360,163 @@ class AMQPConnection
$this->declareExchange(); $this->declareExchange();
} }
if (isset(self::$defaultRetryQueues[$queueName])) { $queue = $this->queues[$queueName];
$this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ switch($queue['QType']) {
'x-dead-letter-exchange' => self::RETRY_ALCHEMY_EXCHANGE, // the exchange to which republish a 'dead' message case self::BASE_QUEUE_WITH_RETRY:
'x-dead-letter-routing-key' => self::$defaultRetryQueues[$queueName] // the routing key to apply to this 'dead' message $this->queue_declare_and_bind($queueName, self::ALCHEMY_EXCHANGE, [
])); 'x-dead-letter-exchange' => self::RETRY_ALCHEMY_EXCHANGE, // the exchange to which republish a 'dead' message
'x-dead-letter-routing-key' => $queue['RetryQ'] // the routing key to apply to this 'dead' message
$this->channel->queue_bind($queueName, self::ALCHEMY_EXCHANGE, $queueName); ]);
$this->setQueue($queue['RetryQ']);
// declare also the corresponding retry queue break;
// use this to delay the delivery of a message to the alchemy-exchange case self::BASE_QUEUE_WITH_LOOP:
$this->channel->queue_declare(self::$defaultRetryQueues[$queueName], false, true, false, false, false, new AMQPTable([ $this->queue_declare_and_bind($queueName, self::ALCHEMY_EXCHANGE, [
'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, 'x-dead-letter-exchange' => self::RETRY_ALCHEMY_EXCHANGE, // the exchange to which republish a 'dead' message
'x-dead-letter-routing-key' => $queueName, 'x-dead-letter-routing-key' => $queue['LoopQ'] // the routing key to apply to this 'dead' message
'x-message-ttl' => $this->getTtlRetryPerRouting($queueName) ]);
])); $this->setQueue($queue['LoopQ']);
break;
$this->channel->queue_bind(self::$defaultRetryQueues[$queueName], AMQPConnection::RETRY_ALCHEMY_EXCHANGE, self::$defaultRetryQueues[$queueName]); case self::LOOP_QUEUE:
case self::RETRY_QUEUE:
} elseif (in_array($queueName, self::$defaultRetryQueues)) { $this->queue_declare_and_bind($queueName, self::RETRY_ALCHEMY_EXCHANGE, [
// if it's a retry queue 'x-dead-letter-exchange' => self::ALCHEMY_EXCHANGE,
$routing = array_search($queueName, AMQPConnection::$defaultRetryQueues); 'x-dead-letter-routing-key' => $queue['BaseQ'],
$this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ 'x-message-ttl' => $this->queues[$queue['BaseQ']]['ttl_retry']
'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, ]);
'x-dead-letter-routing-key' => $routing, break;
'x-message-ttl' => $this->getTtlRetryPerRouting($routing) case self::DELAYED_QUEUE:
])); $this->queue_declare_and_bind($queueName, self::RETRY_ALCHEMY_EXCHANGE, [
'x-dead-letter-exchange' => self::ALCHEMY_EXCHANGE,
$this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); 'x-dead-letter-routing-key' => $queue['BaseQ'],
} elseif (in_array($queueName, self::$defaultFailedQueues)) { 'x-message-ttl' => $this->queues[$queue['BaseQ']]['ttl_delayed']
// if it's a failed queue ]);
$this->channel->queue_declare($queueName, false, true, false, false, false); break;
case self::FAILED_QUEUE:
$this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); $this->queue_declare_and_bind($queueName, self::RETRY_ALCHEMY_EXCHANGE);
} elseif (in_array($queueName, self::$defaultDelayedQueues)) { break;
// if it's a delayed queue case self::BASE_QUEUE:
$routing = array_search($queueName, AMQPConnection::$defaultDelayedQueues); $this->queue_declare_and_bind($queueName, self::ALCHEMY_EXCHANGE);
$this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ break;
'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, default:
'x-dead-letter-routing-key' => $routing, throw new \Exception(sprintf('undefined q type "%s', $queueName));
'x-message-ttl' => $this->getTtlDelayedPerRouting($routing) break;
]));
$this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName);
} else {
$this->channel->queue_declare($queueName, false, true, false, false, false);
$this->channel->queue_bind($queueName, AMQPConnection::ALCHEMY_EXCHANGE, $queueName);
} }
return $this->channel; return $this->channel;
} }
public function reinitializeQueue(array $queuNames) private function queue_declare_and_bind(string $name, string $exchange, array $arguments = null)
{
$this->channel->queue_declare(
$name,
false, true, false, false, false,
$arguments ? new AMQPTable($arguments) : null
);
$this->channel->queue_bind($name, $exchange, $name);
}
/**
* purge some queues, delete related retry-q
* nb: called by admin/purgeQueuAction, so a q may be __any kind__ - not only base-types !
*
* @param array $queueNames
* @throws Exception
*/
public function reinitializeQueue(array $queueNames)
{ {
if (!isset($this->channel)) { if (!isset($this->channel)) {
$this->getChannel(); $this->getChannel();
$this->declareExchange(); $this->declareExchange();
} }
foreach ($queuNames as $queuName) {
if (in_array($queuName, self::$defaultQueues)) { foreach ($queueNames as $queueName) {
$this->channel->queue_purge($queuName); // re-inject conf values (some may have changed)
} else { $settings = $this->conf->get(['workers', 'queues', $queueName], []);
$this->channel->queue_delete($queuName); if(array_key_exists($queueName, $this->queues)) {
$this->queues[$queueName] = array_merge($this->queues[$queueName], $settings);
} }
if (isset(self::$defaultRetryQueues[$queuName])) { if(array_key_exists($queueName, self::MESSAGES)) {
$this->channel->queue_delete(self::$defaultRetryQueues[$queuName]); // base-q
$this->purgeQueue($queueName);
if($this->hasRetryQueue($queueName)) {
$this->deleteQueue($this->getRetryQueueName($queueName));
}
if($this->hasLoopQueue($queueName)) {
$this->deleteQueue($this->getLoopQueueName($queueName));
}
}
else {
// retry, delayed, loop, ... q
$this->deleteQueue($queueName);
} }
$this->setQueue($queuName); $this->setQueue($queueName);
}
}
/**
* delete a queue, fails silently if the q does not exists
*
* @param $queueName
*/
public function deleteQueue($queueName)
{
if (!isset($this->channel)) {
$this->getChannel();
$this->declareExchange();
}
try {
$this->channel->queue_delete($queueName);
}
catch(\Exception $e) {
// no-op
}
}
/**
* purge a queue, fails silently if the q does not exists
*
* @param $queueName
*/
public function purgeQueue($queueName)
{
if (!isset($this->channel)) {
$this->getChannel();
$this->declareExchange();
}
try {
$this->channel->queue_purge($queueName);
}
catch(\Exception $e) {
// no-op
} }
} }
/** /**
* Get queueName, messageCount, consumerCount of queues * Get queueName, messageCount, consumerCount of queues
* @return array * @return array
* @throws Exception
*/ */
public function getQueuesStatus() public function getQueuesStatus()
{ {
$queuesList = array_merge(
array_values(self::$defaultQueues),
array_values(self::$defaultDelayedQueues),
array_values(self::$defaultRetryQueues),
array_values(self::$defaultFailedQueues)
);
$this->getChannel(); $this->getChannel();
$queuesStatus = []; $queuesStatus = [];
foreach ($queuesList as $queue) { foreach($this->queues as $name => $queue) {
$this->setQueue($queue); $this->setQueue($name); // todo : BASE_QUEUE_WITH_RETRY will set both BASE and RETRY Q, so we should skip one of 2
list($queueName, $messageCount, $consumerCount) = $this->channel->queue_declare($queue, true);
$status['queueName'] = $queueName; list($queueName, $messageCount, $consumerCount) = $this->channel->queue_declare($name, true);
$status['messageCount'] = $messageCount; $queuesStatus[$queueName] = [
$status['consumerCount'] = $consumerCount; 'queueName' => $queueName,
'messageCount' => $messageCount,
$queuesStatus[] = $status; 'consumerCount' => $consumerCount
unset($status); ];
} }
ksort($queuesStatus);
return $queuesStatus; return $queuesStatus;
} }
@@ -265,58 +525,4 @@ class AMQPConnection
$this->channel->close(); $this->channel->close();
$this->connection->close(); $this->connection->close();
} }
/**
* @param $routing
* @return int
*/
private function getTtlRetryPerRouting($routing)
{
$config = $this->conf->get(['workers']);
if ($routing == MessagePublisher::PULL_QUEUE &&
isset($config['pull_assets']) &&
isset($config['pull_assets']['pullInterval']) ) {
// convert in milli second
return (int)($config['pull_assets']['pullInterval']) * 1000;
} elseif ($routing == MessagePublisher::VALIDATION_REMINDER_QUEUE) {
if (isset($config['validationReminder']) &&
isset($config['validationReminder']['interval'])) {
// convert in milli second
return (int)($config['validationReminder']['interval']) * 1000;
}
// default value to 2 hour if not set
return (int) 7200 * 1000;
} elseif (isset($config['retry_queue']) &&
isset($config['retry_queue'][array_search($routing, AMQPConnection::$defaultQueues)])) {
return (int)($config['retry_queue'][array_search($routing, AMQPConnection::$defaultQueues)]);
}
if ($routing == MessagePublisher::FTP_QUEUE) {
return self::RETRY_LARGE_DELAY;
} else {
return self::RETRY_DELAY;
}
}
private function getTtlDelayedPerRouting($routing)
{
$delayed = [
MessagePublisher::METADATAS_QUEUE => 'delayedWriteMeta',
MessagePublisher::SUBDEF_QUEUE => 'delayedSubdef'
];
$config = $this->conf->get(['workers']);
if (isset($config['retry_queue']) && isset($config['retry_queue'][$delayed[$routing]])) {
return (int)$config['retry_queue'][$delayed[$routing]];
}
return self::DELAY;
}
} }

View File

@@ -4,6 +4,7 @@ namespace Alchemy\Phrasea\WorkerManager\Queue;
use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool; use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker; use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker;
use Exception;
use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable; use PhpAmqpLib\Wire\AMQPTable;
@@ -11,8 +12,6 @@ use Ramsey\Uuid\Uuid;
class MessageHandler class MessageHandler
{ {
const MAX_OF_TRY = 3;
private $messagePublisher; private $messagePublisher;
public function __construct(MessagePublisher $messagePublisher) public function __construct(MessagePublisher $messagePublisher)
@@ -20,27 +19,37 @@ class MessageHandler
$this->messagePublisher = $messagePublisher; $this->messagePublisher = $messagePublisher;
} }
public function consume(AMQPConnection $serverConnection, WorkerInvoker $workerInvoker, $argQueueName, $maxProcesses) /**
* called by WorkerExecuteCommand cli
*
* @param AMQPConnection $AMQPConnection
* @param WorkerInvoker $workerInvoker
* @param array|null $argQueueNames
* @param $maxProcesses
*/
public function consume(AMQPConnection $AMQPConnection, WorkerInvoker $workerInvoker, $argQueueNames, $maxProcesses)
{ {
$publisher = $this->messagePublisher;
$channel = $serverConnection->getChannel(); $channel = $AMQPConnection->getChannel();
if ($channel == null) { if ($channel == null) {
// todo : if there is no channel, can we push ?
$this->messagePublisher->pushLog("Can't connect to rabbit, check configuration!", "error"); $this->messagePublisher->pushLog("Can't connect to rabbit, check configuration!", "error");
return ; return ;
} }
$serverConnection->declareExchange(); $AMQPConnection->declareExchange();
// define consume callbacks // define consume callbacks
$callback = function (AMQPMessage $message) use ($channel, $workerInvoker, $publisher) { $publisher = $this->messagePublisher;
$callback = function (AMQPMessage $message) use ($AMQPConnection, $channel, $workerInvoker, $publisher) {
$data = json_decode($message->getBody(), true); $data = json_decode($message->getBody(), true);
$count = 0; $count = 0;
$headers = null;
if ($message->has('application_headers')) { if ($message->has('application_headers')) {
/** @var AMQPTable $headers */ /** @var AMQPTable $headers */
$headers = $message->get('application_headers'); $headers = $message->get('application_headers');
@@ -49,9 +58,10 @@ class MessageHandler
if (isset($headerData['x-death'])) { if (isset($headerData['x-death'])) {
$xDeathHeader = $headerData['x-death']; $xDeathHeader = $headerData['x-death'];
// todo : if there are more than 1 xdeath ? what is $count ?
foreach ($xDeathHeader as $xdeath) { foreach ($xDeathHeader as $xdeath) {
$queue = $xdeath['queue']; $queue = $xdeath['queue'];
if (!in_array($queue, AMQPConnection::$defaultQueues)) { if (!$AMQPConnection->isBaseQueue($queue)) {
continue; continue;
} }
@@ -61,51 +71,45 @@ class MessageHandler
} }
} }
// if message is yet executed 3 times, save the unprocessed message in the corresponding failed queues $msgType = $data['message_type'];
if ($count > self::MAX_OF_TRY && !in_array($data['message_type'], AMQPConnection::$defaultLoopTypes)) {
$this->messagePublisher->publishFailedMessage($data['payload'], $headers, AMQPConnection::$defaultFailedQueues[$data['message_type']]);
$logMessage = sprintf("Rabbit message executed 3 times, it's to be saved in %s , payload >>> %s", if($count > $AMQPConnection->getMaxRetry($msgType) && !$AMQPConnection->hasLoopQueue($msgType)) {
AMQPConnection::$defaultFailedQueues[$data['message_type']], $publisher->publishFailedMessage($data['payload'], $headers, $data['message_type']);
$logMessage = sprintf("Rabbit message executed %s times, it's to be saved in %s , payload >>> %s",
$count,
$msgType,
json_encode($data['payload']) json_encode($data['payload'])
); );
$this->messagePublisher->pushLog($logMessage); $publisher->pushLog($logMessage);
$channel->basic_ack($message->delivery_info['delivery_tag']); $channel->basic_ack($message->delivery_info['delivery_tag']);
} else { }
else {
try { try {
$workerInvoker->invokeWorker($data['message_type'], json_encode($data['payload'])); $workerInvoker->invokeWorker($msgType, json_encode($data['payload']));
if (in_array($data['message_type'], AMQPConnection::$defaultLoopTypes)) { if ($AMQPConnection->hasLoopQueue($msgType)) {
// make a loop for the loop type // make a loop for the loop type
$channel->basic_nack($message->delivery_info['delivery_tag']); $channel->basic_nack($message->delivery_info['delivery_tag']);
} else { } else {
$channel->basic_ack($message->delivery_info['delivery_tag']); $channel->basic_ack($message->delivery_info['delivery_tag']);
} }
$oldPayload = $data['payload']; $publisher->pushLog(
$message = $data['message_type'].' to be consumed! >> Payload ::'. json_encode($oldPayload); sprintf('"%s" to be consumed! >> Payload :: %s', $msgType, json_encode($data['payload']))
);
$publisher->pushLog($message); }
} catch (\Exception $e) { catch (Exception $e) {
$channel->basic_nack($message->delivery_info['delivery_tag']); $channel->basic_nack($message->delivery_info['delivery_tag']);
} }
} }
}; };
$prefetchCount = ProcessPool::MAX_PROCESSES; $prefetchCount = $maxProcesses ? $maxProcesses : ProcessPool::MAX_PROCESSES;
foreach($AMQPConnection->getBaseQueueNames() as $queueName) {
if ($maxProcesses) { if (!$argQueueNames || in_array($queueName, $argQueueNames)) {
$prefetchCount = $maxProcesses; $this->runConsumer($queueName, $AMQPConnection, $channel, $prefetchCount, $callback);
}
foreach (AMQPConnection::$defaultQueues as $queueName) {
if ($argQueueName ) {
if (in_array($queueName, $argQueueName)) {
$this->runConsumer($queueName, $serverConnection, $channel, $prefetchCount, $callback);
}
} else {
$this->runConsumer($queueName, $serverConnection, $channel, $prefetchCount, $callback);
} }
} }
} }
@@ -114,9 +118,10 @@ class MessageHandler
{ {
$serverConnection->setQueue($queueName); $serverConnection->setQueue($queueName);
// todo : remove this if !!! move code to a generic place
// initialize validation reminder when starting consumer // initialize validation reminder when starting consumer
if ($queueName == MessagePublisher::VALIDATION_REMINDER_QUEUE) { if ($queueName == MessagePublisher::VALIDATION_REMINDER_TYPE) {
$serverConnection->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_QUEUE]); $serverConnection->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_TYPE]);
$this->messagePublisher->initializeLoopQueue(MessagePublisher::VALIDATION_REMINDER_TYPE); $this->messagePublisher->initializeLoopQueue(MessagePublisher::VALIDATION_REMINDER_TYPE);
} }

View File

@@ -2,6 +2,8 @@
namespace Alchemy\Phrasea\WorkerManager\Queue; namespace Alchemy\Phrasea\WorkerManager\Queue;
use DateTime;
use DateTimeZone;
use Monolog\Logger; use Monolog\Logger;
use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable; use PhpAmqpLib\Wire\AMQPTable;
@@ -28,107 +30,96 @@ class MessagePublisher
const MAIN_QUEUE_TYPE = 'mainQueue'; const MAIN_QUEUE_TYPE = 'mainQueue';
const MAIN_QUEUE = 'main-queue';
const SUBTITLE_QUEUE = 'subtitle-queue';
// ** ** \\
// worker queue to be consumed, when no ack , it is requeued to the retry queue
const ASSETS_INGEST_QUEUE = 'ingest-queue';
const CREATE_RECORD_QUEUE = 'createrecord-queue';
const DELETE_RECORD_QUEUE = 'deleterecord-queue';
const EXPORT_QUEUE = 'export-queue';
const EXPOSE_UPLOAD_QUEUE = 'exposeupload-queue';
const FTP_QUEUE = 'ftp-queue';
const METADATAS_QUEUE = 'metadatas-queue';
const POPULATE_INDEX_QUEUE = 'populateindex-queue';
const PULL_QUEUE = 'pull-queue';
const RECORD_EDIT_QUEUE = 'recordedit-queue';
const SUBDEF_QUEUE = 'subdef-queue';
const VALIDATION_REMINDER_QUEUE = 'validationReminder-queue';
const WEBHOOK_QUEUE = 'webhook-queue';
// retry queue
// we can use these retry queue with TTL, so when message expires it is requeued to the corresponding worker queue
const RETRY_ASSETS_INGEST_QUEUE = 'retry-ingest-queue';
const RETRY_CREATE_RECORD_QUEUE = 'retry-createrecord-queue';
const RETRY_EXPORT_QUEUE = 'retry-export-queue';
const RETRY_FTP_QUEUE = 'retry-ftp-queue';
const RETRY_METADATAS_QUEUE = 'retry-metadatas-queue';
const RETRY_POPULATE_INDEX_QUEUE = 'retry-populateindex-queue';
const RETRY_SUBDEF_QUEUE = 'retry-subdef-queue';
const RETRY_WEBHOOK_QUEUE = 'retry-webhook-queue';
// use those queue to make a loop on a consumer
const LOOP_PULL_QUEUE = 'loop-pull-queue';
const LOOP_VALIDATION_REMINDER_QUEUE = 'loop-validationReminder-queue';
// all failed queue, if message is treated over 3 times it goes to the failed queue
const FAILED_ASSETS_INGEST_QUEUE = 'failed-ingest-queue';
const FAILED_CREATE_RECORD_QUEUE = 'failed-createrecord-queue';
const FAILED_EXPORT_QUEUE = 'failed-export-queue';
const FAILED_FTP_QUEUE = 'failed-ftp-queue';
const FAILED_METADATAS_QUEUE = 'failed-metadatas-queue';
const FAILED_POPULATE_INDEX_QUEUE = 'failed-populateindex-queue';
const FAILED_SUBDEF_QUEUE = 'failed-subdef-queue';
const FAILED_WEBHOOK_QUEUE = 'failed-webhook-queue';
// delayed queue when record is locked
const DELAYED_SUBDEF_QUEUE = 'delayed-subdef-queue';
const DELAYED_METADATAS_QUEUE = 'delayed-metadatas-queue';
const NEW_RECORD_MESSAGE = 'newrecord'; const NEW_RECORD_MESSAGE = 'newrecord';
/** @var AMQPConnection $serverConnection */ /** @var AMQPConnection $AMQPConnection */
private $serverConnection; private $AMQPConnection;
/** @var Logger */ /** @var Logger */
private $logger; private $logger;
public function __construct(AMQPConnection $serverConnection, LoggerInterface $logger) public function __construct(AMQPConnection $AMQPConnection, LoggerInterface $logger)
{ {
$this->serverConnection = $serverConnection; $this->AMQPConnection = $AMQPConnection;
$this->logger = $logger; $this->logger = $logger;
} }
public function publishMessage(array $payload, $queueName, $retryCount = null, $workerMessage = '') public function publishMessage(array $payload, $queueName)
{ {
// add published timestamp to all message payload $this->AMQPConnection->getBaseQueueName($queueName); // just to throw an exception if q is undefined
$payload['payload']['published'] = time();
$msg = new AMQPMessage(json_encode($payload));
$routing = array_search($queueName, AMQPConnection::$defaultRetryQueues);
if (count($retryCount) && $routing != false) { $this->_publishMessage($payload, $queueName);
}
public function publishRetryMessage(array $payload, string $baseQueueName, $retryCount, $workerMessage)
{
$retryQ = $this->AMQPConnection->getRetryQueueName($baseQueueName);
$headers = null;
if(!is_null($retryCount)) {
// add a message header information // add a message header information
$headers = new AMQPTable([ $headers = new AMQPTable([
'x-death' => [ 'x-death' => [
[ [
'count' => $retryCount, 'count' => $retryCount,
'exchange' => AMQPConnection::ALCHEMY_EXCHANGE, 'exchange' => AMQPConnection::ALCHEMY_EXCHANGE,
'queue' => $routing, 'queue' => $baseQueueName,
'routing-keys' => $routing, 'routing-keys' => $baseQueueName,
'reason' => 'rejected', // rejected is sended like nack 'reason' => 'rejected', // rejected is sended like nack
'time' => new \DateTime('now', new \DateTimeZone('UTC')) 'time' => new DateTime('now', new DateTimeZone('UTC'))
] ]
], ],
'worker-message' => $workerMessage 'worker-message' => $workerMessage
]); ]);
}
$this->_publishMessage($payload, $retryQ, $headers);
}
public function publishDelayedMessage(array $payload, string $baseQueueName)
{
$delayedQ = $this->AMQPConnection->getDelayedQueueName($baseQueueName);
$this->_publishMessage($payload, $delayedQ);
}
public function publishFailedMessage(array $payload, AMQPTable $headers, $baseQueueName)
{
$FailedQ = $this->AMQPConnection->getFailedQueueName($baseQueueName);
$msg = new AMQPMessage(json_encode($payload));
$msg->set('application_headers', $headers);
$channel = $this->AMQPConnection->setQueue($FailedQ);
if ($channel == null) {
$this->pushLog("Can't connect to rabbit, check configuration!", "error");
return ;
}
$channel->basic_publish($msg, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $FailedQ);
$this->_publishMessage($payload, $FailedQ, $headers);
}
private function _publishMessage(array $payload, $queueName, $headers = null)
{
// add published timestamp to all message payload
$payload['payload']['published'] = time();
$msg = new AMQPMessage(json_encode($payload));
if (!is_null($headers)) {
// add a message header information
$msg->set('application_headers', $headers); $msg->set('application_headers', $headers);
} }
$channel = $this->serverConnection->setQueue($queueName); if (is_null( ($channel = $this->AMQPConnection->setQueue($queueName)) )) {
if ($channel == null) {
$this->pushLog("Can't connect to rabbit, check configuration!", "error"); $this->pushLog("Can't connect to rabbit, check configuration!", "error");
return true; return true;
} }
$exchange = in_array($queueName, AMQPConnection::$defaultQueues) ? AMQPConnection::ALCHEMY_EXCHANGE : AMQPConnection::RETRY_ALCHEMY_EXCHANGE; $exchange = $this->AMQPConnection->getExchange($queueName); // in_array($queueName, AMQPConnection::$defaultQueues) ? AMQPConnection::ALCHEMY_EXCHANGE : AMQPConnection::RETRY_ALCHEMY_EXCHANGE;
$channel->basic_publish($msg, $exchange, $queueName); $channel->basic_publish($msg, $exchange, $queueName);
return true; return true;
@@ -139,16 +130,16 @@ class MessagePublisher
$payload = [ $payload = [
'message_type' => $type, 'message_type' => $type,
'payload' => [ 'payload' => [
'initTimestamp' => new \DateTime('now', new \DateTimeZone('UTC')) 'initTimestamp' => new DateTime('now', new DateTimeZone('UTC'))
] ]
]; ];
$this->publishMessage($payload, AMQPConnection::$defaultQueues[$type]); $this->publishMessage($payload, $type);
} }
public function connectionClose() public function connectionClose()
{ {
$this->serverConnection->connectionClose(); $this->AMQPConnection->connectionClose();
} }
/** /**
@@ -163,18 +154,4 @@ class MessagePublisher
call_user_func(array($this->logger, $method), $message, $context); call_user_func(array($this->logger, $method), $message, $context);
} }
public function publishFailedMessage(array $payload, AMQPTable $headers, $queueName)
{
$msg = new AMQPMessage(json_encode($payload));
$msg->set('application_headers', $headers);
$channel = $this->serverConnection->setQueue($queueName);
if ($channel == null) {
$this->pushLog("Can't connect to rabbit, check configuration!", "error");
return ;
}
$channel->basic_publish($msg, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName);
}
} }

View File

@@ -24,6 +24,6 @@ class WebhookPublisher implements WebhookPublisherInterface
] ]
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::WEBHOOK_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::WEBHOOK_TYPE);
} }
} }

View File

@@ -33,7 +33,7 @@ class AssetsIngestSubscriber implements EventSubscriberInterface
'payload' => array_merge($event->getData(), ['type' => WorkerRunningJob::TYPE_PUSH]) 'payload' => array_merge($event->getData(), ['type' => WorkerRunningJob::TYPE_PUSH])
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_TYPE);
} }
public function onAssetsCreationFailure(AssetsCreationFailureEvent $event) public function onAssetsCreationFailure(AssetsCreationFailureEvent $event)
@@ -43,9 +43,9 @@ class AssetsIngestSubscriber implements EventSubscriberInterface
'payload' => $event->getPayload() 'payload' => $event->getPayload()
]; ];
$this->messagePublisher->publishMessage( $this->messagePublisher->publishRetryMessage(
$payload, $payload,
MessagePublisher::RETRY_ASSETS_INGEST_QUEUE, MessagePublisher::ASSETS_INGEST_TYPE,
$event->getCount(), $event->getCount(),
$event->getWorkerMessage() $event->getWorkerMessage()
); );
@@ -84,9 +84,9 @@ class AssetsIngestSubscriber implements EventSubscriberInterface
} }
} }
$this->messagePublisher->publishMessage( $this->messagePublisher->publishRetryMessage(
$payload, $payload,
MessagePublisher::RETRY_CREATE_RECORD_QUEUE, MessagePublisher::CREATE_RECORD_TYPE, // todo
$event->getCount(), $event->getCount(),
$event->getWorkerMessage() $event->getWorkerMessage()
); );

View File

@@ -32,7 +32,7 @@ class ExportSubscriber implements EventSubscriberInterface
] ]
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::EXPORT_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::EXPORT_MAIL_TYPE);
} }
public function onExportMailFailure(ExportMailFailureEvent $event) public function onExportMailFailure(ExportMailFailureEvent $event)
@@ -47,9 +47,9 @@ class ExportSubscriber implements EventSubscriberInterface
] ]
]; ];
$this->messagePublisher->publishMessage( $this->messagePublisher->publishRetryMessage(
$payload, $payload,
MessagePublisher::RETRY_EXPORT_QUEUE, MessagePublisher::EXPORT_MAIL_TYPE,
$event->getCount(), $event->getCount(),
$event->getWorkerMessage() $event->getWorkerMessage()
); );
@@ -66,7 +66,7 @@ class ExportSubscriber implements EventSubscriberInterface
$this->messagePublisher->publishMessage( $this->messagePublisher->publishMessage(
$payload, $payload,
MessagePublisher::FTP_QUEUE MessagePublisher::FTP_TYPE
); );
} }

View File

@@ -36,7 +36,7 @@ class ExposeSubscriber implements EventSubscriberInterface
] ]
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::EXPOSE_UPLOAD_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::EXPOSE_UPLOAD_TYPE);
} }
} }

View File

@@ -5,7 +5,6 @@ namespace Alchemy\Phrasea\WorkerManager\Subscriber;
use Alchemy\Phrasea\Application; use Alchemy\Phrasea\Application;
use Alchemy\Phrasea\Core\Event\Record\DeletedEvent; use Alchemy\Phrasea\Core\Event\Record\DeletedEvent;
use Alchemy\Phrasea\Core\Event\Record\DeleteEvent; use Alchemy\Phrasea\Core\Event\Record\DeleteEvent;
use Alchemy\Phrasea\Core\Event\Record\MetadataChangedEvent;
use Alchemy\Phrasea\Core\Event\Record\RecordEvent; use Alchemy\Phrasea\Core\Event\Record\RecordEvent;
use Alchemy\Phrasea\Core\Event\Record\RecordEvents; use Alchemy\Phrasea\Core\Event\Record\RecordEvents;
use Alchemy\Phrasea\Core\Event\Record\SubdefinitionCreateEvent; use Alchemy\Phrasea\Core\Event\Record\SubdefinitionCreateEvent;
@@ -67,7 +66,7 @@ class RecordSubscriber implements EventSubscriberInterface
] ]
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::SUBDEF_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::SUBDEF_CREATION_TYPE);
} }
} }
} }
@@ -87,7 +86,7 @@ class RecordSubscriber implements EventSubscriberInterface
] ]
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::DELETE_RECORD_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::DELETE_RECORD_TYPE);
} }
public function onSubdefinitionCreationFailure(SubdefinitionCreationFailureEvent $event) public function onSubdefinitionCreationFailure(SubdefinitionCreationFailureEvent $event)
@@ -128,9 +127,9 @@ class RecordSubscriber implements EventSubscriberInterface
} }
} }
$this->messagePublisher->publishMessage( $this->messagePublisher->publishRetryMessage(
$payload, $payload,
MessagePublisher::RETRY_SUBDEF_QUEUE, MessagePublisher::SUBDEF_CREATION_TYPE,
$event->getCount(), $event->getCount(),
$event->getWorkerMessage() $event->getWorkerMessage()
); );
@@ -170,18 +169,19 @@ class RecordSubscriber implements EventSubscriberInterface
]; ];
if ($subdef->is_physically_present()) { if ($subdef->is_physically_present()) {
$this->messagePublisher->publishMessage($payload, MessagePublisher::METADATAS_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::WRITE_METADATAS_TYPE);
} else { }
$logMessage = sprintf("Subdef %s is not physically present! to be passed in the %s ! payload >>> %s", else {
$logMessage = sprintf('Subdef "%s" is not physically present! to be passed in the retry q of "%s" ! payload >>> %s',
$subdef->get_name(), $subdef->get_name(),
MessagePublisher::RETRY_METADATAS_QUEUE, MessagePublisher::WRITE_METADATAS_TYPE,
json_encode($payload) json_encode($payload)
); );
$this->messagePublisher->pushLog($logMessage); $this->messagePublisher->pushLog($logMessage);
$this->messagePublisher->publishMessage( $this->messagePublisher->publishRetryMessage(
$payload, $payload,
MessagePublisher::RETRY_METADATAS_QUEUE, MessagePublisher::WRITE_METADATAS_TYPE,
2, 2,
'Subdef is not physically present!' 'Subdef is not physically present!'
); );
@@ -215,10 +215,10 @@ class RecordSubscriber implements EventSubscriberInterface
] ]
]; ];
$logMessage = sprintf("Subdef %s write meta failed, error : %s ! to be passed in the %s ! payload >>> %s", $logMessage = sprintf('Subdef "%s" write meta failed, error : "%s" ! to be passed in the retry q of "%s" ! payload >>> %s',
$event->getSubdefName(), $event->getSubdefName(),
$event->getWorkerMessage(), $event->getWorkerMessage(),
MessagePublisher::RETRY_METADATAS_QUEUE, MessagePublisher::WRITE_METADATAS_TYPE,
json_encode($payload) json_encode($payload)
); );
$this->messagePublisher->pushLog($logMessage); $this->messagePublisher->pushLog($logMessage);
@@ -248,9 +248,9 @@ class RecordSubscriber implements EventSubscriberInterface
} }
} }
$this->messagePublisher->publishMessage( $this->messagePublisher->publishRetryMessage(
$payload, $payload,
MessagePublisher::RETRY_METADATAS_QUEUE, MessagePublisher::WRITE_METADATAS_TYPE,
$event->getCount(), $event->getCount(),
$event->getWorkerMessage() $event->getWorkerMessage()
); );
@@ -276,7 +276,7 @@ class RecordSubscriber implements EventSubscriberInterface
] ]
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::METADATAS_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::WRITE_METADATAS_TYPE);
} }
} }

View File

@@ -40,7 +40,7 @@ class SearchengineSubscriber implements EventSubscriberInterface
] ]
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::POPULATE_INDEX_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::POPULATE_INDEX_TYPE);
} }
} }
@@ -83,9 +83,9 @@ class SearchengineSubscriber implements EventSubscriberInterface
} }
} }
$this->messagePublisher->publishMessage( $this->messagePublisher->publishRetryMessage(
$payload, $payload,
MessagePublisher::RETRY_POPULATE_INDEX_QUEUE, MessagePublisher::POPULATE_INDEX_TYPE,
$event->getCount(), $event->getCount(),
$event->getWorkerMessage() $event->getWorkerMessage()
); );

View File

@@ -65,7 +65,7 @@ class SubtitleSubscriber implements EventSubscriberInterface
'payload' => $data 'payload' => $data
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::MAIN_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::MAIN_QUEUE_TYPE);
} catch (\Exception $e) { } catch (\Exception $e) {
$em->rollback(); $em->rollback();
} }

View File

@@ -29,9 +29,9 @@ class WebhookSubscriber implements EventSubscriberInterface
] ]
]; ];
$this->messagePublisher->publishMessage( $this->messagePublisher->publishRetryMessage(
$payload, $payload,
MessagePublisher::RETRY_WEBHOOK_QUEUE, MessagePublisher::WEBHOOK_TYPE,
$event->getCount(), $event->getCount(),
$event->getWorkerMessage() $event->getWorkerMessage()
); );

View File

@@ -2,8 +2,8 @@
namespace Alchemy\Phrasea\WorkerManager\Worker; namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\EntityManagerAware;
use Alchemy\Phrasea\Application as PhraseaApplication; use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\Application\Helper\EntityManagerAware;
use Alchemy\Phrasea\Model\Entities\StoryWZ; use Alchemy\Phrasea\Model\Entities\StoryWZ;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\UserRepository; use Alchemy\Phrasea\Model\Repositories\UserRepository;
@@ -80,7 +80,7 @@ class AssetsIngestWorker implements WorkerInterface
'commit_id' => $payload['commit_id'] 'commit_id' => $payload['commit_id']
]; ];
$this->messagePublisher->publishMessage($createRecordMessage, MessagePublisher::CREATE_RECORD_QUEUE); $this->messagePublisher->publishMessage($createRecordMessage, MessagePublisher::CREATE_RECORD_TYPE);
/** @var WorkerRunningJob $workerRunningJob */ /** @var WorkerRunningJob $workerRunningJob */
$workerRunningJob = $this->repoWorkerJob->findOneBy([ $workerRunningJob = $this->repoWorkerJob->findOneBy([

View File

@@ -5,7 +5,6 @@ namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application; use Alchemy\Phrasea\Application;
use Alchemy\Phrasea\Application\Helper\NotifierAware; use Alchemy\Phrasea\Application\Helper\NotifierAware;
use Alchemy\Phrasea\Core\LazyLocator; use Alchemy\Phrasea\Core\LazyLocator;
use Alchemy\Phrasea\Exception\InvalidArgumentException;
use Alchemy\Phrasea\Model\Entities\FtpExport; use Alchemy\Phrasea\Model\Entities\FtpExport;
use Alchemy\Phrasea\Model\Entities\FtpExportElement; use Alchemy\Phrasea\Model\Entities\FtpExportElement;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
@@ -371,9 +370,9 @@ class FtpWorker implements WorkerInterface
'payload' => $payload 'payload' => $payload
]; ];
$this->app['alchemy_worker.message.publisher']->publishMessage( $this->getMessagePublisher()->publishRetryMessage(
$fullPayload, $fullPayload,
MessagePublisher::RETRY_FTP_QUEUE, MessagePublisher::FTP_TYPE,
$count, $count,
$workerMessage $workerMessage
); );
@@ -503,4 +502,13 @@ class FtpWorker implements WorkerInterface
{ {
return $this->app['repo.ftp-exports']; return $this->app['repo.ftp-exports'];
} }
/**
* @return MessagePublisher
*/
private function getMessagePublisher()
{
return $this->app['alchemy_worker.message.publisher'];
}
} }

View File

@@ -29,7 +29,7 @@ class MainQueueWorker implements WorkerInterface
switch ($payload['type']) { switch ($payload['type']) {
case MessagePublisher::SUBTITLE_TYPE: case MessagePublisher::SUBTITLE_TYPE:
$queue = MessagePublisher::SUBTITLE_QUEUE; $queue = MessagePublisher::SUBTITLE_TYPE;
$messageType = $payload['type']; $messageType = $payload['type'];
unset($payload['type']); unset($payload['type']);

View File

@@ -82,7 +82,7 @@ class PullAssetsWorker implements WorkerInterface
] ]
]; ];
$this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE); $this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_TYPE);
} }
} }

View File

@@ -75,7 +75,7 @@ class SubdefCreationWorker implements WorkerInterface
if (!$canCreateSubdef) { if (!$canCreateSubdef) {
// the file is in used to write meta // the file is in used to write meta
$this->messagePublisher->publishMessage($message, MessagePublisher::DELAYED_SUBDEF_QUEUE); $this->messagePublisher->publishDelayedMessage($message, MessagePublisher::SUBDEF_CREATION_TYPE);
return ; return ;
} }
@@ -178,7 +178,10 @@ class SubdefCreationWorker implements WorkerInterface
// checking ended // checking ended
// order to write meta for the subdef if needed // order to write meta for the subdef if needed
$this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent($record, $payload['subdefName'])); $this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent(
$record,
$payload['subdefName'])
);
$this->subdefGenerator->setLogger($oldLogger); $this->subdefGenerator->setLogger($oldLogger);

View File

@@ -170,7 +170,11 @@ class WebhookWorker implements WorkerInterface
$this->messagePublisher->pushLog($workerMessage); $this->messagePublisher->pushLog($workerMessage);
// count = 0 mean do not retry because no api application defined // count = 0 mean do not retry because no api application defined
$this->dispatch(WorkerEvents::WEBHOOK_DELIVER_FAILURE, new WebhookDeliverFailureEvent($webhookevent->getId(), $workerMessage, 0)); $this->dispatch(WorkerEvents::WEBHOOK_DELIVER_FAILURE, new WebhookDeliverFailureEvent(
$webhookevent->getId(),
$workerMessage,
0)
);
return; return;
} }
@@ -235,7 +239,7 @@ class WebhookWorker implements WorkerInterface
$this->messagePublisher->publishFailedMessage( $this->messagePublisher->publishFailedMessage(
$payload, $payload,
new AMQPTable(['worker-message' => $e->getMessage()]), new AMQPTable(['worker-message' => $e->getMessage()]),
MessagePublisher::FAILED_WEBHOOK_QUEUE MessagePublisher::WEBHOOK_TYPE
); );
} }
} }

View File

@@ -73,7 +73,7 @@ class WriteMetadatasWorker implements WorkerInterface
if (!$canWriteMeta) { if (!$canWriteMeta) {
// the file is in used to generate subdef // the file is in used to generate subdef
$this->messagePublisher->publishMessage($message, MessagePublisher::DELAYED_METADATAS_QUEUE); $this->messagePublisher->publishDeleyedMessage($message, MessagePublisher::WRITE_METADATAS_TYPE);
return ; return ;
} }

View File

@@ -0,0 +1,119 @@
<?php
use Alchemy\Phrasea\Application;
use Alchemy\Phrasea\Core\Configuration\PropertyAccess;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
class patch_413_PHRAS_3282 implements patchInterface
{
const OLDQ2NEWQ_ttl_retry = [
'assetsIngest' => MessagePublisher::ASSETS_INGEST_TYPE,
'createRecord' => MessagePublisher::CREATE_RECORD_TYPE,
'deleteRecord' => MessagePublisher::DELETE_RECORD_TYPE,
'exportMail' => MessagePublisher::EXPORT_MAIL_TYPE,
'exposeUpload' => MessagePublisher::EXPOSE_UPLOAD_TYPE,
'ftp' => MessagePublisher::FTP_TYPE,
'populateIndex' => MessagePublisher::POPULATE_INDEX_TYPE,
'pullAssets' => MessagePublisher::PULL_ASSETS_TYPE,
'recordEdit' => MessagePublisher::RECORD_EDIT_TYPE,
'subdefCreation' => MessagePublisher::SUBDEF_CREATION_TYPE,
'validationReminder' => MessagePublisher::VALIDATION_REMINDER_TYPE,
'writeMetadatas' => MessagePublisher::WRITE_METADATAS_TYPE,
'webhook' => MessagePublisher::WEBHOOK_TYPE,
];
const OLDQ2NEWQ_ttl_delayed = [
'delayedSubdef' => MessagePublisher::SUBDEF_CREATION_TYPE,
'delayedWriteMeta' => MessagePublisher::WRITE_METADATAS_TYPE,
];
/** @var string */
private $release = '4.1.3';
/** @var array */
private $concern = [base::APPLICATION_BOX, base::DATA_BOX];
/**
* {@inheritdoc}
*/
public function get_release()
{
return $this->release;
}
/**
* {@inheritdoc}
*/
public function getDoctrineMigrations()
{
return [];
}
/**
* {@inheritdoc}
*/
public function require_all_upgrades()
{
return false;
}
/**
* {@inheritdoc}
*/
public function concern()
{
return $this->concern;
}
/**
* {@inheritdoc}
*/
public function apply(base $base, Application $app)
{
if ($base->get_base_type() === base::DATA_BOX) {
$this->patch_databox($base, $app);
}
elseif ($base->get_base_type() === base::APPLICATION_BOX) {
$this->patch_appbox($base, $app);
}
return true;
}
private function patch_databox(base $databox, Application $app)
{
}
private function patch_appbox(base $databox, Application $app)
{
/** @var PropertyAccess $conf */
$conf = $app['conf'];
// --------------------------------------------
// PHRAS-3282_refacto-some-code-on-workers_MASTER
// patch workers settings
// --------------------------------------------
foreach(self::OLDQ2NEWQ_ttl_retry as $old=>$new) {
if(($v = $conf->get(['workers', 'retry_queue', $old], null)) !== null) {
$conf->set(['workers', 'queues', $new, 'ttl_retry'], $v);
}
}
foreach(self::OLDQ2NEWQ_ttl_delayed as $old=>$new) {
if(($v = $conf->get(['workers', 'retry_queue', $old], null)) !== null) {
$conf->set(['workers', 'queues', $new, 'ttl_delayed'], $v);
}
}
if(($v = $conf->get(['workers', 'pull_assets', 'pullInterval'], null)) !== null) {
$conf->set(['workers', 'queues', MessagePublisher::PULL_ASSETS_TYPE, 'ttl_retry'], $v * 1000);
}
if(($v = $conf->get(['workers', 'validationReminder', 'interval'], null)) !== null) {
$conf->set(['workers', 'queues', MessagePublisher::VALIDATION_REMINDER_TYPE, 'ttl_retry'], $v * 1000);
}
$conf->remove(['workers', 'retry_queue']);
$conf->remove(['workers', 'pull_assets']);
$conf->remove(['workers', 'validationReminder']);
}
}

View File

@@ -10,7 +10,7 @@
{{ "admin::workermanager:tab:configuration: title" | trans }} {{ "admin::workermanager:tab:configuration: title" | trans }}
</a> </a>
</li> </li>
<li class="worker-info active" role="presentation"> <li class="worker-info" role="presentation">
<a href="#worker-info" aria-controls="worker-info" role="tab" data-toggle="tab" data-url="/admin/worker-manager/info"> <a href="#worker-info" aria-controls="worker-info" role="tab" data-toggle="tab" data-url="/admin/worker-manager/info">
{{ 'admin::workermanager:tab:workerinfo: title' |trans }} {{ 'admin::workermanager:tab:workerinfo: title' |trans }}
</a> </a>
@@ -56,9 +56,7 @@
<!-- Tab panes --> <!-- Tab panes -->
<div class="tab-content"> <div class="tab-content">
<div role="tabpanel" class="tab-pane fade" id="worker-configuration"></div> <div role="tabpanel" class="tab-pane fade" id="worker-configuration"></div>
<div role="tabpanel" class="tab-pane fade in active" id="worker-info"> <div role="tabpanel" class="tab-pane fade" id="worker-info"></div>
{% include "admin/worker-manager/worker_info.html.twig" %}
</div>
<div role="tabpanel" class="tab-pane fade" id="worker-searchengine"></div> <div role="tabpanel" class="tab-pane fade" id="worker-searchengine"></div>
<div role="tabpanel" class="tab-pane fade" id="worker-pull-assets"></div> <div role="tabpanel" class="tab-pane fade" id="worker-pull-assets"></div>
<div role="tabpanel" class="tab-pane fade" id="worker-ftp"></div> <div role="tabpanel" class="tab-pane fade" id="worker-ftp"></div>
@@ -101,6 +99,10 @@
}); });
} }
}); });
var sel = '#configurationTabs li.{{ _fragment }} a';
$(sel).click();
</script> </script>
{% else %} {% else %}
<h1 class="alert alert-danger"> <h1 class="alert alert-danger">

View File

@@ -6,53 +6,7 @@
<p>{{ 'admin::workermanager:tab:workerconfig: Set up the delay between two attempts per queue! (if not set, default 10000 millisecond)' |trans }}</p> <p>{{ 'admin::workermanager:tab:workerconfig: Set up the delay between two attempts per queue! (if not set, default 10000 millisecond)' |trans }}</p>
{{ form_start(form, {'action': path('worker_admin_configuration')}) }} {{ form_start(form, {'action': path('worker_admin_configuration')}) }}
<div class="control-group"> {{ form_widget(form) }}
{{ form_row(form.assetsIngest) }}
</div>
<div class="control-group">
{{ form_row(form.createRecord) }}
</div>
<div class="control-group">
{{ form_row(form.subdefCreation) }}
</div>
<div class="control-group">
{{ form_row(form.writeMetadatas) }}
</div>
<div class="control-group">
{{ form_row(form.webhook) }}
</div>
<div class="control-group">
{{ form_row(form.exportMail) }}
</div>
<div class="control-group">
{{ form_row(form.populateIndex) }}
</div>
<div class="control-group">
{{ form_row(form.ftp) }}
</div>
<h3> {{ 'admin::workermanager:tab:workerconfig: Config Worker queue delayed' |trans }}</h3>
<p> {{ 'admin::workermanager:tab:workerconfig: if not set ,default 5000 millisecond' |trans }}</p>
<div class="control-group">
{{ form_row(form.delayedSubdef) }}
</div>
<div class="control-group">
{{ form_row(form.delayedWriteMeta) }}
</div>
<div class="control-group">
<input type="submit" class="btn btn-primary" value={{ "admin::workermanager:tab:workerconfig:Apply in queue"|trans }} />
</div>
{{ form_end(form) }} {{ form_end(form) }}

View File

@@ -12,6 +12,7 @@
<th>{{ 'admin::workermanager:tab:queueMonitor: Message count' |trans }}</th> <th>{{ 'admin::workermanager:tab:queueMonitor: Message count' |trans }}</th>
<th>{{ 'admin::workermanager:tab:queueMonitor: Consumer count' |trans }}</th> <th>{{ 'admin::workermanager:tab:queueMonitor: Consumer count' |trans }}</th>
<th></th> <th></th>
<th></th>
</tr> </tr>
</thead> </thead>
<tbody class="queue-list"> <tbody class="queue-list">
@@ -25,6 +26,9 @@
<td> <td>
<button class="btn btn-danger btn-mini purge-queue" data-queue-name="{{ queueStatus.queueName }}">{{ 'admin::workermanager:tab:queueMonitor: Purge Queue' | trans }}</button> <button class="btn btn-danger btn-mini purge-queue" data-queue-name="{{ queueStatus.queueName }}">{{ 'admin::workermanager:tab:queueMonitor: Purge Queue' | trans }}</button>
</td> </td>
<td>
<button class="btn btn-danger btn-mini delete-queue" data-queue-name="{{ queueStatus.queueName }}">{{ 'admin::workermanager:tab:queueMonitor: Delete Queue' | trans }}</button>
</td>
</tr> </tr>
{% endfor %} {% endfor %}
@@ -60,5 +64,21 @@
} }
}); });
$("#worker-queue-monitor").on('click', '.delete-queue', function() {
if (confirm("Warning! Are you sure? Messages cannot be recovered after deleting queue.")) {
$.ajax({
type: "POST",
url: "/admin/worker-manager/delete-queue",
dataType: 'json',
data : {
queueName : $(this).attr("data-queue-name")
},
success: function (data) {
$("#refresh-monitor").trigger("click");
}
});
}
});
</script> </script>
{% endif %} {% endif %}

View File

@@ -1,5 +1,19 @@
<h1>{{ 'admin::workermanager:tab:Reminder: description' |trans }}</h1> <h1>{{ 'admin::workermanager:tab:Reminder: description' |trans }}</h1>
{{ form_start(form, {'action': path('worker_admin_validationReminder')}) }}
{{ form_widget(form) }}
<br/>
<br/>
<br/>
<div class="control-group">
<button type="submit" value="start" class="btn btn-primary start-validation-reminder">{{ "Start"|trans }}</button>
<button type="submit" value="stop" class="btn btn-danger stop-validation-reminder">{{ 'Stop' | trans }}</button>
</div>
{{ form_end(form) }}
<!--
<form name="worker_validation_reminder" method="post" action="/admin/worker-manager/validation-reminder"> <form name="worker_validation_reminder" method="post" action="/admin/worker-manager/validation-reminder">
<div class="control-group"> <div class="control-group">
<div> <div>
@@ -8,38 +22,64 @@
</div> </div>
</div> </div>
<div class="control-group">
<button class="btn btn-primary start-validation-reminder">{{ "admin::workermanager:tab:Reminder: Start"|trans }}</button>
<button class="btn btn-danger stop-validation-reminder">{{ 'admin::workermanager:tab:Reminder: Stop' | trans }}</button>
</div>
</form> </form>
-->
<script type="text/javascript"> <script type="text/javascript">
$("#worker-reminder").on('click', '.stop-validation-reminder', function(e) { $("#worker-reminder").on('click', 'BUTTON[type=submit]', function(e, z) {
e.preventDefault(); e.preventDefault();
if (confirm("Warning! You are about to stop validation Reminder!")) { var button = $(e.target);
$.ajax({ var form = button.closest("FORM");
type: "POST", $("INPUT.act", form).val(button.val()); // "save", "start" or "stop"
url: "/admin/worker-manager/purge-queue",
dataType: 'json',
data : {
queueName : "validationReminder-queue"
},
success: function (data) {
$('#tree li.selected a').trigger('click');
console.log(data);
return false; var ok = true;
} {% if(running) %}
}); if(button.val() === 'save') {
// saving will empty the queue, "stopping"
ok = confirm("Warning! Changing the ttl will stop validation Reminder!")
} }
{% endif %}
if(button.val() === 'stop') {
// saving will empty the queue, "stopping"
ok = confirm("Warning! You are about to stop validation Reminder!")
}
if(ok) {
form.submit();
}
// $('form[name="worker_validation_reminder"]').submit();
}); });
$("#worker-reminder").on('click', '.start-validation-reminder', function(e) { {% if(running) %}
e.preventDefault(); $("#worker-reminder .start-validation-reminder").hide();
$('form[name="worker_validation_reminder"]').submit(); $("#worker-reminder .stop-validation-reminder").show();
}); {% else %}
$("#worker-reminder .start-validation-reminder").show();
$("#worker-reminder .stop-validation-reminder").hide();
{% endif %}
// $("#worker-reminder").on('click', '.stop-validation-reminder', function(e) {
// e.preventDefault();
// if (confirm("Warning! You are about to stop validation Reminder!")) {
// $.ajax({
// type: "POST",
// url: "/admin/worker-manager/purge-queue",
// dataType: 'json',
// data : {
// queueName : "validationReminder-queue"
// },
// success: function (data) {
// $('#tree li.selected a').trigger('click');
// console.log(data);
//
// return false;
// }
// });
// }
// });
//
// $("#worker-reminder").on('click', '.start-validation-reminder', function(e) {
// e.preventDefault();
// $('form[name="worker_validation_reminder"]').submit();
// });
</script> </script>