fix connection

This commit is contained in:
aynsix
2020-05-13 11:55:26 +03:00
parent 4f8c850fc5
commit 2bde6b8804
4 changed files with 56 additions and 12 deletions

View File

@@ -45,6 +45,12 @@ class WorkerExecuteCommand extends Command
/** @var AMQPChannel $channel */
$channel = $serverConnection->getChannel();
if ($channel == null) {
$output->writeln("Can't connect to rabbit, check configuration!");
return;
}
/** @var WorkerInvoker $workerInvoker */
$workerInvoker = $this->container['alchemy_worker.worker_invoker'];

View File

@@ -80,12 +80,18 @@ class AMQPConnection
public function getConnection()
{
if (!isset($this->connection)) {
$this->connection = new AMQPStreamConnection(
$this->hostConfig['host'],
$this->hostConfig['port'],
$this->hostConfig['user'],
$this->hostConfig['password'],
$this->hostConfig['vhost']);
try{
$this->connection = new AMQPStreamConnection(
$this->hostConfig['host'],
$this->hostConfig['port'],
$this->hostConfig['user'],
$this->hostConfig['password'],
$this->hostConfig['vhost']
);
} catch (\Exception $e) {
}
}
return $this->connection;
@@ -94,26 +100,40 @@ class AMQPConnection
public function getChannel()
{
if (!isset($this->channel)) {
$this->channel = $this->getConnection()->channel();
}
$this->getConnection();
if (isset($this->connection)) {
$this->channel = $this->connection->channel();
return $this->channel;
return $this->channel;
}
return null;
} else {
return $this->channel;
}
}
public function declareExchange()
{
$this->channel->exchange_declare(self::ALCHEMY_EXCHANGE, 'direct', false, true, false);
$this->channel->exchange_declare(self::RETRY_ALCHEMY_EXCHANGE, 'direct', false, true, false);
if (!isset($this->channel)) {
$this->channel->exchange_declare(self::ALCHEMY_EXCHANGE, 'direct', false, true, false);
$this->channel->exchange_declare(self::RETRY_ALCHEMY_EXCHANGE, 'direct', false, true, false);
}
}
/**
* @param $queueName
* @return AMQPChannel
* @return AMQPChannel|null
*/
public function setQueue($queueName)
{
if (!isset($this->channel)) {
$this->getChannel();
if (!isset($this->channel)) {
// can't connect to rabbit
return null;
}
$this->declareExchange();
}

View File

@@ -25,6 +25,12 @@ class MessageHandler
$channel = $serverConnection->getChannel();
if ($channel == null) {
$this->messagePublisher->pushLog("Can't connect to rabbit, check configuration!", "error");
return ;
}
// define consume callbacks
$callback = function (AMQPMessage $message) use ($channel, $workerInvoker, $publisher) {

View File

@@ -96,6 +96,12 @@ class MessagePublisher
$channel = $this->serverConnection->setQueue($queueName);
if ($channel == null) {
$this->pushLog("Can't connect to rabbit, check configuration!", "error");
return true;
}
$exchange = in_array($queueName, AMQPConnection::$defaultQueues) ? AMQPConnection::ALCHEMY_EXCHANGE : AMQPConnection::RETRY_ALCHEMY_EXCHANGE;
$channel->basic_publish($msg, $exchange, $queueName);
@@ -137,6 +143,12 @@ class MessagePublisher
$msg->set('application_headers', $headers);
$channel = $this->serverConnection->setQueue($queueName);
if ($channel == null) {
$this->pushLog("Can't connect to rabbit, check configuration!", "error");
return ;
}
$channel->basic_publish($msg, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName);
}
}