From ff5a3a6c16dedabfd0a80d8fd23d4bda4e515a14 Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Tue, 11 Feb 2014 15:33:46 +0100 Subject: [PATCH 01/12] Add dependency on Ratchet and react ZMQ --- composer.json | 2 + composer.lock | 226 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 227 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 4384e0b7e6..631bea274d 100644 --- a/composer.json +++ b/composer.json @@ -10,6 +10,7 @@ "alchemy/oauth2php" : "1.0.0", "alchemy/phlickr" : "0.2.7", "alchemy/task-manager" : "~1.0", + "cboden/ratchet" : "~0.3", "dailymotion/sdk" : "~1.5", "data-uri/data-uri" : "~0.1.0", "doctrine/orm" : "~2.4.0", @@ -37,6 +38,7 @@ "php-ffmpeg/php-ffmpeg" : "~0.4, >=0.4.3", "php-xpdf/php-xpdf" : "~0.2.1", "phpexiftool/phpexiftool" : "~0.3", + "react/zmq" : "~0.2", "silex/silex" : "1.1.x-dev@dev", "silex/web-profiler" : "~1.0.0@dev", "swiftmailer/swiftmailer" : "~4.3.0", diff --git a/composer.lock b/composer.lock index 18614b3c61..df484c5331 100644 --- a/composer.lock +++ b/composer.lock @@ -3,7 +3,7 @@ "This file locks the dependencies of your project to a known state", "Read more about it at http://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file" ], - "hash": "b824f6f34aa5494f3edbedb0b97ef6c6", + "hash": "6a801036563ebb5473657cb717881f31", "packages": [ { "name": "alchemy-fr/tcpdf-clone", @@ -355,6 +355,55 @@ ], "time": "2013-12-03 18:53:49" }, + { + "name": "cboden/ratchet", + "version": "v0.3.0", + "source": { + "type": "git", + "url": "https://github.com/cboden/Ratchet.git", + "reference": "d756e0b507a5f3cdbf8c59dbb7baf68574dc7d58" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/cboden/Ratchet/zipball/d756e0b507a5f3cdbf8c59dbb7baf68574dc7d58", + "reference": "d756e0b507a5f3cdbf8c59dbb7baf68574dc7d58", + "shasum": "" + }, + "require": { + "guzzle/http": ">=3.6.0,<3.8.0-dev", + "php": ">=5.3.9", + "react/socket": "0.3.*", + "symfony/http-foundation": "~2.2", + "symfony/routing": "~2.2" + }, + "type": "library", + "autoload": { + "psr-0": { + "Ratchet": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Chris Boden", + "email": "cboden@gmail.com", + "homepage": "http://res.im", + "role": "Developer" + } + ], + "description": "PHP WebSocket library", + "homepage": "http://socketo.me", + "keywords": [ + "Ratchet", + "WebSockets", + "server", + "sockets" + ], + "time": "2013-10-14 14:38:12" + }, { "name": "dailymotion/sdk", "version": "1.5.1", @@ -2835,6 +2884,181 @@ ], "time": "2012-12-21 11:40:51" }, + { + "name": "react/event-loop", + "version": "v0.3.3", + "target-dir": "React/EventLoop", + "source": { + "type": "git", + "url": "https://github.com/reactphp/event-loop.git", + "reference": "49208fa3a15c9eae4adf8351ee9caa4064fe550d" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/event-loop/zipball/49208fa3a15c9eae4adf8351ee9caa4064fe550d", + "reference": "49208fa3a15c9eae4adf8351ee9caa4064fe550d", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "suggest": { + "ext-libev": "*", + "ext-libevent": ">=0.0.5" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "0.3-dev" + } + }, + "autoload": { + "psr-0": { + "React\\EventLoop": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Event loop abstraction layer that libraries can use for evented I/O.", + "keywords": [ + "event-loop" + ], + "time": "2013-07-08 22:38:22" + }, + { + "name": "react/socket", + "version": "v0.3.2", + "target-dir": "React/Socket", + "source": { + "type": "git", + "url": "https://github.com/reactphp/socket.git", + "reference": "9cc32838d25a934ce950d2662cce2ff1b9e9f8aa" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/socket/zipball/9cc32838d25a934ce950d2662cce2ff1b9e9f8aa", + "reference": "9cc32838d25a934ce950d2662cce2ff1b9e9f8aa", + "shasum": "" + }, + "require": { + "evenement/evenement": "1.0.*", + "php": ">=5.3.3", + "react/event-loop": "0.3.*", + "react/stream": "0.3.*" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "0.3-dev" + } + }, + "autoload": { + "psr-0": { + "React\\Socket": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Library for building an evented socket server.", + "keywords": [ + "Socket" + ], + "time": "2013-04-26 20:23:10" + }, + { + "name": "react/stream", + "version": "v0.3.3", + "target-dir": "React/Stream", + "source": { + "type": "git", + "url": "https://github.com/reactphp/stream.git", + "reference": "b72900ab00513b591fb101faecad38f9c8eab8da" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/stream/zipball/b72900ab00513b591fb101faecad38f9c8eab8da", + "reference": "b72900ab00513b591fb101faecad38f9c8eab8da", + "shasum": "" + }, + "require": { + "evenement/evenement": "1.0.*", + "php": ">=5.3.3" + }, + "suggest": { + "react/event-loop": "0.3.*", + "react/promise": "~1.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "0.3-dev" + } + }, + "autoload": { + "psr-0": { + "React\\Stream": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Basic readable and writable stream interfaces that support piping.", + "keywords": [ + "pipe", + "stream" + ], + "time": "2013-07-09 00:44:12" + }, + { + "name": "react/zmq", + "version": "v0.2.0", + "source": { + "type": "git", + "url": "https://github.com/reactphp/zmq.git", + "reference": "b69d97f99f2127e27d130d7fe3b2cec2a63b2c4e" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/zmq/zipball/b69d97f99f2127e27d130d7fe3b2cec2a63b2c4e", + "reference": "b69d97f99f2127e27d130d7fe3b2cec2a63b2c4e", + "shasum": "" + }, + "require": { + "evenement/evenement": "~1.0", + "ext-zmq": "*", + "php": ">=5.3.2", + "react/event-loop": "0.3.*" + }, + "require-dev": { + "ext-pcntl": "*" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "0.2-dev" + } + }, + "autoload": { + "psr-0": { + "React\\ZMQ": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "ZeroMQ bindings for React.", + "keywords": [ + "zeromq", + "zmq" + ], + "time": "2013-09-18 11:02:06" + }, { "name": "silex/silex", "version": "1.1.x-dev", From cec863ff7bd8ec96aa9cc75b3b1bdea5ac9ac52d Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Tue, 11 Feb 2014 17:41:25 +0100 Subject: [PATCH 02/12] Add check on libevent for production --- .../Setup/Requirements/SystemRequirements.php | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/Alchemy/Phrasea/Setup/Requirements/SystemRequirements.php b/lib/Alchemy/Phrasea/Setup/Requirements/SystemRequirements.php index 7007d787b1..667b632018 100644 --- a/lib/Alchemy/Phrasea/Setup/Requirements/SystemRequirements.php +++ b/lib/Alchemy/Phrasea/Setup/Requirements/SystemRequirements.php @@ -237,9 +237,15 @@ class SystemRequirements extends RequirementCollection implements RequirementInt ); $this->addRecommendation( - extension_loaded('twig'), - 'Twig extension is strongly recommended in production', - 'Install and enable the twig extension.' + extension_loaded('twig'), + 'Twig extension is strongly recommended in production', + 'Install and enable the twig extension.' + ); + + $this->addRecommendation( + function_exists('event_base_new'), + 'LibEvent extension is strongly recommended in production', + 'Install and enable the LibEvent extension.' ); $this->addRequirement( From d0360a02b2265bf57f281ae6552a267b489118bc Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Tue, 11 Feb 2014 16:57:40 +0100 Subject: [PATCH 03/12] Remove signal listener once a signal has been caught --- lib/Alchemy/Phrasea/Command/Task/SchedulerRun.php | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/Alchemy/Phrasea/Command/Task/SchedulerRun.php b/lib/Alchemy/Phrasea/Command/Task/SchedulerRun.php index e782a0d9c3..5c4adce37a 100644 --- a/lib/Alchemy/Phrasea/Command/Task/SchedulerRun.php +++ b/lib/Alchemy/Phrasea/Command/Task/SchedulerRun.php @@ -32,6 +32,7 @@ class SchedulerRun extends Command switch ($signal) { case SIGTERM: case SIGINT: + $this->container['signal-handler']->unregisterAll(); $this->container['task-manager']->stop(); break; } From f181135cdeb75240c8a2f1686eb33bd03e5640cf Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Tue, 11 Feb 2014 17:40:18 +0100 Subject: [PATCH 04/12] Add task manager broadcaster subscriber --- .../TaskManagerServiceProvider.php | 6 +- .../TaskManagerBroadcasterSubscriber.php | 88 ++++++++++++ .../TaskManagerBroadcasterSubscriberTest.php | 126 ++++++++++++++++++ 3 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php diff --git a/lib/Alchemy/Phrasea/Core/CLIProvider/TaskManagerServiceProvider.php b/lib/Alchemy/Phrasea/Core/CLIProvider/TaskManagerServiceProvider.php index ea85adabcc..db74014b05 100644 --- a/lib/Alchemy/Phrasea/Core/CLIProvider/TaskManagerServiceProvider.php +++ b/lib/Alchemy/Phrasea/Core/CLIProvider/TaskManagerServiceProvider.php @@ -33,7 +33,7 @@ class TaskManagerServiceProvider implements ServiceProviderInterface $app['task-manager'] = $app->share(function (Application $app) { $options = $app['task-manager.listener.options']; - return TaskManager::create( + $manager = TaskManager::create( $app['dispatcher'], $app['task-manager.logger'], $app['task-manager.task-list'], @@ -41,8 +41,12 @@ class TaskManagerServiceProvider implements ServiceProviderInterface 'listener_protocol' => $options['protocol'], 'listener_host' => $options['host'], 'listener_port' => $options['port'], + 'tick_period' => 1, ] ); + $manager->addSubscriber($app['ws.task-manager.broadcaster']); + + return $manager; }); $app['task-manager.logger.configuration'] = $app->share(function (Application $app) { diff --git a/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php b/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php new file mode 100644 index 0000000000..30140e801e --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php @@ -0,0 +1,88 @@ +formater = new StateFormater(); + + $this->broadcaster = $broadcaster; + $this->broadcaster->bind(); + + usleep(300000); + } + + public function onManagerStart(TaskManagerEvent $event) + { + $this->broadcaster->send(json_encode([ + 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'event' => TaskManagerEvents::MANAGER_START, + ])); + } + + public function onManagerStop(TaskManagerEvent $event) + { + $this->broadcaster->send(json_encode([ + 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'event' => TaskManagerEvents::MANAGER_STOP, + ])); + } + + public function onManagerRequest(TaskManagerRequestEvent $event) + { + $this->broadcaster->send(json_encode([ + 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'event' => TaskManagerEvents::MANAGER_REQUEST, + 'request' => $event->getRequest(), + 'response' => $event->getResponse(), + ])); + } + + public function onManagerTick(TaskManagerEvent $event) + { + $this->broadcaster->send(json_encode([ + 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'event' => TaskManagerEvents::MANAGER_TICK, + 'message' => $this->formater->toArray( + $event->getManager()->getProcessManager()->getManagedProcesses() + ), + ])); + } + + public static function getSubscribedEvents() + { + return [ + TaskManagerEvents::MANAGER_START => 'onManagerStart', + TaskManagerEvents::MANAGER_STOP => 'onManagerStop', + TaskManagerEvents::MANAGER_REQUEST => 'onManagerRequest', + TaskManagerEvents::MANAGER_TICK => 'onManagerTick', + ]; + } + + public static function create(array $options) + { + return new static(ZMQSocket::create(new \ZMQContext(), \ZMQ::SOCKET_PUB, $options['protocol'], $options['host'], $options['port'])); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php new file mode 100644 index 0000000000..bf7bfa84f6 --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php @@ -0,0 +1,126 @@ +createZMQSocketMock(); + $socket->expects($this->once()) + ->method('send') + ->will($this->jsonCapture($json)); + + $subscriber = new TaskManagerBroadcasterSubscriber($socket); + $subscriber->onManagerStart($this->createTaskManagerEvent()); + + $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_START); + } + + public function testOnManagerStop() + { + $socket = $this->createZMQSocketMock(); + $socket->expects($this->once()) + ->method('send') + ->will($this->jsonCapture($json)); + + $subscriber = new TaskManagerBroadcasterSubscriber($socket); + $subscriber->onManagerStop($this->createTaskManagerEvent()); + + $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_STOP); + } + + public function testOnManagerRequest() + { + $socket = $this->createZMQSocketMock(); + $socket->expects($this->once()) + ->method('send') + ->will($this->jsonCapture($json)); + + $subscriber = new TaskManagerBroadcasterSubscriber($socket); + $subscriber->onManagerRequest(new TaskManagerRequestEvent($this->createTaskManagerMock(), 'PING', 'PONG')); + + $data = $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_REQUEST); + + $this->assertEquals('PING', $data['request']); + $this->assertEquals('PONG', $data['response']); + } + + public function testOnManagerTick() + { + $socket = $this->createZMQSocketMock(); + $socket->expects($this->once()) + ->method('send') + ->will($this->jsonCapture($json)); + + $subscriber = new TaskManagerBroadcasterSubscriber($socket); + $subscriber->onManagerTick($this->createTaskManagerEvent()); + + $data = $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_TICK); + + $this->assertArrayHasKey('message', $data); + $this->assertInternalType('array', $data['message']); + } + + private function assertValidJson($json, $topic, $event) + { + $data = json_decode($json, true); + + $this->assertTrue(json_last_error() === JSON_ERROR_NONE); + $this->assertArrayHasKey('event', $data); + $this->assertArrayHasKey('topic', $data); + + $this->assertEquals($event, $data['event']); + $this->assertEquals($topic, $data['topic']); + + return $data; + } + + private function jsonCapture(&$json) + { + return $this->returnCallback(function ($arg) use (&$json) { $json = $arg; return 'lala'; }); + } + + private function createZMQSocketMock() + { + $socket = $this->getMockBuilder('Alchemy\TaskManager\ZMQSocket') + ->setMethods(['send', 'bind']) + ->disableOriginalConstructor() + ->getMock(); + $socket->expects($this->once()) + ->method('bind'); + + return $socket; + } + + private function createTaskManagerMock() + { + $manager = $this->getMockBuilder('Alchemy\TaskManager\TaskManager') + ->disableOriginalConstructor() + ->getMock(); + + $processManager = $this->getMockBuilder('Neutron\ProcessManager\ProcessManager') + ->disableOriginalConstructor() + ->getMock(); + + $processManager->expects($this->any()) + ->method('getManagedProcesses') + ->will($this->returnValue([])); + + $manager->expects($this->any()) + ->method('getProcessManager') + ->will($this->returnValue($processManager)); + + return $manager; + } + + private function createTaskManagerEvent() + { + return new TaskManagerEvent($this->createTaskManagerMock()); + } +} From 9238a939b303c11d74e71b0efec0dfeb5f95c784 Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Tue, 11 Feb 2014 17:40:59 +0100 Subject: [PATCH 05/12] Minimal fix for callback declaration --- .../Phrasea/TaskManager/Event/FinishedJobRemoverSubscriber.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Alchemy/Phrasea/TaskManager/Event/FinishedJobRemoverSubscriber.php b/lib/Alchemy/Phrasea/TaskManager/Event/FinishedJobRemoverSubscriber.php index 8e3d966419..47e15871cb 100644 --- a/lib/Alchemy/Phrasea/TaskManager/Event/FinishedJobRemoverSubscriber.php +++ b/lib/Alchemy/Phrasea/TaskManager/Event/FinishedJobRemoverSubscriber.php @@ -32,7 +32,7 @@ class FinishedJobRemoverSubscriber implements EventSubscriberInterface public static function getSubscribedEvents() { return [ - JobEvents::FINISHED => [$this, 'onJobFinish'], + JobEvents::FINISHED => 'onJobFinish', ]; } From 0fe18a24942bbe5327b1eda14d5dccbd0c915ccd Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Tue, 11 Feb 2014 18:09:30 +0100 Subject: [PATCH 06/12] Add Websocket server --- .../WebsocketServerServiceProvider.php | 95 ++++++++++++++ .../Websocket/PhraseanetWampServer.php | 119 ++++++++++++++++++ .../TaskManagerBroadcasterSubscriber.php | 2 +- .../WebsocketServerServiceProviderTest.php | 47 +++++++ .../Websocket/PhraseanetWampServerTest.php | 56 +++++++++ .../TaskManagerBroadcasterSubscriberTest.php | 1 + 6 files changed, 319 insertions(+), 1 deletion(-) create mode 100644 lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php create mode 100644 lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php create mode 100644 tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php diff --git a/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php b/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php new file mode 100644 index 0000000000..5488072054 --- /dev/null +++ b/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php @@ -0,0 +1,95 @@ +share(function (Application $app) { + return array_replace([ + 'protocol' => 'tcp', + 'host' => '127.0.0.1', + 'port' => 13598, + ], $app['conf']->get(['main', 'task-manager', 'publisher'], [])); + }); + + $app['ws.task-manager.broadcaster'] = $app->share(function (Application $app) { + return TaskManagerBroadcasterSubscriber::create($app['ws.publisher.options']); + }); + + $app['ws.event-loop'] = $app->share(function () { + return EventLoopFactory::create(); + }); + + $app['ws.server.subscriber'] = $app->share(function (Application $app) { + $options = $app['ws.publisher.options']; + $context = new Context($app['ws.event-loop']); + + $pull = $context->getSocket(\ZMQ::SOCKET_SUB); + $pull->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, ""); + $pull->connect(sprintf('%s://%s:%s', $options['protocol'], $options['host'], $options['port'])); + + $logger = $app['ws.server.logger']; + $pull->on('error', function ($e) use ($logger) { + $logger->error('TaskManager Subscriber received an error.', ['exception' => $e]); + }); + + return $pull; + }); + + $app['ws.server.application'] = $app->share(function (Application $app) { + return new SessionProvider( + new WampServer($app['ws.server.phraseanet-server']), $app['session.storage.handler'] + ); + }); + + $app['ws.server.phraseanet-server'] = $app->share(function (Application $app) { + return new PhraseanetWampServer($app['ws.server.subscriber'], $app['ws.server.logger']); + }); + + $app['ws.server.logger'] = $app->share(function (Application $app) { + return $app['task-manager.logger']; + }); + + $app['ws.server.options'] = $app->share(function (Application $app) { + return array_replace([ + 'host' => 'localhost', + 'port' => 9090, + 'ip' => '127.0.0.1', + ], $app['conf']->get(['main', 'websocket-server'], [])); + }); + + $app['ws.server'] = $app->share(function (Application $app) { + $options = $app['ws.server.options']; + + $server = new App($options['host'], $options['port'], $options['ip'], $app['ws.event-loop']); + $server->route('/websockets', $app['ws.server.application']); + + return $server; + }); + } + + public function boot(Application $app) + { + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php b/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php new file mode 100644 index 0000000000..c65ce22d0b --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php @@ -0,0 +1,119 @@ +pull = $pull; + $this->logger = $logger; + + $pull->on('message', function ($msg) { + $data = @json_decode($msg, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + $this->logger->error(sprintf('[WS] Received invalid message %s : invalid json', $msg)); + + return; + } + + if (!isset($data['topic'])) { + $this->logger->error(sprintf('[WS] Received invalid message %s : no topic', $msg)); + + return; + } + + $this->logger->debug(sprintf('[WS] Received message %s', $msg)); + + if (isset($this->topics[$data['topic']])) { + $this->topics[$data['topic']]->broadcast(json_encode($msg)); + } + }); + } + + public function onPublish(Conn $conn, $topic, $event, array $exclude, array $eligible) + { + $this->logger->error(sprintf('Publishing on topic %s', $topic->getId()), array('event' => $event, 'topic' => $topic)); + $topic->broadcast($event); + } + + public function onCall(Conn $conn, $id, $topic, array $params) + { + $this->logger->error(sprintf('Received RPC call on topic %s', $topic->getId()), array('topic' => $topic)); + $conn->callError($id, $topic, 'RPC not supported on this demo'); + } + + public function onSubscribe(Conn $conn, $topic) + { + $this->logger->debug(sprintf('Subscription received on topic %s', $topic->getId()), array('topic' => $topic)); + $this->topics[$topic->getId()] = $topic; + } + + public function onUnSubscribe(Conn $conn, $topic) + { + $this->logger->debug(sprintf('Unsubscription received on topic %s', $topic->getId()), array('topic' => $topic)); + $this->cleanupReferences($conn, $topic->getId()); + } + + public function onOpen(Conn $conn) + { + if (!$conn->Session->has('usr_id')) { + $this->logger->error('[WS] Connection request aborted, no usr_id in session.'); + $conn->close(); + } + $this->logger->error('[WS] Connection request accepted'); + } + + public function onClose(Conn $conn) + { + $this->cleanupReferences($conn); + $this->logger->error('[WS] Connection closed'); + } + + public function onError(Conn $conn, \Exception $e) + { + $this->logger->error('[WS] Connection error', ['exception' => $e]); + } + + private function cleanupReferences(Conn $conn, $topicId = null) + { + $storage = $this->topics; + $ret = array(); + + foreach ($storage as $id => $topic) { + if (null !== $topicId && $id !== $topicId) { + continue; + } + if ($topic->has($conn)) { + $topic->remove($conn); + } + if (count($topic) > 0) { + $ret[] = $topic; + } + $this->logger->debug(sprintf('%d subscribers remaining on topic %s', count($topic), $topic->getId()), array('topic' => $topic)); + } + + $this->topics = $ret; + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php b/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php index 30140e801e..705a2f32da 100644 --- a/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php +++ b/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php @@ -11,7 +11,7 @@ namespace Alchemy\Phrasea\Websocket\Subscriber; -use Alchemy\Phrasea\TaskManager\PhraseanetWampServer; +use Alchemy\Phrasea\Websocket\PhraseanetWampServer; use Alchemy\TaskManager\Event\StateFormater; use Alchemy\TaskManager\Event\TaskManagerEvent; use Alchemy\TaskManager\Event\TaskManagerRequestEvent; diff --git a/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php b/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php new file mode 100644 index 0000000000..033269df6d --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php @@ -0,0 +1,47 @@ +getMock('Ratchet\ConnectionInterface'); + $conn->Session = $this->getMockBuilder('Symfony\Component\HttpFoundation\Session\Session') + ->disableOriginalConstructor() + ->getMock(); + $conn->Session->expects($this->once()) + ->method('has') + ->with('usr_id') + ->will($this->returnValue(false)); + $conn->expects($this->once()) + ->method('close'); + + $server = new PhraseanetWampServer($this->createSocketWrapperMock(), $this->createLoggerMock()); + $server->onOpen($conn); + } + + public function testOpenConnectionConnected() + { + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->Session = $this->getMockBuilder('Symfony\Component\HttpFoundation\Session\Session') + ->disableOriginalConstructor() + ->getMock(); + $conn->Session->expects($this->once()) + ->method('has') + ->with('usr_id') + ->will($this->returnValue(true)); + $conn->expects($this->never()) + ->method('close'); + + $server = new PhraseanetWampServer($this->createSocketWrapperMock(), $this->createLoggerMock()); + $server->onOpen($conn); + } + + private function createSocketWrapperMock() + { + return $this->getMockBuilder('React\ZMQ\SocketWrapper') + ->disableOriginalConstructor() + ->getMock(); + } + + private function createLoggerMock() + { + return $this->getMockBuilder('Psr\Log\LoggerInterface') + ->disableOriginalConstructor() + ->getMock(); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php index bf7bfa84f6..6558ea4fcb 100644 --- a/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php @@ -3,6 +3,7 @@ namespace Alchemy\Test\Phrasea\Websocket\Subscriber; use Alchemy\Phrasea\Websocket\Subscriber\TaskManagerBroadcasterSubscriber; +use Alchemy\Phrasea\Websocket\PhraseanetWampServer; use Alchemy\TaskManager\Event\TaskManagerEvent; use Alchemy\TaskManager\Event\TaskManagerEvents; use Alchemy\TaskManager\Event\TaskManagerRequestEvent; From 811be51b3d64a305b7b92a71c98c81cd7bac3390 Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Tue, 11 Feb 2014 18:17:42 +0100 Subject: [PATCH 07/12] Add WebsocketServer command --- bin/console | 5 ++- lib/Alchemy/Phrasea/CLI.php | 2 + .../Phrasea/Command/WebsocketServer.php | 38 +++++++++++++++++++ .../Phrasea/Command/WebsocketServerTest.php | 29 ++++++++++++++ 4 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 lib/Alchemy/Phrasea/Command/WebsocketServer.php create mode 100644 tests/Alchemy/Tests/Phrasea/Command/WebsocketServerTest.php diff --git a/bin/console b/bin/console index 2ee888d69c..65b3a681fd 100755 --- a/bin/console +++ b/bin/console @@ -13,6 +13,8 @@ namespace KonsoleKommander; use Alchemy\Phrasea\Command\Plugin\ListPlugin; use Alchemy\Phrasea\Command\SearchEngine\IndexFull; +use Alchemy\Phrasea\Command\WebsocketServer; +use Alchemy\Phrasea\Core\Version; use Alchemy\Phrasea\Command\BuildMissingSubdefs; use Alchemy\Phrasea\Command\CreateCollection; use Alchemy\Phrasea\Command\MailTest; @@ -20,7 +22,6 @@ use Alchemy\Phrasea\Command\Compile\Configuration; use Alchemy\Phrasea\Command\RecordAdd; use Alchemy\Phrasea\Command\RescanTechnicalDatas; use Alchemy\Phrasea\Command\UpgradeDBDatas; -use Alchemy\Phrasea\Core\Version; use Alchemy\Phrasea\CLI; use Alchemy\Phrasea\Command\Plugin\AddPlugin; use Alchemy\Phrasea\Command\Plugin\RemovePlugin; @@ -114,6 +115,8 @@ if ($cli['phraseanet.SE']->getName() === 'ElasticSearch') { $cli->command(new IndexFull('searchengine:index')); } +$cli->command(new WebsocketServer('ws-server:run')); + $cli->loadPlugins(); exit(is_int($cli->run()) ? : 1); diff --git a/lib/Alchemy/Phrasea/CLI.php b/lib/Alchemy/Phrasea/CLI.php index 27c826a6c7..317c36b892 100644 --- a/lib/Alchemy/Phrasea/CLI.php +++ b/lib/Alchemy/Phrasea/CLI.php @@ -13,6 +13,7 @@ namespace Alchemy\Phrasea; use Alchemy\Phrasea\Command\CommandInterface; use Alchemy\Phrasea\Core\CLIProvider\TranslationExtractorServiceProvider; +use Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider; use Alchemy\Phrasea\Exception\RuntimeException; use Symfony\Component\Console; use Alchemy\Phrasea\Core\CLIProvider\CLIDriversServiceProvider; @@ -55,6 +56,7 @@ class CLI extends Application }); $this->register(new PluginServiceProvider()); + $this->register(new WebsocketServerServiceProvider()); $this->register(new ComposerSetupServiceProvider()); $this->register(new CLIDriversServiceProvider()); $this->register(new LessBuilderServiceProvider()); diff --git a/lib/Alchemy/Phrasea/Command/WebsocketServer.php b/lib/Alchemy/Phrasea/Command/WebsocketServer.php new file mode 100644 index 0000000000..2a82239ce1 --- /dev/null +++ b/lib/Alchemy/Phrasea/Command/WebsocketServer.php @@ -0,0 +1,38 @@ +setDescription("Runs the websocket server"); + } + + public function doExecute(InputInterface $input, OutputInterface $output) + { + $sessionConf = $this->container['conf']->get(['main', 'session', 'type'], 'file'); + + if (!in_array($sessionConf, ['memcached', 'memcache', 'redis'])) { + throw new RuntimeException(sprintf('Running the websocket server requires a server session storage, type `%s` provided', $sessionConf)); + } + + $this->container['ws.server']->run(); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Command/WebsocketServerTest.php b/tests/Alchemy/Tests/Phrasea/Command/WebsocketServerTest.php new file mode 100644 index 0000000000..1f16cdd90a --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Command/WebsocketServerTest.php @@ -0,0 +1,29 @@ +getMock('Symfony\Component\Console\Input\InputInterface'); + $output = $this->getMock('Symfony\Component\Console\Output\OutputInterface'); + + $sessionType = self::$DI['cli']['conf']->get(['main', 'session', 'type'], 'file'); + self::$DI['cli']['conf']->set(['main', 'session', 'type'], 'memcached'); + + self::$DI['cli']['ws.server'] = $this->getMockBuilder('Ratchet\App') + ->disableOriginalConstructor() + ->getMock(); + self::$DI['cli']['ws.server']->expects($this->once()) + ->method('run'); + + $command = new WebsocketServer('websocketserver'); + $command->setContainer(self::$DI['cli']); + $command->execute($input, $output); + + self::$DI['cli']['conf']->set(['main', 'session', 'type'], $sessionType); + } +} From 2c642f594fc9178a77e139e25c9c5a08cc6e8332 Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Tue, 11 Feb 2014 18:23:59 +0100 Subject: [PATCH 08/12] Add patch to version 3.9.0-alpha.11 --- config/configuration.sample.yml | 14 ++++++ lib/classes/patch/390alpha11a.php | 78 +++++++++++++++++++++++++++++++ lib/conf.d/configuration.yml | 14 ++++++ 3 files changed, 106 insertions(+) create mode 100644 lib/classes/patch/390alpha11a.php diff --git a/config/configuration.sample.yml b/config/configuration.sample.yml index bc00f45118..2676a56c24 100644 --- a/config/configuration.sample.yml +++ b/config/configuration.sample.yml @@ -29,10 +29,24 @@ main: type: Alchemy\Phrasea\SearchEngine\Phrasea\PhraseaEngine options: [] task-manager: + status: started logger: max-files: 10 enabled: true level: INFO + listener: + protocol: tcp + host: 127.0.0.1 + port: 6660 + linger: 500 + publisher: + protocol: tcp + host: 127.0.0.1 + port: 13598 + websocket-server: + host: local.phrasea + port: 9090 + ip: 0.0.0.0 session: type: 'file' options: [] diff --git a/lib/classes/patch/390alpha11a.php b/lib/classes/patch/390alpha11a.php new file mode 100644 index 0000000000..b4ddddc1b9 --- /dev/null +++ b/lib/classes/patch/390alpha11a.php @@ -0,0 +1,78 @@ +release; + } + + /** + * {@inheritdoc} + */ + public function require_all_upgrades() + { + return false; + } + + /** + * {@inheritdoc} + */ + public function concern() + { + return $this->concern; + } + + /** + * {@inheritdoc} + */ + public function getDoctrineMigrations() + { + return []; + } + + /** + * {@inheritdoc} + */ + public function apply(base $appbox, Application $app) + { + $app['conf']->set(['main', 'task-manager', 'status'], 'started'); + $app['conf']->set(['main', 'websocket-server'], [ + 'host' => parse_url($app['conf']->get('servername'), PHP_URL_HOST), + 'port' => 9090, + 'ip' => '0.0.0.0', + ]); + $app['conf']->set(['main', 'task-manager', 'listener'], [ + 'protocol' => 'tcp', + 'host' => '127.0.0.1', + 'port' => 6660, + 'linger' => 500, + ]); + $app['conf']->set(['main', 'task-manager', 'publisher'], [ + 'protocol' => 'tcp', + 'host' => '127.0.0.1', + 'port' => 13598, + ]); + } +} diff --git a/lib/conf.d/configuration.yml b/lib/conf.d/configuration.yml index 481d633047..7d6c9f0739 100644 --- a/lib/conf.d/configuration.yml +++ b/lib/conf.d/configuration.yml @@ -29,10 +29,24 @@ main: type: Alchemy\Phrasea\SearchEngine\Phrasea\PhraseaEngine options: [] task-manager: + status: started logger: max-files: 10 enabled: true level: INFO + listener: + protocol: tcp + host: 127.0.0.1 + port: 6660 + linger: 500 + publisher: + protocol: tcp + host: 127.0.0.1 + port: 13598 + websocket-server: + host: local.phrasea + port: 9090 + ip: 0.0.0.0 session: type: 'file' options: [] From 8385493ae1e0a51cc4bb4d20d0a2dc957201bddd Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Tue, 11 Feb 2014 18:24:30 +0100 Subject: [PATCH 09/12] Bump to version 3.9.0-alpha.11 --- lib/Alchemy/Phrasea/Core/Version.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Alchemy/Phrasea/Core/Version.php b/lib/Alchemy/Phrasea/Core/Version.php index cf692fe6da..dc4a318741 100644 --- a/lib/Alchemy/Phrasea/Core/Version.php +++ b/lib/Alchemy/Phrasea/Core/Version.php @@ -13,7 +13,7 @@ namespace Alchemy\Phrasea\Core; class Version { - protected static $number = '3.9.0-alpha.10'; + protected static $number = '3.9.0-alpha.11'; protected static $name = 'Epanterias'; public static function getNumber() From a834b3a4df3ef8cfc0e18d7fc857041dc4b51657 Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Thu, 13 Feb 2014 16:06:30 +0100 Subject: [PATCH 10/12] Add useful infos in session storage --- .../Phrasea/Authentication/Authenticator.php | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/lib/Alchemy/Phrasea/Authentication/Authenticator.php b/lib/Alchemy/Phrasea/Authentication/Authenticator.php index 71ec4afbc4..2d8c636886 100644 --- a/lib/Alchemy/Phrasea/Authentication/Authenticator.php +++ b/lib/Alchemy/Phrasea/Authentication/Authenticator.php @@ -64,8 +64,6 @@ class Authenticator $this->session->remove('usr_id'); $this->session->remove('session_id'); - $this->session->set('usr_id', $user->get_id()); - $session = new Session(); $session->setBrowserName($this->browser->getBrowser()) ->setBrowserVersion($this->browser->getVersion()) @@ -76,7 +74,7 @@ class Authenticator $this->em->persist($session); $this->em->flush(); - $this->session->set('session_id', $session->getId()); + $this->populateSession($session); foreach ($this->app['acl']->get($user)->get_granted_sbas() as $databox) { \cache_databox::insertClient($this->app, $databox); @@ -86,6 +84,20 @@ class Authenticator return $session; } + private function populateSession(Session $session) + { + $user = $session->getUser($this->app); + + $rights = []; + if ($this->app['acl']->get($user)->has_right('taskmanager')) { + $rights[] = 'task-manager'; + } + + $this->session->set('usr_id', $user->get_id()); + $this->session->set('websockets_rights', $rights); + $this->session->set('session_id', $session->getId()); + } + public function refreshAccount(Session $session) { if (!$this->em->getRepository('Alchemy\Phrasea\Model\Entities\Session')->findOneBy(['id' => $session->getId()])) { @@ -99,8 +111,7 @@ class Authenticator } $this->session->clear(); - $this->session->set('usr_id', $session->getUsrId()); - $this->session->set('session_id', $session->getId()); + $this->populateSession($session); foreach ($this->app['acl']->get($user)->get_granted_sbas() as $databox) { \cache_databox::insertClient($this->app, $databox); From c0b152d73af940e84ec09caef8b55211f7240744 Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Thu, 13 Feb 2014 16:07:04 +0100 Subject: [PATCH 11/12] Add authorization strategy for WAMP topics --- composer.lock | 12 +- .../WebsocketServerServiceProvider.php | 43 ++-- .../Phrasea/Websocket/Consumer/Consumer.php | 43 ++++ .../Websocket/Consumer/ConsumerInterface.php | 31 +++ .../Websocket/Consumer/ConsumerManager.php | 32 +++ .../Websocket/PhraseanetWampServer.php | 74 ++----- .../TaskManagerBroadcasterSubscriber.php | 10 +- .../Phrasea/Websocket/Topics/Directive.php | 75 +++++++ .../Websocket/Topics/DirectivesManager.php | 61 ++++++ .../Topics/Plugin/PluginInterface.php | 24 +++ .../Plugin/TaskManagerSubscriberPlugin.php | 63 ++++++ .../Websocket/Topics/TopicsManager.php | 163 +++++++++++++++ .../WebsocketServerServiceProviderTest.php | 12 +- .../Consumer/ConsumerManagerTest.php | 58 ++++++ .../Websocket/Consumer/ConsumerTest.php | 32 +++ .../Websocket/PhraseanetWampServerTest.php | 36 +--- .../TaskManagerBroadcasterSubscriberTest.php | 12 +- .../Websocket/Topics/DirectiveTest.php | 50 +++++ .../Topics/DirectivesManagerTest.php | 46 +++++ .../Websocket/Topics/TopicsManagerTest.php | 187 ++++++++++++++++++ 20 files changed, 942 insertions(+), 122 deletions(-) create mode 100644 lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerManager.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Topics/Directive.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Topics/Plugin/TaskManagerSubscriberPlugin.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerManagerTest.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php diff --git a/composer.lock b/composer.lock index df484c5331..5d037ecfcc 100644 --- a/composer.lock +++ b/composer.lock @@ -294,16 +294,16 @@ }, { "name": "alchemy/task-manager", - "version": "1.0.1", + "version": "1.0.2", "source": { "type": "git", "url": "https://github.com/alchemy-fr/task-manager.git", - "reference": "58cc74d41e89cabf1f76c81fdc4e477569e709df" + "reference": "795b9d9781c01cfd82651f66cf3306f53661540c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/alchemy-fr/task-manager/zipball/58cc74d41e89cabf1f76c81fdc4e477569e709df", - "reference": "58cc74d41e89cabf1f76c81fdc4e477569e709df", + "url": "https://api.github.com/repos/alchemy-fr/task-manager/zipball/795b9d9781c01cfd82651f66cf3306f53661540c", + "reference": "795b9d9781c01cfd82651f66cf3306f53661540c", "shasum": "" }, "require": { @@ -323,7 +323,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "0.1-dev" + "dev-master": "1.0-dev" } }, "autoload": { @@ -353,7 +353,7 @@ "parallel", "process" ], - "time": "2013-12-03 18:53:49" + "time": "2014-02-12 11:21:06" }, { "name": "cboden/ratchet", diff --git a/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php b/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php index 5488072054..dbc88ddf64 100644 --- a/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php +++ b/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php @@ -11,12 +11,16 @@ namespace Alchemy\Phrasea\Core\CLIProvider; +use Alchemy\Phrasea\Websocket\Consumer\ConsumerManager; +use Alchemy\Phrasea\Websocket\Topics\Directive; +use Alchemy\Phrasea\Websocket\Topics\DirectivesManager; use Alchemy\Phrasea\Websocket\Subscriber\TaskManagerBroadcasterSubscriber; use Alchemy\Phrasea\Websocket\PhraseanetWampServer; +use Alchemy\Phrasea\Websocket\Topics\Plugin\TaskManagerSubscriberPlugin; +use Alchemy\Phrasea\Websocket\Topics\TopicsManager; use Ratchet\App; use Ratchet\Session\SessionProvider; use Ratchet\Wamp\WampServer; -use React\ZMQ\Context; use Silex\Application; use Silex\ServiceProviderInterface; use React\EventLoop\Factory as EventLoopFactory; @@ -42,19 +46,7 @@ class WebsocketServerServiceProvider implements ServiceProviderInterface }); $app['ws.server.subscriber'] = $app->share(function (Application $app) { - $options = $app['ws.publisher.options']; - $context = new Context($app['ws.event-loop']); - - $pull = $context->getSocket(\ZMQ::SOCKET_SUB); - $pull->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, ""); - $pull->connect(sprintf('%s://%s:%s', $options['protocol'], $options['host'], $options['port'])); - - $logger = $app['ws.server.logger']; - $pull->on('error', function ($e) use ($logger) { - $logger->error('TaskManager Subscriber received an error.', ['exception' => $e]); - }); - - return $pull; + return new TaskManagerSubscriberPlugin($app['ws.publisher.options'], $app['ws.event-loop'], $app['ws.server.logger']); }); $app['ws.server.application'] = $app->share(function (Application $app) { @@ -64,13 +56,34 @@ class WebsocketServerServiceProvider implements ServiceProviderInterface }); $app['ws.server.phraseanet-server'] = $app->share(function (Application $app) { - return new PhraseanetWampServer($app['ws.server.subscriber'], $app['ws.server.logger']); + return new PhraseanetWampServer($app['ws.server.topics-manager'], $app['ws.server.logger']); }); $app['ws.server.logger'] = $app->share(function (Application $app) { return $app['task-manager.logger']; }); + $app['ws.server.topics-manager.directives.conf'] = $app->share(function (Application $app) { + return [ + new Directive(TopicsManager::TOPIC_TASK_MANAGER, true, ['task-manager']), + ]; + }); + + $app['ws.server.topics-manager.directives'] = $app->share(function (Application $app) { + return new DirectivesManager($app['ws.server.topics-manager.directives.conf']); + }); + + $app['ws.server.consumer-manager'] = $app->share(function (Application $app) { + return new ConsumerManager(); + }); + + $app['ws.server.topics-manager'] = $app->share(function (Application $app) { + $manager = new TopicsManager($app['ws.server.topics-manager.directives'], $app['ws.server.consumer-manager']); + $manager->attach($app['ws.server.subscriber']); + + return $manager; + }); + $app['ws.server.options'] = $app->share(function (Application $app) { return array_replace([ 'host' => 'localhost', diff --git a/lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php b/lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php new file mode 100644 index 0000000000..685d311e20 --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php @@ -0,0 +1,43 @@ +usrId = $usrId; + $this->rights = $rights; + } + + /** + * {@inheritdoc} + */ + public function isAuthenticated() + { + return $this->usrId !== null; + } + + /** + * {@inheritdoc} + */ + public function hasRights($rights) + { + return count(array_intersect($this->rights, (array) $rights)) === count($rights); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php b/lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php new file mode 100644 index 0000000000..998f62547e --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php @@ -0,0 +1,31 @@ +has('usr_id') ? $session->get('usr_id') : null; + $rights = $session->has('websockets_rights') ? $session->get('websockets_rights') : []; + + return new Consumer($usrId, $rights);; + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php b/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php index c65ce22d0b..1b7fe32b76 100644 --- a/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php +++ b/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php @@ -11,45 +11,20 @@ namespace Alchemy\Phrasea\Websocket; +use Alchemy\Phrasea\Websocket\Topics\TopicsManager; use Psr\Log\LoggerInterface; -use React\ZMQ\SocketWrapper; use Ratchet\ConnectionInterface as Conn; use Ratchet\Wamp\WampServerInterface; class PhraseanetWampServer implements WampServerInterface { - const TOPIC_TASK_MANAGER = 'http://phraseanet.com/topics/admin/task-manager'; - - private $pull; private $logger; - private $topics = []; + private $manager; - public function __construct(SocketWrapper $pull, LoggerInterface $logger) + public function __construct(TopicsManager $manager, LoggerInterface $logger) { - $this->pull = $pull; $this->logger = $logger; - - $pull->on('message', function ($msg) { - $data = @json_decode($msg, true); - - if (json_last_error() !== JSON_ERROR_NONE) { - $this->logger->error(sprintf('[WS] Received invalid message %s : invalid json', $msg)); - - return; - } - - if (!isset($data['topic'])) { - $this->logger->error(sprintf('[WS] Received invalid message %s : no topic', $msg)); - - return; - } - - $this->logger->debug(sprintf('[WS] Received message %s', $msg)); - - if (isset($this->topics[$data['topic']])) { - $this->topics[$data['topic']]->broadcast(json_encode($msg)); - } - }); + $this->manager = $manager; } public function onPublish(Conn $conn, $topic, $event, array $exclude, array $eligible) @@ -66,54 +41,33 @@ class PhraseanetWampServer implements WampServerInterface public function onSubscribe(Conn $conn, $topic) { - $this->logger->debug(sprintf('Subscription received on topic %s', $topic->getId()), array('topic' => $topic)); - $this->topics[$topic->getId()] = $topic; + if ($this->manager->subscribe($conn, $topic)) { + $this->logger->debug(sprintf('Subscription received on topic %s', $topic->getId()), array('topic' => $topic)); + } else { + $this->logger->error(sprintf('Subscription received on topic %s, user is not allowed', $topic->getId()), array('topic' => $topic)); + } } public function onUnSubscribe(Conn $conn, $topic) { $this->logger->debug(sprintf('Unsubscription received on topic %s', $topic->getId()), array('topic' => $topic)); - $this->cleanupReferences($conn, $topic->getId()); + $this->manager->unsubscribe($conn, $topic); } public function onOpen(Conn $conn) { - if (!$conn->Session->has('usr_id')) { - $this->logger->error('[WS] Connection request aborted, no usr_id in session.'); - $conn->close(); - } - $this->logger->error('[WS] Connection request accepted'); + $this->logger->debug('[WS] Connection request accepted'); + $this->manager->openConnection($conn); } public function onClose(Conn $conn) { - $this->cleanupReferences($conn); - $this->logger->error('[WS] Connection closed'); + $this->logger->debug('[WS] Connection closed'); + $this->manager->closeConnection($conn); } public function onError(Conn $conn, \Exception $e) { $this->logger->error('[WS] Connection error', ['exception' => $e]); } - - private function cleanupReferences(Conn $conn, $topicId = null) - { - $storage = $this->topics; - $ret = array(); - - foreach ($storage as $id => $topic) { - if (null !== $topicId && $id !== $topicId) { - continue; - } - if ($topic->has($conn)) { - $topic->remove($conn); - } - if (count($topic) > 0) { - $ret[] = $topic; - } - $this->logger->debug(sprintf('%d subscribers remaining on topic %s', count($topic), $topic->getId()), array('topic' => $topic)); - } - - $this->topics = $ret; - } } diff --git a/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php b/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php index 705a2f32da..f8939ef60a 100644 --- a/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php +++ b/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php @@ -11,7 +11,7 @@ namespace Alchemy\Phrasea\Websocket\Subscriber; -use Alchemy\Phrasea\Websocket\PhraseanetWampServer; +use Alchemy\Phrasea\Websocket\Topics\TopicsManager; use Alchemy\TaskManager\Event\StateFormater; use Alchemy\TaskManager\Event\TaskManagerEvent; use Alchemy\TaskManager\Event\TaskManagerRequestEvent; @@ -37,7 +37,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface public function onManagerStart(TaskManagerEvent $event) { $this->broadcaster->send(json_encode([ - 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, 'event' => TaskManagerEvents::MANAGER_START, ])); } @@ -45,7 +45,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface public function onManagerStop(TaskManagerEvent $event) { $this->broadcaster->send(json_encode([ - 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, 'event' => TaskManagerEvents::MANAGER_STOP, ])); } @@ -53,7 +53,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface public function onManagerRequest(TaskManagerRequestEvent $event) { $this->broadcaster->send(json_encode([ - 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, 'event' => TaskManagerEvents::MANAGER_REQUEST, 'request' => $event->getRequest(), 'response' => $event->getResponse(), @@ -63,7 +63,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface public function onManagerTick(TaskManagerEvent $event) { $this->broadcaster->send(json_encode([ - 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, 'event' => TaskManagerEvents::MANAGER_TICK, 'message' => $this->formater->toArray( $event->getManager()->getProcessManager()->getManagedProcesses() diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/Directive.php b/lib/Alchemy/Phrasea/Websocket/Topics/Directive.php new file mode 100644 index 0000000000..8d3e4a2877 --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/Directive.php @@ -0,0 +1,75 @@ +topic = $topic; + $this->requireAuthentication = (Boolean) $requireAuthentication; + $this->requiredRights = $requiredRights; + } + + /** + * @return string + */ + public function getTopic() + { + return $this->topic; + } + + /** + * Returns true if the topic requires an authenticated consumer + * + * @return Boolean + */ + public function requireAuthentication() + { + return $this->requireAuthentication; + } + + /** + * Returns an array of required rights for the authenticated consumer + * + * @return array + */ + public function getRequiredRights() + { + return $this->requiredRights; + } + + /** + * Returns true if the consumer satisfies the directive + * + * @param ConsumerInterface $consumer + * + * @return Boolean + */ + public function isStatisfiedBy(ConsumerInterface $consumer) + { + if ($this->requireAuthentication() && !$consumer->isAuthenticated()) { + return false; + } + + return $consumer->hasRights($this->getRequiredRights()); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php b/lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php new file mode 100644 index 0000000000..5ab6fdca9c --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php @@ -0,0 +1,61 @@ +directives = $directives; + } + + /** + * Returns true if the consumer has access to the given topic + * + * @param ConsumerInterface $consumer + * @param Topic $topic + * + * @return Boolean + */ + public function hasAccess(ConsumerInterface $consumer, Topic $topic) + { + foreach ($this->getDirectives($topic) as $directive) { + if (!$directive->isStatisfiedBy($consumer)) { + return false; + } + } + + return true; + } + + /** + * @param Topic $topic + * + * @return Directive[] + */ + private function getDirectives(Topic $topic) + { + return array_filter($this->directives, function (Directive $directive) use ($topic) { + return $directive->getTopic() === $topic->getId(); + }); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php b/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php new file mode 100644 index 0000000000..36ef65e4d7 --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php @@ -0,0 +1,24 @@ +logger = $logger; + $context = new Context($loop); + + $this->pull = $context->getSocket(\ZMQ::SOCKET_SUB); + $this->pull->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, ""); + $this->pull->connect(sprintf('%s://%s:%s', $options['protocol'], $options['host'], $options['port'])); + + $this->pull->on('error', function ($e) use ($logger) { + $logger->error('TaskManager Subscriber received an error.', ['exception' => $e]); + }); + } + + /** + * {@inheritdoc} + */ + public function attach(TopicsManager $manager) + { + $this->pull->on('message', function ($msg) use ($manager) { + $data = @json_decode($msg, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + $this->logger->error(sprintf('[WS] Received invalid message %s : invalid json', $msg)); + + return; + } + + if (!isset($data['topic'])) { + $this->logger->error(sprintf('[WS] Received invalid message %s : no topic', $msg)); + + return; + } + + $this->logger->debug(sprintf('[WS] Received message %s', $msg)); + + $manager->broadcast($data['topic'], json_encode($msg)); + }); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php b/lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php new file mode 100644 index 0000000000..74a95171de --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php @@ -0,0 +1,163 @@ +directives = $directives; + $this->consumerManager = $consumerManagaer; + } + + /** + * Attaches a plugin to the TopicsManager + * + * @param PluginInterface $plugin + * + * @return TopicsManager + */ + public function attach(PluginInterface $plugin) + { + $plugin->attach($this); + + return $this; + } + + /** + * Checks if the consumer related to the connection has access to the topic, + * removes the connection from topic if the consumer is not granted. + * + * @param Conn $conn + * @param Topic $topic + * + * @return Boolean Return true if the consumer is granted, false otherwise + */ + public function subscribe(Conn $conn, Topic $topic) + { + if (!$this->directives->hasAccess($conn->User, $topic)) { + $topic->remove($conn); + + return false; + } + + $this->topics[$topic->getId()] = $topic; + + return true; + } + + /** + * Triggered on unsubscription. + * + * Removes internal references to the topic if no more consumers are listening. + * + * @param Conn $conn + * @param Topic $topic + * + * @return TopicsManager + */ + public function unsubscribe(Conn $conn, Topic $topic) + { + $this->cleanupReferences($conn, $topic); + + return $this; + } + + /** + * Triggered on connection, populates the connection with a consumer. + * + * @param Conn $conn + * + * @return TopicsManager + */ + public function openConnection(Conn $conn) + { + try { + $conn->User = $this->consumerManager->create($conn->Session); + } catch (\RuntimeException $e) { + $conn->close(); + } + + return $this; + } + + /** + * Triggered on deconnexion. + * + * Removes internal references to topics if no more consumers are listening. + * + * @param Conn $conn + * + * @return TopicsManager + */ + public function closeConnection(Conn $conn) + { + $this->cleanupReferences($conn); + + return $this; + } + + /** + * Brodcasts a message to a topic, if it exists + * + * @param $topicId string + * @param $message string + * + * @return TopicsManager + */ + public function broadcast($topicId, $message) + { + if (isset($this->topics[$topicId])) { + $this->topics[$topicId]->broadcast($message); + } + + return $this; + } + + /** + * Removes internal references to topics if they do not contains any reference to an active connection. + * + * @param Conn $conn + * @param null|Topic $topic Restrict to this topic, if provided + */ + private function cleanupReferences(Conn $conn, Topic $topic = null) + { + $storage = $this->topics; + $updated = array(); + + foreach ($storage as $id => $storedTopic) { + if (null !== $topic && $id !== $topic->getId()) { + continue; + } + if ($storedTopic->has($conn)) { + $storedTopic->remove($conn); + } + if (count($storedTopic) > 0) { + $updated[] = $storedTopic; + } + } + + $this->topics = $updated; + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php b/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php index 033269df6d..faa437783f 100644 --- a/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php +++ b/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php @@ -20,7 +20,7 @@ class WebsocketServerServiceProviderTest extends ServiceProviderTestCase [ 'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider', 'ws.server.subscriber', - 'React\ZMQ\SocketWrapper', + 'Alchemy\Phrasea\Websocket\Topics\Plugin\TaskManagerSubscriberPlugin', ], [ 'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider', @@ -42,6 +42,16 @@ class WebsocketServerServiceProviderTest extends ServiceProviderTestCase 'ws.server.logger', 'Psr\Log\LoggerInterface', ], + [ + 'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider', + 'ws.server.topics-manager.directives', + 'Alchemy\Phrasea\Websocket\Topics\DirectivesManager', + ], + [ + 'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider', + 'ws.server.consumer-manager', + 'Alchemy\Phrasea\Websocket\Consumer\ConsumerManager', + ], ]; } } diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerManagerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerManagerTest.php new file mode 100644 index 0000000000..15bc64e980 --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerManagerTest.php @@ -0,0 +1,58 @@ +create($this->createSessionMock($usrId, $rights)); + $this->assertSame($authenticated, $consumer->isAuthenticated()); + $this->assertSame($hasRights, $consumer->hasRights($checkedRights)); + } + + public function provideConsumerManagerData() + { + return [ + [25, ['task-manager'], true, [], true], + [25, ['task-manager'], true, ['task-manager'], true], + [null, ['task-manager'], false, ['task-manager', 'neutron'], false], + [null, ['neutron', 'task-manager'], false, ['task-manager', 'neutron'], true], + [42, ['neutron', 'task-manager', 'romain'], true, ['task-manager', 'neutron'], true], + ]; + } + + private function createSessionMock($usrId, $rights) + { + $session = $this->getMock('Symfony\Component\HttpFoundation\Session\SessionInterface'); + $session->expects($this->any()) + ->method('has') + ->will($this->returnCallback(function ($prop) use ($usrId, $rights) { + switch ($prop) { + case 'usr_id': + return $usrId !== null; + case 'websockets_rights': + return $rights !== null; + } + })); + + $session->expects($this->any()) + ->method('get') + ->will($this->returnCallback(function ($prop) use ($usrId, $rights) { + switch ($prop) { + case 'usr_id': + return $usrId; + case 'websockets_rights': + return $rights; + } + })); + + return $session; + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php new file mode 100644 index 0000000000..49d0a8df7e --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php @@ -0,0 +1,32 @@ +assertTrue($consumer->isAuthenticated()); + $consumer = new Consumer(null, []); + $this->assertFalse($consumer->isAuthenticated()); + } + + public function testHasRights() + { + $consumer = new Consumer(42, ['neutron']); + $this->assertTrue($consumer->hasRights('neutron')); + $consumer = new Consumer(42, ['neutron']); + $this->assertTrue($consumer->hasRights(['neutron'])); + $consumer = new Consumer(42, ['romainneutron']); + $this->assertFalse($consumer->hasRights('neutron')); + $consumer = new Consumer(42, ['romainneutron']); + $this->assertFalse($consumer->hasRights(['neutron'])); + $consumer = new Consumer(42, ['neutron']); + $this->assertFalse($consumer->hasRights(['neutron', 'romain'])); + $consumer = new Consumer(42, ['romain', 'neutron', 'bouteille']); + $this->assertTrue($consumer->hasRights(['neutron', 'romain'])); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php index 2200bfbc59..b12278dbfb 100644 --- a/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php +++ b/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php @@ -6,43 +6,21 @@ use Alchemy\Phrasea\Websocket\PhraseanetWampServer; class PhraseanetWampServerTest extends \PhraseanetTestCase { - public function testOpenConnectionNotConnected() - { - $conn = $this->getMock('Ratchet\ConnectionInterface'); - $conn->Session = $this->getMockBuilder('Symfony\Component\HttpFoundation\Session\Session') - ->disableOriginalConstructor() - ->getMock(); - $conn->Session->expects($this->once()) - ->method('has') - ->with('usr_id') - ->will($this->returnValue(false)); - $conn->expects($this->once()) - ->method('close'); - - $server = new PhraseanetWampServer($this->createSocketWrapperMock(), $this->createLoggerMock()); - $server->onOpen($conn); - } - public function testOpenConnectionConnected() { + $topicsManager = $this->createTopicsManagerMock(); $conn = $this->getMock('Ratchet\ConnectionInterface'); - $conn->Session = $this->getMockBuilder('Symfony\Component\HttpFoundation\Session\Session') - ->disableOriginalConstructor() - ->getMock(); - $conn->Session->expects($this->once()) - ->method('has') - ->with('usr_id') - ->will($this->returnValue(true)); - $conn->expects($this->never()) - ->method('close'); + $topicsManager->expects($this->once()) + ->method('openConnection') + ->with($conn); - $server = new PhraseanetWampServer($this->createSocketWrapperMock(), $this->createLoggerMock()); + $server = new PhraseanetWampServer($topicsManager, $this->createLoggerMock()); $server->onOpen($conn); } - private function createSocketWrapperMock() + private function createTopicsManagerMock() { - return $this->getMockBuilder('React\ZMQ\SocketWrapper') + return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Topics\TopicsManager') ->disableOriginalConstructor() ->getMock(); } diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php index 6558ea4fcb..f9e0b8089c 100644 --- a/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php @@ -1,9 +1,9 @@ onManagerStart($this->createTaskManagerEvent()); - $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_START); + $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_START); } public function testOnManagerStop() @@ -33,7 +33,7 @@ class TaskManagerBroadcasterSubscriberTest extends \PhraseanetTestCase $subscriber = new TaskManagerBroadcasterSubscriber($socket); $subscriber->onManagerStop($this->createTaskManagerEvent()); - $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_STOP); + $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_STOP); } public function testOnManagerRequest() @@ -46,7 +46,7 @@ class TaskManagerBroadcasterSubscriberTest extends \PhraseanetTestCase $subscriber = new TaskManagerBroadcasterSubscriber($socket); $subscriber->onManagerRequest(new TaskManagerRequestEvent($this->createTaskManagerMock(), 'PING', 'PONG')); - $data = $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_REQUEST); + $data = $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_REQUEST); $this->assertEquals('PING', $data['request']); $this->assertEquals('PONG', $data['response']); @@ -62,7 +62,7 @@ class TaskManagerBroadcasterSubscriberTest extends \PhraseanetTestCase $subscriber = new TaskManagerBroadcasterSubscriber($socket); $subscriber->onManagerTick($this->createTaskManagerEvent()); - $data = $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_TICK); + $data = $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_TICK); $this->assertArrayHasKey('message', $data); $this->assertInternalType('array', $data['message']); diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php new file mode 100644 index 0000000000..5e7f4d4862 --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php @@ -0,0 +1,50 @@ +assertSame('http://topic', $directive->getTopic()); + $this->assertTrue($directive->requireAuthentication()); + $this->assertSame(['neutron'], $directive->getRequiredRights()); + } + + /** + * @dataProvider provideStatisfiedByCombinaisons + */ + public function testIsSatisfiedBy($authenticationRequired, $requiredRights, $authenticated, $hasRights, $satisfied) + { + $consumer = $this->createConsumerMock($authenticated, $hasRights, $requiredRights); + $directive = new Directive('http://topic', $authenticationRequired, $requiredRights); + $this->assertEquals($satisfied, $directive->isStatisfiedBy($consumer)); + } + + public function provideStatisfiedByCombinaisons() + { + return [ + [true, ['neutron'], true, true, true], + [true, [], false, true, false], + [false, ['neutron'], true, false, false], + [false, ['neutron'], false, false, false], + ]; + } + + private function createConsumerMock($authenticated, $hasRights, $requiredRights) + { + $consumer = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + $consumer->expects($this->any()) + ->method('isAuthenticated') + ->will($this->returnValue($authenticated)); + $consumer->expects($this->any()) + ->method('hasRights') + ->with($requiredRights) + ->will($this->returnValue($hasRights)); + + return $consumer; + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php new file mode 100644 index 0000000000..315f6aa054 --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php @@ -0,0 +1,46 @@ +assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4'))); + + $consumer = new Consumer(null, []); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4'))); + + $consumer = new Consumer(42, ['neutron']); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4'))); + + $consumer = new Consumer(42, ['neutron', 'bouteille', 'romain']); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic4'))); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php new file mode 100644 index 0000000000..cd8ef1215d --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php @@ -0,0 +1,187 @@ +createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $plugin = $this->getMock('Alchemy\Phrasea\Websocket\Topics\Plugin\PluginInterface'); + $plugin->expects($this->once()) + ->method('attach') + ->with($manager); + + $this->assertSame($manager, $manager->attach($plugin)); + } + + public function testSubscribeWithAccess() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = $this->getMockBuilder('Ratchet\Wamp\Topic') + ->disableOriginalConstructor() + ->getMock(); + + $topic->expects($this->never()) + ->method('remove'); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager->subscribe($conn, $topic); + } + + public function testSubscribeWithoutAccess() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = $this->getMockBuilder('Ratchet\Wamp\Topic') + ->disableOriginalConstructor() + ->getMock(); + + $topic->expects($this->once()) + ->method('remove') + ->with($conn); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(false)); + + $manager->subscribe($conn, $topic); + } + + public function testUnsubscribe() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = new Topic('http://topic'); + $topic->add($conn); + + // should be subscribed to be unsubscribed + $manager->subscribe($conn, $topic); + $manager->unsubscribe($conn, $topic); + + $this->assertFalse($topic->has($conn)); + } + + public function testOpenConnection() + { + $consumer = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + $session = $this->getMock('Symfony\Component\HttpFoundation\Session\SessionInterface'); + + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + $consumerManager->expects($this->once()) + ->method('create') + ->with($session) + ->will($this->returnValue($consumer)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->Session = $session; + + $manager->openConnection($conn); + + $this->assertSame($consumer, $conn->User); + } + + public function testCloseConnection() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = new Topic('http://topic'); + $topic->add($conn); + + // should be subscribed to be unsubscribed + $manager->subscribe($conn, $topic); + $manager->closeConnection($conn); + + $this->assertFalse($topic->has($conn)); + } + + public function testBroadcast() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = $this->getMockBuilder('Ratchet\Wamp\Topic') + ->disableOriginalConstructor() + ->getMock(); + $topic->expects($this->any()) + ->method('getId') + ->will($this->returnValue('http://topic')); + $topic->expects($this->once()) + ->method('broadcast') + ->with('hello world !'); + + // should be subscribed to be unsubscribed + $manager->subscribe($conn, $topic); + $manager->broadcast('http://topic', 'hello world !'); + $manager->broadcast('http://topic2', 'nothing'); + } + + private function createDirectivesManagerMock() + { + return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Topics\DirectivesManager') + ->disableOriginalConstructor() + ->getMock(); + } + + private function createConsumerManagerMock() + { + return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Consumer\ConsumerManager') + ->disableOriginalConstructor() + ->getMock(); + } +} From ffab51235a03246ef26a2cc010c0555f0dda0f6c Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Fri, 14 Feb 2014 16:19:32 +0100 Subject: [PATCH 12/12] Address PR comments --- config/configuration.sample.yml | 8 +++---- .../Phrasea/Command/WebsocketServer.php | 3 +-- .../WebsocketServerServiceProvider.php | 2 +- .../Websocket/PhraseanetWampServer.php | 21 +++++++++++++++++++ .../Topics/Plugin/PluginInterface.php | 2 +- lib/classes/patch/390alpha11a.php | 2 +- lib/conf.d/configuration.yml | 8 +++---- .../Tests/Phrasea/Controller/SetupTest.php | 2 +- 8 files changed, 34 insertions(+), 14 deletions(-) diff --git a/config/configuration.sample.yml b/config/configuration.sample.yml index 2676a56c24..56aee3c5be 100644 --- a/config/configuration.sample.yml +++ b/config/configuration.sample.yml @@ -39,14 +39,14 @@ main: host: 127.0.0.1 port: 6660 linger: 500 - publisher: - protocol: tcp - host: 127.0.0.1 - port: 13598 websocket-server: host: local.phrasea port: 9090 ip: 0.0.0.0 + subscriber: + protocol: tcp + host: 127.0.0.1 + port: 13598 session: type: 'file' options: [] diff --git a/lib/Alchemy/Phrasea/Command/WebsocketServer.php b/lib/Alchemy/Phrasea/Command/WebsocketServer.php index 2a82239ce1..af506ada8e 100644 --- a/lib/Alchemy/Phrasea/Command/WebsocketServer.php +++ b/lib/Alchemy/Phrasea/Command/WebsocketServer.php @@ -21,8 +21,7 @@ class WebsocketServer extends Command { parent::__construct($name); - $this - ->setDescription("Runs the websocket server"); + $this->setDescription("Runs the websocket server"); } public function doExecute(InputInterface $input, OutputInterface $output) diff --git a/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php b/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php index dbc88ddf64..94e5dcc75f 100644 --- a/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php +++ b/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php @@ -34,7 +34,7 @@ class WebsocketServerServiceProvider implements ServiceProviderInterface 'protocol' => 'tcp', 'host' => '127.0.0.1', 'port' => 13598, - ], $app['conf']->get(['main', 'task-manager', 'publisher'], [])); + ], $app['conf']->get(['main', 'websocket-server', 'subscriber'], [])); }); $app['ws.task-manager.broadcaster'] = $app->share(function (Application $app) { diff --git a/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php b/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php index 1b7fe32b76..e0e6538e78 100644 --- a/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php +++ b/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php @@ -27,18 +27,27 @@ class PhraseanetWampServer implements WampServerInterface $this->manager = $manager; } + /** + * {@inheritdoc} + */ public function onPublish(Conn $conn, $topic, $event, array $exclude, array $eligible) { $this->logger->error(sprintf('Publishing on topic %s', $topic->getId()), array('event' => $event, 'topic' => $topic)); $topic->broadcast($event); } + /** + * {@inheritdoc} + */ public function onCall(Conn $conn, $id, $topic, array $params) { $this->logger->error(sprintf('Received RPC call on topic %s', $topic->getId()), array('topic' => $topic)); $conn->callError($id, $topic, 'RPC not supported on this demo'); } + /** + * {@inheritdoc} + */ public function onSubscribe(Conn $conn, $topic) { if ($this->manager->subscribe($conn, $topic)) { @@ -48,24 +57,36 @@ class PhraseanetWampServer implements WampServerInterface } } + /** + * {@inheritdoc} + */ public function onUnSubscribe(Conn $conn, $topic) { $this->logger->debug(sprintf('Unsubscription received on topic %s', $topic->getId()), array('topic' => $topic)); $this->manager->unsubscribe($conn, $topic); } + /** + * {@inheritdoc} + */ public function onOpen(Conn $conn) { $this->logger->debug('[WS] Connection request accepted'); $this->manager->openConnection($conn); } + /** + * {@inheritdoc} + */ public function onClose(Conn $conn) { $this->logger->debug('[WS] Connection closed'); $this->manager->closeConnection($conn); } + /** + * {@inheritdoc} + */ public function onError(Conn $conn, \Exception $e) { $this->logger->error('[WS] Connection error', ['exception' => $e]); diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php b/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php index 36ef65e4d7..2edd0eb74c 100644 --- a/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php +++ b/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php @@ -16,7 +16,7 @@ use Alchemy\Phrasea\Websocket\Topics\TopicsManager; interface PluginInterface { /** - * Attaches a Plugn to the TopicsManager + * Attaches a Plugin to the TopicsManager * * @param TopicsManager $manager */ diff --git a/lib/classes/patch/390alpha11a.php b/lib/classes/patch/390alpha11a.php index b4ddddc1b9..643d7a79c3 100644 --- a/lib/classes/patch/390alpha11a.php +++ b/lib/classes/patch/390alpha11a.php @@ -69,7 +69,7 @@ class patch_390alpha11a implements patchInterface 'port' => 6660, 'linger' => 500, ]); - $app['conf']->set(['main', 'task-manager', 'publisher'], [ + $app['conf']->set(['main', 'websocket-server', 'subscriber'], [ 'protocol' => 'tcp', 'host' => '127.0.0.1', 'port' => 13598, diff --git a/lib/conf.d/configuration.yml b/lib/conf.d/configuration.yml index 7d6c9f0739..56273b61a5 100644 --- a/lib/conf.d/configuration.yml +++ b/lib/conf.d/configuration.yml @@ -39,14 +39,14 @@ main: host: 127.0.0.1 port: 6660 linger: 500 - publisher: - protocol: tcp - host: 127.0.0.1 - port: 13598 websocket-server: host: local.phrasea port: 9090 ip: 0.0.0.0 + subscriber: + protocol: tcp + host: 127.0.0.1 + port: 13598 session: type: 'file' options: [] diff --git a/tests/Alchemy/Tests/Phrasea/Controller/SetupTest.php b/tests/Alchemy/Tests/Phrasea/Controller/SetupTest.php index 67e928b608..1d2181b2c3 100644 --- a/tests/Alchemy/Tests/Phrasea/Controller/SetupTest.php +++ b/tests/Alchemy/Tests/Phrasea/Controller/SetupTest.php @@ -105,7 +105,7 @@ class SetupTest extends \PhraseanetWebTestCase ->disableOriginalConstructor() ->getMock(); - $user->expects($this->exactly(2)) + $user->expects($this->exactly(1)) ->method('get_id') ->will($this->returnValue(self::$DI['user']->get_id()));