diff --git a/bin/console b/bin/console index 2ee888d69c..65b3a681fd 100755 --- a/bin/console +++ b/bin/console @@ -13,6 +13,8 @@ namespace KonsoleKommander; use Alchemy\Phrasea\Command\Plugin\ListPlugin; use Alchemy\Phrasea\Command\SearchEngine\IndexFull; +use Alchemy\Phrasea\Command\WebsocketServer; +use Alchemy\Phrasea\Core\Version; use Alchemy\Phrasea\Command\BuildMissingSubdefs; use Alchemy\Phrasea\Command\CreateCollection; use Alchemy\Phrasea\Command\MailTest; @@ -20,7 +22,6 @@ use Alchemy\Phrasea\Command\Compile\Configuration; use Alchemy\Phrasea\Command\RecordAdd; use Alchemy\Phrasea\Command\RescanTechnicalDatas; use Alchemy\Phrasea\Command\UpgradeDBDatas; -use Alchemy\Phrasea\Core\Version; use Alchemy\Phrasea\CLI; use Alchemy\Phrasea\Command\Plugin\AddPlugin; use Alchemy\Phrasea\Command\Plugin\RemovePlugin; @@ -114,6 +115,8 @@ if ($cli['phraseanet.SE']->getName() === 'ElasticSearch') { $cli->command(new IndexFull('searchengine:index')); } +$cli->command(new WebsocketServer('ws-server:run')); + $cli->loadPlugins(); exit(is_int($cli->run()) ? : 1); diff --git a/composer.json b/composer.json index 4384e0b7e6..631bea274d 100644 --- a/composer.json +++ b/composer.json @@ -10,6 +10,7 @@ "alchemy/oauth2php" : "1.0.0", "alchemy/phlickr" : "0.2.7", "alchemy/task-manager" : "~1.0", + "cboden/ratchet" : "~0.3", "dailymotion/sdk" : "~1.5", "data-uri/data-uri" : "~0.1.0", "doctrine/orm" : "~2.4.0", @@ -37,6 +38,7 @@ "php-ffmpeg/php-ffmpeg" : "~0.4, >=0.4.3", "php-xpdf/php-xpdf" : "~0.2.1", "phpexiftool/phpexiftool" : "~0.3", + "react/zmq" : "~0.2", "silex/silex" : "1.1.x-dev@dev", "silex/web-profiler" : "~1.0.0@dev", "swiftmailer/swiftmailer" : "~4.3.0", diff --git a/composer.lock b/composer.lock index 18614b3c61..5d037ecfcc 100644 --- a/composer.lock +++ b/composer.lock @@ -3,7 +3,7 @@ "This file locks the dependencies of your project to a known state", "Read more about it at http://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file" ], - "hash": "b824f6f34aa5494f3edbedb0b97ef6c6", + "hash": "6a801036563ebb5473657cb717881f31", "packages": [ { "name": "alchemy-fr/tcpdf-clone", @@ -294,16 +294,16 @@ }, { "name": "alchemy/task-manager", - "version": "1.0.1", + "version": "1.0.2", "source": { "type": "git", "url": "https://github.com/alchemy-fr/task-manager.git", - "reference": "58cc74d41e89cabf1f76c81fdc4e477569e709df" + "reference": "795b9d9781c01cfd82651f66cf3306f53661540c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/alchemy-fr/task-manager/zipball/58cc74d41e89cabf1f76c81fdc4e477569e709df", - "reference": "58cc74d41e89cabf1f76c81fdc4e477569e709df", + "url": "https://api.github.com/repos/alchemy-fr/task-manager/zipball/795b9d9781c01cfd82651f66cf3306f53661540c", + "reference": "795b9d9781c01cfd82651f66cf3306f53661540c", "shasum": "" }, "require": { @@ -323,7 +323,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "0.1-dev" + "dev-master": "1.0-dev" } }, "autoload": { @@ -353,7 +353,56 @@ "parallel", "process" ], - "time": "2013-12-03 18:53:49" + "time": "2014-02-12 11:21:06" + }, + { + "name": "cboden/ratchet", + "version": "v0.3.0", + "source": { + "type": "git", + "url": "https://github.com/cboden/Ratchet.git", + "reference": "d756e0b507a5f3cdbf8c59dbb7baf68574dc7d58" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/cboden/Ratchet/zipball/d756e0b507a5f3cdbf8c59dbb7baf68574dc7d58", + "reference": "d756e0b507a5f3cdbf8c59dbb7baf68574dc7d58", + "shasum": "" + }, + "require": { + "guzzle/http": ">=3.6.0,<3.8.0-dev", + "php": ">=5.3.9", + "react/socket": "0.3.*", + "symfony/http-foundation": "~2.2", + "symfony/routing": "~2.2" + }, + "type": "library", + "autoload": { + "psr-0": { + "Ratchet": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Chris Boden", + "email": "cboden@gmail.com", + "homepage": "http://res.im", + "role": "Developer" + } + ], + "description": "PHP WebSocket library", + "homepage": "http://socketo.me", + "keywords": [ + "Ratchet", + "WebSockets", + "server", + "sockets" + ], + "time": "2013-10-14 14:38:12" }, { "name": "dailymotion/sdk", @@ -2835,6 +2884,181 @@ ], "time": "2012-12-21 11:40:51" }, + { + "name": "react/event-loop", + "version": "v0.3.3", + "target-dir": "React/EventLoop", + "source": { + "type": "git", + "url": "https://github.com/reactphp/event-loop.git", + "reference": "49208fa3a15c9eae4adf8351ee9caa4064fe550d" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/event-loop/zipball/49208fa3a15c9eae4adf8351ee9caa4064fe550d", + "reference": "49208fa3a15c9eae4adf8351ee9caa4064fe550d", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "suggest": { + "ext-libev": "*", + "ext-libevent": ">=0.0.5" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "0.3-dev" + } + }, + "autoload": { + "psr-0": { + "React\\EventLoop": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Event loop abstraction layer that libraries can use for evented I/O.", + "keywords": [ + "event-loop" + ], + "time": "2013-07-08 22:38:22" + }, + { + "name": "react/socket", + "version": "v0.3.2", + "target-dir": "React/Socket", + "source": { + "type": "git", + "url": "https://github.com/reactphp/socket.git", + "reference": "9cc32838d25a934ce950d2662cce2ff1b9e9f8aa" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/socket/zipball/9cc32838d25a934ce950d2662cce2ff1b9e9f8aa", + "reference": "9cc32838d25a934ce950d2662cce2ff1b9e9f8aa", + "shasum": "" + }, + "require": { + "evenement/evenement": "1.0.*", + "php": ">=5.3.3", + "react/event-loop": "0.3.*", + "react/stream": "0.3.*" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "0.3-dev" + } + }, + "autoload": { + "psr-0": { + "React\\Socket": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Library for building an evented socket server.", + "keywords": [ + "Socket" + ], + "time": "2013-04-26 20:23:10" + }, + { + "name": "react/stream", + "version": "v0.3.3", + "target-dir": "React/Stream", + "source": { + "type": "git", + "url": "https://github.com/reactphp/stream.git", + "reference": "b72900ab00513b591fb101faecad38f9c8eab8da" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/stream/zipball/b72900ab00513b591fb101faecad38f9c8eab8da", + "reference": "b72900ab00513b591fb101faecad38f9c8eab8da", + "shasum": "" + }, + "require": { + "evenement/evenement": "1.0.*", + "php": ">=5.3.3" + }, + "suggest": { + "react/event-loop": "0.3.*", + "react/promise": "~1.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "0.3-dev" + } + }, + "autoload": { + "psr-0": { + "React\\Stream": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Basic readable and writable stream interfaces that support piping.", + "keywords": [ + "pipe", + "stream" + ], + "time": "2013-07-09 00:44:12" + }, + { + "name": "react/zmq", + "version": "v0.2.0", + "source": { + "type": "git", + "url": "https://github.com/reactphp/zmq.git", + "reference": "b69d97f99f2127e27d130d7fe3b2cec2a63b2c4e" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/zmq/zipball/b69d97f99f2127e27d130d7fe3b2cec2a63b2c4e", + "reference": "b69d97f99f2127e27d130d7fe3b2cec2a63b2c4e", + "shasum": "" + }, + "require": { + "evenement/evenement": "~1.0", + "ext-zmq": "*", + "php": ">=5.3.2", + "react/event-loop": "0.3.*" + }, + "require-dev": { + "ext-pcntl": "*" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "0.2-dev" + } + }, + "autoload": { + "psr-0": { + "React\\ZMQ": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "ZeroMQ bindings for React.", + "keywords": [ + "zeromq", + "zmq" + ], + "time": "2013-09-18 11:02:06" + }, { "name": "silex/silex", "version": "1.1.x-dev", diff --git a/config/configuration.sample.yml b/config/configuration.sample.yml index bc00f45118..56aee3c5be 100644 --- a/config/configuration.sample.yml +++ b/config/configuration.sample.yml @@ -29,10 +29,24 @@ main: type: Alchemy\Phrasea\SearchEngine\Phrasea\PhraseaEngine options: [] task-manager: + status: started logger: max-files: 10 enabled: true level: INFO + listener: + protocol: tcp + host: 127.0.0.1 + port: 6660 + linger: 500 + websocket-server: + host: local.phrasea + port: 9090 + ip: 0.0.0.0 + subscriber: + protocol: tcp + host: 127.0.0.1 + port: 13598 session: type: 'file' options: [] diff --git a/lib/Alchemy/Phrasea/Authentication/Authenticator.php b/lib/Alchemy/Phrasea/Authentication/Authenticator.php index 71ec4afbc4..2d8c636886 100644 --- a/lib/Alchemy/Phrasea/Authentication/Authenticator.php +++ b/lib/Alchemy/Phrasea/Authentication/Authenticator.php @@ -64,8 +64,6 @@ class Authenticator $this->session->remove('usr_id'); $this->session->remove('session_id'); - $this->session->set('usr_id', $user->get_id()); - $session = new Session(); $session->setBrowserName($this->browser->getBrowser()) ->setBrowserVersion($this->browser->getVersion()) @@ -76,7 +74,7 @@ class Authenticator $this->em->persist($session); $this->em->flush(); - $this->session->set('session_id', $session->getId()); + $this->populateSession($session); foreach ($this->app['acl']->get($user)->get_granted_sbas() as $databox) { \cache_databox::insertClient($this->app, $databox); @@ -86,6 +84,20 @@ class Authenticator return $session; } + private function populateSession(Session $session) + { + $user = $session->getUser($this->app); + + $rights = []; + if ($this->app['acl']->get($user)->has_right('taskmanager')) { + $rights[] = 'task-manager'; + } + + $this->session->set('usr_id', $user->get_id()); + $this->session->set('websockets_rights', $rights); + $this->session->set('session_id', $session->getId()); + } + public function refreshAccount(Session $session) { if (!$this->em->getRepository('Alchemy\Phrasea\Model\Entities\Session')->findOneBy(['id' => $session->getId()])) { @@ -99,8 +111,7 @@ class Authenticator } $this->session->clear(); - $this->session->set('usr_id', $session->getUsrId()); - $this->session->set('session_id', $session->getId()); + $this->populateSession($session); foreach ($this->app['acl']->get($user)->get_granted_sbas() as $databox) { \cache_databox::insertClient($this->app, $databox); diff --git a/lib/Alchemy/Phrasea/CLI.php b/lib/Alchemy/Phrasea/CLI.php index 27c826a6c7..317c36b892 100644 --- a/lib/Alchemy/Phrasea/CLI.php +++ b/lib/Alchemy/Phrasea/CLI.php @@ -13,6 +13,7 @@ namespace Alchemy\Phrasea; use Alchemy\Phrasea\Command\CommandInterface; use Alchemy\Phrasea\Core\CLIProvider\TranslationExtractorServiceProvider; +use Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider; use Alchemy\Phrasea\Exception\RuntimeException; use Symfony\Component\Console; use Alchemy\Phrasea\Core\CLIProvider\CLIDriversServiceProvider; @@ -55,6 +56,7 @@ class CLI extends Application }); $this->register(new PluginServiceProvider()); + $this->register(new WebsocketServerServiceProvider()); $this->register(new ComposerSetupServiceProvider()); $this->register(new CLIDriversServiceProvider()); $this->register(new LessBuilderServiceProvider()); diff --git a/lib/Alchemy/Phrasea/Command/Task/SchedulerRun.php b/lib/Alchemy/Phrasea/Command/Task/SchedulerRun.php index e782a0d9c3..5c4adce37a 100644 --- a/lib/Alchemy/Phrasea/Command/Task/SchedulerRun.php +++ b/lib/Alchemy/Phrasea/Command/Task/SchedulerRun.php @@ -32,6 +32,7 @@ class SchedulerRun extends Command switch ($signal) { case SIGTERM: case SIGINT: + $this->container['signal-handler']->unregisterAll(); $this->container['task-manager']->stop(); break; } diff --git a/lib/Alchemy/Phrasea/Command/WebsocketServer.php b/lib/Alchemy/Phrasea/Command/WebsocketServer.php new file mode 100644 index 0000000000..af506ada8e --- /dev/null +++ b/lib/Alchemy/Phrasea/Command/WebsocketServer.php @@ -0,0 +1,37 @@ +setDescription("Runs the websocket server"); + } + + public function doExecute(InputInterface $input, OutputInterface $output) + { + $sessionConf = $this->container['conf']->get(['main', 'session', 'type'], 'file'); + + if (!in_array($sessionConf, ['memcached', 'memcache', 'redis'])) { + throw new RuntimeException(sprintf('Running the websocket server requires a server session storage, type `%s` provided', $sessionConf)); + } + + $this->container['ws.server']->run(); + } +} diff --git a/lib/Alchemy/Phrasea/Core/CLIProvider/TaskManagerServiceProvider.php b/lib/Alchemy/Phrasea/Core/CLIProvider/TaskManagerServiceProvider.php index ea85adabcc..db74014b05 100644 --- a/lib/Alchemy/Phrasea/Core/CLIProvider/TaskManagerServiceProvider.php +++ b/lib/Alchemy/Phrasea/Core/CLIProvider/TaskManagerServiceProvider.php @@ -33,7 +33,7 @@ class TaskManagerServiceProvider implements ServiceProviderInterface $app['task-manager'] = $app->share(function (Application $app) { $options = $app['task-manager.listener.options']; - return TaskManager::create( + $manager = TaskManager::create( $app['dispatcher'], $app['task-manager.logger'], $app['task-manager.task-list'], @@ -41,8 +41,12 @@ class TaskManagerServiceProvider implements ServiceProviderInterface 'listener_protocol' => $options['protocol'], 'listener_host' => $options['host'], 'listener_port' => $options['port'], + 'tick_period' => 1, ] ); + $manager->addSubscriber($app['ws.task-manager.broadcaster']); + + return $manager; }); $app['task-manager.logger.configuration'] = $app->share(function (Application $app) { diff --git a/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php b/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php new file mode 100644 index 0000000000..94e5dcc75f --- /dev/null +++ b/lib/Alchemy/Phrasea/Core/CLIProvider/WebsocketServerServiceProvider.php @@ -0,0 +1,108 @@ +share(function (Application $app) { + return array_replace([ + 'protocol' => 'tcp', + 'host' => '127.0.0.1', + 'port' => 13598, + ], $app['conf']->get(['main', 'websocket-server', 'subscriber'], [])); + }); + + $app['ws.task-manager.broadcaster'] = $app->share(function (Application $app) { + return TaskManagerBroadcasterSubscriber::create($app['ws.publisher.options']); + }); + + $app['ws.event-loop'] = $app->share(function () { + return EventLoopFactory::create(); + }); + + $app['ws.server.subscriber'] = $app->share(function (Application $app) { + return new TaskManagerSubscriberPlugin($app['ws.publisher.options'], $app['ws.event-loop'], $app['ws.server.logger']); + }); + + $app['ws.server.application'] = $app->share(function (Application $app) { + return new SessionProvider( + new WampServer($app['ws.server.phraseanet-server']), $app['session.storage.handler'] + ); + }); + + $app['ws.server.phraseanet-server'] = $app->share(function (Application $app) { + return new PhraseanetWampServer($app['ws.server.topics-manager'], $app['ws.server.logger']); + }); + + $app['ws.server.logger'] = $app->share(function (Application $app) { + return $app['task-manager.logger']; + }); + + $app['ws.server.topics-manager.directives.conf'] = $app->share(function (Application $app) { + return [ + new Directive(TopicsManager::TOPIC_TASK_MANAGER, true, ['task-manager']), + ]; + }); + + $app['ws.server.topics-manager.directives'] = $app->share(function (Application $app) { + return new DirectivesManager($app['ws.server.topics-manager.directives.conf']); + }); + + $app['ws.server.consumer-manager'] = $app->share(function (Application $app) { + return new ConsumerManager(); + }); + + $app['ws.server.topics-manager'] = $app->share(function (Application $app) { + $manager = new TopicsManager($app['ws.server.topics-manager.directives'], $app['ws.server.consumer-manager']); + $manager->attach($app['ws.server.subscriber']); + + return $manager; + }); + + $app['ws.server.options'] = $app->share(function (Application $app) { + return array_replace([ + 'host' => 'localhost', + 'port' => 9090, + 'ip' => '127.0.0.1', + ], $app['conf']->get(['main', 'websocket-server'], [])); + }); + + $app['ws.server'] = $app->share(function (Application $app) { + $options = $app['ws.server.options']; + + $server = new App($options['host'], $options['port'], $options['ip'], $app['ws.event-loop']); + $server->route('/websockets', $app['ws.server.application']); + + return $server; + }); + } + + public function boot(Application $app) + { + } +} diff --git a/lib/Alchemy/Phrasea/Core/Version.php b/lib/Alchemy/Phrasea/Core/Version.php index cf692fe6da..dc4a318741 100644 --- a/lib/Alchemy/Phrasea/Core/Version.php +++ b/lib/Alchemy/Phrasea/Core/Version.php @@ -13,7 +13,7 @@ namespace Alchemy\Phrasea\Core; class Version { - protected static $number = '3.9.0-alpha.10'; + protected static $number = '3.9.0-alpha.11'; protected static $name = 'Epanterias'; public static function getNumber() diff --git a/lib/Alchemy/Phrasea/Setup/Requirements/SystemRequirements.php b/lib/Alchemy/Phrasea/Setup/Requirements/SystemRequirements.php index 7007d787b1..667b632018 100644 --- a/lib/Alchemy/Phrasea/Setup/Requirements/SystemRequirements.php +++ b/lib/Alchemy/Phrasea/Setup/Requirements/SystemRequirements.php @@ -237,9 +237,15 @@ class SystemRequirements extends RequirementCollection implements RequirementInt ); $this->addRecommendation( - extension_loaded('twig'), - 'Twig extension is strongly recommended in production', - 'Install and enable the twig extension.' + extension_loaded('twig'), + 'Twig extension is strongly recommended in production', + 'Install and enable the twig extension.' + ); + + $this->addRecommendation( + function_exists('event_base_new'), + 'LibEvent extension is strongly recommended in production', + 'Install and enable the LibEvent extension.' ); $this->addRequirement( diff --git a/lib/Alchemy/Phrasea/TaskManager/Event/FinishedJobRemoverSubscriber.php b/lib/Alchemy/Phrasea/TaskManager/Event/FinishedJobRemoverSubscriber.php index 8e3d966419..47e15871cb 100644 --- a/lib/Alchemy/Phrasea/TaskManager/Event/FinishedJobRemoverSubscriber.php +++ b/lib/Alchemy/Phrasea/TaskManager/Event/FinishedJobRemoverSubscriber.php @@ -32,7 +32,7 @@ class FinishedJobRemoverSubscriber implements EventSubscriberInterface public static function getSubscribedEvents() { return [ - JobEvents::FINISHED => [$this, 'onJobFinish'], + JobEvents::FINISHED => 'onJobFinish', ]; } diff --git a/lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php b/lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php new file mode 100644 index 0000000000..685d311e20 --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Consumer/Consumer.php @@ -0,0 +1,43 @@ +usrId = $usrId; + $this->rights = $rights; + } + + /** + * {@inheritdoc} + */ + public function isAuthenticated() + { + return $this->usrId !== null; + } + + /** + * {@inheritdoc} + */ + public function hasRights($rights) + { + return count(array_intersect($this->rights, (array) $rights)) === count($rights); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php b/lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php new file mode 100644 index 0000000000..998f62547e --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Consumer/ConsumerInterface.php @@ -0,0 +1,31 @@ +has('usr_id') ? $session->get('usr_id') : null; + $rights = $session->has('websockets_rights') ? $session->get('websockets_rights') : []; + + return new Consumer($usrId, $rights);; + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php b/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php new file mode 100644 index 0000000000..e0e6538e78 --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/PhraseanetWampServer.php @@ -0,0 +1,94 @@ +logger = $logger; + $this->manager = $manager; + } + + /** + * {@inheritdoc} + */ + public function onPublish(Conn $conn, $topic, $event, array $exclude, array $eligible) + { + $this->logger->error(sprintf('Publishing on topic %s', $topic->getId()), array('event' => $event, 'topic' => $topic)); + $topic->broadcast($event); + } + + /** + * {@inheritdoc} + */ + public function onCall(Conn $conn, $id, $topic, array $params) + { + $this->logger->error(sprintf('Received RPC call on topic %s', $topic->getId()), array('topic' => $topic)); + $conn->callError($id, $topic, 'RPC not supported on this demo'); + } + + /** + * {@inheritdoc} + */ + public function onSubscribe(Conn $conn, $topic) + { + if ($this->manager->subscribe($conn, $topic)) { + $this->logger->debug(sprintf('Subscription received on topic %s', $topic->getId()), array('topic' => $topic)); + } else { + $this->logger->error(sprintf('Subscription received on topic %s, user is not allowed', $topic->getId()), array('topic' => $topic)); + } + } + + /** + * {@inheritdoc} + */ + public function onUnSubscribe(Conn $conn, $topic) + { + $this->logger->debug(sprintf('Unsubscription received on topic %s', $topic->getId()), array('topic' => $topic)); + $this->manager->unsubscribe($conn, $topic); + } + + /** + * {@inheritdoc} + */ + public function onOpen(Conn $conn) + { + $this->logger->debug('[WS] Connection request accepted'); + $this->manager->openConnection($conn); + } + + /** + * {@inheritdoc} + */ + public function onClose(Conn $conn) + { + $this->logger->debug('[WS] Connection closed'); + $this->manager->closeConnection($conn); + } + + /** + * {@inheritdoc} + */ + public function onError(Conn $conn, \Exception $e) + { + $this->logger->error('[WS] Connection error', ['exception' => $e]); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php b/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php new file mode 100644 index 0000000000..f8939ef60a --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriber.php @@ -0,0 +1,88 @@ +formater = new StateFormater(); + + $this->broadcaster = $broadcaster; + $this->broadcaster->bind(); + + usleep(300000); + } + + public function onManagerStart(TaskManagerEvent $event) + { + $this->broadcaster->send(json_encode([ + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, + 'event' => TaskManagerEvents::MANAGER_START, + ])); + } + + public function onManagerStop(TaskManagerEvent $event) + { + $this->broadcaster->send(json_encode([ + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, + 'event' => TaskManagerEvents::MANAGER_STOP, + ])); + } + + public function onManagerRequest(TaskManagerRequestEvent $event) + { + $this->broadcaster->send(json_encode([ + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, + 'event' => TaskManagerEvents::MANAGER_REQUEST, + 'request' => $event->getRequest(), + 'response' => $event->getResponse(), + ])); + } + + public function onManagerTick(TaskManagerEvent $event) + { + $this->broadcaster->send(json_encode([ + 'topic' => TopicsManager::TOPIC_TASK_MANAGER, + 'event' => TaskManagerEvents::MANAGER_TICK, + 'message' => $this->formater->toArray( + $event->getManager()->getProcessManager()->getManagedProcesses() + ), + ])); + } + + public static function getSubscribedEvents() + { + return [ + TaskManagerEvents::MANAGER_START => 'onManagerStart', + TaskManagerEvents::MANAGER_STOP => 'onManagerStop', + TaskManagerEvents::MANAGER_REQUEST => 'onManagerRequest', + TaskManagerEvents::MANAGER_TICK => 'onManagerTick', + ]; + } + + public static function create(array $options) + { + return new static(ZMQSocket::create(new \ZMQContext(), \ZMQ::SOCKET_PUB, $options['protocol'], $options['host'], $options['port'])); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/Directive.php b/lib/Alchemy/Phrasea/Websocket/Topics/Directive.php new file mode 100644 index 0000000000..8d3e4a2877 --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/Directive.php @@ -0,0 +1,75 @@ +topic = $topic; + $this->requireAuthentication = (Boolean) $requireAuthentication; + $this->requiredRights = $requiredRights; + } + + /** + * @return string + */ + public function getTopic() + { + return $this->topic; + } + + /** + * Returns true if the topic requires an authenticated consumer + * + * @return Boolean + */ + public function requireAuthentication() + { + return $this->requireAuthentication; + } + + /** + * Returns an array of required rights for the authenticated consumer + * + * @return array + */ + public function getRequiredRights() + { + return $this->requiredRights; + } + + /** + * Returns true if the consumer satisfies the directive + * + * @param ConsumerInterface $consumer + * + * @return Boolean + */ + public function isStatisfiedBy(ConsumerInterface $consumer) + { + if ($this->requireAuthentication() && !$consumer->isAuthenticated()) { + return false; + } + + return $consumer->hasRights($this->getRequiredRights()); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php b/lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php new file mode 100644 index 0000000000..5ab6fdca9c --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/DirectivesManager.php @@ -0,0 +1,61 @@ +directives = $directives; + } + + /** + * Returns true if the consumer has access to the given topic + * + * @param ConsumerInterface $consumer + * @param Topic $topic + * + * @return Boolean + */ + public function hasAccess(ConsumerInterface $consumer, Topic $topic) + { + foreach ($this->getDirectives($topic) as $directive) { + if (!$directive->isStatisfiedBy($consumer)) { + return false; + } + } + + return true; + } + + /** + * @param Topic $topic + * + * @return Directive[] + */ + private function getDirectives(Topic $topic) + { + return array_filter($this->directives, function (Directive $directive) use ($topic) { + return $directive->getTopic() === $topic->getId(); + }); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php b/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php new file mode 100644 index 0000000000..2edd0eb74c --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/Plugin/PluginInterface.php @@ -0,0 +1,24 @@ +logger = $logger; + $context = new Context($loop); + + $this->pull = $context->getSocket(\ZMQ::SOCKET_SUB); + $this->pull->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, ""); + $this->pull->connect(sprintf('%s://%s:%s', $options['protocol'], $options['host'], $options['port'])); + + $this->pull->on('error', function ($e) use ($logger) { + $logger->error('TaskManager Subscriber received an error.', ['exception' => $e]); + }); + } + + /** + * {@inheritdoc} + */ + public function attach(TopicsManager $manager) + { + $this->pull->on('message', function ($msg) use ($manager) { + $data = @json_decode($msg, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + $this->logger->error(sprintf('[WS] Received invalid message %s : invalid json', $msg)); + + return; + } + + if (!isset($data['topic'])) { + $this->logger->error(sprintf('[WS] Received invalid message %s : no topic', $msg)); + + return; + } + + $this->logger->debug(sprintf('[WS] Received message %s', $msg)); + + $manager->broadcast($data['topic'], json_encode($msg)); + }); + } +} diff --git a/lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php b/lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php new file mode 100644 index 0000000000..74a95171de --- /dev/null +++ b/lib/Alchemy/Phrasea/Websocket/Topics/TopicsManager.php @@ -0,0 +1,163 @@ +directives = $directives; + $this->consumerManager = $consumerManagaer; + } + + /** + * Attaches a plugin to the TopicsManager + * + * @param PluginInterface $plugin + * + * @return TopicsManager + */ + public function attach(PluginInterface $plugin) + { + $plugin->attach($this); + + return $this; + } + + /** + * Checks if the consumer related to the connection has access to the topic, + * removes the connection from topic if the consumer is not granted. + * + * @param Conn $conn + * @param Topic $topic + * + * @return Boolean Return true if the consumer is granted, false otherwise + */ + public function subscribe(Conn $conn, Topic $topic) + { + if (!$this->directives->hasAccess($conn->User, $topic)) { + $topic->remove($conn); + + return false; + } + + $this->topics[$topic->getId()] = $topic; + + return true; + } + + /** + * Triggered on unsubscription. + * + * Removes internal references to the topic if no more consumers are listening. + * + * @param Conn $conn + * @param Topic $topic + * + * @return TopicsManager + */ + public function unsubscribe(Conn $conn, Topic $topic) + { + $this->cleanupReferences($conn, $topic); + + return $this; + } + + /** + * Triggered on connection, populates the connection with a consumer. + * + * @param Conn $conn + * + * @return TopicsManager + */ + public function openConnection(Conn $conn) + { + try { + $conn->User = $this->consumerManager->create($conn->Session); + } catch (\RuntimeException $e) { + $conn->close(); + } + + return $this; + } + + /** + * Triggered on deconnexion. + * + * Removes internal references to topics if no more consumers are listening. + * + * @param Conn $conn + * + * @return TopicsManager + */ + public function closeConnection(Conn $conn) + { + $this->cleanupReferences($conn); + + return $this; + } + + /** + * Brodcasts a message to a topic, if it exists + * + * @param $topicId string + * @param $message string + * + * @return TopicsManager + */ + public function broadcast($topicId, $message) + { + if (isset($this->topics[$topicId])) { + $this->topics[$topicId]->broadcast($message); + } + + return $this; + } + + /** + * Removes internal references to topics if they do not contains any reference to an active connection. + * + * @param Conn $conn + * @param null|Topic $topic Restrict to this topic, if provided + */ + private function cleanupReferences(Conn $conn, Topic $topic = null) + { + $storage = $this->topics; + $updated = array(); + + foreach ($storage as $id => $storedTopic) { + if (null !== $topic && $id !== $topic->getId()) { + continue; + } + if ($storedTopic->has($conn)) { + $storedTopic->remove($conn); + } + if (count($storedTopic) > 0) { + $updated[] = $storedTopic; + } + } + + $this->topics = $updated; + } +} diff --git a/lib/classes/patch/390alpha11a.php b/lib/classes/patch/390alpha11a.php new file mode 100644 index 0000000000..643d7a79c3 --- /dev/null +++ b/lib/classes/patch/390alpha11a.php @@ -0,0 +1,78 @@ +release; + } + + /** + * {@inheritdoc} + */ + public function require_all_upgrades() + { + return false; + } + + /** + * {@inheritdoc} + */ + public function concern() + { + return $this->concern; + } + + /** + * {@inheritdoc} + */ + public function getDoctrineMigrations() + { + return []; + } + + /** + * {@inheritdoc} + */ + public function apply(base $appbox, Application $app) + { + $app['conf']->set(['main', 'task-manager', 'status'], 'started'); + $app['conf']->set(['main', 'websocket-server'], [ + 'host' => parse_url($app['conf']->get('servername'), PHP_URL_HOST), + 'port' => 9090, + 'ip' => '0.0.0.0', + ]); + $app['conf']->set(['main', 'task-manager', 'listener'], [ + 'protocol' => 'tcp', + 'host' => '127.0.0.1', + 'port' => 6660, + 'linger' => 500, + ]); + $app['conf']->set(['main', 'websocket-server', 'subscriber'], [ + 'protocol' => 'tcp', + 'host' => '127.0.0.1', + 'port' => 13598, + ]); + } +} diff --git a/lib/conf.d/configuration.yml b/lib/conf.d/configuration.yml index 481d633047..56273b61a5 100644 --- a/lib/conf.d/configuration.yml +++ b/lib/conf.d/configuration.yml @@ -29,10 +29,24 @@ main: type: Alchemy\Phrasea\SearchEngine\Phrasea\PhraseaEngine options: [] task-manager: + status: started logger: max-files: 10 enabled: true level: INFO + listener: + protocol: tcp + host: 127.0.0.1 + port: 6660 + linger: 500 + websocket-server: + host: local.phrasea + port: 9090 + ip: 0.0.0.0 + subscriber: + protocol: tcp + host: 127.0.0.1 + port: 13598 session: type: 'file' options: [] diff --git a/tests/Alchemy/Tests/Phrasea/Command/WebsocketServerTest.php b/tests/Alchemy/Tests/Phrasea/Command/WebsocketServerTest.php new file mode 100644 index 0000000000..1f16cdd90a --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Command/WebsocketServerTest.php @@ -0,0 +1,29 @@ +getMock('Symfony\Component\Console\Input\InputInterface'); + $output = $this->getMock('Symfony\Component\Console\Output\OutputInterface'); + + $sessionType = self::$DI['cli']['conf']->get(['main', 'session', 'type'], 'file'); + self::$DI['cli']['conf']->set(['main', 'session', 'type'], 'memcached'); + + self::$DI['cli']['ws.server'] = $this->getMockBuilder('Ratchet\App') + ->disableOriginalConstructor() + ->getMock(); + self::$DI['cli']['ws.server']->expects($this->once()) + ->method('run'); + + $command = new WebsocketServer('websocketserver'); + $command->setContainer(self::$DI['cli']); + $command->execute($input, $output); + + self::$DI['cli']['conf']->set(['main', 'session', 'type'], $sessionType); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Controller/SetupTest.php b/tests/Alchemy/Tests/Phrasea/Controller/SetupTest.php index 67e928b608..1d2181b2c3 100644 --- a/tests/Alchemy/Tests/Phrasea/Controller/SetupTest.php +++ b/tests/Alchemy/Tests/Phrasea/Controller/SetupTest.php @@ -105,7 +105,7 @@ class SetupTest extends \PhraseanetWebTestCase ->disableOriginalConstructor() ->getMock(); - $user->expects($this->exactly(2)) + $user->expects($this->exactly(1)) ->method('get_id') ->will($this->returnValue(self::$DI['user']->get_id())); diff --git a/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php b/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php new file mode 100644 index 0000000000..faa437783f --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Core/CLIProvider/WebsocketServerServiceProviderTest.php @@ -0,0 +1,57 @@ +create($this->createSessionMock($usrId, $rights)); + $this->assertSame($authenticated, $consumer->isAuthenticated()); + $this->assertSame($hasRights, $consumer->hasRights($checkedRights)); + } + + public function provideConsumerManagerData() + { + return [ + [25, ['task-manager'], true, [], true], + [25, ['task-manager'], true, ['task-manager'], true], + [null, ['task-manager'], false, ['task-manager', 'neutron'], false], + [null, ['neutron', 'task-manager'], false, ['task-manager', 'neutron'], true], + [42, ['neutron', 'task-manager', 'romain'], true, ['task-manager', 'neutron'], true], + ]; + } + + private function createSessionMock($usrId, $rights) + { + $session = $this->getMock('Symfony\Component\HttpFoundation\Session\SessionInterface'); + $session->expects($this->any()) + ->method('has') + ->will($this->returnCallback(function ($prop) use ($usrId, $rights) { + switch ($prop) { + case 'usr_id': + return $usrId !== null; + case 'websockets_rights': + return $rights !== null; + } + })); + + $session->expects($this->any()) + ->method('get') + ->will($this->returnCallback(function ($prop) use ($usrId, $rights) { + switch ($prop) { + case 'usr_id': + return $usrId; + case 'websockets_rights': + return $rights; + } + })); + + return $session; + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php new file mode 100644 index 0000000000..49d0a8df7e --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Consumer/ConsumerTest.php @@ -0,0 +1,32 @@ +assertTrue($consumer->isAuthenticated()); + $consumer = new Consumer(null, []); + $this->assertFalse($consumer->isAuthenticated()); + } + + public function testHasRights() + { + $consumer = new Consumer(42, ['neutron']); + $this->assertTrue($consumer->hasRights('neutron')); + $consumer = new Consumer(42, ['neutron']); + $this->assertTrue($consumer->hasRights(['neutron'])); + $consumer = new Consumer(42, ['romainneutron']); + $this->assertFalse($consumer->hasRights('neutron')); + $consumer = new Consumer(42, ['romainneutron']); + $this->assertFalse($consumer->hasRights(['neutron'])); + $consumer = new Consumer(42, ['neutron']); + $this->assertFalse($consumer->hasRights(['neutron', 'romain'])); + $consumer = new Consumer(42, ['romain', 'neutron', 'bouteille']); + $this->assertTrue($consumer->hasRights(['neutron', 'romain'])); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php new file mode 100644 index 0000000000..b12278dbfb --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/PhraseanetWampServerTest.php @@ -0,0 +1,34 @@ +createTopicsManagerMock(); + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $topicsManager->expects($this->once()) + ->method('openConnection') + ->with($conn); + + $server = new PhraseanetWampServer($topicsManager, $this->createLoggerMock()); + $server->onOpen($conn); + } + + private function createTopicsManagerMock() + { + return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Topics\TopicsManager') + ->disableOriginalConstructor() + ->getMock(); + } + + private function createLoggerMock() + { + return $this->getMockBuilder('Psr\Log\LoggerInterface') + ->disableOriginalConstructor() + ->getMock(); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php new file mode 100644 index 0000000000..f9e0b8089c --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Subscriber/TaskManagerBroadcasterSubscriberTest.php @@ -0,0 +1,127 @@ +createZMQSocketMock(); + $socket->expects($this->once()) + ->method('send') + ->will($this->jsonCapture($json)); + + $subscriber = new TaskManagerBroadcasterSubscriber($socket); + $subscriber->onManagerStart($this->createTaskManagerEvent()); + + $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_START); + } + + public function testOnManagerStop() + { + $socket = $this->createZMQSocketMock(); + $socket->expects($this->once()) + ->method('send') + ->will($this->jsonCapture($json)); + + $subscriber = new TaskManagerBroadcasterSubscriber($socket); + $subscriber->onManagerStop($this->createTaskManagerEvent()); + + $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_STOP); + } + + public function testOnManagerRequest() + { + $socket = $this->createZMQSocketMock(); + $socket->expects($this->once()) + ->method('send') + ->will($this->jsonCapture($json)); + + $subscriber = new TaskManagerBroadcasterSubscriber($socket); + $subscriber->onManagerRequest(new TaskManagerRequestEvent($this->createTaskManagerMock(), 'PING', 'PONG')); + + $data = $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_REQUEST); + + $this->assertEquals('PING', $data['request']); + $this->assertEquals('PONG', $data['response']); + } + + public function testOnManagerTick() + { + $socket = $this->createZMQSocketMock(); + $socket->expects($this->once()) + ->method('send') + ->will($this->jsonCapture($json)); + + $subscriber = new TaskManagerBroadcasterSubscriber($socket); + $subscriber->onManagerTick($this->createTaskManagerEvent()); + + $data = $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_TICK); + + $this->assertArrayHasKey('message', $data); + $this->assertInternalType('array', $data['message']); + } + + private function assertValidJson($json, $topic, $event) + { + $data = json_decode($json, true); + + $this->assertTrue(json_last_error() === JSON_ERROR_NONE); + $this->assertArrayHasKey('event', $data); + $this->assertArrayHasKey('topic', $data); + + $this->assertEquals($event, $data['event']); + $this->assertEquals($topic, $data['topic']); + + return $data; + } + + private function jsonCapture(&$json) + { + return $this->returnCallback(function ($arg) use (&$json) { $json = $arg; return 'lala'; }); + } + + private function createZMQSocketMock() + { + $socket = $this->getMockBuilder('Alchemy\TaskManager\ZMQSocket') + ->setMethods(['send', 'bind']) + ->disableOriginalConstructor() + ->getMock(); + $socket->expects($this->once()) + ->method('bind'); + + return $socket; + } + + private function createTaskManagerMock() + { + $manager = $this->getMockBuilder('Alchemy\TaskManager\TaskManager') + ->disableOriginalConstructor() + ->getMock(); + + $processManager = $this->getMockBuilder('Neutron\ProcessManager\ProcessManager') + ->disableOriginalConstructor() + ->getMock(); + + $processManager->expects($this->any()) + ->method('getManagedProcesses') + ->will($this->returnValue([])); + + $manager->expects($this->any()) + ->method('getProcessManager') + ->will($this->returnValue($processManager)); + + return $manager; + } + + private function createTaskManagerEvent() + { + return new TaskManagerEvent($this->createTaskManagerMock()); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php new file mode 100644 index 0000000000..5e7f4d4862 --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectiveTest.php @@ -0,0 +1,50 @@ +assertSame('http://topic', $directive->getTopic()); + $this->assertTrue($directive->requireAuthentication()); + $this->assertSame(['neutron'], $directive->getRequiredRights()); + } + + /** + * @dataProvider provideStatisfiedByCombinaisons + */ + public function testIsSatisfiedBy($authenticationRequired, $requiredRights, $authenticated, $hasRights, $satisfied) + { + $consumer = $this->createConsumerMock($authenticated, $hasRights, $requiredRights); + $directive = new Directive('http://topic', $authenticationRequired, $requiredRights); + $this->assertEquals($satisfied, $directive->isStatisfiedBy($consumer)); + } + + public function provideStatisfiedByCombinaisons() + { + return [ + [true, ['neutron'], true, true, true], + [true, [], false, true, false], + [false, ['neutron'], true, false, false], + [false, ['neutron'], false, false, false], + ]; + } + + private function createConsumerMock($authenticated, $hasRights, $requiredRights) + { + $consumer = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + $consumer->expects($this->any()) + ->method('isAuthenticated') + ->will($this->returnValue($authenticated)); + $consumer->expects($this->any()) + ->method('hasRights') + ->with($requiredRights) + ->will($this->returnValue($hasRights)); + + return $consumer; + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php new file mode 100644 index 0000000000..315f6aa054 --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/DirectivesManagerTest.php @@ -0,0 +1,46 @@ +assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4'))); + + $consumer = new Consumer(null, []); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4'))); + + $consumer = new Consumer(42, ['neutron']); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4'))); + + $consumer = new Consumer(42, ['neutron', 'bouteille', 'romain']); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic3'))); + $this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic4'))); + } +} diff --git a/tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php new file mode 100644 index 0000000000..cd8ef1215d --- /dev/null +++ b/tests/Alchemy/Tests/Phrasea/Websocket/Topics/TopicsManagerTest.php @@ -0,0 +1,187 @@ +createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $plugin = $this->getMock('Alchemy\Phrasea\Websocket\Topics\Plugin\PluginInterface'); + $plugin->expects($this->once()) + ->method('attach') + ->with($manager); + + $this->assertSame($manager, $manager->attach($plugin)); + } + + public function testSubscribeWithAccess() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = $this->getMockBuilder('Ratchet\Wamp\Topic') + ->disableOriginalConstructor() + ->getMock(); + + $topic->expects($this->never()) + ->method('remove'); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager->subscribe($conn, $topic); + } + + public function testSubscribeWithoutAccess() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = $this->getMockBuilder('Ratchet\Wamp\Topic') + ->disableOriginalConstructor() + ->getMock(); + + $topic->expects($this->once()) + ->method('remove') + ->with($conn); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(false)); + + $manager->subscribe($conn, $topic); + } + + public function testUnsubscribe() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = new Topic('http://topic'); + $topic->add($conn); + + // should be subscribed to be unsubscribed + $manager->subscribe($conn, $topic); + $manager->unsubscribe($conn, $topic); + + $this->assertFalse($topic->has($conn)); + } + + public function testOpenConnection() + { + $consumer = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + $session = $this->getMock('Symfony\Component\HttpFoundation\Session\SessionInterface'); + + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + $consumerManager->expects($this->once()) + ->method('create') + ->with($session) + ->will($this->returnValue($consumer)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->Session = $session; + + $manager->openConnection($conn); + + $this->assertSame($consumer, $conn->User); + } + + public function testCloseConnection() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = new Topic('http://topic'); + $topic->add($conn); + + // should be subscribed to be unsubscribed + $manager->subscribe($conn, $topic); + $manager->closeConnection($conn); + + $this->assertFalse($topic->has($conn)); + } + + public function testBroadcast() + { + $directivesManager = $this->createDirectivesManagerMock(); + $consumerManager = $this->createConsumerManagerMock(); + + $directivesManager->expects($this->once()) + ->method('hasAccess') + ->will($this->returnValue(true)); + + $manager = new TopicsManager($directivesManager, $consumerManager); + + $conn = $this->getMock('Ratchet\ConnectionInterface'); + $conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface'); + + $topic = $this->getMockBuilder('Ratchet\Wamp\Topic') + ->disableOriginalConstructor() + ->getMock(); + $topic->expects($this->any()) + ->method('getId') + ->will($this->returnValue('http://topic')); + $topic->expects($this->once()) + ->method('broadcast') + ->with('hello world !'); + + // should be subscribed to be unsubscribed + $manager->subscribe($conn, $topic); + $manager->broadcast('http://topic', 'hello world !'); + $manager->broadcast('http://topic2', 'nothing'); + } + + private function createDirectivesManagerMock() + { + return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Topics\DirectivesManager') + ->disableOriginalConstructor() + ->getMock(); + } + + private function createConsumerManagerMock() + { + return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Consumer\ConsumerManager') + ->disableOriginalConstructor() + ->getMock(); + } +}