diff --git a/lib/Alchemy/Phrasea/Core/Provider/WebhookServiceProvider.php b/lib/Alchemy/Phrasea/Core/Provider/WebhookServiceProvider.php index e224d8a43d..def052f1d6 100644 --- a/lib/Alchemy/Phrasea/Core/Provider/WebhookServiceProvider.php +++ b/lib/Alchemy/Phrasea/Core/Provider/WebhookServiceProvider.php @@ -3,6 +3,11 @@ namespace Alchemy\Phrasea\Core\Provider; use Alchemy\Phrasea\Webhook\EventProcessorFactory; +use Alchemy\Phrasea\Webhook\EventProcessorWorker; +use Alchemy\Phrasea\Webhook\WebhookInvoker; +use Alchemy\Phrasea\Webhook\WebhookPublisher; +use Alchemy\Worker\CallableWorkerFactory; +use Alchemy\Worker\TypeBasedWorkerResolver; use Silex\Application; use Silex\ServiceProviderInterface; @@ -11,9 +16,50 @@ class WebhookServiceProvider implements ServiceProviderInterface public function register(Application $app) { + $this->createAlias($app, 'webhook.event_repository', 'repo.webhook-event'); + $this->createAlias($app, 'webhook.event_manipulator', 'manipulator.webhook-event'); + $this->createAlias($app, 'webhook.delivery_repository', 'repo.webhook-delivery'); + $this->createAlias($app, 'webhook.delivery_manipulator', 'manipulator.webhook-delivery'); + $app['webhook.processor_factory'] = $app->share(function ($app) { return new EventProcessorFactory($app); }); + + $app['webhook.invoker'] = $app->share(function ($app) { + return new WebhookInvoker( + $app['repo.api-applications'], + $app['webhook.processor_factory'], + $app['webhook.event_repository'], + $app['webhook.event_manipulator'], + $app['webhook.delivery_repository'], + $app['webhook.delivery_manipulator'] + ); + }); + + $app['webhook.publisher'] = $app->share(function ($app) { + return new WebhookPublisher($app['alchemy_worker.queue_registry'], $app['alchemy_worker.queue_name']); + }); + + $app['alchemy_worker.worker_resolver'] = $app->extend( + 'alchemy_worker.type_based_worker_resolver', + function (TypeBasedWorkerResolver $resolver, Application $app) { + $resolver->setFactory('webhook', new CallableWorkerFactory(function () use ($app) { + return new EventProcessorWorker( + $app['webhook.event_repository'], + $app['webhook.invoker'] + ); + })); + + return $resolver; + } + ); + } + + private function createAlias(Application $app, $alias, $targetServiceKey) + { + $app[$alias] = $app->share(function () use ($app, $targetServiceKey) { + return $app[$targetServiceKey]; + }); } public function boot(Application $app) diff --git a/lib/Alchemy/Phrasea/Core/Provider/WorkerConfigurationServiceProvider.php b/lib/Alchemy/Phrasea/Core/Provider/WorkerConfigurationServiceProvider.php index cc0ac0a2a3..1b2e15d4f0 100644 --- a/lib/Alchemy/Phrasea/Core/Provider/WorkerConfigurationServiceProvider.php +++ b/lib/Alchemy/Phrasea/Core/Provider/WorkerConfigurationServiceProvider.php @@ -4,6 +4,8 @@ namespace Alchemy\Phrasea\Core\Provider; use Alchemy\Phrasea\Core\Configuration\PropertyAccess; use Alchemy\Phrasea\Exception\RuntimeException; +use Alchemy\Worker\CallableWorkerFactory; +use Alchemy\Worker\TypeBasedWorkerResolver; use Silex\Application; use Silex\ServiceProviderInterface; diff --git a/lib/Alchemy/Phrasea/Model/Manipulator/WebhookEventManipulator.php b/lib/Alchemy/Phrasea/Model/Manipulator/WebhookEventManipulator.php index 9fe2c59779..40e7f812a7 100644 --- a/lib/Alchemy/Phrasea/Model/Manipulator/WebhookEventManipulator.php +++ b/lib/Alchemy/Phrasea/Model/Manipulator/WebhookEventManipulator.php @@ -12,29 +12,46 @@ namespace Alchemy\Phrasea\Model\Manipulator; use Alchemy\Phrasea\Model\Entities\WebhookEvent; +use Alchemy\Phrasea\Webhook\WebhookPublisher; use Doctrine\Common\Persistence\ObjectManager; use Doctrine\ORM\EntityRepository; class WebhookEventManipulator implements ManipulatorInterface { + /** + * @var ObjectManager + */ private $om; + + /** + * @var EntityRepository + */ private $repository; - public function __construct(ObjectManager $om, EntityRepository $repo) + /** + * @var WebhookPublisher + */ + private $publisher; + + public function __construct(ObjectManager $om, EntityRepository $repo, WebhookPublisher $publisher) { $this->om = $om; $this->repository = $repo; + $this->publisher = $publisher; } public function create($eventName, $type, array $data) { $event = new WebhookEvent(); + $event->setName($eventName); $event->setType($type); $event->setData($data); $this->update($event); + $this->publisher->publishWebhookEvent($event); + return $event; } diff --git a/lib/Alchemy/Phrasea/TaskManager/Job/WebhookJob.php b/lib/Alchemy/Phrasea/TaskManager/Job/WebhookJob.php index 8d56af93d1..dbe4d7bd41 100644 --- a/lib/Alchemy/Phrasea/TaskManager/Job/WebhookJob.php +++ b/lib/Alchemy/Phrasea/TaskManager/Job/WebhookJob.php @@ -12,21 +12,11 @@ namespace Alchemy\Phrasea\TaskManager\Job; use Alchemy\Phrasea\Core\Version; -use Alchemy\Phrasea\Model\Entities\WebhookEvent; -use Alchemy\Phrasea\Model\Entities\WebhookEventDelivery; -use Alchemy\Phrasea\Model\Entities\ApiApplication; use Alchemy\Phrasea\TaskManager\Editor\DefaultEditor; use Alchemy\Phrasea\Webhook\EventProcessorFactory; use Guzzle\Http\Client as GuzzleClient; -use Guzzle\Batch\BatchBuilder; -use Guzzle\Http\Message\Request; -use Silex\Application; -use Guzzle\Common\Event; -use Guzzle\Plugin\Backoff\BackoffPlugin; -use Guzzle\Plugin\Backoff\TruncatedBackoffStrategy; -use Guzzle\Plugin\Backoff\CallbackBackoffStrategy; -use Guzzle\Plugin\Backoff\CurlBackoffStrategy; use Psr\Log\LoggerInterface; +use Silex\Application; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Translation\TranslatorInterface; @@ -34,15 +24,12 @@ class WebhookJob extends AbstractJob { private $httpClient; - private $firstRun = true; - public function __construct( TranslatorInterface $translator, EventDispatcherInterface $dispatcher = null, LoggerInterface $logger = null, GuzzleClient $httpClient = null - ) - { + ) { parent::__construct($translator, $dispatcher, $logger); $this->httpClient = $httpClient ?: new GuzzleClient(); @@ -89,57 +76,6 @@ class WebhookJob extends AbstractJob { $app = $data->getApplication(); $thirdPartyApplications = $app['repo.api-applications']->findWithDefinedWebhookCallback(); - $that = $this; - - if ($this->firstRun) { - $this->httpClient->getEventDispatcher()->addListener('request.error', function (Event $event) { - // override guzzle default behavior of throwing exceptions - // when 4xx & 5xx responses are encountered - $event->stopPropagation(); - }, -254); - - // Set callback which logs success or failure - $subscriber = new CallbackBackoffStrategy(function ($retries, Request $request, $response, $e) use ($app, $that) { - $retry = true; - if ($response && (null !== $deliverId = parse_url($request->getUrl(), PHP_URL_FRAGMENT))) { - $delivery = $app['repo.webhook-delivery']->find($deliverId); - - $logContext = [ 'host' => $request->getHost() ]; - - if ($response->isSuccessful()) { - $app['manipulator.webhook-delivery']->deliverySuccess($delivery); - - $logType = 'info'; - $logEntry = sprintf('Deliver success event "%d:%s" for app "%s"', - $delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(), - $delivery->getThirdPartyApplication()->getName() - ); - - $retry = false; - } else { - $app['manipulator.webhook-delivery']->deliveryFailure($delivery); - - $logType = 'error'; - $logEntry = sprintf('Deliver failure event "%d:%s" for app "%s"', - $delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(), - $delivery->getThirdPartyApplication()->getName() - ); - } - - $that->log($logType, $logEntry, $logContext); - - return $retry; - } - }, true, new CurlBackoffStrategy()); - - // set max retries - $subscriber = new TruncatedBackoffStrategy(WebhookEventDelivery::MAX_DELIVERY_TRIES, $subscriber); - $subscriber = new BackoffPlugin($subscriber); - - $this->httpClient->addSubscriber($subscriber); - - $this->firstRun = false; - } /** @var EventProcessorFactory $eventFactory */ $eventFactory = $app['webhook.processor_factory']; @@ -155,41 +91,4 @@ class WebhookJob extends AbstractJob $this->deliverEvent($eventFactory, $app, $thirdPartyApplications, $event); } } - - private function deliverEvent(EventProcessorFactory $eventFactory, Application $app, array $thirdPartyApplications, WebhookEvent $event) - { - if (count($thirdPartyApplications) === 0) { - $this->log('info', sprintf('No applications defined to listen for webhook events')); - - return; - } - - // format event data - $eventProcessor = $eventFactory->get($event); - $data = $eventProcessor->process($event); - - // batch requests - $batch = BatchBuilder::factory() - ->transferRequests(10) - ->build(); - - foreach ($thirdPartyApplications as $thirdPartyApplication) { - $delivery = $app['manipulator.webhook-delivery']->create($thirdPartyApplication, $event); - - // append delivery id as url anchor - $uniqueUrl = $this->getUrl($thirdPartyApplication, $delivery); - - // create http request with data as request body - $batch->add($this->httpClient->createRequest('POST', $uniqueUrl, [ - 'Content-Type' => 'application/vnd.phraseanet.event+json' - ], json_encode($data))); - } - - $batch->flush(); - } - - private function getUrl(ApiApplication $application, WebhookEventDelivery $delivery) - { - return sprintf('%s#%s', $application->getWebhookUrl(), $delivery->getId()); - } } diff --git a/lib/Alchemy/Phrasea/Webhook/EventProcessorFactory.php b/lib/Alchemy/Phrasea/Webhook/EventProcessorFactory.php index e547271804..2665ef27b5 100644 --- a/lib/Alchemy/Phrasea/Webhook/EventProcessorFactory.php +++ b/lib/Alchemy/Phrasea/Webhook/EventProcessorFactory.php @@ -9,13 +9,14 @@ use Alchemy\Phrasea\Webhook\Processor\CallableProcessorFactory; use Alchemy\Phrasea\Webhook\Processor\FeedEntryProcessorFactory; use Alchemy\Phrasea\Webhook\Processor\OrderNotificationProcessorFactory; use Alchemy\Phrasea\Webhook\Processor\ProcessorFactory; +use Alchemy\Phrasea\Webhook\Processor\ProcessorInterface; use Alchemy\Phrasea\Webhook\Processor\UserRegistrationProcessorFactory; class EventProcessorFactory { /** - * @var ProcessorFactory + * @var ProcessorFactory[] */ private $processorFactories = []; @@ -57,10 +58,20 @@ class EventProcessorFactory /** * @param WebhookEvent $event * @return Processor\ProcessorInterface + * @deprecated Use getProcessor() instead */ public function get(WebhookEvent $event) { - if (! isset($this->processorFactories[$event->getType()])) { + return $this->getProcessor($event); + } + + /** + * @param WebhookEvent $event + * @return ProcessorInterface + */ + public function getProcessor(WebhookEvent $event) + { + if (!isset($this->processorFactories[$event->getType()])) { throw new \RuntimeException(sprintf('No processor found for %s', $event->getType())); } diff --git a/lib/Alchemy/Phrasea/Webhook/EventProcessorWorker.php b/lib/Alchemy/Phrasea/Webhook/EventProcessorWorker.php new file mode 100644 index 0000000000..be78a8e5ad --- /dev/null +++ b/lib/Alchemy/Phrasea/Webhook/EventProcessorWorker.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Alchemy\Phrasea\Webhook; + +use Alchemy\Phrasea\Model\Entities\WebhookEvent; +use Alchemy\Phrasea\Model\Repositories\WebhookEventRepository; +use Alchemy\Worker\Worker; + +class EventProcessorWorker implements Worker +{ + + /** + * @var WebhookEventRepository + */ + private $eventRepository; + + /** + * @var WebhookInvoker + */ + private $invoker; + + /** + * @param WebhookEventRepository $eventRepository + * @param WebhookInvoker $invoke + */ + public function __construct(WebhookEventRepository $eventRepository, WebhookInvoker $invoke) + { + $this->eventRepository = $eventRepository; + $this->invoker = $invoke; + } + + /** + * @param array $payload + * @return void + */ + public function process(array $payload) + { + $eventId = $payload['id']; + /** @var WebhookEvent $event */ + $event = $this->eventRepository->find($eventId); + + if ($event === null || $event->isProcessed()) { + return; + } + + $this->invoker->invoke($event); + } +} diff --git a/lib/Alchemy/Phrasea/Webhook/WebhookInvoker.php b/lib/Alchemy/Phrasea/Webhook/WebhookInvoker.php new file mode 100644 index 0000000000..2f03b14380 --- /dev/null +++ b/lib/Alchemy/Phrasea/Webhook/WebhookInvoker.php @@ -0,0 +1,248 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Alchemy\Phrasea\Webhook; + +use Alchemy\Phrasea\Model\Entities\ApiApplication; +use Alchemy\Phrasea\Model\Entities\WebhookEvent; +use Alchemy\Phrasea\Model\Entities\WebhookEventDelivery; +use Alchemy\Phrasea\Model\Manipulator\WebhookEventDeliveryManipulator; +use Alchemy\Phrasea\Model\Manipulator\WebhookEventManipulator; +use Alchemy\Phrasea\Model\Repositories\ApiApplicationRepository; +use Alchemy\Phrasea\Model\Repositories\WebhookEventDeliveryRepository; +use Alchemy\Phrasea\Model\Repositories\WebhookEventRepository; +use Guzzle\Batch\BatchBuilder; +use Guzzle\Common\Event; +use Guzzle\Http\Client; +use Guzzle\Http\Message\Request; +use Guzzle\Plugin\Backoff\BackoffPlugin; +use Guzzle\Plugin\Backoff\CallbackBackoffStrategy; +use Guzzle\Plugin\Backoff\CurlBackoffStrategy; +use Guzzle\Plugin\Backoff\TruncatedBackoffStrategy; +use Psr\Log\LoggerAwareInterface; +use Psr\Log\LoggerInterface; +use Psr\Log\NullLogger; + +/** + * Class WebhookInvoker invokes remote endpoints with webhook event data + * @package Alchemy\Phrasea\Webhook + */ +class WebhookInvoker implements LoggerAwareInterface +{ + + /** + * @var ApiApplicationRepository + */ + private $applicationRepository; + + /** + * @var WebhookEventDeliveryManipulator + */ + private $eventDeliveryManipulator; + + /** + * @var WebhookEventDeliveryRepository + */ + private $eventDeliveryRepository; + + /** + * @var Client + */ + private $client; + + /** + * @var LoggerInterface + */ + private $logger; + /** + * @var EventProcessorFactory + */ + private $processorFactory; + /** + * @var WebhookEventRepository + */ + private $eventRepository; + /** + * @var WebhookEventManipulator + */ + private $eventManipulator; + + /** + * @param ApiApplicationRepository $applicationRepository + * @param EventProcessorFactory $processorFactory + * @param WebhookEventRepository $eventRepository + * @param WebhookEventManipulator $eventManipulator + * @param WebhookEventDeliveryManipulator $eventDeliveryManipulator + * @param WebhookEventDeliveryRepository $eventDeliveryRepository + * @param Client $client + * + * @todo Extract classes to reduce number of required dependencies + */ + public function __construct( + ApiApplicationRepository $applicationRepository, + EventProcessorFactory $processorFactory, + WebhookEventRepository $eventRepository, + WebhookEventManipulator $eventManipulator, + WebhookEventDeliveryManipulator $eventDeliveryManipulator, + WebhookEventDeliveryRepository $eventDeliveryRepository, + Client $client = null + ) { + $this->applicationRepository = $applicationRepository; + $this->processorFactory = $processorFactory; + $this->eventRepository = $eventRepository; + $this->eventManipulator = $eventManipulator; + $this->eventDeliveryManipulator = $eventDeliveryManipulator; + $this->eventDeliveryRepository = $eventDeliveryRepository; + $this->client = $client; + $this->logger = new NullLogger(); + + $this->configureClient(); + } + + /** + * Sets a logger instance on the object. + * + * @param LoggerInterface $logger + * + * @return null + */ + public function setLogger(LoggerInterface $logger) + { + $this->logger = $logger; + } + + public function invoke(WebhookEvent $event) + { + $this->doInvoke($event, $this->applicationRepository->findWithDefinedWebhookCallback()); + } + + public function invokeUnprocessedEvents() + { + $targetApplications = $this->applicationRepository->findWithDefinedWebhookCallback(); + + foreach ($this->eventRepository->getUnprocessedEventIterator() as $row) { + /** @var WebhookEvent $event */ + $event = $row[0]; + + $this->doInvoke($event, $targetApplications); + } + } + + /** + * @param WebhookEvent $event + * @param ApiApplication[] $targets + */ + private function doInvoke(WebhookEvent $event, array $targets) + { + $this->eventManipulator->processed($event); + $this->logger->info(sprintf('Processing event "%s" with id %d', $event->getName(), $event->getId())); + + // send requests + $this->doHttpDelivery($event, $targets); + } + + private function configureClient() + { + $this->client->getEventDispatcher()->addListener('request.error', function (Event $event) { + // Override guzzle default behavior of throwing exceptions + // when 4xx & 5xx responses are encountered + $event->stopPropagation(); + }, -254); + + // Set callback which logs success or failure + $subscriber = new CallbackBackoffStrategy(function ($retries, Request $request, $response, $e) { + $retry = true; + if ($response && (null !== $deliverId = parse_url($request->getUrl(), PHP_URL_FRAGMENT))) { + $delivery = $this->eventDeliveryRepository->find($deliverId); + + $logContext = ['host' => $request->getHost()]; + + if ($response->isSuccessful()) { + $this->eventDeliveryManipulator->deliverySuccess($delivery); + + $logType = 'info'; + $logEntry = sprintf('Deliver success event "%d:%s" for app "%s"', + $delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(), + $delivery->getThirdPartyApplication()->getName() + ); + + $retry = false; + } else { + $this->eventDeliveryManipulator->deliveryFailure($delivery); + + $logType = 'error'; + $logEntry = sprintf('Deliver failure event "%d:%s" for app "%s"', + $delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(), + $delivery->getThirdPartyApplication()->getName() + ); + } + + $this->logger->log($logType, $logEntry, $logContext); + + return $retry; + } + }, true, new CurlBackoffStrategy()); + + // Set max retries + $subscriber = new TruncatedBackoffStrategy(WebhookEventDelivery::MAX_DELIVERY_TRIES, $subscriber); + $subscriber = new BackoffPlugin($subscriber); + + $this->client->addSubscriber($subscriber); + } + + /** + * @param WebhookEvent $event + * @param ApiApplication[] $targets + */ + private function doHttpDelivery( + WebhookEvent $event, + array $targets + ) { + if (count($targets) === 0) { + $this->logger->info(sprintf('No applications defined to listen for webhook events')); + + return; + } + + // Format event data + $eventProcessor = $this->processorFactory->getProcessor($event); + $data = $eventProcessor->process($event); + + // Batch requests + $batch = BatchBuilder::factory() + ->transferRequests(10) + ->build(); + + foreach ($targets as $thirdPartyApplication) { + $delivery = $this->eventDeliveryManipulator->create($thirdPartyApplication, $event); + + // append delivery id as url anchor + $uniqueUrl = $this->buildUrl($thirdPartyApplication, $delivery); + + // create http request with data as request body + $batch->add($this->client->createRequest('POST', $uniqueUrl, [ + 'Content-Type' => 'application/vnd.phraseanet.event+json' + ], json_encode($data))); + } + + $batch->flush(); + } + + /** + * @param ApiApplication $application + * @param WebhookEventDelivery $delivery + * @return string + */ + private function buildUrl(ApiApplication $application, WebhookEventDelivery $delivery) + { + return sprintf('%s#%s', $application->getWebhookUrl(), $delivery->getId()); + } +} diff --git a/lib/Alchemy/Phrasea/Webhook/WebhookPublisher.php b/lib/Alchemy/Phrasea/Webhook/WebhookPublisher.php new file mode 100644 index 0000000000..29a6a9511e --- /dev/null +++ b/lib/Alchemy/Phrasea/Webhook/WebhookPublisher.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Alchemy\Phrasea\Webhook; + +use Alchemy\Phrasea\Model\Entities\WebhookEvent; +use Alchemy\Queue\Message; +use Alchemy\Queue\MessageQueueRegistry; + +/** + * Class WebhookPublisher publishes webhook event notifications in message queues + * @package Alchemy\Phrasea\Webhook + */ +class WebhookPublisher +{ + /** + * @var MessageQueueRegistry + */ + private $queueRegistry; + + /** + * @var string + */ + private $queueName; + + /** + * @param MessageQueueRegistry $queueRegistry + * @param $queueName + */ + public function __construct(MessageQueueRegistry $queueRegistry, $queueName) + { + $this->queueRegistry = $queueRegistry; + $this->queueName = $queueName; + } + + /** + * @param WebhookEvent $event + */ + public function publishWebhookEvent(WebhookEvent $event) + { + $queue = $this->queueRegistry->getQueue($this->queueName); + $payload = [ + 'message_type' => $event->getId(), + 'payload' => [ 'id' => $event->getId() ] + ]; + + $queue->publish(new Message($payload)); + } +} diff --git a/lib/classes/API/Webhook.php b/lib/classes/API/Webhook.php deleted file mode 100644 index 8d347a9322..0000000000 --- a/lib/classes/API/Webhook.php +++ /dev/null @@ -1,95 +0,0 @@ -appbox = $appbox; - $this->id = $id; - $sql = 'SELECT `type`, `data`, created - FROM api_webhooks - WHERE id = :id'; - $stmt = $this->appbox->get_connection()->prepare($sql); - $stmt->execute([':id' => $id]); - $row = $stmt->fetch(\PDO::FETCH_ASSOC); - - if (!$row) { - throw new RuntimeException('Webhooks not found'); - } - - $stmt->closeCursor(); - - $this->type = $row['type']; - $this->data = json_decode($row['data']); - $this->created = new \DateTime($row['created']); - } - - public function delete() - { - $sql = 'DELETE FROM api_webhooks WHERE id = :id'; - - $stmt = $this->appbox->get_connection()->prepare($sql); - $stmt->execute([':id' => $this->id]); - $stmt->closeCursor(); - - return; - } - - public static function create(appbox $appbox, $type, array $data) - { - $sql = 'INSERT INTO api_webhooks (id, `type`, `data`, created) - VALUES (null, :type, :data, NOW())'; - - $stmt = $appbox->get_connection()->prepare($sql); - $stmt->execute([ - 'type' => $type, - 'data' => json_encode($data), - ]); - $stmt->closeCursor(); - - return new API_Webhook($appbox, $appbox->get_connection()->lastInsertId()); - } - - /** - * @return \DateTime - */ - public function getCreated() - { - return $this->created; - } - - /** - * @return mixed - */ - public function getData() - { - return $this->data; - } - - /** - * @return mixed - */ - public function getType() - { - return $this->type; - } -}