Add authorization strategy for WAMP topics

This commit is contained in:
Romain Neutron
2014-02-13 16:07:04 +01:00
parent a834b3a4df
commit c0b152d73a
20 changed files with 942 additions and 122 deletions

View File

@@ -11,12 +11,16 @@
namespace Alchemy\Phrasea\Core\CLIProvider;
use Alchemy\Phrasea\Websocket\Consumer\ConsumerManager;
use Alchemy\Phrasea\Websocket\Topics\Directive;
use Alchemy\Phrasea\Websocket\Topics\DirectivesManager;
use Alchemy\Phrasea\Websocket\Subscriber\TaskManagerBroadcasterSubscriber;
use Alchemy\Phrasea\Websocket\PhraseanetWampServer;
use Alchemy\Phrasea\Websocket\Topics\Plugin\TaskManagerSubscriberPlugin;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
use Ratchet\App;
use Ratchet\Session\SessionProvider;
use Ratchet\Wamp\WampServer;
use React\ZMQ\Context;
use Silex\Application;
use Silex\ServiceProviderInterface;
use React\EventLoop\Factory as EventLoopFactory;
@@ -42,19 +46,7 @@ class WebsocketServerServiceProvider implements ServiceProviderInterface
});
$app['ws.server.subscriber'] = $app->share(function (Application $app) {
$options = $app['ws.publisher.options'];
$context = new Context($app['ws.event-loop']);
$pull = $context->getSocket(\ZMQ::SOCKET_SUB);
$pull->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, "");
$pull->connect(sprintf('%s://%s:%s', $options['protocol'], $options['host'], $options['port']));
$logger = $app['ws.server.logger'];
$pull->on('error', function ($e) use ($logger) {
$logger->error('TaskManager Subscriber received an error.', ['exception' => $e]);
});
return $pull;
return new TaskManagerSubscriberPlugin($app['ws.publisher.options'], $app['ws.event-loop'], $app['ws.server.logger']);
});
$app['ws.server.application'] = $app->share(function (Application $app) {
@@ -64,13 +56,34 @@ class WebsocketServerServiceProvider implements ServiceProviderInterface
});
$app['ws.server.phraseanet-server'] = $app->share(function (Application $app) {
return new PhraseanetWampServer($app['ws.server.subscriber'], $app['ws.server.logger']);
return new PhraseanetWampServer($app['ws.server.topics-manager'], $app['ws.server.logger']);
});
$app['ws.server.logger'] = $app->share(function (Application $app) {
return $app['task-manager.logger'];
});
$app['ws.server.topics-manager.directives.conf'] = $app->share(function (Application $app) {
return [
new Directive(TopicsManager::TOPIC_TASK_MANAGER, true, ['task-manager']),
];
});
$app['ws.server.topics-manager.directives'] = $app->share(function (Application $app) {
return new DirectivesManager($app['ws.server.topics-manager.directives.conf']);
});
$app['ws.server.consumer-manager'] = $app->share(function (Application $app) {
return new ConsumerManager();
});
$app['ws.server.topics-manager'] = $app->share(function (Application $app) {
$manager = new TopicsManager($app['ws.server.topics-manager.directives'], $app['ws.server.consumer-manager']);
$manager->attach($app['ws.server.subscriber']);
return $manager;
});
$app['ws.server.options'] = $app->share(function (Application $app) {
return array_replace([
'host' => 'localhost',

View File

@@ -0,0 +1,43 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Consumer;
/**
* Websocket consumer
*/
class Consumer implements ConsumerInterface
{
private $usrId;
private $rights;
public function __construct($usrId, array $rights)
{
$this->usrId = $usrId;
$this->rights = $rights;
}
/**
* {@inheritdoc}
*/
public function isAuthenticated()
{
return $this->usrId !== null;
}
/**
* {@inheritdoc}
*/
public function hasRights($rights)
{
return count(array_intersect($this->rights, (array) $rights)) === count($rights);
}
}

View File

@@ -0,0 +1,31 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Consumer;
interface ConsumerInterface
{
/**
* Return true if the consumer is authenticated in Phraseanet
*
* @return Boolean
*/
public function isAuthenticated();
/**
* Return true if the user has the given rights
*
* @param string\array $rights A right or an array of rights
*
* @return Boolean
*/
public function hasRights($rights);
}

View File

@@ -0,0 +1,32 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Consumer;
use Symfony\Component\HttpFoundation\Session\SessionInterface;
class ConsumerManager
{
/**
* Creates a consumer given a Session
*
* @param Session $session
*
* @return Consumer
*/
public function create(SessionInterface $session)
{
$usrId = $session->has('usr_id') ? $session->get('usr_id') : null;
$rights = $session->has('websockets_rights') ? $session->get('websockets_rights') : [];
return new Consumer($usrId, $rights);;
}
}

View File

@@ -11,45 +11,20 @@
namespace Alchemy\Phrasea\Websocket;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
use Psr\Log\LoggerInterface;
use React\ZMQ\SocketWrapper;
use Ratchet\ConnectionInterface as Conn;
use Ratchet\Wamp\WampServerInterface;
class PhraseanetWampServer implements WampServerInterface
{
const TOPIC_TASK_MANAGER = 'http://phraseanet.com/topics/admin/task-manager';
private $pull;
private $logger;
private $topics = [];
private $manager;
public function __construct(SocketWrapper $pull, LoggerInterface $logger)
public function __construct(TopicsManager $manager, LoggerInterface $logger)
{
$this->pull = $pull;
$this->logger = $logger;
$pull->on('message', function ($msg) {
$data = @json_decode($msg, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$this->logger->error(sprintf('[WS] Received invalid message %s : invalid json', $msg));
return;
}
if (!isset($data['topic'])) {
$this->logger->error(sprintf('[WS] Received invalid message %s : no topic', $msg));
return;
}
$this->logger->debug(sprintf('[WS] Received message %s', $msg));
if (isset($this->topics[$data['topic']])) {
$this->topics[$data['topic']]->broadcast(json_encode($msg));
}
});
$this->manager = $manager;
}
public function onPublish(Conn $conn, $topic, $event, array $exclude, array $eligible)
@@ -66,54 +41,33 @@ class PhraseanetWampServer implements WampServerInterface
public function onSubscribe(Conn $conn, $topic)
{
$this->logger->debug(sprintf('Subscription received on topic %s', $topic->getId()), array('topic' => $topic));
$this->topics[$topic->getId()] = $topic;
if ($this->manager->subscribe($conn, $topic)) {
$this->logger->debug(sprintf('Subscription received on topic %s', $topic->getId()), array('topic' => $topic));
} else {
$this->logger->error(sprintf('Subscription received on topic %s, user is not allowed', $topic->getId()), array('topic' => $topic));
}
}
public function onUnSubscribe(Conn $conn, $topic)
{
$this->logger->debug(sprintf('Unsubscription received on topic %s', $topic->getId()), array('topic' => $topic));
$this->cleanupReferences($conn, $topic->getId());
$this->manager->unsubscribe($conn, $topic);
}
public function onOpen(Conn $conn)
{
if (!$conn->Session->has('usr_id')) {
$this->logger->error('[WS] Connection request aborted, no usr_id in session.');
$conn->close();
}
$this->logger->error('[WS] Connection request accepted');
$this->logger->debug('[WS] Connection request accepted');
$this->manager->openConnection($conn);
}
public function onClose(Conn $conn)
{
$this->cleanupReferences($conn);
$this->logger->error('[WS] Connection closed');
$this->logger->debug('[WS] Connection closed');
$this->manager->closeConnection($conn);
}
public function onError(Conn $conn, \Exception $e)
{
$this->logger->error('[WS] Connection error', ['exception' => $e]);
}
private function cleanupReferences(Conn $conn, $topicId = null)
{
$storage = $this->topics;
$ret = array();
foreach ($storage as $id => $topic) {
if (null !== $topicId && $id !== $topicId) {
continue;
}
if ($topic->has($conn)) {
$topic->remove($conn);
}
if (count($topic) > 0) {
$ret[] = $topic;
}
$this->logger->debug(sprintf('%d subscribers remaining on topic %s', count($topic), $topic->getId()), array('topic' => $topic));
}
$this->topics = $ret;
}
}

View File

@@ -11,7 +11,7 @@
namespace Alchemy\Phrasea\Websocket\Subscriber;
use Alchemy\Phrasea\Websocket\PhraseanetWampServer;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
use Alchemy\TaskManager\Event\StateFormater;
use Alchemy\TaskManager\Event\TaskManagerEvent;
use Alchemy\TaskManager\Event\TaskManagerRequestEvent;
@@ -37,7 +37,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface
public function onManagerStart(TaskManagerEvent $event)
{
$this->broadcaster->send(json_encode([
'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER,
'topic' => TopicsManager::TOPIC_TASK_MANAGER,
'event' => TaskManagerEvents::MANAGER_START,
]));
}
@@ -45,7 +45,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface
public function onManagerStop(TaskManagerEvent $event)
{
$this->broadcaster->send(json_encode([
'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER,
'topic' => TopicsManager::TOPIC_TASK_MANAGER,
'event' => TaskManagerEvents::MANAGER_STOP,
]));
}
@@ -53,7 +53,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface
public function onManagerRequest(TaskManagerRequestEvent $event)
{
$this->broadcaster->send(json_encode([
'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER,
'topic' => TopicsManager::TOPIC_TASK_MANAGER,
'event' => TaskManagerEvents::MANAGER_REQUEST,
'request' => $event->getRequest(),
'response' => $event->getResponse(),
@@ -63,7 +63,7 @@ class TaskManagerBroadcasterSubscriber implements EventSubscriberInterface
public function onManagerTick(TaskManagerEvent $event)
{
$this->broadcaster->send(json_encode([
'topic' => PhraseanetWampServer::TOPIC_TASK_MANAGER,
'topic' => TopicsManager::TOPIC_TASK_MANAGER,
'event' => TaskManagerEvents::MANAGER_TICK,
'message' => $this->formater->toArray(
$event->getManager()->getProcessManager()->getManagedProcesses()

View File

@@ -0,0 +1,75 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Topics;
use Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface;
/**
* Stores consumer required settings for a topic
*/
class Directive
{
private $topic;
private $requireAuthentication;
private $requiredRights;
public function __construct($topic, $requireAuthentication, array $requiredRights)
{
$this->topic = $topic;
$this->requireAuthentication = (Boolean) $requireAuthentication;
$this->requiredRights = $requiredRights;
}
/**
* @return string
*/
public function getTopic()
{
return $this->topic;
}
/**
* Returns true if the topic requires an authenticated consumer
*
* @return Boolean
*/
public function requireAuthentication()
{
return $this->requireAuthentication;
}
/**
* Returns an array of required rights for the authenticated consumer
*
* @return array
*/
public function getRequiredRights()
{
return $this->requiredRights;
}
/**
* Returns true if the consumer satisfies the directive
*
* @param ConsumerInterface $consumer
*
* @return Boolean
*/
public function isStatisfiedBy(ConsumerInterface $consumer)
{
if ($this->requireAuthentication() && !$consumer->isAuthenticated()) {
return false;
}
return $consumer->hasRights($this->getRequiredRights());
}
}

View File

@@ -0,0 +1,61 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Topics;
use Alchemy\Phrasea\Websocket\Consumer\ConsumerInterface;
use Ratchet\Wamp\Topic;
class DirectivesManager
{
private $directives;
public function __construct(array $directives)
{
array_walk($directives, function ($directive) {
if (!$directive instanceof Directive) {
throw new \InvalidArgumentException('Websocket configuration only accepts configuration directives.');
}
});
$this->directives = $directives;
}
/**
* Returns true if the consumer has access to the given topic
*
* @param ConsumerInterface $consumer
* @param Topic $topic
*
* @return Boolean
*/
public function hasAccess(ConsumerInterface $consumer, Topic $topic)
{
foreach ($this->getDirectives($topic) as $directive) {
if (!$directive->isStatisfiedBy($consumer)) {
return false;
}
}
return true;
}
/**
* @param Topic $topic
*
* @return Directive[]
*/
private function getDirectives(Topic $topic)
{
return array_filter($this->directives, function (Directive $directive) use ($topic) {
return $directive->getTopic() === $topic->getId();
});
}
}

View File

@@ -0,0 +1,24 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Topics\Plugin;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
interface PluginInterface
{
/**
* Attaches a Plugn to the TopicsManager
*
* @param TopicsManager $manager
*/
public function attach(TopicsManager $manager);
}

View File

@@ -0,0 +1,63 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Topics\Plugin;
use Alchemy\Phrasea\Websocket\Topics\TopicsManager;
use Psr\Log\LoggerInterface;
use React\EventLoop\LoopInterface;
use React\ZMQ\Context;
class TaskManagerSubscriberPlugin implements PluginInterface
{
private $logger;
private $pull;
public function __construct($options, LoopInterface $loop, LoggerInterface $logger)
{
$this->logger = $logger;
$context = new Context($loop);
$this->pull = $context->getSocket(\ZMQ::SOCKET_SUB);
$this->pull->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, "");
$this->pull->connect(sprintf('%s://%s:%s', $options['protocol'], $options['host'], $options['port']));
$this->pull->on('error', function ($e) use ($logger) {
$logger->error('TaskManager Subscriber received an error.', ['exception' => $e]);
});
}
/**
* {@inheritdoc}
*/
public function attach(TopicsManager $manager)
{
$this->pull->on('message', function ($msg) use ($manager) {
$data = @json_decode($msg, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$this->logger->error(sprintf('[WS] Received invalid message %s : invalid json', $msg));
return;
}
if (!isset($data['topic'])) {
$this->logger->error(sprintf('[WS] Received invalid message %s : no topic', $msg));
return;
}
$this->logger->debug(sprintf('[WS] Received message %s', $msg));
$manager->broadcast($data['topic'], json_encode($msg));
});
}
}

View File

@@ -0,0 +1,163 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Websocket\Topics;
use Alchemy\Phrasea\Websocket\Consumer\Consumer;
use Alchemy\Phrasea\Websocket\Consumer\ConsumerManager;
use Alchemy\Phrasea\Websocket\Topics\Plugin\PluginInterface;
use Ratchet\ConnectionInterface as Conn;
use Ratchet\Wamp\Topic;
class TopicsManager
{
const TOPIC_TASK_MANAGER = 'http://phraseanet.com/topics/admin/task-manager';
private $topics = [];
private $directives;
private $consumerManager;
public function __construct(DirectivesManager $directives, ConsumerManager $consumerManagaer)
{
$this->directives = $directives;
$this->consumerManager = $consumerManagaer;
}
/**
* Attaches a plugin to the TopicsManager
*
* @param PluginInterface $plugin
*
* @return TopicsManager
*/
public function attach(PluginInterface $plugin)
{
$plugin->attach($this);
return $this;
}
/**
* Checks if the consumer related to the connection has access to the topic,
* removes the connection from topic if the consumer is not granted.
*
* @param Conn $conn
* @param Topic $topic
*
* @return Boolean Return true if the consumer is granted, false otherwise
*/
public function subscribe(Conn $conn, Topic $topic)
{
if (!$this->directives->hasAccess($conn->User, $topic)) {
$topic->remove($conn);
return false;
}
$this->topics[$topic->getId()] = $topic;
return true;
}
/**
* Triggered on unsubscription.
*
* Removes internal references to the topic if no more consumers are listening.
*
* @param Conn $conn
* @param Topic $topic
*
* @return TopicsManager
*/
public function unsubscribe(Conn $conn, Topic $topic)
{
$this->cleanupReferences($conn, $topic);
return $this;
}
/**
* Triggered on connection, populates the connection with a consumer.
*
* @param Conn $conn
*
* @return TopicsManager
*/
public function openConnection(Conn $conn)
{
try {
$conn->User = $this->consumerManager->create($conn->Session);
} catch (\RuntimeException $e) {
$conn->close();
}
return $this;
}
/**
* Triggered on deconnexion.
*
* Removes internal references to topics if no more consumers are listening.
*
* @param Conn $conn
*
* @return TopicsManager
*/
public function closeConnection(Conn $conn)
{
$this->cleanupReferences($conn);
return $this;
}
/**
* Brodcasts a message to a topic, if it exists
*
* @param $topicId string
* @param $message string
*
* @return TopicsManager
*/
public function broadcast($topicId, $message)
{
if (isset($this->topics[$topicId])) {
$this->topics[$topicId]->broadcast($message);
}
return $this;
}
/**
* Removes internal references to topics if they do not contains any reference to an active connection.
*
* @param Conn $conn
* @param null|Topic $topic Restrict to this topic, if provided
*/
private function cleanupReferences(Conn $conn, Topic $topic = null)
{
$storage = $this->topics;
$updated = array();
foreach ($storage as $id => $storedTopic) {
if (null !== $topic && $id !== $topic->getId()) {
continue;
}
if ($storedTopic->has($conn)) {
$storedTopic->remove($conn);
}
if (count($storedTopic) > 0) {
$updated[] = $storedTopic;
}
}
$this->topics = $updated;
}
}