socket = $socket; } /** * Notifies the task manager given a message constant, see MESSAGE_* constants. * * @param string $message * * @return mixed|null The return value of the task manager. */ public function notify($message) { try { $command = $this->createCommand($message); $this->socket->send($command); $limit = microtime(true) + 0.5; while (microtime(true) < $limit && false === $result = $this->socket->recv(\ZMQ::MODE_NOBLOCK)) { usleep(1000); } if (false === $result) { return null; } $data = @json_decode($result, true); if (JSON_ERROR_NONE !== json_last_error()) { return null; } if (!isset($data['reply']) || !isset($data['request']) || $command !== $data['request']) { return null; } return $data['reply']; } catch (\ZMQSocketException $e) { } return null; } private function createCommand($message) { switch ($message) { case static::MESSAGE_CREATE: case static::MESSAGE_UPDATE: case static::MESSAGE_DELETE: return TaskManager::MESSAGE_PROCESS_UPDATE; case static::MESSAGE_INFORMATIONS: return TaskManager::MESSAGE_STATE; default: throw new InvalidArgumentException(sprintf('Unable to understand %s message notification', $message)); } } /** * Creates a Notifier. * * @param array $options * * @return Notifier */ public static function create(array $options = array()) { $context = new \ZMQContext(); $socket = $context->getSocket(\ZMQ::SOCKET_REQ); $socket->setSockOpt(\ZMQ::SOCKOPT_LINGER, $options['linger']); $socket->connect(sprintf( '%s://%s:%s', $options['protocol'], $options['host'], $options['port'] )); return new static($socket); } }