From 2bde6b8804fd60b13c244576bd1e171d5d78c389 Mon Sep 17 00:00:00 2001 From: aynsix Date: Wed, 13 May 2020 11:55:26 +0300 Subject: [PATCH] fix connection --- .../Command/WorkerExecuteCommand.php | 6 +++ .../WorkerManager/Queue/AMQPConnection.php | 44 ++++++++++++++----- .../WorkerManager/Queue/MessageHandler.php | 6 +++ .../WorkerManager/Queue/MessagePublisher.php | 12 +++++ 4 files changed, 56 insertions(+), 12 deletions(-) diff --git a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php index f25b6ff9c4..ebc17e1496 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Command/WorkerExecuteCommand.php @@ -45,6 +45,12 @@ class WorkerExecuteCommand extends Command /** @var AMQPChannel $channel */ $channel = $serverConnection->getChannel(); + if ($channel == null) { + $output->writeln("Can't connect to rabbit, check configuration!"); + + return; + } + /** @var WorkerInvoker $workerInvoker */ $workerInvoker = $this->container['alchemy_worker.worker_invoker']; diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php index aa7a1995d4..6d3b9600f6 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/AMQPConnection.php @@ -80,12 +80,18 @@ class AMQPConnection public function getConnection() { if (!isset($this->connection)) { - $this->connection = new AMQPStreamConnection( - $this->hostConfig['host'], - $this->hostConfig['port'], - $this->hostConfig['user'], - $this->hostConfig['password'], - $this->hostConfig['vhost']); + try{ + $this->connection = new AMQPStreamConnection( + $this->hostConfig['host'], + $this->hostConfig['port'], + $this->hostConfig['user'], + $this->hostConfig['password'], + $this->hostConfig['vhost'] + ); + + } catch (\Exception $e) { + + } } return $this->connection; @@ -94,26 +100,40 @@ class AMQPConnection public function getChannel() { if (!isset($this->channel)) { - $this->channel = $this->getConnection()->channel(); - } + $this->getConnection(); + if (isset($this->connection)) { + $this->channel = $this->connection->channel(); - return $this->channel; + return $this->channel; + } + + return null; + } else { + return $this->channel; + } } public function declareExchange() { - $this->channel->exchange_declare(self::ALCHEMY_EXCHANGE, 'direct', false, true, false); - $this->channel->exchange_declare(self::RETRY_ALCHEMY_EXCHANGE, 'direct', false, true, false); + if (!isset($this->channel)) { + $this->channel->exchange_declare(self::ALCHEMY_EXCHANGE, 'direct', false, true, false); + $this->channel->exchange_declare(self::RETRY_ALCHEMY_EXCHANGE, 'direct', false, true, false); + } } /** * @param $queueName - * @return AMQPChannel + * @return AMQPChannel|null */ public function setQueue($queueName) { if (!isset($this->channel)) { $this->getChannel(); + if (!isset($this->channel)) { + // can't connect to rabbit + return null; + } + $this->declareExchange(); } diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php index dcd4f39bc5..b418bc9cbe 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessageHandler.php @@ -25,6 +25,12 @@ class MessageHandler $channel = $serverConnection->getChannel(); + if ($channel == null) { + $this->messagePublisher->pushLog("Can't connect to rabbit, check configuration!", "error"); + + return ; + } + // define consume callbacks $callback = function (AMQPMessage $message) use ($channel, $workerInvoker, $publisher) { diff --git a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php index cc94de8b6f..62cc23ae52 100644 --- a/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php +++ b/lib/Alchemy/Phrasea/WorkerManager/Queue/MessagePublisher.php @@ -96,6 +96,12 @@ class MessagePublisher $channel = $this->serverConnection->setQueue($queueName); + if ($channel == null) { + $this->pushLog("Can't connect to rabbit, check configuration!", "error"); + + return true; + } + $exchange = in_array($queueName, AMQPConnection::$defaultQueues) ? AMQPConnection::ALCHEMY_EXCHANGE : AMQPConnection::RETRY_ALCHEMY_EXCHANGE; $channel->basic_publish($msg, $exchange, $queueName); @@ -137,6 +143,12 @@ class MessagePublisher $msg->set('application_headers', $headers); $channel = $this->serverConnection->setQueue($queueName); + if ($channel == null) { + $this->pushLog("Can't connect to rabbit, check configuration!", "error"); + + return ; + } + $channel->basic_publish($msg, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName); } }