PHRAS-1251 Replace webhook job by queue worker

This commit is contained in:
Thibaud Fabre
2016-10-06 14:26:39 +02:00
parent c48b490e18
commit 15c533f709
9 changed files with 443 additions and 201 deletions

View File

@@ -3,6 +3,11 @@
namespace Alchemy\Phrasea\Core\Provider; namespace Alchemy\Phrasea\Core\Provider;
use Alchemy\Phrasea\Webhook\EventProcessorFactory; use Alchemy\Phrasea\Webhook\EventProcessorFactory;
use Alchemy\Phrasea\Webhook\EventProcessorWorker;
use Alchemy\Phrasea\Webhook\WebhookInvoker;
use Alchemy\Phrasea\Webhook\WebhookPublisher;
use Alchemy\Worker\CallableWorkerFactory;
use Alchemy\Worker\TypeBasedWorkerResolver;
use Silex\Application; use Silex\Application;
use Silex\ServiceProviderInterface; use Silex\ServiceProviderInterface;
@@ -11,9 +16,50 @@ class WebhookServiceProvider implements ServiceProviderInterface
public function register(Application $app) public function register(Application $app)
{ {
$this->createAlias($app, 'webhook.event_repository', 'repo.webhook-event');
$this->createAlias($app, 'webhook.event_manipulator', 'manipulator.webhook-event');
$this->createAlias($app, 'webhook.delivery_repository', 'repo.webhook-delivery');
$this->createAlias($app, 'webhook.delivery_manipulator', 'manipulator.webhook-delivery');
$app['webhook.processor_factory'] = $app->share(function ($app) { $app['webhook.processor_factory'] = $app->share(function ($app) {
return new EventProcessorFactory($app); return new EventProcessorFactory($app);
}); });
$app['webhook.invoker'] = $app->share(function ($app) {
return new WebhookInvoker(
$app['repo.api-applications'],
$app['webhook.processor_factory'],
$app['webhook.event_repository'],
$app['webhook.event_manipulator'],
$app['webhook.delivery_repository'],
$app['webhook.delivery_manipulator']
);
});
$app['webhook.publisher'] = $app->share(function ($app) {
return new WebhookPublisher($app['alchemy_worker.queue_registry'], $app['alchemy_worker.queue_name']);
});
$app['alchemy_worker.worker_resolver'] = $app->extend(
'alchemy_worker.type_based_worker_resolver',
function (TypeBasedWorkerResolver $resolver, Application $app) {
$resolver->setFactory('webhook', new CallableWorkerFactory(function () use ($app) {
return new EventProcessorWorker(
$app['webhook.event_repository'],
$app['webhook.invoker']
);
}));
return $resolver;
}
);
}
private function createAlias(Application $app, $alias, $targetServiceKey)
{
$app[$alias] = $app->share(function () use ($app, $targetServiceKey) {
return $app[$targetServiceKey];
});
} }
public function boot(Application $app) public function boot(Application $app)

View File

@@ -4,6 +4,8 @@ namespace Alchemy\Phrasea\Core\Provider;
use Alchemy\Phrasea\Core\Configuration\PropertyAccess; use Alchemy\Phrasea\Core\Configuration\PropertyAccess;
use Alchemy\Phrasea\Exception\RuntimeException; use Alchemy\Phrasea\Exception\RuntimeException;
use Alchemy\Worker\CallableWorkerFactory;
use Alchemy\Worker\TypeBasedWorkerResolver;
use Silex\Application; use Silex\Application;
use Silex\ServiceProviderInterface; use Silex\ServiceProviderInterface;

View File

@@ -12,29 +12,46 @@
namespace Alchemy\Phrasea\Model\Manipulator; namespace Alchemy\Phrasea\Model\Manipulator;
use Alchemy\Phrasea\Model\Entities\WebhookEvent; use Alchemy\Phrasea\Model\Entities\WebhookEvent;
use Alchemy\Phrasea\Webhook\WebhookPublisher;
use Doctrine\Common\Persistence\ObjectManager; use Doctrine\Common\Persistence\ObjectManager;
use Doctrine\ORM\EntityRepository; use Doctrine\ORM\EntityRepository;
class WebhookEventManipulator implements ManipulatorInterface class WebhookEventManipulator implements ManipulatorInterface
{ {
/**
* @var ObjectManager
*/
private $om; private $om;
/**
* @var EntityRepository
*/
private $repository; private $repository;
public function __construct(ObjectManager $om, EntityRepository $repo) /**
* @var WebhookPublisher
*/
private $publisher;
public function __construct(ObjectManager $om, EntityRepository $repo, WebhookPublisher $publisher)
{ {
$this->om = $om; $this->om = $om;
$this->repository = $repo; $this->repository = $repo;
$this->publisher = $publisher;
} }
public function create($eventName, $type, array $data) public function create($eventName, $type, array $data)
{ {
$event = new WebhookEvent(); $event = new WebhookEvent();
$event->setName($eventName); $event->setName($eventName);
$event->setType($type); $event->setType($type);
$event->setData($data); $event->setData($data);
$this->update($event); $this->update($event);
$this->publisher->publishWebhookEvent($event);
return $event; return $event;
} }

View File

@@ -12,21 +12,11 @@
namespace Alchemy\Phrasea\TaskManager\Job; namespace Alchemy\Phrasea\TaskManager\Job;
use Alchemy\Phrasea\Core\Version; use Alchemy\Phrasea\Core\Version;
use Alchemy\Phrasea\Model\Entities\WebhookEvent;
use Alchemy\Phrasea\Model\Entities\WebhookEventDelivery;
use Alchemy\Phrasea\Model\Entities\ApiApplication;
use Alchemy\Phrasea\TaskManager\Editor\DefaultEditor; use Alchemy\Phrasea\TaskManager\Editor\DefaultEditor;
use Alchemy\Phrasea\Webhook\EventProcessorFactory; use Alchemy\Phrasea\Webhook\EventProcessorFactory;
use Guzzle\Http\Client as GuzzleClient; use Guzzle\Http\Client as GuzzleClient;
use Guzzle\Batch\BatchBuilder;
use Guzzle\Http\Message\Request;
use Silex\Application;
use Guzzle\Common\Event;
use Guzzle\Plugin\Backoff\BackoffPlugin;
use Guzzle\Plugin\Backoff\TruncatedBackoffStrategy;
use Guzzle\Plugin\Backoff\CallbackBackoffStrategy;
use Guzzle\Plugin\Backoff\CurlBackoffStrategy;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Silex\Application;
use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Translation\TranslatorInterface; use Symfony\Component\Translation\TranslatorInterface;
@@ -34,15 +24,12 @@ class WebhookJob extends AbstractJob
{ {
private $httpClient; private $httpClient;
private $firstRun = true;
public function __construct( public function __construct(
TranslatorInterface $translator, TranslatorInterface $translator,
EventDispatcherInterface $dispatcher = null, EventDispatcherInterface $dispatcher = null,
LoggerInterface $logger = null, LoggerInterface $logger = null,
GuzzleClient $httpClient = null GuzzleClient $httpClient = null
) ) {
{
parent::__construct($translator, $dispatcher, $logger); parent::__construct($translator, $dispatcher, $logger);
$this->httpClient = $httpClient ?: new GuzzleClient(); $this->httpClient = $httpClient ?: new GuzzleClient();
@@ -89,57 +76,6 @@ class WebhookJob extends AbstractJob
{ {
$app = $data->getApplication(); $app = $data->getApplication();
$thirdPartyApplications = $app['repo.api-applications']->findWithDefinedWebhookCallback(); $thirdPartyApplications = $app['repo.api-applications']->findWithDefinedWebhookCallback();
$that = $this;
if ($this->firstRun) {
$this->httpClient->getEventDispatcher()->addListener('request.error', function (Event $event) {
// override guzzle default behavior of throwing exceptions
// when 4xx & 5xx responses are encountered
$event->stopPropagation();
}, -254);
// Set callback which logs success or failure
$subscriber = new CallbackBackoffStrategy(function ($retries, Request $request, $response, $e) use ($app, $that) {
$retry = true;
if ($response && (null !== $deliverId = parse_url($request->getUrl(), PHP_URL_FRAGMENT))) {
$delivery = $app['repo.webhook-delivery']->find($deliverId);
$logContext = [ 'host' => $request->getHost() ];
if ($response->isSuccessful()) {
$app['manipulator.webhook-delivery']->deliverySuccess($delivery);
$logType = 'info';
$logEntry = sprintf('Deliver success event "%d:%s" for app "%s"',
$delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(),
$delivery->getThirdPartyApplication()->getName()
);
$retry = false;
} else {
$app['manipulator.webhook-delivery']->deliveryFailure($delivery);
$logType = 'error';
$logEntry = sprintf('Deliver failure event "%d:%s" for app "%s"',
$delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(),
$delivery->getThirdPartyApplication()->getName()
);
}
$that->log($logType, $logEntry, $logContext);
return $retry;
}
}, true, new CurlBackoffStrategy());
// set max retries
$subscriber = new TruncatedBackoffStrategy(WebhookEventDelivery::MAX_DELIVERY_TRIES, $subscriber);
$subscriber = new BackoffPlugin($subscriber);
$this->httpClient->addSubscriber($subscriber);
$this->firstRun = false;
}
/** @var EventProcessorFactory $eventFactory */ /** @var EventProcessorFactory $eventFactory */
$eventFactory = $app['webhook.processor_factory']; $eventFactory = $app['webhook.processor_factory'];
@@ -155,41 +91,4 @@ class WebhookJob extends AbstractJob
$this->deliverEvent($eventFactory, $app, $thirdPartyApplications, $event); $this->deliverEvent($eventFactory, $app, $thirdPartyApplications, $event);
} }
} }
private function deliverEvent(EventProcessorFactory $eventFactory, Application $app, array $thirdPartyApplications, WebhookEvent $event)
{
if (count($thirdPartyApplications) === 0) {
$this->log('info', sprintf('No applications defined to listen for webhook events'));
return;
}
// format event data
$eventProcessor = $eventFactory->get($event);
$data = $eventProcessor->process($event);
// batch requests
$batch = BatchBuilder::factory()
->transferRequests(10)
->build();
foreach ($thirdPartyApplications as $thirdPartyApplication) {
$delivery = $app['manipulator.webhook-delivery']->create($thirdPartyApplication, $event);
// append delivery id as url anchor
$uniqueUrl = $this->getUrl($thirdPartyApplication, $delivery);
// create http request with data as request body
$batch->add($this->httpClient->createRequest('POST', $uniqueUrl, [
'Content-Type' => 'application/vnd.phraseanet.event+json'
], json_encode($data)));
}
$batch->flush();
}
private function getUrl(ApiApplication $application, WebhookEventDelivery $delivery)
{
return sprintf('%s#%s', $application->getWebhookUrl(), $delivery->getId());
}
} }

View File

@@ -9,13 +9,14 @@ use Alchemy\Phrasea\Webhook\Processor\CallableProcessorFactory;
use Alchemy\Phrasea\Webhook\Processor\FeedEntryProcessorFactory; use Alchemy\Phrasea\Webhook\Processor\FeedEntryProcessorFactory;
use Alchemy\Phrasea\Webhook\Processor\OrderNotificationProcessorFactory; use Alchemy\Phrasea\Webhook\Processor\OrderNotificationProcessorFactory;
use Alchemy\Phrasea\Webhook\Processor\ProcessorFactory; use Alchemy\Phrasea\Webhook\Processor\ProcessorFactory;
use Alchemy\Phrasea\Webhook\Processor\ProcessorInterface;
use Alchemy\Phrasea\Webhook\Processor\UserRegistrationProcessorFactory; use Alchemy\Phrasea\Webhook\Processor\UserRegistrationProcessorFactory;
class EventProcessorFactory class EventProcessorFactory
{ {
/** /**
* @var ProcessorFactory * @var ProcessorFactory[]
*/ */
private $processorFactories = []; private $processorFactories = [];
@@ -57,10 +58,20 @@ class EventProcessorFactory
/** /**
* @param WebhookEvent $event * @param WebhookEvent $event
* @return Processor\ProcessorInterface * @return Processor\ProcessorInterface
* @deprecated Use getProcessor() instead
*/ */
public function get(WebhookEvent $event) public function get(WebhookEvent $event)
{ {
if (! isset($this->processorFactories[$event->getType()])) { return $this->getProcessor($event);
}
/**
* @param WebhookEvent $event
* @return ProcessorInterface
*/
public function getProcessor(WebhookEvent $event)
{
if (!isset($this->processorFactories[$event->getType()])) {
throw new \RuntimeException(sprintf('No processor found for %s', $event->getType())); throw new \RuntimeException(sprintf('No processor found for %s', $event->getType()));
} }

View File

@@ -0,0 +1,57 @@
<?php
/*
* This file is part of phrasea-4.1.
*
* (c) Alchemy <info@alchemy.fr>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Webhook;
use Alchemy\Phrasea\Model\Entities\WebhookEvent;
use Alchemy\Phrasea\Model\Repositories\WebhookEventRepository;
use Alchemy\Worker\Worker;
class EventProcessorWorker implements Worker
{
/**
* @var WebhookEventRepository
*/
private $eventRepository;
/**
* @var WebhookInvoker
*/
private $invoker;
/**
* @param WebhookEventRepository $eventRepository
* @param WebhookInvoker $invoke
*/
public function __construct(WebhookEventRepository $eventRepository, WebhookInvoker $invoke)
{
$this->eventRepository = $eventRepository;
$this->invoker = $invoke;
}
/**
* @param array $payload
* @return void
*/
public function process(array $payload)
{
$eventId = $payload['id'];
/** @var WebhookEvent $event */
$event = $this->eventRepository->find($eventId);
if ($event === null || $event->isProcessed()) {
return;
}
$this->invoker->invoke($event);
}
}

View File

@@ -0,0 +1,248 @@
<?php
/*
* This file is part of phrasea-4.1.
*
* (c) Alchemy <info@alchemy.fr>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Webhook;
use Alchemy\Phrasea\Model\Entities\ApiApplication;
use Alchemy\Phrasea\Model\Entities\WebhookEvent;
use Alchemy\Phrasea\Model\Entities\WebhookEventDelivery;
use Alchemy\Phrasea\Model\Manipulator\WebhookEventDeliveryManipulator;
use Alchemy\Phrasea\Model\Manipulator\WebhookEventManipulator;
use Alchemy\Phrasea\Model\Repositories\ApiApplicationRepository;
use Alchemy\Phrasea\Model\Repositories\WebhookEventDeliveryRepository;
use Alchemy\Phrasea\Model\Repositories\WebhookEventRepository;
use Guzzle\Batch\BatchBuilder;
use Guzzle\Common\Event;
use Guzzle\Http\Client;
use Guzzle\Http\Message\Request;
use Guzzle\Plugin\Backoff\BackoffPlugin;
use Guzzle\Plugin\Backoff\CallbackBackoffStrategy;
use Guzzle\Plugin\Backoff\CurlBackoffStrategy;
use Guzzle\Plugin\Backoff\TruncatedBackoffStrategy;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
/**
* Class WebhookInvoker invokes remote endpoints with webhook event data
* @package Alchemy\Phrasea\Webhook
*/
class WebhookInvoker implements LoggerAwareInterface
{
/**
* @var ApiApplicationRepository
*/
private $applicationRepository;
/**
* @var WebhookEventDeliveryManipulator
*/
private $eventDeliveryManipulator;
/**
* @var WebhookEventDeliveryRepository
*/
private $eventDeliveryRepository;
/**
* @var Client
*/
private $client;
/**
* @var LoggerInterface
*/
private $logger;
/**
* @var EventProcessorFactory
*/
private $processorFactory;
/**
* @var WebhookEventRepository
*/
private $eventRepository;
/**
* @var WebhookEventManipulator
*/
private $eventManipulator;
/**
* @param ApiApplicationRepository $applicationRepository
* @param EventProcessorFactory $processorFactory
* @param WebhookEventRepository $eventRepository
* @param WebhookEventManipulator $eventManipulator
* @param WebhookEventDeliveryManipulator $eventDeliveryManipulator
* @param WebhookEventDeliveryRepository $eventDeliveryRepository
* @param Client $client
*
* @todo Extract classes to reduce number of required dependencies
*/
public function __construct(
ApiApplicationRepository $applicationRepository,
EventProcessorFactory $processorFactory,
WebhookEventRepository $eventRepository,
WebhookEventManipulator $eventManipulator,
WebhookEventDeliveryManipulator $eventDeliveryManipulator,
WebhookEventDeliveryRepository $eventDeliveryRepository,
Client $client = null
) {
$this->applicationRepository = $applicationRepository;
$this->processorFactory = $processorFactory;
$this->eventRepository = $eventRepository;
$this->eventManipulator = $eventManipulator;
$this->eventDeliveryManipulator = $eventDeliveryManipulator;
$this->eventDeliveryRepository = $eventDeliveryRepository;
$this->client = $client;
$this->logger = new NullLogger();
$this->configureClient();
}
/**
* Sets a logger instance on the object.
*
* @param LoggerInterface $logger
*
* @return null
*/
public function setLogger(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function invoke(WebhookEvent $event)
{
$this->doInvoke($event, $this->applicationRepository->findWithDefinedWebhookCallback());
}
public function invokeUnprocessedEvents()
{
$targetApplications = $this->applicationRepository->findWithDefinedWebhookCallback();
foreach ($this->eventRepository->getUnprocessedEventIterator() as $row) {
/** @var WebhookEvent $event */
$event = $row[0];
$this->doInvoke($event, $targetApplications);
}
}
/**
* @param WebhookEvent $event
* @param ApiApplication[] $targets
*/
private function doInvoke(WebhookEvent $event, array $targets)
{
$this->eventManipulator->processed($event);
$this->logger->info(sprintf('Processing event "%s" with id %d', $event->getName(), $event->getId()));
// send requests
$this->doHttpDelivery($event, $targets);
}
private function configureClient()
{
$this->client->getEventDispatcher()->addListener('request.error', function (Event $event) {
// Override guzzle default behavior of throwing exceptions
// when 4xx & 5xx responses are encountered
$event->stopPropagation();
}, -254);
// Set callback which logs success or failure
$subscriber = new CallbackBackoffStrategy(function ($retries, Request $request, $response, $e) {
$retry = true;
if ($response && (null !== $deliverId = parse_url($request->getUrl(), PHP_URL_FRAGMENT))) {
$delivery = $this->eventDeliveryRepository->find($deliverId);
$logContext = ['host' => $request->getHost()];
if ($response->isSuccessful()) {
$this->eventDeliveryManipulator->deliverySuccess($delivery);
$logType = 'info';
$logEntry = sprintf('Deliver success event "%d:%s" for app "%s"',
$delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(),
$delivery->getThirdPartyApplication()->getName()
);
$retry = false;
} else {
$this->eventDeliveryManipulator->deliveryFailure($delivery);
$logType = 'error';
$logEntry = sprintf('Deliver failure event "%d:%s" for app "%s"',
$delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(),
$delivery->getThirdPartyApplication()->getName()
);
}
$this->logger->log($logType, $logEntry, $logContext);
return $retry;
}
}, true, new CurlBackoffStrategy());
// Set max retries
$subscriber = new TruncatedBackoffStrategy(WebhookEventDelivery::MAX_DELIVERY_TRIES, $subscriber);
$subscriber = new BackoffPlugin($subscriber);
$this->client->addSubscriber($subscriber);
}
/**
* @param WebhookEvent $event
* @param ApiApplication[] $targets
*/
private function doHttpDelivery(
WebhookEvent $event,
array $targets
) {
if (count($targets) === 0) {
$this->logger->info(sprintf('No applications defined to listen for webhook events'));
return;
}
// Format event data
$eventProcessor = $this->processorFactory->getProcessor($event);
$data = $eventProcessor->process($event);
// Batch requests
$batch = BatchBuilder::factory()
->transferRequests(10)
->build();
foreach ($targets as $thirdPartyApplication) {
$delivery = $this->eventDeliveryManipulator->create($thirdPartyApplication, $event);
// append delivery id as url anchor
$uniqueUrl = $this->buildUrl($thirdPartyApplication, $delivery);
// create http request with data as request body
$batch->add($this->client->createRequest('POST', $uniqueUrl, [
'Content-Type' => 'application/vnd.phraseanet.event+json'
], json_encode($data)));
}
$batch->flush();
}
/**
* @param ApiApplication $application
* @param WebhookEventDelivery $delivery
* @return string
*/
private function buildUrl(ApiApplication $application, WebhookEventDelivery $delivery)
{
return sprintf('%s#%s', $application->getWebhookUrl(), $delivery->getId());
}
}

View File

@@ -0,0 +1,57 @@
<?php
/*
* This file is part of phrasea-4.1.
*
* (c) Alchemy <info@alchemy.fr>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Webhook;
use Alchemy\Phrasea\Model\Entities\WebhookEvent;
use Alchemy\Queue\Message;
use Alchemy\Queue\MessageQueueRegistry;
/**
* Class WebhookPublisher publishes webhook event notifications in message queues
* @package Alchemy\Phrasea\Webhook
*/
class WebhookPublisher
{
/**
* @var MessageQueueRegistry
*/
private $queueRegistry;
/**
* @var string
*/
private $queueName;
/**
* @param MessageQueueRegistry $queueRegistry
* @param $queueName
*/
public function __construct(MessageQueueRegistry $queueRegistry, $queueName)
{
$this->queueRegistry = $queueRegistry;
$this->queueName = $queueName;
}
/**
* @param WebhookEvent $event
*/
public function publishWebhookEvent(WebhookEvent $event)
{
$queue = $this->queueRegistry->getQueue($this->queueName);
$payload = [
'message_type' => $event->getId(),
'payload' => [ 'id' => $event->getId() ]
];
$queue->publish(new Message($payload));
}
}

View File

@@ -1,95 +0,0 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2016 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
use Alchemy\Phrasea\Exception\RuntimeException;
class API_Webhook
{
const NEW_FEED_ENTRY = "new_feed_entry";
protected $appbox;
protected $id;
protected $type;
protected $data;
protected $created;
public function __construct(appbox $appbox, $id)
{
$this->appbox = $appbox;
$this->id = $id;
$sql = 'SELECT `type`, `data`, created
FROM api_webhooks
WHERE id = :id';
$stmt = $this->appbox->get_connection()->prepare($sql);
$stmt->execute([':id' => $id]);
$row = $stmt->fetch(\PDO::FETCH_ASSOC);
if (!$row) {
throw new RuntimeException('Webhooks not found');
}
$stmt->closeCursor();
$this->type = $row['type'];
$this->data = json_decode($row['data']);
$this->created = new \DateTime($row['created']);
}
public function delete()
{
$sql = 'DELETE FROM api_webhooks WHERE id = :id';
$stmt = $this->appbox->get_connection()->prepare($sql);
$stmt->execute([':id' => $this->id]);
$stmt->closeCursor();
return;
}
public static function create(appbox $appbox, $type, array $data)
{
$sql = 'INSERT INTO api_webhooks (id, `type`, `data`, created)
VALUES (null, :type, :data, NOW())';
$stmt = $appbox->get_connection()->prepare($sql);
$stmt->execute([
'type' => $type,
'data' => json_encode($data),
]);
$stmt->closeCursor();
return new API_Webhook($appbox, $appbox->get_connection()->lastInsertId());
}
/**
* @return \DateTime
*/
public function getCreated()
{
return $this->created;
}
/**
* @return mixed
*/
public function getData()
{
return $this->data;
}
/**
* @return mixed
*/
public function getType()
{
return $this->type;
}
}