diff --git a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php index 11dee7472c..02e91ffe76 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Controller/AdminConfigurationController.php @@ -21,7 +21,12 @@ class AdminConfigurationController extends Controller { public function indexAction(PhraseaApplication $app, Request $request) { - return $this->render('admin/worker-manager/index.html.twig'); + /** @var AMQPConnection $serverConnection */ + $serverConnection = $this->app['alchemy_worker.amqp.connection']; + + return $this->render('admin/worker-manager/index.html.twig', [ + 'isConnected' => ($serverConnection->getChannel() != null) ? true : false + ]); } /** @@ -48,6 +53,7 @@ class AdminConfigurationController extends Controller $serverConnection = $this->app['alchemy_worker.amqp.connection']; // change the queue TTL $serverConnection->reinitializeQueue($retryQueuesToReset); + $serverConnection->reinitializeQueue(AMQPConnection::$defaultDelayedQueues); return $app->redirectPath('worker_admin'); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerConfigurationType.php b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerConfigurationType.php index 3a990e8827..51a7368413 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerConfigurationType.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Form/WorkerConfigurationType.php @@ -34,6 +34,12 @@ class WorkerConfigurationType extends AbstractType ->add(MessagePublisher::POPULATE_INDEX_TYPE, 'text', [ 'label' => 'Populate Index retry delay in ms' ]) + ->add('delayedSubdef', 'text', [ + 'label' => 'Subdef delay in ms' + ]) + ->add('delayedWriteMeta', 'text', [ + 'label' => 'Write meta delay in ms' + ]) ; } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php index 6d3b9600f6..726eecbf1f 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php @@ -63,6 +63,9 @@ class AMQPConnection // default message TTL in retry queue in millisecond const RETRY_DELAY = 10000; + // default message TTL in delayed queue in millisecond + const DELAY = 5000; + public function __construct(PropertyAccess $conf) { $defaultConfiguration = [ @@ -150,7 +153,7 @@ class AMQPConnection $this->channel->queue_declare(self::$defaultRetryQueues[$queueName], false, true, false, false, false, new AMQPTable([ 'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, 'x-dead-letter-routing-key' => $queueName, - 'x-message-ttl' => $this->getTtlPerRouting($queueName) + 'x-message-ttl' => $this->getTtlRetryPerRouting($queueName) ])); $this->channel->queue_bind(self::$defaultRetryQueues[$queueName], AMQPConnection::RETRY_ALCHEMY_EXCHANGE, self::$defaultRetryQueues[$queueName]); @@ -161,7 +164,7 @@ class AMQPConnection $this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ 'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, 'x-dead-letter-routing-key' => $routing, - 'x-message-ttl' => $this->getTtlPerRouting($routing) + 'x-message-ttl' => $this->getTtlRetryPerRouting($routing) ])); $this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); @@ -176,7 +179,7 @@ class AMQPConnection $this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([ 'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE, 'x-dead-letter-routing-key' => $routing, - 'x-message-ttl' => 5000 + 'x-message-ttl' => $this->getTtlDelayedPerRouting($routing) ])); $this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); @@ -191,6 +194,10 @@ class AMQPConnection public function reinitializeQueue(array $queuNames) { + if (!isset($this->channel)) { + $this->getChannel(); + $this->declareExchange(); + } foreach ($queuNames as $queuName) { if (in_array($queuName, self::$defaultQueues)) { $this->channel->queue_purge($queuName); @@ -216,7 +223,7 @@ class AMQPConnection * @param $routing * @return int */ - private function getTtlPerRouting($routing) + private function getTtlRetryPerRouting($routing) { $config = $this->conf->get(['workers']); @@ -233,4 +240,20 @@ class AMQPConnection return self::RETRY_DELAY; } + + private function getTtlDelayedPerRouting($routing) + { + $delayed = [ + MessagePublisher::METADATAS_QUEUE => 'delayedWriteMeta', + MessagePublisher::SUBDEF_QUEUE => 'delayedSubdef' + ]; + + $config = $this->conf->get(['workers']); + + if (isset($config['retry_queue']) && isset($config['retry_queue'][$delayed[$routing]])) { + return (int)$config['retry_queue'][$delayed[$routing]]; + } + + return self::DELAY; + } } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php index afadf00539..635ff7e3cc 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/SubdefCreationWorker.php @@ -72,9 +72,9 @@ class SubdefCreationWorker implements WorkerInterface 'payload' => $payload ]; $this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_SUBDEF_QUEUE); - - $message = MessagePublisher::SUBDEF_CREATION_TYPE.' to be re-published! >> Payload ::'. json_encode($payload); - $this->messagePublisher->pushLog($message); +// +// $message = MessagePublisher::SUBDEF_CREATION_TYPE.' to be re-published! >> Payload ::'. json_encode($payload); +// $this->messagePublisher->pushLog($message); return ; } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php index 2816b8e757..ccaacdef0c 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Worker/WriteMetadatasWorker.php @@ -77,8 +77,8 @@ class WriteMetadatasWorker implements WorkerInterface ]; $this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_METADATAS_QUEUE); - $message = MessagePublisher::WRITE_METADATAS_TYPE.' to be re-published! >> Payload ::'. json_encode($payload); - $this->messagePublisher->pushLog($message); +// $message = MessagePublisher::WRITE_METADATAS_TYPE.' to be re-published! >> Payload ::'. json_encode($payload); +// $this->messagePublisher->pushLog($message); return ; } diff --git a/templates/web/admin/worker-manager/index.html.twig b/templates/web/admin/worker-manager/index.html.twig index 7185cd55a5..ff10e205dc 100644 --- a/templates/web/admin/worker-manager/index.html.twig +++ b/templates/web/admin/worker-manager/index.html.twig @@ -1,77 +1,85 @@