mirror of
https://github.com/alchemy-fr/Phraseanet.git
synced 2025-10-23 09:53:15 +00:00
make delayed queue connfigurable
This commit is contained in:
@@ -21,7 +21,12 @@ class AdminConfigurationController extends Controller
|
||||
{
|
||||
public function indexAction(PhraseaApplication $app, Request $request)
|
||||
{
|
||||
return $this->render('admin/worker-manager/index.html.twig');
|
||||
/** @var AMQPConnection $serverConnection */
|
||||
$serverConnection = $this->app['alchemy_worker.amqp.connection'];
|
||||
|
||||
return $this->render('admin/worker-manager/index.html.twig', [
|
||||
'isConnected' => ($serverConnection->getChannel() != null) ? true : false
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -48,6 +53,7 @@ class AdminConfigurationController extends Controller
|
||||
$serverConnection = $this->app['alchemy_worker.amqp.connection'];
|
||||
// change the queue TTL
|
||||
$serverConnection->reinitializeQueue($retryQueuesToReset);
|
||||
$serverConnection->reinitializeQueue(AMQPConnection::$defaultDelayedQueues);
|
||||
|
||||
return $app->redirectPath('worker_admin');
|
||||
}
|
||||
|
@@ -34,6 +34,12 @@ class WorkerConfigurationType extends AbstractType
|
||||
->add(MessagePublisher::POPULATE_INDEX_TYPE, 'text', [
|
||||
'label' => 'Populate Index retry delay in ms'
|
||||
])
|
||||
->add('delayedSubdef', 'text', [
|
||||
'label' => 'Subdef delay in ms'
|
||||
])
|
||||
->add('delayedWriteMeta', 'text', [
|
||||
'label' => 'Write meta delay in ms'
|
||||
])
|
||||
;
|
||||
}
|
||||
|
||||
|
@@ -63,6 +63,9 @@ class AMQPConnection
|
||||
// default message TTL in retry queue in millisecond
|
||||
const RETRY_DELAY = 10000;
|
||||
|
||||
// default message TTL in delayed queue in millisecond
|
||||
const DELAY = 5000;
|
||||
|
||||
public function __construct(PropertyAccess $conf)
|
||||
{
|
||||
$defaultConfiguration = [
|
||||
@@ -150,7 +153,7 @@ class AMQPConnection
|
||||
$this->channel->queue_declare(self::$defaultRetryQueues[$queueName], false, true, false, false, false, new AMQPTable([
|
||||
'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE,
|
||||
'x-dead-letter-routing-key' => $queueName,
|
||||
'x-message-ttl' => $this->getTtlPerRouting($queueName)
|
||||
'x-message-ttl' => $this->getTtlRetryPerRouting($queueName)
|
||||
]));
|
||||
|
||||
$this->channel->queue_bind(self::$defaultRetryQueues[$queueName], AMQPConnection::RETRY_ALCHEMY_EXCHANGE, self::$defaultRetryQueues[$queueName]);
|
||||
@@ -161,7 +164,7 @@ class AMQPConnection
|
||||
$this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([
|
||||
'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE,
|
||||
'x-dead-letter-routing-key' => $routing,
|
||||
'x-message-ttl' => $this->getTtlPerRouting($routing)
|
||||
'x-message-ttl' => $this->getTtlRetryPerRouting($routing)
|
||||
]));
|
||||
|
||||
$this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName);
|
||||
@@ -176,7 +179,7 @@ class AMQPConnection
|
||||
$this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([
|
||||
'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE,
|
||||
'x-dead-letter-routing-key' => $routing,
|
||||
'x-message-ttl' => 5000
|
||||
'x-message-ttl' => $this->getTtlDelayedPerRouting($routing)
|
||||
]));
|
||||
|
||||
$this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName);
|
||||
@@ -191,6 +194,10 @@ class AMQPConnection
|
||||
|
||||
public function reinitializeQueue(array $queuNames)
|
||||
{
|
||||
if (!isset($this->channel)) {
|
||||
$this->getChannel();
|
||||
$this->declareExchange();
|
||||
}
|
||||
foreach ($queuNames as $queuName) {
|
||||
if (in_array($queuName, self::$defaultQueues)) {
|
||||
$this->channel->queue_purge($queuName);
|
||||
@@ -216,7 +223,7 @@ class AMQPConnection
|
||||
* @param $routing
|
||||
* @return int
|
||||
*/
|
||||
private function getTtlPerRouting($routing)
|
||||
private function getTtlRetryPerRouting($routing)
|
||||
{
|
||||
$config = $this->conf->get(['workers']);
|
||||
|
||||
@@ -233,4 +240,20 @@ class AMQPConnection
|
||||
|
||||
return self::RETRY_DELAY;
|
||||
}
|
||||
|
||||
private function getTtlDelayedPerRouting($routing)
|
||||
{
|
||||
$delayed = [
|
||||
MessagePublisher::METADATAS_QUEUE => 'delayedWriteMeta',
|
||||
MessagePublisher::SUBDEF_QUEUE => 'delayedSubdef'
|
||||
];
|
||||
|
||||
$config = $this->conf->get(['workers']);
|
||||
|
||||
if (isset($config['retry_queue']) && isset($config['retry_queue'][$delayed[$routing]])) {
|
||||
return (int)$config['retry_queue'][$delayed[$routing]];
|
||||
}
|
||||
|
||||
return self::DELAY;
|
||||
}
|
||||
}
|
||||
|
@@ -72,9 +72,9 @@ class SubdefCreationWorker implements WorkerInterface
|
||||
'payload' => $payload
|
||||
];
|
||||
$this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_SUBDEF_QUEUE);
|
||||
|
||||
$message = MessagePublisher::SUBDEF_CREATION_TYPE.' to be re-published! >> Payload ::'. json_encode($payload);
|
||||
$this->messagePublisher->pushLog($message);
|
||||
//
|
||||
// $message = MessagePublisher::SUBDEF_CREATION_TYPE.' to be re-published! >> Payload ::'. json_encode($payload);
|
||||
// $this->messagePublisher->pushLog($message);
|
||||
|
||||
return ;
|
||||
}
|
||||
|
@@ -77,8 +77,8 @@ class WriteMetadatasWorker implements WorkerInterface
|
||||
];
|
||||
$this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_METADATAS_QUEUE);
|
||||
|
||||
$message = MessagePublisher::WRITE_METADATAS_TYPE.' to be re-published! >> Payload ::'. json_encode($payload);
|
||||
$this->messagePublisher->pushLog($message);
|
||||
// $message = MessagePublisher::WRITE_METADATAS_TYPE.' to be re-published! >> Payload ::'. json_encode($payload);
|
||||
// $this->messagePublisher->pushLog($message);
|
||||
|
||||
return ;
|
||||
}
|
||||
|
Reference in New Issue
Block a user