Merge pull request #944 from romainneutron/websockets

[3.9] Add websocket server
This commit is contained in:
Nicolas Le Goff
2014-02-17 12:02:56 +01:00
35 changed files with 1818 additions and 20 deletions

View File

@@ -13,6 +13,8 @@ namespace KonsoleKommander;
use Alchemy\Phrasea\Command\Plugin\ListPlugin;
use Alchemy\Phrasea\Command\SearchEngine\IndexFull;
use Alchemy\Phrasea\Command\WebsocketServer;
use Alchemy\Phrasea\Core\Version;
use Alchemy\Phrasea\Command\BuildMissingSubdefs;
use Alchemy\Phrasea\Command\CreateCollection;
use Alchemy\Phrasea\Command\MailTest;
@@ -20,7 +22,6 @@ use Alchemy\Phrasea\Command\Compile\Configuration;
use Alchemy\Phrasea\Command\RecordAdd;
use Alchemy\Phrasea\Command\RescanTechnicalDatas;
use Alchemy\Phrasea\Command\UpgradeDBDatas;
use Alchemy\Phrasea\Core\Version;
use Alchemy\Phrasea\CLI;
use Alchemy\Phrasea\Command\Plugin\AddPlugin;
use Alchemy\Phrasea\Command\Plugin\RemovePlugin;
@@ -114,6 +115,8 @@ if ($cli['phraseanet.SE']->getName() === 'ElasticSearch') {
$cli->command(new IndexFull('searchengine:index'));
}
$cli->command(new WebsocketServer('ws-server:run'));
$cli->loadPlugins();
exit(is_int($cli->run()) ? : 1);

View File

@@ -10,6 +10,7 @@
"alchemy/oauth2php" : "1.0.0",
"alchemy/phlickr" : "0.2.7",
"alchemy/task-manager" : "~1.0",
"cboden/ratchet" : "~0.3",
"dailymotion/sdk" : "~1.5",
"data-uri/data-uri" : "~0.1.0",
"doctrine/orm" : "~2.4.0",
@@ -37,6 +38,7 @@
"php-ffmpeg/php-ffmpeg" : "~0.4, >=0.4.3",
"php-xpdf/php-xpdf" : "~0.2.1",
"phpexiftool/phpexiftool" : "~0.3",
"react/zmq" : "~0.2",
"silex/silex" : "1.1.x-dev@dev",
"silex/web-profiler" : "~1.0.0@dev",
"swiftmailer/swiftmailer" : "~4.3.0",

238
composer.lock generated
View File

@@ -3,7 +3,7 @@
"This file locks the dependencies of your project to a known state",
"Read more about it at http://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file"
],
"hash": "b824f6f34aa5494f3edbedb0b97ef6c6",
"hash": "6a801036563ebb5473657cb717881f31",
"packages": [
{
"name": "alchemy-fr/tcpdf-clone",
@@ -294,16 +294,16 @@
},
{
"name": "alchemy/task-manager",
"version": "1.0.1",
"version": "1.0.2",
"source": {
"type": "git",
"url": "https://github.com/alchemy-fr/task-manager.git",
"reference": "58cc74d41e89cabf1f76c81fdc4e477569e709df"
"reference": "795b9d9781c01cfd82651f66cf3306f53661540c"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/alchemy-fr/task-manager/zipball/58cc74d41e89cabf1f76c81fdc4e477569e709df",
"reference": "58cc74d41e89cabf1f76c81fdc4e477569e709df",
"url": "https://api.github.com/repos/alchemy-fr/task-manager/zipball/795b9d9781c01cfd82651f66cf3306f53661540c",
"reference": "795b9d9781c01cfd82651f66cf3306f53661540c",
"shasum": ""
},
"require": {
@@ -323,7 +323,7 @@
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "0.1-dev"
"dev-master": "1.0-dev"
}
},
"autoload": {
@@ -353,7 +353,56 @@
"parallel",
"process"
],
"time": "2013-12-03 18:53:49"
"time": "2014-02-12 11:21:06"
},
{
"name": "cboden/ratchet",
"version": "v0.3.0",
"source": {
"type": "git",
"url": "https://github.com/cboden/Ratchet.git",
"reference": "d756e0b507a5f3cdbf8c59dbb7baf68574dc7d58"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/cboden/Ratchet/zipball/d756e0b507a5f3cdbf8c59dbb7baf68574dc7d58",
"reference": "d756e0b507a5f3cdbf8c59dbb7baf68574dc7d58",
"shasum": ""
},
"require": {
"guzzle/http": ">=3.6.0,<3.8.0-dev",
"php": ">=5.3.9",
"react/socket": "0.3.*",
"symfony/http-foundation": "~2.2",
"symfony/routing": "~2.2"
},
"type": "library",
"autoload": {
"psr-0": {
"Ratchet": "src"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Chris Boden",
"email": "cboden@gmail.com",
"homepage": "http://res.im",
"role": "Developer"
}
],
"description": "PHP WebSocket library",
"homepage": "http://socketo.me",
"keywords": [
"Ratchet",
"WebSockets",
"server",
"sockets"
],
"time": "2013-10-14 14:38:12"
},
{
"name": "dailymotion/sdk",
@@ -2835,6 +2884,181 @@
],
"time": "2012-12-21 11:40:51"
},
{
"name": "react/event-loop",
"version": "v0.3.3",
"target-dir": "React/EventLoop",
"source": {
"type": "git",
"url": "https://github.com/reactphp/event-loop.git",
"reference": "49208fa3a15c9eae4adf8351ee9caa4064fe550d"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/reactphp/event-loop/zipball/49208fa3a15c9eae4adf8351ee9caa4064fe550d",
"reference": "49208fa3a15c9eae4adf8351ee9caa4064fe550d",
"shasum": ""
},
"require": {
"php": ">=5.3.3"
},
"suggest": {
"ext-libev": "*",
"ext-libevent": ">=0.0.5"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "0.3-dev"
}
},
"autoload": {
"psr-0": {
"React\\EventLoop": ""
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"description": "Event loop abstraction layer that libraries can use for evented I/O.",
"keywords": [
"event-loop"
],
"time": "2013-07-08 22:38:22"
},
{
"name": "react/socket",
"version": "v0.3.2",
"target-dir": "React/Socket",
"source": {
"type": "git",
"url": "https://github.com/reactphp/socket.git",
"reference": "9cc32838d25a934ce950d2662cce2ff1b9e9f8aa"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/reactphp/socket/zipball/9cc32838d25a934ce950d2662cce2ff1b9e9f8aa",
"reference": "9cc32838d25a934ce950d2662cce2ff1b9e9f8aa",
"shasum": ""
},
"require": {
"evenement/evenement": "1.0.*",
"php": ">=5.3.3",
"react/event-loop": "0.3.*",
"react/stream": "0.3.*"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "0.3-dev"
}
},
"autoload": {
"psr-0": {
"React\\Socket": ""
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"description": "Library for building an evented socket server.",
"keywords": [
"Socket"
],
"time": "2013-04-26 20:23:10"
},
{
"name": "react/stream",
"version": "v0.3.3",
"target-dir": "React/Stream",
"source": {
"type": "git",
"url": "https://github.com/reactphp/stream.git",
"reference": "b72900ab00513b591fb101faecad38f9c8eab8da"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/reactphp/stream/zipball/b72900ab00513b591fb101faecad38f9c8eab8da",
"reference": "b72900ab00513b591fb101faecad38f9c8eab8da",
"shasum": ""
},
"require": {
"evenement/evenement": "1.0.*",
"php": ">=5.3.3"
},
"suggest": {
"react/event-loop": "0.3.*",
"react/promise": "~1.0"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "0.3-dev"
}
},
"autoload": {
"psr-0": {
"React\\Stream": ""
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"description": "Basic readable and writable stream interfaces that support piping.",
"keywords": [
"pipe",
"stream"
],
"time": "2013-07-09 00:44:12"
},
{
"name": "react/zmq",
"version": "v0.2.0",
"source": {
"type": "git",
"url": "https://github.com/reactphp/zmq.git",
"reference": "b69d97f99f2127e27d130d7fe3b2cec2a63b2c4e"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/reactphp/zmq/zipball/b69d97f99f2127e27d130d7fe3b2cec2a63b2c4e",
"reference": "b69d97f99f2127e27d130d7fe3b2cec2a63b2c4e",
"shasum": ""
},
"require": {
"evenement/evenement": "~1.0",
"ext-zmq": "*",
"php": ">=5.3.2",
"react/event-loop": "0.3.*"
},
"require-dev": {
"ext-pcntl": "*"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "0.2-dev"
}
},
"autoload": {
"psr-0": {
"React\\ZMQ": "src"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"description": "ZeroMQ bindings for React.",
"keywords": [
"zeromq",
"zmq"
],
"time": "2013-09-18 11:02:06"
},
{
"name": "silex/silex",
"version": "1.1.x-dev",

View File

@@ -29,10 +29,24 @@ main:
type: Alchemy\Phrasea\SearchEngine\Phrasea\PhraseaEngine
options: []
task-manager:
status: started
logger:
max-files: 10
enabled: true
level: INFO
listener:
protocol: tcp
host: 127.0.0.1
port: 6660
linger: 500
websocket-server:
host: local.phrasea
port: 9090
ip: 0.0.0.0
subscriber:
protocol: tcp
host: 127.0.0.1
port: 13598
session:
type: 'file'
options: []

View File

@@ -64,8 +64,6 @@ class Authenticator
$this->session->remove('usr_id');
$this->session->remove('session_id');
$this->session->set('usr_id', $user->get_id());
$session = new Session();
$session->setBrowserName($this->browser->getBrowser())
->setBrowserVersion($this->browser->getVersion())
@@ -76,7 +74,7 @@ class Authenticator
$this->em->persist($session);
$this->em->flush();
$this->session->set('session_id', $session->getId());
$this->populateSession($session);
foreach ($this->app['acl']->get($user)->get_granted_sbas() as $databox) {
\cache_databox::insertClient($this->app, $databox);
@@ -86,6 +84,20 @@ class Authenticator
return $session;
}
private function populateSession(Session $session)
{
$user = $session->getUser($this->app);
$rights = [];
if ($this->app['acl']->get($user)->has_right('taskmanager')) {
$rights[] = 'task-manager';
}
$this->session->set('usr_id', $user->get_id());
$this->session->set('websockets_rights', $rights);
$this->session->set('session_id', $session->getId());
}
public function refreshAccount(Session $session)
{
if (!$this->em->getRepository('Alchemy\Phrasea\Model\Entities\Session')->findOneBy(['id' => $session->getId()])) {
@@ -99,8 +111,7 @@ class Authenticator
}
$this->session->clear();
$this->session->set('usr_id', $session->getUsrId());
$this->session->set('session_id', $session->getId());
$this->populateSession($session);
foreach ($this->app['acl']->get($user)->get_granted_sbas() as $databox) {
\cache_databox::insertClient($this->app, $databox);

View File

@@ -13,6 +13,7 @@ namespace Alchemy\Phrasea;
use Alchemy\Phrasea\Command\CommandInterface;
use Alchemy\Phrasea\Core\CLIProvider\TranslationExtractorServiceProvider;
use Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider;
use Alchemy\Phrasea\Exception\RuntimeException;
use Symfony\Component\Console;
use Alchemy\Phrasea\Core\CLIProvider\CLIDriversServiceProvider;
@@ -55,6 +56,7 @@ class CLI extends Application
});
$this->register(new PluginServiceProvider());
$this->register(new WebsocketServerServiceProvider());
$this->register(new ComposerSetupServiceProvider());
$this->register(new CLIDriversServiceProvider());
$this->register(new LessBuilderServiceProvider());

View File

@@ -32,6 +32,7 @@ class SchedulerRun extends Command
switch ($signal) {
case SIGTERM:
case SIGINT:
$this->container['signal-handler']->unregisterAll();
$this->container['task-manager']->stop();
break;
}

View File

@@ -0,0 +1,37 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Command;
use Alchemy\Phrasea\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class WebsocketServer extends Command
{
public function __construct($name = null)
{
parent::__construct($name);
$this->setDescription("Runs the websocket server");
}
public function doExecute(InputInterface $input, OutputInterface $output)
{
$sessionConf = $this->container['conf']->get(['main', 'session', 'type'], 'file');
if (!in_array($sessionConf, ['memcached', 'memcache', 'redis'])) {
throw new RuntimeException(sprintf('Running the websocket server requires a server session storage, type `%s` provided', $sessionConf));
}
$this->container['ws.server']->run();
}
}

View File

@@ -33,7 +33,7 @@ class TaskManagerServiceProvider implements ServiceProviderInterface
$app['task-manager'] = $app->share(function (Application $app) {
$options = $app['task-manager.listener.options'];
return TaskManager::create(
$manager = TaskManager::create(
$app['dispatcher'],
$app['task-manager.logger'],
$app['task-manager.task-list'],
@@ -41,8 +41,12 @@ class TaskManagerServiceProvider implements ServiceProviderInterface
'listener_protocol' => $options['protocol'],
'listener_host' => $options['host'],
'listener_port' => $options['port'],
'tick_period' => 1,
]
);
$manager->addSubscriber($app['ws.task-manager.broadcaster']);
return $manager;
});
$app['task-manager.logger.configuration'] = $app->share(function (Application $app) {

View File

@@ -0,0 +1,108 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Core\CLIProvider;
use Alchemy\Phrasea\Websocket\Consumer\ConsumerManager;
use Alchemy\Phrasea\Websocket\Topics\Directive;
use Alchemy\Phrasea\Websocket\Topics\DirectivesManager;
use Alchemy\Phrasea\Websocket\Subscriber\TaskManagerBroadcasterSubscriber;
use Alchemy\Phrasea\Websocket\PhraseanetWampServer;
use Alchemy\Phrasea\Websocket\Topics\Plugin\TaskManagerSubscriberPlugin;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
use Ratchet\App;
use Ratchet\Session\SessionProvider;
use Ratchet\Wamp\WampServer;
use Silex\Application;
use Silex\ServiceProviderInterface;
use React\EventLoop\Factory as EventLoopFactory;
class WebsocketServerServiceProvider implements ServiceProviderInterface
{
public function register(Application $app)
{
$app['ws.publisher.options'] = $app->share(function (Application $app) {
return array_replace([
'protocol' => 'tcp',
'host' => '127.0.0.1',
'port' => 13598,
], $app['conf']->get(['main', 'websocket-server', 'subscriber'], []));
});
$app['ws.task-manager.broadcaster'] = $app->share(function (Application $app) {
return TaskManagerBroadcasterSubscriber::create($app['ws.publisher.options']);
});
$app['ws.event-loop'] = $app->share(function () {
return EventLoopFactory::create();
});
$app['ws.server.subscriber'] = $app->share(function (Application $app) {
return new TaskManagerSubscriberPlugin($app['ws.publisher.options'], $app['ws.event-loop'], $app['ws.server.logger']);
});
$app['ws.server.application'] = $app->share(function (Application $app) {
return new SessionProvider(
new WampServer($app['ws.server.phraseanet-server']), $app['session.storage.handler']
);
});
$app['ws.server.phraseanet-server'] = $app->share(function (Application $app) {
return new PhraseanetWampServer($app['ws.server.topics-manager'], $app['ws.server.logger']);
});
$app['ws.server.logger'] = $app->share(function (Application $app) {
return $app['task-manager.logger'];
});
$app['ws.server.topics-manager.directives.conf'] = $app->share(function (Application $app) {
return [
new Directive(TopicsManager::TOPIC_TASK_MANAGER, true, ['task-manager']),
];
});
$app['ws.server.topics-manager.directives'] = $app->share(function (Application $app) {
return new DirectivesManager($app['ws.server.topics-manager.directives.conf']);
});
$app['ws.server.consumer-manager'] = $app->share(function (Application $app) {
return new ConsumerManager();
});
$app['ws.server.topics-manager'] = $app->share(function (Application $app) {
$manager = new TopicsManager($app['ws.server.topics-manager.directives'], $app['ws.server.consumer-manager']);
$manager->attach($app['ws.server.subscriber']);
return $manager;
});
$app['ws.server.options'] = $app->share(function (Application $app) {
return array_replace([
'host' => 'localhost',
'port' => 9090,
'ip' => '127.0.0.1',
], $app['conf']->get(['main', 'websocket-server'], []));
});
$app['ws.server'] = $app->share(function (Application $app) {
$options = $app['ws.server.options'];
$server = new App($options['host'], $options['port'], $options['ip'], $app['ws.event-loop']);
$server->route('/websockets', $app['ws.server.application']);
return $server;
});
}
public function boot(Application $app)
{
}
}

View File

@@ -13,7 +13,7 @@ namespace Alchemy\Phrasea\Core;
class Version
{
protected static $number = '3.9.0-alpha.10';
protected static $number = '3.9.0-alpha.11';
protected static $name = 'Epanterias';
public static function getNumber()

View File

@@ -237,9 +237,15 @@ class SystemRequirements extends RequirementCollection implements RequirementInt
);
$this->addRecommendation(
extension_loaded('twig'),
'Twig extension is strongly recommended in production',
'Install and enable the <strong>twig</strong> extension.'
extension_loaded('twig'),
'Twig extension is strongly recommended in production',
'Install and enable the <strong>twig</strong> extension.'
);
$this->addRecommendation(
function_exists('event_base_new'),
'LibEvent extension is strongly recommended in production',
'Install and enable the <strong>LibEvent</strong> extension.'
);
$this->addRequirement(

View File

@@ -32,7 +32,7 @@ class FinishedJobRemoverSubscriber implements EventSubscriberInterface
public static function getSubscribedEvents()
{
return [
JobEvents::FINISHED => [$this, 'onJobFinish'],
JobEvents::FINISHED => 'onJobFinish',
];
}

View File

@@ -0,0 +1,43 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Consumer;
/**
* Websocket consumer
*/
class Consumer implements ConsumerInterface
{
private $usrId;
private $rights;
public function __construct($usrId, array $rights)
{
$this->usrId = $usrId;
$this->rights = $rights;
}
/**
* {@inheritdoc}
*/
public function isAuthenticated()
{
return $this->usrId !== null;
}
/**
* {@inheritdoc}
*/
public function hasRights($rights)
{
return count(array_intersect($this->rights, (array) $rights)) === count($rights);
}
}

View File

@@ -0,0 +1,31 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Consumer;
interface ConsumerInterface
{
/**
* Return true if the consumer is authenticated in Phraseanet
*
* @return Boolean
*/
public function isAuthenticated();
/**
* Return true if the user has the given rights
*
* @param string\array $rights A right or an array of rights
*
* @return Boolean
*/
public function hasRights($rights);
}

View File

@@ -0,0 +1,32 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Consumer;
use Symfony\Component\HttpFoundation\Session\SessionInterface;
class ConsumerManager
{
/**
* Creates a consumer given a Session
*
* @param Session $session
*
* @return Consumer
*/
public function create(SessionInterface $session)
{
$usrId = $session->has('usr_id') ? $session->get('usr_id') : null;
$rights = $session->has('websockets_rights') ? $session->get('websockets_rights') : [];
return new Consumer($usrId, $rights);;
}
}

View File

@@ -0,0 +1,94 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
use Psr\Log\LoggerInterface;
use Ratchet\ConnectionInterface as Conn;
use Ratchet\Wamp\WampServerInterface;
class PhraseanetWampServer implements WampServerInterface
{
private $logger;
private $manager;
public function __construct(TopicsManager $manager, LoggerInterface $logger)
{
$this->logger = $logger;
$this->manager = $manager;
}
/**
* {@inheritdoc}
*/
public function onPublish(Conn $conn, $topic, $event, array $exclude, array $eligible)
{
$this->logger->error(sprintf('Publishing on topic %s', $topic->getId()), array('event' => $event, 'topic' => $topic));
$topic->broadcast($event);
}
/**
* {@inheritdoc}
*/
public function onCall(Conn $conn, $id, $topic, array $params)
{
$this->logger->error(sprintf('Received RPC call on topic %s', $topic->getId()), array('topic' => $topic));
$conn->callError($id, $topic, 'RPC not supported on this demo');
}
/**
* {@inheritdoc}
*/
public function onSubscribe(Conn $conn, $topic)
{
if ($this->manager->subscribe($conn, $topic)) {
$this->logger->debug(sprintf('Subscription received on topic %s', $topic->getId()), array('topic' => $topic));
} else {
$this->logger->error(sprintf('Subscription received on topic %s, user is not allowed', $topic->getId()), array('topic' => $topic));
}
}
/**
* {@inheritdoc}
*/
public function onUnSubscribe(Conn $conn, $topic)
{
$this->logger->debug(sprintf('Unsubscription received on topic %s', $topic->getId()), array('topic' => $topic));
$this->manager->unsubscribe($conn, $topic);
}
/**
* {@inheritdoc}
*/
public function onOpen(Conn $conn)
{
$this->logger->debug('[WS] Connection request accepted');
$this->manager->openConnection($conn);
}
/**
* {@inheritdoc}
*/
public function onClose(Conn $conn)
{
$this->logger->debug('[WS] Connection closed');
$this->manager->closeConnection($conn);
}
/**
* {@inheritdoc}
*/
public function onError(Conn $conn, \Exception $e)
{
$this->logger->error('[WS] Connection error', ['exception' => $e]);
}
}

View File

@@ -0,0 +1,88 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Subscriber;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
use Alchemy\TaskManager\Event\StateFormater;
use Alchemy\TaskManager\Event\TaskManagerEvent;
use Alchemy\TaskManager\Event\TaskManagerRequestEvent;
use Alchemy\TaskManager\Event\TaskManagerEvents;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Alchemy\TaskManager\ZMQSocket;
class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface
{
private $broadcaster;
private $formater;
public function __construct(ZMQSocket $broadcaster)
{
$this->formater = new StateFormater();
$this->broadcaster = $broadcaster;
$this->broadcaster->bind();
usleep(300000);
}
public function onManagerStart(TaskManagerEvent $event)
{
$this->broadcaster->send(json_encode([
'topic' => TopicsManager::TOPIC_TASK_MANAGER,
'event' => TaskManagerEvents::MANAGER_START,
]));
}
public function onManagerStop(TaskManagerEvent $event)
{
$this->broadcaster->send(json_encode([
'topic' => TopicsManager::TOPIC_TASK_MANAGER,
'event' => TaskManagerEvents::MANAGER_STOP,
]));
}
public function onManagerRequest(TaskManagerRequestEvent $event)
{
$this->broadcaster->send(json_encode([
'topic' => TopicsManager::TOPIC_TASK_MANAGER,
'event' => TaskManagerEvents::MANAGER_REQUEST,
'request' => $event->getRequest(),
'response' => $event->getResponse(),
]));
}
public function onManagerTick(TaskManagerEvent $event)
{
$this->broadcaster->send(json_encode([
'topic' => TopicsManager::TOPIC_TASK_MANAGER,
'event' => TaskManagerEvents::MANAGER_TICK,
'message' => $this->formater->toArray(
$event->getManager()->getProcessManager()->getManagedProcesses()
),
]));
}
public static function getSubscribedEvents()
{
return [
TaskManagerEvents::MANAGER_START => 'onManagerStart',
TaskManagerEvents::MANAGER_STOP => 'onManagerStop',
TaskManagerEvents::MANAGER_REQUEST => 'onManagerRequest',
TaskManagerEvents::MANAGER_TICK => 'onManagerTick',
];
}
public static function create(array $options)
{
return new static(ZMQSocket::create(new \ZMQContext(), \ZMQ::SOCKET_PUB, $options['protocol'], $options['host'], $options['port']));
}
}

View File

@@ -0,0 +1,75 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Topics;
use Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface;
/**
* Stores consumer required settings for a topic
*/
class Directive
{
private $topic;
private $requireAuthentication;
private $requiredRights;
public function __construct($topic, $requireAuthentication, array $requiredRights)
{
$this->topic = $topic;
$this->requireAuthentication = (Boolean) $requireAuthentication;
$this->requiredRights = $requiredRights;
}
/**
* @return string
*/
public function getTopic()
{
return $this->topic;
}
/**
* Returns true if the topic requires an authenticated consumer
*
* @return Boolean
*/
public function requireAuthentication()
{
return $this->requireAuthentication;
}
/**
* Returns an array of required rights for the authenticated consumer
*
* @return array
*/
public function getRequiredRights()
{
return $this->requiredRights;
}
/**
* Returns true if the consumer satisfies the directive
*
* @param ConsumerInterface $consumer
*
* @return Boolean
*/
public function isStatisfiedBy(ConsumerInterface $consumer)
{
if ($this->requireAuthentication() && !$consumer->isAuthenticated()) {
return false;
}
return $consumer->hasRights($this->getRequiredRights());
}
}

View File

@@ -0,0 +1,61 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Topics;
use Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface;
use Ratchet\Wamp\Topic;
class DirectivesManager
{
private $directives;
public function __construct(array $directives)
{
array_walk($directives, function ($directive) {
if (!$directive instanceof Directive) {
throw new \InvalidArgumentException('Websocket configuration only accepts configuration directives.');
}
});
$this->directives = $directives;
}
/**
* Returns true if the consumer has access to the given topic
*
* @param ConsumerInterface $consumer
* @param Topic $topic
*
* @return Boolean
*/
public function hasAccess(ConsumerInterface $consumer, Topic $topic)
{
foreach ($this->getDirectives($topic) as $directive) {
if (!$directive->isStatisfiedBy($consumer)) {
return false;
}
}
return true;
}
/**
* @param Topic $topic
*
* @return Directive[]
*/
private function getDirectives(Topic $topic)
{
return array_filter($this->directives, function (Directive $directive) use ($topic) {
return $directive->getTopic() === $topic->getId();
});
}
}

View File

@@ -0,0 +1,24 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Topics\Plugin;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
interface PluginInterface
{
/**
* Attaches a Plugin to the TopicsManager
*
* @param TopicsManager $manager
*/
public function attach(TopicsManager $manager);
}

View File

@@ -0,0 +1,63 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Topics\Plugin;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
use Psr\Log\LoggerInterface;
use React\EventLoop\LoopInterface;
use React\ZMQ\Context;
class TaskManagerSubscriberPlugin implements PluginInterface
{
private $logger;
private $pull;
public function __construct($options, LoopInterface $loop, LoggerInterface $logger)
{
$this->logger = $logger;
$context = new Context($loop);
$this->pull = $context->getSocket(\ZMQ::SOCKET_SUB);
$this->pull->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, "");
$this->pull->connect(sprintf('%s://%s:%s', $options['protocol'], $options['host'], $options['port']));
$this->pull->on('error', function ($e) use ($logger) {
$logger->error('TaskManager Subscriber received an error.', ['exception' => $e]);
});
}
/**
* {@inheritdoc}
*/
public function attach(TopicsManager $manager)
{
$this->pull->on('message', function ($msg) use ($manager) {
$data = @json_decode($msg, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$this->logger->error(sprintf('[WS] Received invalid message %s : invalid json', $msg));
return;
}
if (!isset($data['topic'])) {
$this->logger->error(sprintf('[WS] Received invalid message %s : no topic', $msg));
return;
}
$this->logger->debug(sprintf('[WS] Received message %s', $msg));
$manager->broadcast($data['topic'], json_encode($msg));
});
}
}

View File

@@ -0,0 +1,163 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Topics;
use Alchemy\Phrasea\Websocket\Consumer\Consumer;
use Alchemy\Phrasea\Websocket\Consumer\ConsumerManager;
use Alchemy\Phrasea\Websocket\Topics\Plugin\PluginInterface;
use Ratchet\ConnectionInterface as Conn;
use Ratchet\Wamp\Topic;
class TopicsManager
{
const TOPIC_TASK_MANAGER = 'http://phraseanet.com/topics/admin/task-manager';
private $topics = [];
private $directives;
private $consumerManager;
public function __construct(DirectivesManager $directives, ConsumerManager $consumerManagaer)
{
$this->directives = $directives;
$this->consumerManager = $consumerManagaer;
}
/**
* Attaches a plugin to the TopicsManager
*
* @param PluginInterface $plugin
*
* @return TopicsManager
*/
public function attach(PluginInterface $plugin)
{
$plugin->attach($this);
return $this;
}
/**
* Checks if the consumer related to the connection has access to the topic,
* removes the connection from topic if the consumer is not granted.
*
* @param Conn $conn
* @param Topic $topic
*
* @return Boolean Return true if the consumer is granted, false otherwise
*/
public function subscribe(Conn $conn, Topic $topic)
{
if (!$this->directives->hasAccess($conn->User, $topic)) {
$topic->remove($conn);
return false;
}
$this->topics[$topic->getId()] = $topic;
return true;
}
/**
* Triggered on unsubscription.
*
* Removes internal references to the topic if no more consumers are listening.
*
* @param Conn $conn
* @param Topic $topic
*
* @return TopicsManager
*/
public function unsubscribe(Conn $conn, Topic $topic)
{
$this->cleanupReferences($conn, $topic);
return $this;
}
/**
* Triggered on connection, populates the connection with a consumer.
*
* @param Conn $conn
*
* @return TopicsManager
*/
public function openConnection(Conn $conn)
{
try {
$conn->User = $this->consumerManager->create($conn->Session);
} catch (\RuntimeException $e) {
$conn->close();
}
return $this;
}
/**
* Triggered on deconnexion.
*
* Removes internal references to topics if no more consumers are listening.
*
* @param Conn $conn
*
* @return TopicsManager
*/
public function closeConnection(Conn $conn)
{
$this->cleanupReferences($conn);
return $this;
}
/**
* Brodcasts a message to a topic, if it exists
*
* @param $topicId string
* @param $message string
*
* @return TopicsManager
*/
public function broadcast($topicId, $message)
{
if (isset($this->topics[$topicId])) {
$this->topics[$topicId]->broadcast($message);
}
return $this;
}
/**
* Removes internal references to topics if they do not contains any reference to an active connection.
*
* @param Conn $conn
* @param null|Topic $topic Restrict to this topic, if provided
*/
private function cleanupReferences(Conn $conn, Topic $topic = null)
{
$storage = $this->topics;
$updated = array();
foreach ($storage as $id => $storedTopic) {
if (null !== $topic && $id !== $topic->getId()) {
continue;
}
if ($storedTopic->has($conn)) {
$storedTopic->remove($conn);
}
if (count($storedTopic) > 0) {
$updated[] = $storedTopic;
}
}
$this->topics = $updated;
}
}

View File

@@ -0,0 +1,78 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
use Alchemy\Phrasea\Application;
use Alchemy\Phrasea\Model\Entities\Task;
class patch_390alpha11a implements patchInterface
{
/** @var string */
private $release = '3.9.0-alpha.11';
/** @var array */
private $concern = [base::APPLICATION_BOX];
/**
* {@inheritdoc}
*/
public function get_release()
{
return $this->release;
}
/**
* {@inheritdoc}
*/
public function require_all_upgrades()
{
return false;
}
/**
* {@inheritdoc}
*/
public function concern()
{
return $this->concern;
}
/**
* {@inheritdoc}
*/
public function getDoctrineMigrations()
{
return [];
}
/**
* {@inheritdoc}
*/
public function apply(base $appbox, Application $app)
{
$app['conf']->set(['main', 'task-manager', 'status'], 'started');
$app['conf']->set(['main', 'websocket-server'], [
'host' => parse_url($app['conf']->get('servername'), PHP_URL_HOST),
'port' => 9090,
'ip' => '0.0.0.0',
]);
$app['conf']->set(['main', 'task-manager', 'listener'], [
'protocol' => 'tcp',
'host' => '127.0.0.1',
'port' => 6660,
'linger' => 500,
]);
$app['conf']->set(['main', 'websocket-server', 'subscriber'], [
'protocol' => 'tcp',
'host' => '127.0.0.1',
'port' => 13598,
]);
}
}

View File

@@ -29,10 +29,24 @@ main:
type: Alchemy\Phrasea\SearchEngine\Phrasea\PhraseaEngine
options: []
task-manager:
status: started
logger:
max-files: 10
enabled: true
level: INFO
listener:
protocol: tcp
host: 127.0.0.1
port: 6660
linger: 500
websocket-server:
host: local.phrasea
port: 9090
ip: 0.0.0.0
subscriber:
protocol: tcp
host: 127.0.0.1
port: 13598
session:
type: 'file'
options: []

View File

@@ -0,0 +1,29 @@
<?php
namespace Alchemy\Tests\Phrasea\Command;
use Alchemy\Phrasea\Command\WebsocketServer;
class WebsocketServerTest extends \PhraseanetTestCase
{
public function testRunWithoutProblems()
{
$input = $this->getMock('Symfony\Component\Console\Input\InputInterface');
$output = $this->getMock('Symfony\Component\Console\Output\OutputInterface');
$sessionType = self::$DI['cli']['conf']->get(['main', 'session', 'type'], 'file');
self::$DI['cli']['conf']->set(['main', 'session', 'type'], 'memcached');
self::$DI['cli']['ws.server'] = $this->getMockBuilder('Ratchet\App')
->disableOriginalConstructor()
->getMock();
self::$DI['cli']['ws.server']->expects($this->once())
->method('run');
$command = new WebsocketServer('websocketserver');
$command->setContainer(self::$DI['cli']);
$command->execute($input, $output);
self::$DI['cli']['conf']->set(['main', 'session', 'type'], $sessionType);
}
}

View File

@@ -105,7 +105,7 @@ class SetupTest extends \PhraseanetWebTestCase
->disableOriginalConstructor()
->getMock();
$user->expects($this->exactly(2))
$user->expects($this->exactly(1))
->method('get_id')
->will($this->returnValue(self::$DI['user']->get_id()));

View File

@@ -0,0 +1,57 @@
<?php
namespace Alchemy\Tests\Phrasea\Core\CLIProvider;
class WebsocketServerServiceProviderTest extends ServiceProviderTestCase
{
public function provideServiceDescription()
{
return [
[
'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider',
'ws.task-manager.broadcaster',
'Alchemy\Phrasea\Websocket\Subscriber\TaskManagerBroadcasterSubscriber',
],
[
'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider',
'ws.event-loop',
'React\EventLoop\LoopInterface',
],
[
'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider',
'ws.server.subscriber',
'Alchemy\Phrasea\Websocket\Topics\Plugin\TaskManagerSubscriberPlugin',
],
[
'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider',
'ws.server.application',
'Ratchet\WebSocket\WsServerInterface',
],
[
'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider',
'ws.server',
'Ratchet\App',
],
[
'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider',
'ws.server.phraseanet-server',
'Alchemy\Phrasea\Websocket\PhraseanetWampServer',
],
[
'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider',
'ws.server.logger',
'Psr\Log\LoggerInterface',
],
[
'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider',
'ws.server.topics-manager.directives',
'Alchemy\Phrasea\Websocket\Topics\DirectivesManager',
],
[
'Alchemy\Phrasea\Core\CLIProvider\WebsocketServerServiceProvider',
'ws.server.consumer-manager',
'Alchemy\Phrasea\Websocket\Consumer\ConsumerManager',
],
];
}
}

View File

@@ -0,0 +1,58 @@
<?php
namespace Alchemy\Tests\Phrasea\Websocket\Consumer;
use Alchemy\Phrasea\Websocket\Consumer\ConsumerManager;
class ConsumerManagerTest extends \PhraseanetTestCase
{
/**
* @dataProvider provideConsumerManagerData
*/
public function testCreate($usrId, $rights, $authenticated, $checkedRights, $hasRights)
{
$manager = new ConsumerManager();
$consumer = $manager->create($this->createSessionMock($usrId, $rights));
$this->assertSame($authenticated, $consumer->isAuthenticated());
$this->assertSame($hasRights, $consumer->hasRights($checkedRights));
}
public function provideConsumerManagerData()
{
return [
[25, ['task-manager'], true, [], true],
[25, ['task-manager'], true, ['task-manager'], true],
[null, ['task-manager'], false, ['task-manager', 'neutron'], false],
[null, ['neutron', 'task-manager'], false, ['task-manager', 'neutron'], true],
[42, ['neutron', 'task-manager', 'romain'], true, ['task-manager', 'neutron'], true],
];
}
private function createSessionMock($usrId, $rights)
{
$session = $this->getMock('Symfony\Component\HttpFoundation\Session\SessionInterface');
$session->expects($this->any())
->method('has')
->will($this->returnCallback(function ($prop) use ($usrId, $rights) {
switch ($prop) {
case 'usr_id':
return $usrId !== null;
case 'websockets_rights':
return $rights !== null;
}
}));
$session->expects($this->any())
->method('get')
->will($this->returnCallback(function ($prop) use ($usrId, $rights) {
switch ($prop) {
case 'usr_id':
return $usrId;
case 'websockets_rights':
return $rights;
}
}));
return $session;
}
}

View File

@@ -0,0 +1,32 @@
<?php
namespace Alchemy\Tests\Phrasea\Websocket\Consumer;
use Alchemy\Phrasea\Websocket\Consumer\Consumer;
class ConsumerTest extends \PhraseanetTestCase
{
public function testIsAuthenticated()
{
$consumer = new Consumer(42, []);
$this->assertTrue($consumer->isAuthenticated());
$consumer = new Consumer(null, []);
$this->assertFalse($consumer->isAuthenticated());
}
public function testHasRights()
{
$consumer = new Consumer(42, ['neutron']);
$this->assertTrue($consumer->hasRights('neutron'));
$consumer = new Consumer(42, ['neutron']);
$this->assertTrue($consumer->hasRights(['neutron']));
$consumer = new Consumer(42, ['romainneutron']);
$this->assertFalse($consumer->hasRights('neutron'));
$consumer = new Consumer(42, ['romainneutron']);
$this->assertFalse($consumer->hasRights(['neutron']));
$consumer = new Consumer(42, ['neutron']);
$this->assertFalse($consumer->hasRights(['neutron', 'romain']));
$consumer = new Consumer(42, ['romain', 'neutron', 'bouteille']);
$this->assertTrue($consumer->hasRights(['neutron', 'romain']));
}
}

View File

@@ -0,0 +1,34 @@
<?php
namespace Alchemy\Tests\Phrasea\Websocket;
use Alchemy\Phrasea\Websocket\PhraseanetWampServer;
class PhraseanetWampServerTest extends \PhraseanetTestCase
{
public function testOpenConnectionConnected()
{
$topicsManager = $this->createTopicsManagerMock();
$conn = $this->getMock('Ratchet\ConnectionInterface');
$topicsManager->expects($this->once())
->method('openConnection')
->with($conn);
$server = new PhraseanetWampServer($topicsManager, $this->createLoggerMock());
$server->onOpen($conn);
}
private function createTopicsManagerMock()
{
return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Topics\TopicsManager')
->disableOriginalConstructor()
->getMock();
}
private function createLoggerMock()
{
return $this->getMockBuilder('Psr\Log\LoggerInterface')
->disableOriginalConstructor()
->getMock();
}
}

View File

@@ -0,0 +1,127 @@
<?php
namespace Alchemy\Tests\Phrasea\Websocket\Subscriber;
use Alchemy\Phrasea\Websocket\Subscriber\TaskManagerBroadcasterSubscriber;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
use Alchemy\TaskManager\Event\TaskManagerEvent;
use Alchemy\TaskManager\Event\TaskManagerEvents;
use Alchemy\TaskManager\Event\TaskManagerRequestEvent;
class TaskManagerBroadcasterSubscriberTest extends \PhraseanetTestCase
{
public function testOnManagerStart()
{
$socket = $this->createZMQSocketMock();
$socket->expects($this->once())
->method('send')
->will($this->jsonCapture($json));
$subscriber = new TaskManagerBroadcasterSubscriber($socket);
$subscriber->onManagerStart($this->createTaskManagerEvent());
$this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_START);
}
public function testOnManagerStop()
{
$socket = $this->createZMQSocketMock();
$socket->expects($this->once())
->method('send')
->will($this->jsonCapture($json));
$subscriber = new TaskManagerBroadcasterSubscriber($socket);
$subscriber->onManagerStop($this->createTaskManagerEvent());
$this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_STOP);
}
public function testOnManagerRequest()
{
$socket = $this->createZMQSocketMock();
$socket->expects($this->once())
->method('send')
->will($this->jsonCapture($json));
$subscriber = new TaskManagerBroadcasterSubscriber($socket);
$subscriber->onManagerRequest(new TaskManagerRequestEvent($this->createTaskManagerMock(), 'PING', 'PONG'));
$data = $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_REQUEST);
$this->assertEquals('PING', $data['request']);
$this->assertEquals('PONG', $data['response']);
}
public function testOnManagerTick()
{
$socket = $this->createZMQSocketMock();
$socket->expects($this->once())
->method('send')
->will($this->jsonCapture($json));
$subscriber = new TaskManagerBroadcasterSubscriber($socket);
$subscriber->onManagerTick($this->createTaskManagerEvent());
$data = $this->assertValidJson($json, TopicsManager::TOPIC_TASK_MANAGER, TaskManagerEvents::MANAGER_TICK);
$this->assertArrayHasKey('message', $data);
$this->assertInternalType('array', $data['message']);
}
private function assertValidJson($json, $topic, $event)
{
$data = json_decode($json, true);
$this->assertTrue(json_last_error() === JSON_ERROR_NONE);
$this->assertArrayHasKey('event', $data);
$this->assertArrayHasKey('topic', $data);
$this->assertEquals($event, $data['event']);
$this->assertEquals($topic, $data['topic']);
return $data;
}
private function jsonCapture(&$json)
{
return $this->returnCallback(function ($arg) use (&$json) { $json = $arg; return 'lala'; });
}
private function createZMQSocketMock()
{
$socket = $this->getMockBuilder('Alchemy\TaskManager\ZMQSocket')
->setMethods(['send', 'bind'])
->disableOriginalConstructor()
->getMock();
$socket->expects($this->once())
->method('bind');
return $socket;
}
private function createTaskManagerMock()
{
$manager = $this->getMockBuilder('Alchemy\TaskManager\TaskManager')
->disableOriginalConstructor()
->getMock();
$processManager = $this->getMockBuilder('Neutron\ProcessManager\ProcessManager')
->disableOriginalConstructor()
->getMock();
$processManager->expects($this->any())
->method('getManagedProcesses')
->will($this->returnValue([]));
$manager->expects($this->any())
->method('getProcessManager')
->will($this->returnValue($processManager));
return $manager;
}
private function createTaskManagerEvent()
{
return new TaskManagerEvent($this->createTaskManagerMock());
}
}

View File

@@ -0,0 +1,50 @@
<?php
namespace Alchemy\Tests\Phrasea\Websocket\Topics;
use Alchemy\Phrasea\Websocket\Topics\Directive;
class DirectiveTest extends \PhraseanetTestCase
{
public function testGetters()
{
$directive = new Directive('http://topic', true, ['neutron']);
$this->assertSame('http://topic', $directive->getTopic());
$this->assertTrue($directive->requireAuthentication());
$this->assertSame(['neutron'], $directive->getRequiredRights());
}
/**
* @dataProvider provideStatisfiedByCombinaisons
*/
public function testIsSatisfiedBy($authenticationRequired, $requiredRights, $authenticated, $hasRights, $satisfied)
{
$consumer = $this->createConsumerMock($authenticated, $hasRights, $requiredRights);
$directive = new Directive('http://topic', $authenticationRequired, $requiredRights);
$this->assertEquals($satisfied, $directive->isStatisfiedBy($consumer));
}
public function provideStatisfiedByCombinaisons()
{
return [
[true, ['neutron'], true, true, true],
[true, [], false, true, false],
[false, ['neutron'], true, false, false],
[false, ['neutron'], false, false, false],
];
}
private function createConsumerMock($authenticated, $hasRights, $requiredRights)
{
$consumer = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface');
$consumer->expects($this->any())
->method('isAuthenticated')
->will($this->returnValue($authenticated));
$consumer->expects($this->any())
->method('hasRights')
->with($requiredRights)
->will($this->returnValue($hasRights));
return $consumer;
}
}

View File

@@ -0,0 +1,46 @@
<?php
namespace Alchemy\Tests\Phrasea\Websocket\Topics;
use Alchemy\Phrasea\Websocket\Consumer\Consumer;
use Alchemy\Phrasea\Websocket\Topics\Directive;
use Alchemy\Phrasea\Websocket\Topics\DirectivesManager;
use Ratchet\Wamp\Topic;
class DirectivesManagerTest extends \PhraseanetTestCase
{
public function testHasAccess()
{
$manager = new DirectivesManager([
new Directive('http://topic', false, []),
new Directive('http://topic2', true, []),
new Directive('http://topic3', true, ['neutron']),
new Directive('http://topic4', true, ['bouteille']),
new Directive('http://topic4', true, ['neutron', 'romain']),
]);
$consumer = new Consumer(42, []);
$this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic')));
$this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2')));
$this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic3')));
$this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4')));
$consumer = new Consumer(null, []);
$this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic')));
$this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic2')));
$this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic3')));
$this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4')));
$consumer = new Consumer(42, ['neutron']);
$this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic')));
$this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2')));
$this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic3')));
$this->assertFalse($manager->hasAccess($consumer, new Topic('http://topic4')));
$consumer = new Consumer(42, ['neutron', 'bouteille', 'romain']);
$this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic')));
$this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic2')));
$this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic3')));
$this->assertTrue($manager->hasAccess($consumer, new Topic('http://topic4')));
}
}

View File

@@ -0,0 +1,187 @@
<?php
namespace Alchemy\Tests\Phrasea\Websocket\Topics;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
use Ratchet\Wamp\Topic;
class TopicsManagerTest extends \PhraseanetTestCase
{
public function testAttach()
{
$directivesManager = $this->createDirectivesManagerMock();
$consumerManager = $this->createConsumerManagerMock();
$manager = new TopicsManager($directivesManager, $consumerManager);
$plugin = $this->getMock('Alchemy\Phrasea\Websocket\Topics\Plugin\PluginInterface');
$plugin->expects($this->once())
->method('attach')
->with($manager);
$this->assertSame($manager, $manager->attach($plugin));
}
public function testSubscribeWithAccess()
{
$directivesManager = $this->createDirectivesManagerMock();
$consumerManager = $this->createConsumerManagerMock();
$manager = new TopicsManager($directivesManager, $consumerManager);
$conn = $this->getMock('Ratchet\ConnectionInterface');
$conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface');
$topic = $this->getMockBuilder('Ratchet\Wamp\Topic')
->disableOriginalConstructor()
->getMock();
$topic->expects($this->never())
->method('remove');
$directivesManager->expects($this->once())
->method('hasAccess')
->will($this->returnValue(true));
$manager->subscribe($conn, $topic);
}
public function testSubscribeWithoutAccess()
{
$directivesManager = $this->createDirectivesManagerMock();
$consumerManager = $this->createConsumerManagerMock();
$manager = new TopicsManager($directivesManager, $consumerManager);
$conn = $this->getMock('Ratchet\ConnectionInterface');
$conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface');
$topic = $this->getMockBuilder('Ratchet\Wamp\Topic')
->disableOriginalConstructor()
->getMock();
$topic->expects($this->once())
->method('remove')
->with($conn);
$directivesManager->expects($this->once())
->method('hasAccess')
->will($this->returnValue(false));
$manager->subscribe($conn, $topic);
}
public function testUnsubscribe()
{
$directivesManager = $this->createDirectivesManagerMock();
$consumerManager = $this->createConsumerManagerMock();
$directivesManager->expects($this->once())
->method('hasAccess')
->will($this->returnValue(true));
$manager = new TopicsManager($directivesManager, $consumerManager);
$conn = $this->getMock('Ratchet\ConnectionInterface');
$conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface');
$topic = new Topic('http://topic');
$topic->add($conn);
// should be subscribed to be unsubscribed
$manager->subscribe($conn, $topic);
$manager->unsubscribe($conn, $topic);
$this->assertFalse($topic->has($conn));
}
public function testOpenConnection()
{
$consumer = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface');
$session = $this->getMock('Symfony\Component\HttpFoundation\Session\SessionInterface');
$directivesManager = $this->createDirectivesManagerMock();
$consumerManager = $this->createConsumerManagerMock();
$consumerManager->expects($this->once())
->method('create')
->with($session)
->will($this->returnValue($consumer));
$manager = new TopicsManager($directivesManager, $consumerManager);
$conn = $this->getMock('Ratchet\ConnectionInterface');
$conn->Session = $session;
$manager->openConnection($conn);
$this->assertSame($consumer, $conn->User);
}
public function testCloseConnection()
{
$directivesManager = $this->createDirectivesManagerMock();
$consumerManager = $this->createConsumerManagerMock();
$directivesManager->expects($this->once())
->method('hasAccess')
->will($this->returnValue(true));
$manager = new TopicsManager($directivesManager, $consumerManager);
$conn = $this->getMock('Ratchet\ConnectionInterface');
$conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface');
$topic = new Topic('http://topic');
$topic->add($conn);
// should be subscribed to be unsubscribed
$manager->subscribe($conn, $topic);
$manager->closeConnection($conn);
$this->assertFalse($topic->has($conn));
}
public function testBroadcast()
{
$directivesManager = $this->createDirectivesManagerMock();
$consumerManager = $this->createConsumerManagerMock();
$directivesManager->expects($this->once())
->method('hasAccess')
->will($this->returnValue(true));
$manager = new TopicsManager($directivesManager, $consumerManager);
$conn = $this->getMock('Ratchet\ConnectionInterface');
$conn->User = $this->getMock('Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface');
$topic = $this->getMockBuilder('Ratchet\Wamp\Topic')
->disableOriginalConstructor()
->getMock();
$topic->expects($this->any())
->method('getId')
->will($this->returnValue('http://topic'));
$topic->expects($this->once())
->method('broadcast')
->with('hello world !');
// should be subscribed to be unsubscribed
$manager->subscribe($conn, $topic);
$manager->broadcast('http://topic', 'hello world !');
$manager->broadcast('http://topic2', 'nothing');
}
private function createDirectivesManagerMock()
{
return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Topics\DirectivesManager')
->disableOriginalConstructor()
->getMock();
}
private function createConsumerManagerMock()
{
return $this->getMockBuilder('Alchemy\Phrasea\Websocket\Consumer\ConsumerManager')
->disableOriginalConstructor()
->getMock();
}
}