From c0b152d73af940e84ec09caef8b55211f7240744 Mon Sep 17 00:00:00 2001 From: Romain Neutron Date: Thu, 13 Feb 2014 16:07:04 +0100 Subject: [PATCH] Add authorization strategy for WAMP topics --- composer.lock | 12 +- .../WebsocketServerServiceProvider.php | 43 ++-- .../Phrasea/Websocket/Consumer/Consumer.php | 43 ++++ .../Websocket/Consumer/ConsumerInterface.php | 31 +++ .../Websocket/Consumer/ConsumerManager.php | 32 +++ .../Websocket/PhraseanetWampServer.php | 74 ++----- .../TaskManagerBroadcasterSubscriber.php | 10 +- .../Phrasea/Websocket/Topics/Directive.php | 75 +++++++ .../Websocket/Topics/DirectivesManager.php | 61 ++++++ .../Topics/Plugin/PluginInterface.php | 24 +++ .../Plugin/TaskManagerSubscriberPlugin.php | 63 ++++++ .../Websocket/Topics/TopicsManager.php | 163 +++++++++++++++ .../WebsocketServerServiceProviderTest.php | 12 +- .../Consumer/ConsumerManagerTest.php | 58 ++++++ .../Websocket/Consumer/ConsumerTest.php | 32 +++ .../Websocket/PhraseanetWampServerTest.php | 36 +--- .../TaskManagerBroadcasterSubscriberTest.php | 12 +- .../Websocket/Topics/DirectiveTest.php | 50 +++++ .../Topics/DirectivesManagerTest.php | 46 +++++ .../Websocket/Topics/TopicsManagerTest.php | 187 ++++++++++++++++++ 20 files changed, 942 insertions(+), 122 deletions(-) create mode 100644 lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerManager.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Topics/Directive.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Topics/Plugin/TaskManagerSubscriberPlugin.php create mode 100644 lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerManagerTest.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php create mode 100644 tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php diff --git a/composer.lock b/composer.lock index df484c5331..5d037ecfcc 100644 --- a/composer.lock +++ b/composer.lock @@ -294,16 +294,16 @@ }, { "name": "alchemy/task-manager", - "version": "1.0.1", + "version": "1.0.2", "source": { "type": "git", "url": "https://github.com/alchemy-fr/task-manager.git", - "reference": "58cc74d41e89cabf1f76c81fdc4e477569e709df" + "reference": "795b9d9781c01cfd82651f66cf3306f53661540c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/alchemy-fr/task-manager/zipball/58cc74d41e89cabf1f76c81fdc4e477569e709df", - "reference": "58cc74d41e89cabf1f76c81fdc4e477569e709df", + "url": "https://api.github.com/repos/alchemy-fr/task-manager/zipball/795b9d9781c01cfd82651f66cf3306f53661540c", + "reference": "795b9d9781c01cfd82651f66cf3306f53661540c", "shasum": "" }, "require": { @@ -323,7 +323,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "0.1-dev" + "dev-master": "1.0-dev" } }, "autoload": { @@ -353,7 +353,7 @@ "parallel", "process" ], - "time": "2013-12-03 18:53:49" + "time": "2014-02-12 11:21:06" }, { "name": "cboden/ratchet", diff --git a/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php b/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php index 5488072054..dbc88ddf64 100644 --- a/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php +++ b/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php @@ -11,12 +11,16 @@ namespace Alchemy\Phrasea\Core\CLIProvider; +use Alchemy\Phrasea\Websocket\Consumer\ConsumerManager; +use Alchemy\Phrasea\Websocket\Topics\Directive; +use Alchemy\Phrasea\Websocket\Topics\DirectivesManager; use Alchemy\Phrasea\Websocket\Subscriber\TaskManagerBroadcasterSubscriber; use Alchemy\Phrasea\Websocket\PhraseanetWampServer; +use Alchemy\Phrasea\Websocket\Topics\Plugin\TaskManagerSubscriberPlugin; +use Alchemy\Phrasea\Websocket\Topics\TopicsManager; use Ratchet\App; use Ratchet\Session\SessionProvider; use Ratchet\Wamp\WampServer; -use React\ZMQ\Context; use Silex\Application; use Silex\ServiceProviderInterface; use React\EventLoop\Factory as EventLoopFactory; @@ -42,19 +46,7 @@ class WebsocketServerServiceProvider implements ServiceProviderInterface }); $app['ws.server.subscriber'] = $app->share(function (Application $app) { - $options = $app['ws.publisher.options']; - $context = new Context($app['ws.event-loop']); - - $pull = $context->getSocket(\ZMQ::SOCKET_SUB); - $pull->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, ""); - $pull->connect(sprintf('%s://%s:%s', $options['protocol'], $options['host'], $options['port'])); - - $logger = $app['ws.server.logger']; - $pull->on('error', function ($e) use ($logger) { - $logger->error('TaskManager Subscriber received an error.', ['exception' => $e]); - }); - - return $pull; + return new TaskManagerSubscriberPlugin($app['ws.publisher.options'], $app['ws.event-loop'], $app['ws.server.logger']); }); $app['ws.server.application'] = $app->share(function (Application $app) { @@ -64,13 +56,34 @@ class WebsocketServerServiceProvider implements ServiceProviderInterface }); $app['ws.server.phraseanet-server'] = $app->share(function (Application $app) { - return new PhraseanetWampServer($app['ws.server.subscriber'], $app['ws.server.logger']); + return new PhraseanetWampServer($app['ws.server.topics-manager'], $app['ws.server.logger']); }); $app['ws.server.logger'] = $app->share(function (Application $app) { return $app['task-manager.logger']; }); + $app['ws.server.topics-manager.directives.conf'] = $app->share(function (Application $app) { + return [ + new Directive(TopicsManager::TOPIC_TASK_MANAGER, true, ['task-manager']), + ]; + }); + + $app['ws.server.topics-manager.directives'] = $app->share(function (Application $app) { + return new DirectivesManager($app['ws.server.topics-manager.directives.conf']); + }); + + $app['ws.server.consumer-manager'] = $app->share(function (Application $app) { + return new ConsumerManager(); + }); + + $app['ws.server.topics-manager'] = $app->share(function (Application $app) { + $manager = new TopicsManager($app['ws.server.topics-manager.directives'], $app['ws.server.consumer-manager']); + $manager->attach($app['ws.server.subscriber']); + + return $manager; + }); + $app['ws.server.options'] = $app->share(function (Application $app) { return array_replace([ 'host' => 'localhost', diff --git a/lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php b/lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php new file mode 100644 index 0000000000..685d311e20 --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php @@ -0,0 +1,43 @@ +usrId = $usrId; + $this->rights = $rights; + } + + /** + * {@inheritdoc} + */ + public function isAuthenticated() + { + return $this->usrId !== null; + } + + /** + * {@inheritdoc} + */ + public function hasRights($rights) + { + return count(array_intersect($this->rights, (array) $rights)) === count($rights); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php b/lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php new file mode 100644 index 0000000000..998f62547e --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php @@ -0,0 +1,31 @@ +has('usr_id') ? $session->get('usr_id') : null; + $rights = $session->has('websockets_rights') ? $session->get('websockets_rights') : []; + + return new Consumer($usrId, $rights);; + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php b/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php index c65ce22d0b..1b7fe32b76 100644 --- a/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php +++ b/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php @@ -11,45 +11,20 @@ namespace Alchemy\Phrasea\Websocket; +use Alchemy\Phrasea\Websocket\Topics\TopicsManager; use Psr\Log\LoggerInterface; -use React\ZMQ\SocketWrapper; use Ratchet\ConnectionInterface as Conn; use Ratchet\Wamp\WampServerInterface; class PhraseanetWampServer implements WampServerInterface { - const TOPIC_TASK_MANAGER = 'http://phraseanet.com/topics/admin/task-manager'; - - private $pull; private $logger; - private $topics = []; + private $manager; - public function __construct(SocketWrapper $pull, LoggerInterface $logger) + public function __construct(TopicsManager $manager, LoggerInterface $logger) { - $this->pull = $pull; $this->logger = $logger; - - $pull->on('message', function ($msg) { - $data = @json_decode($msg, true); - - if (json_last_error() !== JSON_ERROR_NONE) { - $this->logger->error(sprintf('[WS] Received invalid message %s : invalid json', $msg)); - - return; - } - - if (!isset($data['topic'])) { - $this->logger->error(sprintf('[WS] Received invalid message %s : no topic', $msg)); - - return; - } - - $this->logger->debug(sprintf('[WS] Received message %s', $msg)); - - if (isset($this->topics[$data['topic']])) { - $this->topics[$data['topic']]->broadcast(json_encode($msg)); - } - }); + $this->manager = $manager; } public function onPublish(Conn $conn, $topic, $event, array $exclude, array $eligible) @@ -66,54 +41,33 @@ class PhraseanetWampServer implements WampServerInterface public function onSubscribe(Conn $conn, $topic) { - $this->logger->debug(sprintf('Subscription received on topic %s', $topic->getId()), array('topic' => $topic)); - $this->topics[$topic->getId()] = $topic; + if ($this->manager->subscribe($conn, $topic)) { + $this->logger->debug(sprintf('Subscription received on topic %s', $topic->getId()), array('topic' => $topic)); + } else { + $this->logger->error(sprintf('Subscription received on topic %s, user is not allowed', $topic->getId()), array('topic' => $topic)); + } } public function onUnSubscribe(Conn $conn, $topic) { $this->logger->debug(sprintf('Unsubscription received on topic %s', $topic->getId()), array('topic' => $topic)); - $this->cleanupReferences($conn, $topic->getId()); + $this->manager->unsubscribe($conn, $topic); } public function onOpen(Conn $conn) { - if (!$conn->Session->has('usr_id')) { - $this->logger->error('[WS] Connection request aborted, no usr_id in session.'); - $conn->close(); - } - $this->logger->error('[WS] Connection request accepted'); + $this->logger->debug('[WS] Connection request accepted'); + $this->manager->openConnection($conn); } public function onClose(Conn $conn) { - $this->cleanupReferences($conn); - $this->logger->error('[WS] Connection closed'); + $this->logger->debug('[WS] Connection closed'); + $this->manager->closeConnection($conn); } public function onError(Conn $conn, \Exception $e) { $this->logger->error('[WS] Connection error', ['exception' => $e]); } - - private function cleanupReferences(Conn $conn, $topicId = null) - { - $storage = $this->topics; - $ret = array(); - - foreach ($storage as $id => $topic) { - if (null !== $topicId && $id !== $topicId) { - continue; - } - if ($topic->has($conn)) { - $topic->remove($conn); - } - if (count($topic) > 0) { - $ret[] = $topic; - } - $this->logger->debug(sprintf('%d subscribers remaining on topic %s', count($topic), $topic->getId()), array('topic' => $topic)); - } - - $this->topics = $ret; - } } diff --git a/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php b/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php index 705a2f32da..f8939ef60a 100644 --- a/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php +++ b/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php @@ -11,7 +11,7 @@ namespace Alchemy\Phrasea\Websocket\Subscriber; -use Alchemy\Phrasea\Websocket\PhraseanetWampServer; +use Alchemy\Phrasea\Websocket\Topics\TopicsManager; use Alchemy\TaskManager\Event\StateFormater; use Alchemy\TaskManager\Event\TaskManagerEvent; use Alchemy\TaskManager\Event\TaskManagerRequestEvent; @@ -37,7 +37,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface public function onManagerStart(TaskManagerEvent $event) { $this->broadcaster->send(json_encode([ - 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, 'event' => TaskManagerEvents::MANAGER_START, ])); } @@ -45,7 +45,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface public function onManagerStop(TaskManagerEvent $event) { $this->broadcaster->send(json_encode([ - 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, 'event' => TaskManagerEvents::MANAGER_STOP, ])); } @@ -53,7 +53,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface public function onManagerRequest(TaskManagerRequestEvent $event) { $this->broadcaster->send(json_encode([ - 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, 'event' => TaskManagerEvents::MANAGER_REQUEST, 'request' => $event->getRequest(), 'response' => $event->getResponse(), @@ -63,7 +63,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface public function onManagerTick(TaskManagerEvent $event) { $this->broadcaster->send(json_encode([ - 'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER, + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, 'event' => TaskManagerEvents::MANAGER_TICK, 'message' => $this->formater->toArray( $event->getManager()->getProcessManager()->getManagedProcesses() diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/Directive.php b/lib/Alchemy/Phrasea/Websocket/Topics/Directive.php new file mode 100644 index 0000000000..8d3e4a2877 --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/Directive.php @@ -0,0 +1,75 @@ +topic = $topic; + $this->requireAuthentication = (Boolean) $requireAuthentication; + $this->requiredRights = $requiredRights; + } + + /** + * @return string + */ + public function getTopic() + { + return $this->topic; + } + + /** + * Returns true if the topic requires an authenticated consumer + * + * @return Boolean + */ + public function requireAuthentication() + { + return $this->requireAuthentication; + } + + /** + * Returns an array of required rights for the authenticated consumer + * + * @return array + */ + public function getRequiredRights() + { + return $this->requiredRights; + } + + /** + * Returns true if the consumer satisfies the directive + * + * @param ConsumerInterface $consumer + * + * @return Boolean + */ + public function isStatisfiedBy(ConsumerInterface $consumer) + { + if ($this->requireAuthentication() && !$consumer->isAuthenticated()) { + return false; + } + + return $consumer->hasRights($this->getRequiredRights()); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php b/lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php new file mode 100644 index 0000000000..5ab6fdca9c --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php @@ -0,0 +1,61 @@ +directives = $directives; + } + + /** + * Returns true if the consumer has access to the given topic + * + * @param ConsumerInterface $consumer + * @param Topic $topic + * + * @return Boolean + */ + public function hasAccess(ConsumerInterface $consumer, Topic $topic) + { + foreach ($this->getDirectives($topic) as $directive) { + if (!$directive->isStatisfiedBy($consumer)) { + return false; + } + } + + return true; + } + + /** + * @param Topic $topic + * + * @return Directive[] + */ + private function getDirectives(Topic $topic) + { + return array_filter($this->directives, function (Directive $directive) use ($topic) { + return $directive->getTopic() === $topic->getId(); + }); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php b/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php new file mode 100644 index 0000000000..36ef65e4d7 --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php @@ -0,0 +1,24 @@ +logger = $logger; + $context = new Context($loop); + + $this->pull = $context->getSocket(\ZMQ::SOCKET_SUB); + $this->pull->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, ""); + $this->pull->connect(sprintf('%s://%s:%s', $options['protocol'], $options['host'], $options['port'])); + + $this->pull->on('error', function ($e) use ($logger) { + $logger->error('TaskManager Subscriber received an error.', ['exception' => $e]); + }); + } + + /** + * {@inheritdoc} + */ + public function attach(TopicsManager $manager) + { + $this->pull->on('message', function ($msg) use ($manager) { + $data = @json_decode($msg, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + $this->logger->error(sprintf('[WS] Received invalid message %s : invalid json', $msg)); + + return; + } + + if (!isset($data['topic'])) { + $this->logger->error(sprintf('[WS] Received invalid message %s : no topic', $msg)); + + return; + } + + $this->logger->debug(sprintf('[WS] Received message %s', $msg)); + + $manager->broadcast($data['topic'], json_encode($msg)); + }); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php b/lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php new file mode 100644 index 0000000000..74a95171de --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php @@ -0,0 +1,163 @@ +directives = $directives; + $this->consumerManager = $consumerManagaer; + } + + /** + * Attaches a plugin to the TopicsManager + * + * @param PluginInterface $plugin + * + * @return TopicsManager + */ + public function attach(PluginInterface $plugin) + { + $plugin->attach($this); + + return $this; + } + + /** + * Checks if the consumer related to the connection has access to the topic, + * removes the connection from topic if the consumer is not granted. + * + * @param Conn $conn + * @param Topic $topic + * + * @return Boolean Return true if the consumer is granted, false otherwise + */ + public function subscribe(Conn $conn, Topic $topic) + { + if (!$this->directives->hasAccess($conn->User, $topic)) { + $topic->remove($conn); + + return false; + } + + $this->topics[$topic->getId()] = $topic; + + return true; + } + + /** + * Triggered on unsubscription. + * + * Removes internal references to the topic if no more consumers are listening. + * + * @param Conn $conn + * @param Topic $topic + * + * @return TopicsManager + */ + public function unsubscribe(Conn $conn, Topic $topic) + { + $this->cleanupReferences($conn, $topic); + + return $this; + } + + /** + * Triggered on connection, populates the connection with a consumer. + * + * @param Conn $conn + * + * @return TopicsManager + */ + public function openConnection(Conn $conn) + { + try { + $conn->User = $this->consumerManager->create($conn->Session); + } catch (\RuntimeException $e) { + $conn->close(); + } + + return $this; + } + + /** + * Triggered on deconnexion. + * + * Removes internal references to topics if no more consumers are listening. + * + * @param Conn $conn + * + * @return TopicsManager + */ + public function closeConnection(Conn $conn) + { + $this->cleanupReferences($conn); + + return $this; + } + + /** + * Brodcasts a message to a topic, if it exists + * + * @param $topicId string + * @param $message string + * + * @return TopicsManager + */ + public function broadcast($topicId, $message) + { + if (isset($this->topics[$topicId])) { + $this->topics[$topicId]->broadcast($message); + } + + return $this; + } + + /** + * Removes internal references to topics if they do not contains any reference to an active connection. + * + * @param Conn $conn + * @param null|Topic $topic Restrict to this topic, if provided + */ + private function cleanupReferences(Conn $conn, Topic $topic = null) + { + $storage = $this->topics; + $updated = array(); + + foreach ($storage as $id => $storedTopic) { + if (null !== $topic && $id !== $topic->getId()) { + continue; + } + if ($storedTopic->has($conn)) { + $storedTopic->remove($conn); + } + if (count($storedTopic) > 0) { + $updated[] = $storedTopic; + } + } + + $this->topics = $updated; + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php b/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php index 033269df6d..faa437783f 100644 --- a/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php +++ b/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php @@ -20,7 +20,7 @@ class WebsocketServerServiceProviderTest extends ServiceProviderTestCase [ 'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider', 'ws.server.subscriber', - 'React\ZMQ\SocketWrapper', + 'Alchemy\Phrasea\Websocket\Topics\Plugin\TaskManagerSubscriberPlugin', ], [ 'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider', @@ -42,6 +42,16 @@ class WebsocketServerServiceProviderTest extends ServiceProviderTestCase 'ws.server.logger', 'Psr\Log\LoggerInterface', ], + [ + 'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider', + 'ws.server.topics-manager.directives', + 'Alchemy\Phrasea\Websocket\Topics\DirectivesManager', + ], + [ + 'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider', + 'ws.server.consumer-manager', + 'Alchemy\Phrasea\Websocket\Consumer\ConsumerManager', + ], ]; } } diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerManagerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerManagerTest.php new file mode 100644 index 0000000000..15bc64e980 --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerManagerTest.php @@ -0,0 +1,58 @@ +create($this->createSessionMock($usrId, $rights)); + $this->assertSame($authenticated, $consumer->isAuthenticated()); + $this->assertSame($hasRights, $consumer->hasRights($checkedRights)); + } + + public function provideConsumerManagerData() + { + return [ + [25, ['task-manager'], true, [], true], + [25, ['task-manager'], true, ['task-manager'], true], + [null, ['task-manager'], false, ['task-manager', 'neutron'], false], + [null, ['neutron', 'task-manager'], false, ['task-manager', 'neutron'], true], + [42, ['neutron', 'task-manager', 'romain'], true, ['task-manager', 'neutron'], true], + ]; + } + + private function createSessionMock($usrId, $rights) + { + $session = $this->getMock('Symfony\Component\HttpFoundation\Session\SessionInterface'); + $session->expects($this->any()) + ->method('has') + ->will($this->returnCallback(function ($prop) use ($usrId, $rights) { + switch ($prop) { + case 'usr_id': + return $usrId !== null; + case 'websockets_rights': + return $rights !== null; + } + })); + + $session->expects($this->any()) + ->method('get') + ->will($this->returnCallback(function ($prop) use ($usrId, $rights) { + switch ($prop) { + case 'usr_id': + return $usrId; + case 'websockets_rights': + return $rights; + } + })); + + return $session; + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php new file mode 100644 index 0000000000..49d0a8df7e --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php @@ -0,0 +1,32 @@ +assertTrue($consumer->isAuthenticated()); + $consumer = new Consumer(null, []); + $this->assertFalse($consumer->isAuthenticated()); + } + + public function testHasRights() + { + $consumer = new Consumer(42, ['neutron']); + $this->assertTrue($consumer->hasRights('neutron')); + $consumer = new Consumer(42, ['neutron']); + $this->assertTrue($consumer->hasRights(['neutron'])); + $consumer = new Consumer(42, ['romainneutron']); + $this->assertFalse($consumer->hasRights('neutron')); + $consumer = new Consumer(42, ['romainneutron']); + $this->assertFalse($consumer->hasRights(['neutron'])); + $consumer = new Consumer(42, ['neutron']); + $this->assertFalse($consumer->hasRights(['neutron', 'romain'])); + $consumer = new Consumer(42, ['romain', 'neutron', 'bouteille']); + $this->assertTrue($consumer->hasRights(['neutron', 'romain'])); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php index 2200bfbc59..b12278dbfb 100644 --- a/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php +++ b/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php @@ -6,43 +6,21 @@ use Alchemy\Phrasea\Websocket\PhraseanetWampServer; class PhraseanetWampServerTest extends \PhraseanetTestCase { - public function testOpenConnectionNotConnected() - { - $conn = $this->getMock('Ratchet\ConnectionInterface'); - $conn->Session = $this->getMockBuilder('Symfony\Component\HttpFoundation\Session\Session') - ->disableOriginalConstructor() - ->getMock(); - $conn->Session->expects($this->once()) - ->method('has') - ->with('usr_id') - ->will($this->returnValue(false)); - $conn->expects($this->once()) - ->method('close'); - - $server = new PhraseanetWampServer($this->createSocketWrapperMock(), $this->createLoggerMock()); - $server->onOpen($conn); - } - public function testOpenConnectionConnected() { + $topicsManager = $this->createTopicsManagerMock(); $conn = $this->getMock('Ratchet\ConnectionInterface'); - $conn->Session = $this->getMockBuilder('Symfony\Component\HttpFoundation\Session\Session') - ->disableOriginalConstructor() - ->getMock(); - $conn->Session->expects($this->once()) - ->method('has') - ->with('usr_id') - ->will($this->returnValue(true)); - $conn->expects($this->never()) - ->method('close'); + $topicsManager->expects($this->once()) + ->method('openConnection') + ->with($conn); - $server = new PhraseanetWampServer($this->createSocketWrapperMock(), $this->createLoggerMock()); + $server = new PhraseanetWampServer($topicsManager, $this->createLoggerMock()); $server->onOpen($conn); } - private function createSocketWrapperMock() + private function createTopicsManagerMock() { - return $this->getMockBuilder('React\ZMQ\SocketWrapper') + return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Topics\TopicsManager') ->disableOriginalConstructor() ->getMock(); } diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php index 6558ea4fcb..f9e0b8089c 100644 --- a/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php @@ -1,9 +1,9 @@ onManagerStart($this->createTaskManagerEvent()); - $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_START); + $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_START); } public function testOnManagerStop() @@ -33,7 +33,7 @@ class TaskManagerBroadcasterSubscriberTest extends \PhraseanetTestCase $subscriber = new TaskManagerBroadcasterSubscriber($socket); $subscriber->onManagerStop($this->createTaskManagerEvent()); - $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_STOP); + $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_STOP); } public function testOnManagerRequest() @@ -46,7 +46,7 @@ class TaskManagerBroadcasterSubscriberTest extends \PhraseanetTestCase $subscriber = new TaskManagerBroadcasterSubscriber($socket); $subscriber->onManagerRequest(new TaskManagerRequestEvent($this->createTaskManagerMock(), 'PING', 'PONG')); - $data = $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_REQUEST); + $data = $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_REQUEST); $this->assertEquals('PING', $data['request']); $this->assertEquals('PONG', $data['response']); @@ -62,7 +62,7 @@ class TaskManagerBroadcasterSubscriberTest extends \PhraseanetTestCase $subscriber = new TaskManagerBroadcasterSubscriber($socket); $subscriber->onManagerTick($this->createTaskManagerEvent()); - $data = $this->assertValidJson($json, PhraseanetWampServer::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_TICK); + $data = $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_TICK); $this->assertArrayHasKey('message', $data); $this->assertInternalType('array', $data['message']); diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php new file mode 100644 index 0000000000..5e7f4d4862 --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php @@ -0,0 +1,50 @@ +assertSame('http://topic', $directive->getTopic()); + $this->assertTrue($directive->requireAuthentication()); + $this->assertSame(['neutron'], $directive->getRequiredRights()); + } + + /** + * @dataProvider provideStatisfiedByCombinaisons + */ + public function testIsSatisfiedBy($authenticationRequired, $requiredRights, $authenticated, $hasRights, $satisfied) + { + $consumer = $this->createConsumerMock($authenticated, $hasRights, $requiredRights); + $directive = new Directive('http://topic', $authenticationRequired, $requiredRights); + $this->assertEquals($satisfied, $directive->isStatisfiedBy($consumer)); + } + + public function provideStatisfiedByCombinaisons() + { + return [ + [true, ['neutron'], true, true, true], + [true, [], false, true, false], + [false, ['neutron'], true, false, false], + [false, ['neutron'], false, false, false], + ]; + } + + private function createConsumerMock($authenticated, $hasRights, $requiredRights) + { + $consumer = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + $consumer->expects($this->any()) + ->method('isAuthenticated') + ->will($this->returnValue($authenticated)); + $consumer->expects($this->any()) + ->method('hasRights') + ->with($requiredRights) + ->will($this->returnValue($hasRights)); + + return $consumer; + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php new file mode 100644 index 0000000000..315f6aa054 --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php @@ -0,0 +1,46 @@ +assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4'))); + + $consumer = new Consumer(null, []); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4'))); + + $consumer = new Consumer(42, ['neutron']); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4'))); + + $consumer = new Consumer(42, ['neutron', 'bouteille', 'romain']); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic4'))); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php new file mode 100644 index 0000000000..cd8ef1215d --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php @@ -0,0 +1,187 @@ +createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $plugin = $this->getMock('Alchemy\Phrasea\Websocket\Topics\Plugin\PluginInterface'); + $plugin->expects($this->once()) + ->method('attach') + ->with($manager); + + $this->assertSame($manager, $manager->attach($plugin)); + } + + public function testSubscribeWithAccess() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = $this->getMockBuilder('Ratchet\Wamp\Topic') + ->disableOriginalConstructor() + ->getMock(); + + $topic->expects($this->never()) + ->method('remove'); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager->subscribe($conn, $topic); + } + + public function testSubscribeWithoutAccess() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = $this->getMockBuilder('Ratchet\Wamp\Topic') + ->disableOriginalConstructor() + ->getMock(); + + $topic->expects($this->once()) + ->method('remove') + ->with($conn); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(false)); + + $manager->subscribe($conn, $topic); + } + + public function testUnsubscribe() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = new Topic('http://topic'); + $topic->add($conn); + + // should be subscribed to be unsubscribed + $manager->subscribe($conn, $topic); + $manager->unsubscribe($conn, $topic); + + $this->assertFalse($topic->has($conn)); + } + + public function testOpenConnection() + { + $consumer = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + $session = $this->getMock('Symfony\Component\HttpFoundation\Session\SessionInterface'); + + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + $consumerManager->expects($this->once()) + ->method('create') + ->with($session) + ->will($this->returnValue($consumer)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->Session = $session; + + $manager->openConnection($conn); + + $this->assertSame($consumer, $conn->User); + } + + public function testCloseConnection() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = new Topic('http://topic'); + $topic->add($conn); + + // should be subscribed to be unsubscribed + $manager->subscribe($conn, $topic); + $manager->closeConnection($conn); + + $this->assertFalse($topic->has($conn)); + } + + public function testBroadcast() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = $this->getMockBuilder('Ratchet\Wamp\Topic') + ->disableOriginalConstructor() + ->getMock(); + $topic->expects($this->any()) + ->method('getId') + ->will($this->returnValue('http://topic')); + $topic->expects($this->once()) + ->method('broadcast') + ->with('hello world !'); + + // should be subscribed to be unsubscribed + $manager->subscribe($conn, $topic); + $manager->broadcast('http://topic', 'hello world !'); + $manager->broadcast('http://topic2', 'nothing'); + } + + private function createDirectivesManagerMock() + { + return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Topics\DirectivesManager') + ->disableOriginalConstructor() + ->getMock(); + } + + private function createConsumerManagerMock() + { + return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Consumer\ConsumerManager') + ->disableOriginalConstructor() + ->getMock(); + } +}