include plugin worker in phraseanet core

This commit is contained in:
aynsix
2020-05-12 18:06:45 +03:00
parent e4a872dfeb
commit bb2123df5a
77 changed files with 5185 additions and 68 deletions

View File

@@ -58,6 +58,9 @@ use Alchemy\Phrasea\Command\User\UserPasswordCommand;
use Alchemy\Phrasea\Command\User\UserListCommand;
use Alchemy\Phrasea\Command\UpgradeDBDatas;
use Alchemy\Phrasea\Command\ApplyRightsCommand;
use Alchemy\Phrasea\WorkerManager\Command\WorkerExecuteCommand;
use Alchemy\Phrasea\WorkerManager\Command\WorkerRunServiceCommand;
use Alchemy\Phrasea\WorkerManager\Command\WorkerShowConfigCommand;
require_once __DIR__ . '/../lib/autoload.php';
@@ -162,9 +165,9 @@ $cli->command(new QueryParseCommand());
$cli->command(new QuerySampleCommand());
$cli->command(new FindConceptsCommand());
//$cli->command($cli['alchemy_worker.commands.run_dispatcher_command']);
//$cli->command($cli['alchemy_worker.commands.run_worker_command']);
//$cli->command($cli['alchemy_worker.commands.show_configuration']);
$cli->command(new WorkerExecuteCommand());
$cli->command(new WorkerRunServiceCommand());
$cli->command(new WorkerShowConfigCommand());
$cli->loadPlugins();

View File

@@ -133,7 +133,8 @@
"facebook/graph-sdk": "^5.6",
"box/spout": "^2.7",
"paragonie/random-lib": "^2.0",
"czproject/git-php": "^3.17"
"czproject/git-php": "^3.17",
"php-amqplib/php-amqplib": "2.9"
},
"require-dev": {
"mikey179/vfsstream": "~1.5",

76
composer.lock generated
View File

@@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "008ff0b5d3d13b4f0ce5d34348ded83a",
"content-hash": "1fda2bd48bdb1ad3a2cf0bfca723d775",
"packages": [
{
"name": "alchemy-fr/tcpdf-clone",
@@ -5145,6 +5145,80 @@
],
"time": "2019-03-20T17:19:05+00:00"
},
{
"name": "php-amqplib/php-amqplib",
"version": "v2.9.0",
"source": {
"type": "git",
"url": "https://github.com/php-amqplib/php-amqplib.git",
"reference": "08d105d6b69ff4d8fa4fe090c49afdc568a5665b"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/08d105d6b69ff4d8fa4fe090c49afdc568a5665b",
"reference": "08d105d6b69ff4d8fa4fe090c49afdc568a5665b",
"shasum": ""
},
"require": {
"ext-bcmath": "*",
"ext-sockets": "*",
"php": ">=5.4.0"
},
"replace": {
"videlalvaro/php-amqplib": "self.version"
},
"require-dev": {
"ext-curl": "*",
"nategood/httpful": "^0.2.20",
"phpdocumentor/phpdocumentor": "^2.9",
"phpunit/phpunit": "^4.8",
"squizlabs/php_codesniffer": "^2.5"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "2.8-dev"
}
},
"autoload": {
"psr-4": {
"PhpAmqpLib\\": "PhpAmqpLib/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"LGPL-2.1-or-later"
],
"authors": [
{
"name": "Alvaro Videla",
"role": "Original Maintainer"
},
{
"name": "John Kelly",
"email": "johnmkelly86@gmail.com",
"role": "Maintainer"
},
{
"name": "Raúl Araya",
"email": "nubeiro@gmail.com",
"role": "Maintainer"
},
{
"name": "Luke Bakken",
"email": "luke@bakken.io",
"role": "Maintainer"
}
],
"description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.",
"homepage": "https://github.com/php-amqplib/php-amqplib/",
"keywords": [
"message",
"queue",
"rabbitmq"
],
"time": "2019-03-22T16:02:15+00:00"
},
{
"name": "php-ffmpeg/php-ffmpeg",
"version": "v0.15",

View File

@@ -87,6 +87,8 @@ use Alchemy\Phrasea\Media\MediaAccessorResolver;
use Alchemy\Phrasea\Media\PermalinkMediaResolver;
use Alchemy\Phrasea\Media\TechnicalDataServiceProvider;
use Alchemy\Phrasea\Model\Entities\User;
use Alchemy\Phrasea\WorkerManager\Provider\AlchemyWorkerServiceProvider;
use Alchemy\Phrasea\WorkerManager\Provider\QueueWorkerServiceProvider;
use Alchemy\QueueProvider\QueueServiceProvider;
use Alchemy\WorkerProvider\WorkerServiceProvider;
use Doctrine\DBAL\Event\ConnectionEventArgs;
@@ -268,6 +270,9 @@ class Application extends SilexApplication
$this->register(new OrderServiceProvider());
$this->register(new WebhookServiceProvider());
$this->register(new QueueWorkerServiceProvider());
$this->register(new AlchemyWorkerServiceProvider());
$this['monolog'] = $this->share(
$this->extend('monolog', function (LoggerInterface $logger, Application $app) {

View File

@@ -6,6 +6,7 @@ use Alchemy\EmbedProvider\EmbedServiceProvider;
use Alchemy\Phrasea\Application;
use Alchemy\Phrasea\ControllerProvider as Providers;
use Alchemy\Phrasea\Report\ControllerProvider\ProdReportControllerProvider;
use Alchemy\Phrasea\WorkerManager\Provider\ControllerServiceProvider as WorkerManagerProvider;
use Assert\Assertion;
use Silex\ControllerProviderInterface;
@@ -28,6 +29,7 @@ class RouteLoader
'/admin/setup' => Providers\Admin\Setup::class,
'/admin/subdefs' => Providers\Admin\Subdefs::class,
'/admin/task-manager' => Providers\Admin\TaskManager::class,
'/admin/worker-manager' => WorkerManagerProvider::class,
'/admin/users' => Providers\Admin\Users::class,
'/client/' => Providers\Client\Root::class,
'/datafiles' => Providers\Datafiles::class,

View File

@@ -368,6 +368,7 @@ class RootController extends Controller
'collection',
'user',
'users',
'workermanager'
];
$feature = 'connected';

View File

@@ -87,6 +87,8 @@ use Alchemy\Phrasea\SearchEngine\SearchEngineResult;
use Alchemy\Phrasea\Status\StatusStructure;
use Alchemy\Phrasea\TaskManager\LiveInformation;
use Alchemy\Phrasea\Utilities\NullableDateTime;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreateEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Doctrine\ORM\EntityManager;
use Guzzle\Http\Client as Guzzle;
use League\Fractal\Resource\Item;
@@ -217,6 +219,30 @@ class V1Controller extends Controller
return $this->showTaskAction($request, $task);
}
/**
* Use with the uploader service
* @param Request $request
* @return Response
*/
public function sendAssetsInQueue(Request $request)
{
$jsonBodyHelper = $this->getJsonBodyHelper();
$schema = $this->app['json-schema.ref_resolver']->resolve($this->app['json-schema.base_uri']. 'assets_enqueue.json');
$data = $request->getContent();
$errors = $jsonBodyHelper->validateJson(json_decode($data), $schema);
if (count($errors) > 0) {
return Result::createError($request, 422, $errors[0])->createResponse();
}
$this->dispatch(WorkerEvents::ASSETS_CREATE, new AssetsCreateEvent(json_decode($data)));
return Result::create($request, [
"data" => json_decode($data),
])->createResponse();
}
private function getCacheInformation()
{
$caches = [

View File

@@ -284,6 +284,9 @@ class V1 extends Api implements ControllerProviderInterface, ServiceProviderInte
$controllers->post('/accounts/unlock/{token}/', 'controller.api.v1:unlockAccount')
->before('controller.api.v1:ensureUserManagementRights');
// the api route for the uploader service
$controllers->post('/upload/enqueue', 'controller.api.v1:sendAssetsInQueue');
return $controllers;
}
}

View File

@@ -54,6 +54,7 @@ class ControllerProviderServiceProvider implements ServiceProviderInterface
Admin\Setup::class => [],
Admin\Subdefs::class => [],
Admin\TaskManager::class => [],
\Alchemy\Phrasea\WorkerManager\Provider\ControllerServiceProvider::class => [],
Admin\Users::class => [],
Client\Root::class => [],
Datafiles::class => [],

View File

@@ -47,65 +47,10 @@ class ExportSubscriber extends AbstractNotificationSubscriber
$this->app['event-manager']->notify($params['usr_id'], 'eventsmanager_notify_downloadmailfail', $datas, $mailed);
}
public function onCreateExportMail(ExportMailEvent $event)
{
$destMails = $event->getDestinationMails();
$params = $event->getParams();
/** @var UserRepository $userRepository */
$userRepository = $this->app['repo.users'];
$user = $userRepository->find($event->getEmitterUserId());
/** @var TokenRepository $tokenRepository */
$tokenRepository = $this->app['repo.tokens'];
/** @var Token $token */
$token = $tokenRepository->findValidToken($event->getTokenValue());
$list = unserialize($token->getData());
//zip documents
\set_export::build_zip(
$this->app,
$token,
$list,
$this->app['tmp.download.path'].'/'. $token->getValue() . '.zip'
);
$remaingEmails = $destMails;
$emitter = new Emitter($user->getDisplayName(), $user->getEmail());
foreach ($destMails as $key => $mail) {
try {
$receiver = new Receiver(null, trim($mail));
} catch (InvalidArgumentException $e) {
continue;
}
$mail = MailRecordsExport::create($this->app, $receiver, $emitter, $params['textmail']);
$mail->setButtonUrl($params['url']);
$mail->setExpiration($token->getExpiration());
$this->deliver($mail, $params['reading_confirm']);
unset($remaingEmails[$key]);
}
//some mails failed
if (count($remaingEmails) > 0) {
foreach ($remaingEmails as $mail) {
$this->app['dispatcher']->dispatch(PhraseaEvents::EXPORT_MAIL_FAILURE, new ExportFailureEvent($user, $params['ssttid'], $params['lst'], \eventsmanager_notify_downloadmailfail::MAIL_FAIL, $mail));
}
}
}
public static function getSubscribedEvents()
{
return [
PhraseaEvents::EXPORT_MAIL_FAILURE => 'onMailExportFailure',
PhraseaEvents::EXPORT_MAIL_CREATE => 'onCreateExportMail',
];
}
}

View File

@@ -33,7 +33,6 @@ class RecordEditSubscriber implements EventSubscriberInterface
RecordEvents::ROTATE => 'onRecordChange',
RecordEvents::COLLECTION_CHANGED => 'onCollectionChanged',
RecordEvents::SUBDEFINITION_CREATE => 'onSubdefinitionCreate',
RecordEvents::DELETE => 'onDelete',
);
}
@@ -59,12 +58,6 @@ class RecordEditSubscriber implements EventSubscriberInterface
$recordAdapter->rebuild_subdefs();
}
public function onDelete(DeleteEvent $event)
{
$recordAdapter = $this->convertToRecordAdapter($event->getRecord());
$recordAdapter->delete();
}
public function onEdit(RecordEdit $event)
{
static $into = false;

View File

@@ -147,6 +147,9 @@ class RepositoriesServiceProvider implements ServiceProviderInterface
$app['repo.webhook-delivery'] = $app->share(function (PhraseaApplication $app) {
return $app['orm.em']->getRepository('Phraseanet:WebhookEventDelivery');
});
$app['repo.worker-running-job'] = $app->share(function (PhraseaApplication $app) {
return $app['orm.em']->getRepository('Phraseanet:WorkerRunningJob');
});
$app['repo.databoxes'] = $app->share(function (PhraseaApplication $app) {
$appbox = $app->getApplicationBox();

View File

@@ -17,7 +17,7 @@ class Version
* @var string
*/
private $number = '4.1.0-alpha.26a';
private $number = '4.1.0-alpha.27a';
/**
* @var string

View File

@@ -0,0 +1,133 @@
<?php
namespace Alchemy\Phrasea\Model\Entities;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Table(name="WorkerRunningJob",
* indexes={
* @ORM\index(name="databox_id", columns={"databox_id"}),
* @ORM\index(name="record_id", columns={"record_id"}),
* }
* )
* @ORM\Entity(repositoryClass="Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository")
*/
class WorkerRunningJob
{
/**
* @ORM\Column(type="integer")
* @ORM\Id
* @ORM\GeneratedValue
*/
private $id;
/**
* @ORM\Column(type="integer", name="databox_id")
*/
private $databoxId;
/**
* @ORM\Column(type="integer", name="record_id")
*/
private $recordId;
/**
* @ORM\Column(type="integer", name="work")
*/
private $work;
/**
* @ORM\Column(type="string", name="work_on")
*/
private $workOn;
/**
* @return integer
*/
public function getId()
{
return $this->id;
}
/**
* @param $databoxId
* @return $this
*/
public function setDataboxId($databoxId)
{
$this->databoxId = $databoxId;
return $this;
}
/**
* @return mixed
*/
public function getDataboxId()
{
return $this->databoxId;
}
/**
* @param $recordId
* @return $this
*/
public function setRecordId($recordId)
{
$this->recordId = $recordId;
return $this;
}
/**
* @return mixed
*/
public function getRecordId()
{
return $this->recordId;
}
/**
* @param $work
* @return $this
*/
public function setWork($work)
{
$this->work = $work;
return $this;
}
/**
* @return mixed
*/
public function getWork()
{
return $this->work;
}
/**
* @param $workOn
* @return $this
*/
public function setWorkOn($workOn)
{
$this->workOn = $workOn;
return $this;
}
/**
* @return mixed
*/
public function getWorkOn()
{
return $this->workOn;
}
}

View File

@@ -0,0 +1,72 @@
<?php
namespace Alchemy\Phrasea\Model\Repositories;
use Alchemy\Phrasea\Core\PhraseaTokens;
use Doctrine\ORM\EntityRepository;
class WorkerRunningJobRepository extends EntityRepository
{
/**
* return true if we can create subdef
* @param $subdefName
* @param $recordId
* @param $databoxId
* @return bool
*/
public function canCreateSubdef($subdefName, $recordId, $databoxId)
{
$rsm = $this->createResultSetMappingBuilder('w');
$rsm->addScalarResult('work_on','work_on');
$sql = 'SELECT work_on
FROM WorkerRunningJob
WHERE ((work & :write_meta) > 0 OR ((work & :make_subdef) > 0 AND work_on = :work_on) )
AND record_id = :record_id
AND databox_id = :databox_id';
$query = $this->_em->createNativeQuery($sql, $rsm);
$query->setParameters([
'write_meta' => PhraseaTokens::WRITE_META,
'make_subdef'=> PhraseaTokens::MAKE_SUBDEF,
'work_on' => $subdefName,
'record_id' => $recordId,
'databox_id' => $databoxId
]
);
return count($query->getResult()) == 0;
}
/**
* return true if we can write meta
*
* @param $subdefName
* @param $recordId
* @param $databoxId
* @return bool
*/
public function canWriteMetadata($subdefName, $recordId, $databoxId)
{
$rsm = $this->createResultSetMappingBuilder('w');
$rsm->addScalarResult('work_on','work_on');
$sql = 'SELECT work_on
FROM WorkerRunningJob
WHERE ((work & :make_subdef) > 0 OR ((work & :write_meta) > 0 AND work_on = :work_on) )
AND record_id = :record_id
AND databox_id = :databox_id';
$query = $this->_em->createNativeQuery($sql, $rsm);
$query->setParameters([
'make_subdef'=> PhraseaTokens::MAKE_SUBDEF,
'write_meta' => PhraseaTokens::WRITE_META,
'work_on' => $subdefName,
'record_id' => $recordId,
'databox_id' => $databoxId
]
);
return count($query->getResult()) == 0;
}
}

View File

@@ -0,0 +1,83 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Command;
use Alchemy\Phrasea\Command\Command;
use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection;
use Alchemy\Phrasea\WorkerManager\Queue\MessageHandler;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker;
use PhpAmqpLib\Channel\AMQPChannel;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class WorkerExecuteCommand extends Command
{
/**
* Constructor
*/
public function __construct()
{
parent::__construct('worker:execute');
$this->setDescription('Listen queues define on configuration, launch corresponding service for execution')
->addOption('preserve-payload', 'p', InputOption::VALUE_NONE, 'Preserve temporary payload file')
->addOption('queue-name', '', InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'The name of queues to be consuming')
->addOption('max-processes', 'm', InputOption::VALUE_REQUIRED, 'The max number of process allow to run (default 4) ')
->addOption('MWG', '', InputOption::VALUE_NONE, 'Enable MWG metadata compatibility (use only for write metadata service)')
->addOption('clear-metadatas', '', InputOption::VALUE_NONE, 'Delete metadatas from documents if not compliant with Database structure (use only for write metadata service)')
->setHelp('');
return $this;
}
protected function doExecute(InputInterface $input, OutputInterface $output)
{
$MWG = false;
$clearMetadatas = false;
$argQueueName = $input->getOption('queue-name');
$maxProcesses = intval($input->getOption('max-processes'));
/** @var AMQPConnection $serverConnection */
$serverConnection = $this->container['alchemy_worker.amqp.connection'];
/** @var AMQPChannel $channel */
$channel = $serverConnection->getChannel();
/** @var WorkerInvoker $workerInvoker */
$workerInvoker = $this->container['alchemy_worker.worker_invoker'];
if ($input->getOption('max-processes') != null && $maxProcesses == 0) {
$output->writeln('<error>Invalid max-processes option.Need an integer</error>');
return;
} elseif($maxProcesses) {
$workerInvoker->setMaxProcessPoolValue($maxProcesses);
}
if ($input->getOption('MWG')) {
$MWG = true;
}
if ($input->getOption('clear-metadatas')) {
$clearMetadatas = true;
}
if ($input->getOption('preserve-payload')) {
$workerInvoker->preservePayloads();
}
/** @var MessageHandler $messageHandler */
$messageHandler = $this->container['alchemy_worker.message.handler'];
$messageHandler->consume($serverConnection, $workerInvoker, $argQueueName, $maxProcesses);
while (count($channel->callbacks)) {
$output->writeln("[*] Waiting for messages. To exit press CTRL+C");
$channel->wait();
}
$serverConnection->connectionClose();
}
}

View File

@@ -0,0 +1,56 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Command;
use Alchemy\Phrasea\Command\Command;
use Alchemy\Phrasea\WorkerManager\Worker\Resolver\WorkerResolverInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class WorkerRunServiceCommand extends Command
{
public function __construct()
{
parent::__construct('worker:run-service');
$this->setDescription('Execute a service')
->addArgument('type')
->addArgument('body')
->addOption('preserve-payload', 'p', InputOption::VALUE_NONE, 'Preserve temporary payload file');
return $this;
}
protected function doExecute(InputInterface $input, OutputInterface $output)
{
/** @var WorkerResolverInterface $workerResolver */
$workerResolver = $this->container['alchemy_worker.type_based_worker_resolver'];
$type = $input->getArgument('type');
$body = file_get_contents($input->getArgument('body'));
if ($body === false) {
$output->writeln('Unable to read payload file');
return;
}
$body = json_decode($body, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$output->writeln('<error>Invalid message body</error>');
return;
}
$worker = $workerResolver->getWorker($type, $body);
$worker->process($body);
if (! $input->getOption('preserve-payload')) {
unlink($input->getArgument('body'));
}
}
}

View File

@@ -0,0 +1,27 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Command;
use Alchemy\Phrasea\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Yaml\Yaml;
class WorkerShowConfigCommand extends Command
{
public function __construct()
{
parent::__construct('worker:show-configuration');
$this->setDescription('Show queues configuration');
}
public function doExecute(InputInterface $input, OutputInterface $output)
{
$serverConfiguration = $this->container['conf']->get(['workers', 'queue', 'worker-queue']);
$output->writeln(['', 'Configured server: ']);
$output->writeln(['Rabbit Server : ' . Yaml::dump($serverConfiguration, 0), '']);
}
}

View File

@@ -0,0 +1,33 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Configuration;
class Config
{
const WORKER_DATABASE_FILE = 'worker.db';
public static function getPluginDatabaseFile()
{
$dbDir = realpath(dirname(__FILE__) . "/../") . "/Db" ;
if (!is_dir($dbDir)) {
mkdir($dbDir, 0755, true);
}
$dbFile = $dbDir . '/' . self::WORKER_DATABASE_FILE;
if (!is_file($dbFile)) {
file_put_contents($dbFile, '');
}
return $dbFile;
}
public static function getWorkerSqliteConnection()
{
$db_conn = 'sqlite:'. self::getPluginDatabaseFile();
$pdo = new \PDO($db_conn);
return $pdo;
}
}

View File

@@ -0,0 +1,168 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Controller;
use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\Controller\Controller;
use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions;
use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Form\WorkerConfigurationType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerPullAssetsType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerSearchengineType;
use Alchemy\Phrasea\WorkerManager\Model\DBManipulator;
use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Form\FormInterface;
use Symfony\Component\HttpFoundation\Request;
class AdminConfigurationController extends Controller
{
public function indexAction(PhraseaApplication $app, Request $request)
{
return $this->render('admin/worker-manager/index.html.twig');
}
/**
* @param PhraseaApplication $app
* @param Request $request
* @return mixed
*/
public function configurationAction(PhraseaApplication $app, Request $request)
{
$retryQueueConfig = $this->getRetryQueueConfiguration();
$form = $app->form(new WorkerConfigurationType(), $retryQueueConfig);
$form->handleRequest($request);
if ($form->isValid()) {
// save config in file
$app['conf']->set(['workers', 'retry_queue'], $form->getData());
$queues = array_intersect_key(AMQPConnection::$defaultQueues, $retryQueueConfig);
$retryQueuesToReset = array_intersect_key(AMQPConnection::$defaultRetryQueues, array_flip($queues));
/** @var AMQPConnection $serverConnection */
$serverConnection = $this->app['alchemy_worker.amqp.connection'];
// change the queue TTL
$serverConnection->reinitializeQueue($retryQueuesToReset);
return $app->redirectPath('worker_admin');
}
return $this->render('admin/worker-manager/worker_configuration.html.twig', [
'form' => $form->createView()
]);
}
public function searchengineAction(PhraseaApplication $app, Request $request)
{
$options = $this->getElasticsearchOptions();
$form = $app->form(new WorkerSearchengineType(), $options);
$form->handleRequest($request);
if ($form->isValid()) {
$populateInfo = $this->getData($form);
$this->getDispatcher()->dispatch(WorkerEvents::POPULATE_INDEX, new PopulateIndexEvent($populateInfo));
return $app->redirectPath('worker_admin');
}
return $this->render('admin/worker-manager/worker_searchengine.html.twig', [
'form' => $form->createView()
]);
}
public function subviewAction(PhraseaApplication $app)
{
return $this->render('admin/worker-manager/worker_subview.html.twig', [
]);
}
public function metadataAction(PhraseaApplication $app)
{
return $this->render('admin/worker-manager/worker_metadata.html.twig', [
]);
}
public function populateStatusAction(PhraseaApplication $app, Request $request)
{
$databoxIds = $request->get('sbasIds');
return DBManipulator::checkPopulateIndexStatusByDataboxId($databoxIds);
}
public function pullAssetsAction(PhraseaApplication $app, Request $request)
{
$pullAssetsConfig = $this->getPullAssetsConfiguration();
$form = $app->form(new WorkerPullAssetsType(), $pullAssetsConfig);
$form->handleRequest($request);
if ($form->isValid()) {
/** @var AMQPConnection $serverConnection */
$serverConnection = $this->app['alchemy_worker.amqp.connection'];
$serverConnection->setQueue(MessagePublisher::PULL_QUEUE);
// save new pull config
$app['conf']->set(['workers', 'pull_assets'], array_merge($pullAssetsConfig, $form->getData()));
// reinitialize the pull queues
$serverConnection->reinitializeQueue([MessagePublisher::PULL_QUEUE]);
$this->app['alchemy_worker.message.publisher']->initializePullAssets();
return $app->redirectPath('worker_admin');
}
return $this->render('admin/worker-manager/worker_pull_assets.html.twig', [
'form' => $form->createView()
]);
}
/**
* @return EventDispatcherInterface
*/
private function getDispatcher()
{
return $this->app['dispatcher'];
}
/**
* @return ElasticsearchOptions
*/
private function getElasticsearchOptions()
{
return $this->app['elasticsearch.options'];
}
/**
* @param FormInterface $form
* @return array
*/
private function getData(FormInterface $form)
{
/** @var ElasticsearchOptions $options */
$options = $form->getData();
$data['host'] = $options->getHost();
$data['port'] = $options->getPort();
$data['indexName'] = $options->getIndexName();
$data['databoxIds'] = $form->getExtraData()['sbas'];
return $data;
}
private function getPullAssetsConfiguration()
{
return $this->app['conf']->get(['workers', 'pull_assets'], []);
}
private function getRetryQueueConfiguration()
{
return $this->app['conf']->get(['workers', 'retry_queue'], []);
}
}

View File

@@ -0,0 +1,21 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
use Symfony\Component\EventDispatcher\Event as SfEvent;
class AssetsCreateEvent extends SfEvent
{
private $data;
public function __construct($data)
{
$this->data = $data;
}
public function getData()
{
return $this->data;
}
}

View File

@@ -0,0 +1,34 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
use Symfony\Component\EventDispatcher\Event as SfEvent;
class AssetsCreationFailureEvent extends SfEvent
{
private $payload;
private $workerMessage;
private $count;
public function __construct($payload, $workerMessage, $count = 2)
{
$this->payload = $payload;
$this->workerMessage = $workerMessage;
$this->count = $count;
}
public function getPayload()
{
return $this->payload;
}
public function getWorkerMessage()
{
return $this->workerMessage;
}
public function getCount()
{
return $this->count;
}
}

View File

@@ -0,0 +1,35 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
use Symfony\Component\EventDispatcher\Event as SfEvent;
class AssetsCreationRecordFailureEvent extends SfEvent
{
/** @var array */
private $payload;
private $workerMessage;
private $count;
public function __construct($payload, $workerMessage = '', $count = 2)
{
$this->payload = $payload;
$this->workerMessage = $workerMessage;
$this->count = $count;
}
public function getPayload()
{
return $this->payload;
}
public function getWorkerMessage()
{
return $this->workerMessage;
}
public function getCount()
{
return $this->count;
}
}

View File

@@ -0,0 +1,55 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
use Symfony\Component\EventDispatcher\Event as SfEvent;
class ExportMailFailureEvent extends SfEvent
{
private $emitterUserId;
private $tokenValue;
private $destinationMails;
private $params;
private $workerMessage;
private $count;
public function __construct($emitterUserId, $tokenValue, $destinationMails, $params, $workerMessage = '', $count = 2)
{
$this->emitterUserId = $emitterUserId;
$this->tokenValue = $tokenValue;
$this->destinationMails = $destinationMails;
$this->params = $params;
$this->workerMessage = $workerMessage;
$this->count = $count;
}
public function getEmitterUserId()
{
return $this->emitterUserId;
}
public function getTokenValue()
{
return $this->tokenValue;
}
public function getDestinationMails()
{
return $this->destinationMails;
}
public function getParams()
{
return $this->params;
}
public function getWorkerMessage()
{
return $this->workerMessage;
}
public function getCount()
{
return $this->count;
}
}

View File

@@ -0,0 +1,25 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
use Symfony\Component\EventDispatcher\Event as SfEvent;
class PopulateIndexEvent extends SfEvent
{
/** @var array */
private $data;
public function __construct($data)
{
$this->data = $data;
}
/**
* @return array
*/
public function getData()
{
return $this->data;
}
}

View File

@@ -0,0 +1,55 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
use Symfony\Component\EventDispatcher\Event as SfEvent;
class PopulateIndexFailureEvent extends SfEvent
{
private $host;
private $port;
private $indexName;
private $databoxId;
private $workerMessage;
private $count;
public function __construct($host, $port, $indexName, $databoxId, $workerMessage = '', $count = 2)
{
$this->host = $host;
$this->port = $port;
$this->indexName = $indexName;
$this->databoxId = $databoxId;
$this->workerMessage = $workerMessage;
$this->count = $count;
}
public function getHost()
{
return $this->host;
}
public function getPort()
{
return $this->port;
}
public function getIndexName()
{
return $this->indexName;
}
public function getDataboxId()
{
return $this->databoxId;
}
public function getWorkerMessage()
{
return $this->workerMessage;
}
public function getCount()
{
return $this->count;
}
}

View File

@@ -0,0 +1,21 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
use Symfony\Component\EventDispatcher\Event as SfEvent;
class StoryCreateCoverEvent extends SfEvent
{
private $data;
public function __construct($data)
{
$this->data = $data;
}
public function getData()
{
return $this->data;
}
}

View File

@@ -0,0 +1,37 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
use Alchemy\Phrasea\Core\Event\Record\RecordEvent;
use Alchemy\Phrasea\Model\RecordInterface;
class SubdefinitionCreationFailureEvent extends RecordEvent
{
private $subdefName;
private $workerMessage;
private $count;
public function __construct(RecordInterface $record, $subdefName, $workerMessage = '', $count = 2)
{
parent::__construct($record);
$this->subdefName = $subdefName;
$this->workerMessage = $workerMessage;
$this->count = $count;
}
public function getSubdefName()
{
return $this->subdefName;
}
public function getWorkerMessage()
{
return $this->workerMessage;
}
public function getCount()
{
return $this->count;
}
}

View File

@@ -0,0 +1,47 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
use Alchemy\Phrasea\Core\Event\Record\RecordEvent;
use Alchemy\Phrasea\Model\RecordInterface;
class SubdefinitionWritemetaEvent extends RecordEvent
{
const CREATE = 'create';
const FAILED = 'failed';
private $status;
private $subdefName;
private $workerMessage;
private $count;
public function __construct(RecordInterface $record, $subdefName, $status = self::CREATE, $workerMessage = '', $count = 2)
{
parent::__construct($record);
$this->subdefName = $subdefName;
$this->status = $status;
$this->workerMessage = $workerMessage;
$this->count = $count;
}
public function getSubdefName()
{
return $this->subdefName;
}
public function getStatus()
{
return $this->status;
}
public function getWorkerMessage()
{
return $this->workerMessage;
}
public function getCount()
{
return $this->count;
}
}

View File

@@ -0,0 +1,41 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
use Symfony\Component\EventDispatcher\Event as SfEvent;
class WebhookDeliverFailureEvent extends SfEvent
{
private $webhookEventId;
private $workerMessage;
private $count;
private $deleveryId;
public function __construct($webhookEventId, $workerMessage, $count = 2, $deleveryId = null)
{
$this->webhookEventId = $webhookEventId;
$this->workerMessage = $workerMessage;
$this->count = $count;
$this->deleveryId = $deleveryId;
}
public function getWebhookEventId()
{
return $this->webhookEventId;
}
public function getWorkerMessage()
{
return $this->workerMessage;
}
public function getCount()
{
return $this->count;
}
public function getDeleveryId()
{
return $this->deleveryId;
}
}

View File

@@ -0,0 +1,22 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Event;
final class WorkerEvents
{
const ASSETS_CREATE = 'assets.create';
const ASSETS_CREATION_FAILURE = 'assets.create_failure';
const ASSETS_CREATION_RECORD_FAILURE = 'assets.creation_record_failure';
const STORY_CREATE_COVER = 'story.create_cover';
const POPULATE_INDEX = 'populate.index';
const POPULATE_INDEX_FAILURE = "populate.index_failure";
const SUBDEFINITION_WRITE_META = 'subdefinition.write_meta';
const SUBDEFINITION_CREATION_FAILURE = 'subdefinition.creation_failure';
const EXPORT_MAIL_FAILURE = 'export.mail_failure';
const WEBHOOK_DELIVER_FAILURE = 'webhook.deliver_failure';
}

View File

@@ -0,0 +1,44 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Form;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Symfony\Component\Form\AbstractType;
use Symfony\Component\Form\FormBuilderInterface;
class WorkerConfigurationType extends AbstractType
{
public function buildForm(FormBuilderInterface $builder, array $options)
{
parent::buildForm($builder, $options);
$builder
->add(MessagePublisher::ASSETS_INGEST_TYPE, 'text', [
'label' => 'Ingest retry delay in ms'
])
->add(MessagePublisher::CREATE_RECORD_TYPE, 'text', [
'label' => 'Create record retry delay in ms'
])
->add(MessagePublisher::SUBDEF_CREATION_TYPE, 'text', [
'label' => 'Subdefinition retry delay in ms'
])
->add(MessagePublisher::WRITE_METADATAS_TYPE, 'text', [
'label' => 'Metadatas retry delay in ms'
])
->add(MessagePublisher::WEBHOOK_TYPE, 'text', [
'label' => 'Webhook retry delay in ms'
])
->add(MessagePublisher::EXPORT_MAIL_TYPE, 'text', [
'label' => 'Export mail retry delay in ms'
])
->add(MessagePublisher::POPULATE_INDEX_TYPE, 'text', [
'label' => 'Populate Index retry delay in ms'
])
;
}
public function getName()
{
return 'worker_configuration';
}
}

View File

@@ -0,0 +1,37 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Form;
use Symfony\Component\Form\AbstractType;
use Symfony\Component\Form\FormBuilderInterface;
class WorkerPullAssetsType extends AbstractType
{
public function buildForm(FormBuilderInterface $builder, array $options)
{
parent::buildForm($builder, $options);
$builder
->add('endpointCommit', 'text', [
'label' => 'Endpoint get commit'
])
->add('endpointToken', 'text', [
'label' => 'Endpoint get token'
])
->add('clientSecret', 'text', [
'label' => 'Client secret'
])
->add('clientId', 'text', [
'label' => 'Client ID'
])
->add('pullInterval', 'text', [
'label' => 'Fetching interval in second'
])
;
}
public function getName()
{
return 'worker_pullAssets';
}
}

View File

@@ -0,0 +1,48 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Form;
use Symfony\Component\Form\AbstractType;
use Symfony\Component\Form\FormBuilderInterface;
use Symfony\Component\OptionsResolver\OptionsResolverInterface;
use Symfony\Component\Validator\Constraints\NotBlank;
use Symfony\Component\Validator\Constraints\Range;
class WorkerSearchengineType extends AbstractType
{
public function buildForm(FormBuilderInterface $builder, array $options)
{
parent::buildForm($builder, $options);
$builder
->add('host', 'text', [
'label' => 'Elasticsearch server host',
'constraints' => new NotBlank(),
])
->add('port', 'integer', [
'label' => 'Elasticsearch service port',
'constraints' => [
new Range(['min' => 1, 'max' => 65535]),
new NotBlank()
]
])
->add('indexName', 'text', [
'label' => 'Elasticsearch index name',
'constraints' => new NotBlank(),
'attr' =>['data-class'=>'inline']
])
;
}
public function setDefaultOptions(OptionsResolverInterface $resolver)
{
$resolver->setDefaults([
'allow_extra_fields' => true
]);
}
public function getName()
{
return 'worker_searchengine';
}
}

View File

@@ -0,0 +1,175 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Model;
use Alchemy\Phrasea\WorkerManager\Configuration\Config;
class DBManipulator
{
/**
* @param array $params = [host, port, indexName, databoxId]
*/
public static function savePopulateStatus(array $params)
{
$pdo = Config::getWorkerSqliteConnection();
$pdo->beginTransaction();
try {
$pdo->query("CREATE TABLE IF NOT EXISTS populate_running(host TEXT NOT NULL, port TEXT NOT NULL, index_name TEXT NOT NULL, databox_id TEXT NOT NULL);");
$stmt = $pdo->prepare("INSERT INTO populate_running(host, port, index_name, databox_id) VALUES(:host, :port, :index_name, :databox_id)");
$stmt->execute([
':host' => $params['host'],
':port' => $params['port'],
':index_name' => $params['indexName'],
':databox_id' => $params['databoxId']
]);
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
}
}
/**
* @param array $params = [host, port, indexName, databoxId]
*/
public static function deletePopulateStatus(array $params)
{
$pdo = Config::getWorkerSqliteConnection();
$pdo->beginTransaction();
try {
$stmt = $pdo->prepare("DELETE FROM populate_running WHERE host = :host AND port= :port AND index_name= :index_name AND databox_id= :databox_id");
$stmt->execute([
':host' => $params['host'],
':port' => $params['port'],
':index_name' => $params['indexName'],
':databox_id' => $params['databoxId']
]);
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
}
}
/**
* Update commits table in the temporary sqlite worker.db
* @param $commitId
* @param $assetId
* @return int the number of the remaining assets in the commit
*/
public static function updateRemainingAssetsListByCommit($commitId, $assetId)
{
$row = 1;
$pdo = Config::getWorkerSqliteConnection();
$pdo->beginTransaction();
try {
// remove assetId from the assets list
$stmt = $pdo->prepare("DELETE FROM commits WHERE commit_id = :commit_id AND asset= :assetId");
$stmt->execute([
':commit_id' => $commitId,
':assetId' => $assetId
]);
$stmt = $pdo->prepare("SELECT * FROM commits WHERE commit_id = :commit_id");
$stmt->execute([
':commit_id' => $commitId,
]);
$row = $stmt->fetchAll(\PDO::FETCH_ASSOC);
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
}
return count($row);
}
/**
* @param $commitId
* @return bool
*/
public static function isCommitToBeCreating($commitId)
{
$pdo = Config::getWorkerSqliteConnection();
$pdo->beginTransaction();
$row = 0;
try {
$pdo->query("CREATE TABLE IF NOT EXISTS commits(commit_id TEXT NOT NULL, asset TEXT NOT NULL);");
$stmt = $pdo->prepare("SELECT * FROM commits WHERE commit_id = :commit_id");
$stmt->execute([
':commit_id' => $commitId,
]);
$row = $stmt->fetchAll(\PDO::FETCH_ASSOC);
$pdo->commit();
} catch (\Exception $e) {
//no-op
}
return count($row) ? true : false;
}
public static function saveAssetsList($commitId, $assetIds)
{
$pdo = Config::getWorkerSqliteConnection();
$pdo->beginTransaction();
try {
$pdo->query("CREATE TABLE IF NOT EXISTS commits(commit_id TEXT NOT NULL, asset TEXT NOT NULL);");
// insert all assets ID in the temporary sqlite database
foreach ($assetIds as $assetId) {
$stmt = $pdo->prepare("INSERT INTO commits(commit_id, asset) VALUES(:commit_id, :asset)");
$stmt->execute([
':commit_id' => $commitId,
':asset' => $assetId
]);
}
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
}
}
/**
* @param array $databoxIds
* @return int
*/
public static function checkPopulateIndexStatusByDataboxId(array $databoxIds)
{
$pdo = Config::getWorkerSqliteConnection();
$in = str_repeat("?,", count($databoxIds)-1) . "?";
$pdo->beginTransaction();
try {
$pdo->query("CREATE TABLE IF NOT EXISTS populate_running(host TEXT NOT NULL, port TEXT NOT NULL, index_name TEXT NOT NULL, databox_id TEXT NOT NULL);");
$stmt = $pdo->prepare("SELECT * FROM populate_running WHERE databox_id IN ($in)");
$stmt->execute($databoxIds);
$row = $stmt->fetchAll(\PDO::FETCH_ASSOC);
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
}
return count($row);
}
}

View File

@@ -0,0 +1,148 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Provider;
use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\Core\LazyLocator;
use Alchemy\Phrasea\Plugin\PluginProviderInterface;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Alchemy\Phrasea\WorkerManager\Worker\AssetsIngestWorker;
use Alchemy\Phrasea\WorkerManager\Worker\CreateRecordWorker;
use Alchemy\Phrasea\WorkerManager\Worker\DeleteRecordWorker;
use Alchemy\Phrasea\WorkerManager\Worker\ExportMailWorker;
use Alchemy\Phrasea\WorkerManager\Worker\Factory\CallableWorkerFactory;
use Alchemy\Phrasea\WorkerManager\Worker\PopulateIndexWorker;
use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool;
use Alchemy\Phrasea\WorkerManager\Worker\PullAssetsWorker;
use Alchemy\Phrasea\WorkerManager\Worker\Resolver\TypeBasedWorkerResolver;
use Alchemy\Phrasea\WorkerManager\Worker\SubdefCreationWorker;
use Alchemy\Phrasea\WorkerManager\Worker\WebhookWorker;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker;
use Alchemy\Phrasea\WorkerManager\Worker\WriteMetadatasWorker;
use Monolog\Handler\RotatingFileHandler;
use Monolog\Logger;
use Psr\Log\LoggerAwareInterface;
use Silex\Application;
class AlchemyWorkerServiceProvider implements PluginProviderInterface
{
public function register(Application $app)
{
$app['alchemy_worker.type_based_worker_resolver'] = $app->share(function () {
return new TypeBasedWorkerResolver();
});
$app['alchemy_worker.logger'] = $app->share(function (Application $app) {
$logger = new $app['monolog.logger.class']('alchemy-service logger');
$logger->pushHandler(new RotatingFileHandler(
$app['log.path'] . DIRECTORY_SEPARATOR . 'worker_service.log',
10,
Logger::INFO
));
return $logger;
});
// use the console logger
$loggerSetter = function (LoggerAwareInterface $loggerAware) use ($app) {
if (isset($app['logger'])) {
$loggerAware->setLogger($app['logger']);
}
return $loggerAware;
};
$app['alchemy_worker.process_pool'] = $app->share(function (Application $app) use ($loggerSetter) {
return $loggerSetter(new ProcessPool());
});
$app['alchemy_worker.worker_invoker'] = $app->share(function (Application $app) use ($loggerSetter) {
return $loggerSetter(new WorkerInvoker($app['alchemy_worker.process_pool']));
});
// register workers
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::SUBDEF_CREATION_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new SubdefCreationWorker(
$app['subdef.generator'],
$app['alchemy_worker.message.publisher'],
$app['alchemy_worker.logger'],
$app['dispatcher'],
$app['phraseanet.filesystem'],
$app['repo.worker-running-job']
))
->setApplicationBox($app['phraseanet.appbox'])
->setEntityManagerLocator(new LazyLocator($app, 'orm.em'))
;
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::WRITE_METADATAS_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new WriteMetadatasWorker(
$app['exiftool.writer'],
$app['alchemy_worker.logger'],
$app['alchemy_worker.message.publisher'],
$app['repo.worker-running-job']
))
->setApplicationBox($app['phraseanet.appbox'])
->setDispatcher($app['dispatcher'])
->setEntityManagerLocator(new LazyLocator($app, 'orm.em'))
;
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::EXPORT_MAIL_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new ExportMailWorker($app))
->setDelivererLocator(new LazyLocator($app, 'notification.deliverer'));
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::ASSETS_INGEST_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new AssetsIngestWorker($app))
->setEntityManagerLocator(new LazyLocator($app, 'orm.em'));
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::WEBHOOK_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new WebhookWorker($app))
->setDispatcher($app['dispatcher']);
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::CREATE_RECORD_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new CreateRecordWorker($app))
->setApplicationBox($app['phraseanet.appbox'])
->setBorderManagerLocator(new LazyLocator($app, 'border-manager'))
->setEntityManagerLocator(new LazyLocator($app, 'orm.em'))
->setFileSystemLocator(new LazyLocator($app, 'filesystem'))
->setTemporaryFileSystemLocator(new LazyLocator($app, 'temporary-filesystem'))
->setDispatcher($app['dispatcher']);
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::POPULATE_INDEX_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new PopulateIndexWorker($app['alchemy_worker.message.publisher'], $app['elasticsearch.indexer']))
->setApplicationBox($app['phraseanet.appbox'])
->setDispatcher($app['dispatcher']);
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::PULL_ASSETS_TYPE, new CallableWorkerFactory(function () use ($app) {
return new PullAssetsWorker($app['alchemy_worker.message.publisher']);
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::DELETE_RECORD_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new DeleteRecordWorker())
->setApplicationBox($app['phraseanet.appbox']);
}));
}
/**
* {@inheritdoc}
*/
public function boot(Application $app)
{
}
/**
* {@inheritdoc}
*/
public static function create(PhraseaApplication $app)
{
return new static();
}
}

View File

@@ -0,0 +1,103 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Provider;
use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\ControllerProvider\ControllerProviderTrait;
use Alchemy\Phrasea\Security\Firewall;
use Alchemy\Phrasea\WorkerManager\Controller\AdminConfigurationController;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Silex\Application;
use Silex\ControllerProviderInterface;
use Silex\ServiceProviderInterface;
use Symfony\Component\HttpFoundation\Request;
class ControllerServiceProvider implements ControllerProviderInterface, ServiceProviderInterface
{
use ControllerProviderTrait;
/**
* {@inheritdoc}
*/
public function register(Application $app)
{
$app['controller.worker.admin.configuration'] = $app->share(function (PhraseaApplication $app) {
return new AdminConfigurationController($app);
});
// example of route to check webhook
$app->post('/webhook', array($this, 'getWebhookData'));
}
/**
* {@inheritdoc}
*/
public function boot(Application $app)
{
}
public function connect(Application $app)
{
$controllers = $this->createAuthenticatedCollection($app);
$firewall = $this->getFirewall($app);
$controllers->before(function () use ($firewall) {
$firewall->requireRight(\ACL::TASKMANAGER);
});
$controllers->match('/', 'controller.worker.admin.configuration:indexAction')
->method('GET')
->bind('worker_admin');
$controllers->match('/configuration', 'controller.worker.admin.configuration:configurationAction')
->method('GET|POST')
->bind('worker_admin_configuration');
$controllers->match('/searchengine', 'controller.worker.admin.configuration:searchengineAction')
->method('GET|POST')
->bind('worker_admin_searchengine');
$controllers->match('/subview', 'controller.worker.admin.configuration:subviewAction')
->method('GET|POST')
->bind('worker_admin_subview');
$controllers->match('/metadata', 'controller.worker.admin.configuration:metadataAction')
->method('GET|POST')
->bind('worker_admin_metadata');
$controllers->get('/populate-status', 'controller.worker.admin.configuration:populateStatusAction')
->bind('worker_admin_populate_status');
$controllers->match('/pull-assets', 'controller.worker.admin.configuration:pullAssetsAction')
->method('GET|POST')
->bind('worker_admin_pullAssets');
return $controllers;
}
public function getWebhookData(Application $app, Request $request)
{
$messagePubliser = $this->getMessagePublisher($app);
$messagePubliser->pushLog("RECEIVED ON phraseanet WEBHOOK URL TEST = ". $request->getUri() . " DATA : ". $request->getContent());
return 0;
}
/**
* @param Application $app
* @return Firewall
*/
private function getFirewall(Application $app)
{
return $app['firewall'];
}
/**
* @param Application $app
* @return MessagePublisher
*/
private function getMessagePublisher(Application $app)
{
return $app['alchemy_worker.message.publisher'];
}
}

View File

@@ -0,0 +1,95 @@
<?php
/*
* This file is part of Phraseanet graylog plugin
*
* (c) 2005-2019 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\WorkerManager\Provider;
use Alchemy\Phrasea\Model\Manipulator\WebhookEventManipulator;
use Alchemy\Phrasea\Plugin\PluginProviderInterface;
use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection;
use Alchemy\Phrasea\WorkerManager\Queue\MessageHandler;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Alchemy\Phrasea\WorkerManager\Queue\WebhookPublisher;
use Alchemy\Phrasea\WorkerManager\Subscriber\AssetsIngestSubscriber;
use Alchemy\Phrasea\WorkerManager\Subscriber\ExportSubscriber;
use Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber;
use Alchemy\Phrasea\WorkerManager\Subscriber\SearchengineSubscriber;
use Alchemy\Phrasea\WorkerManager\Subscriber\WebhookSubscriber;
use Silex\Application;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class QueueWorkerServiceProvider implements PluginProviderInterface
{
/**
* {@inheritdoc}
*/
public function register(Application $app)
{
$app['alchemy_worker.amqp.connection'] = $app->share(function (Application $app) {
return new AMQPConnection($app['conf']);
});
$app['alchemy_worker.message.handler'] = $app->share(function (Application $app) {
return new MessageHandler($app['alchemy_worker.message.publisher']);
});
$app['alchemy_worker.message.publisher'] = $app->share(function (Application $app) {
return new MessagePublisher($app['alchemy_worker.amqp.connection'], $app['alchemy_worker.logger']);
});
$app['alchemy_worker.webhook.publisher'] = $app->share(function (Application $app) {
return new WebhookPublisher($app['alchemy_worker.message.publisher']);
});
$app['manipulator.webhook-event'] = $app->share(function (Application $app) {
return new WebhookEventManipulator(
$app['orm.em'],
$app['repo.webhook-event'],
$app['alchemy_worker.webhook.publisher']
);
});
$app['dispatcher'] = $app->share(
$app->extend('dispatcher', function (EventDispatcherInterface $dispatcher, Application $app) {
$dispatcher->addSubscriber(
(new RecordSubscriber($app)
)->setApplicationBox($app['phraseanet.appbox'])
);
$dispatcher->addSubscriber(new ExportSubscriber($app['alchemy_worker.message.publisher']));
$dispatcher->addSubscriber(new AssetsIngestSubscriber($app['alchemy_worker.message.publisher']));
$dispatcher->addSubscriber(new SearchengineSubscriber($app['alchemy_worker.message.publisher']));
$dispatcher->addSubscriber(new WebhookSubscriber($app['alchemy_worker.message.publisher']));
return $dispatcher;
})
);
}
/**
* {@inheritdoc}
*/
public function boot(Application $app)
{
}
/**
* {@inheritdoc}
*/
public static function create(PhraseaApplication $app)
{
return new static();
}
}

View File

@@ -0,0 +1,219 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Queue;
use Alchemy\Phrasea\Core\Configuration\PropertyAccess;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
class AMQPConnection
{
const ALCHEMY_EXCHANGE = 'alchemy-exchange';
const RETRY_ALCHEMY_EXCHANGE = 'retry-alchemy-exchange';
/** @var AMQPStreamConnection */
private $connection;
/** @var AMQPChannel */
private $channel;
private $hostConfig;
private $conf;
public static $defaultQueues = [
MessagePublisher::WRITE_METADATAS_TYPE => MessagePublisher::METADATAS_QUEUE,
MessagePublisher::SUBDEF_CREATION_TYPE => MessagePublisher::SUBDEF_QUEUE,
MessagePublisher::EXPORT_MAIL_TYPE => MessagePublisher::EXPORT_QUEUE,
MessagePublisher::WEBHOOK_TYPE => MessagePublisher::WEBHOOK_QUEUE,
MessagePublisher::ASSETS_INGEST_TYPE => MessagePublisher::ASSETS_INGEST_QUEUE,
MessagePublisher::CREATE_RECORD_TYPE => MessagePublisher::CREATE_RECORD_QUEUE,
MessagePublisher::PULL_QUEUE => MessagePublisher::PULL_QUEUE,
MessagePublisher::POPULATE_INDEX_TYPE => MessagePublisher::POPULATE_INDEX_QUEUE,
MessagePublisher::DELETE_RECORD_TYPE => MessagePublisher::DELETE_RECORD_QUEUE
];
// the corresponding worker queues and retry queues, loop queue
public static $defaultRetryQueues = [
MessagePublisher::METADATAS_QUEUE => MessagePublisher::RETRY_METADATAS_QUEUE,
MessagePublisher::SUBDEF_QUEUE => MessagePublisher::RETRY_SUBDEF_QUEUE,
MessagePublisher::EXPORT_QUEUE => MessagePublisher::RETRY_EXPORT_QUEUE,
MessagePublisher::WEBHOOK_QUEUE => MessagePublisher::RETRY_WEBHOOK_QUEUE,
MessagePublisher::ASSETS_INGEST_QUEUE => MessagePublisher::RETRY_ASSETS_INGEST_QUEUE,
MessagePublisher::CREATE_RECORD_QUEUE => MessagePublisher::RETRY_CREATE_RECORD_QUEUE,
MessagePublisher::POPULATE_INDEX_QUEUE => MessagePublisher::RETRY_POPULATE_INDEX_QUEUE,
MessagePublisher::PULL_QUEUE => MessagePublisher::LOOP_PULL_QUEUE
];
// default message TTL in retry queue in millisecond
public static $defaultFailedQueues = [
MessagePublisher::WRITE_METADATAS_TYPE => MessagePublisher::FAILED_METADATAS_QUEUE,
MessagePublisher::SUBDEF_CREATION_TYPE => MessagePublisher::FAILED_SUBDEF_QUEUE,
MessagePublisher::EXPORT_MAIL_TYPE => MessagePublisher::FAILED_EXPORT_QUEUE,
MessagePublisher::WEBHOOK_TYPE => MessagePublisher::FAILED_WEBHOOK_QUEUE,
MessagePublisher::ASSETS_INGEST_TYPE => MessagePublisher::FAILED_ASSETS_INGEST_QUEUE,
MessagePublisher::CREATE_RECORD_TYPE => MessagePublisher::FAILED_CREATE_RECORD_QUEUE,
MessagePublisher::POPULATE_INDEX_TYPE => MessagePublisher::FAILED_POPULATE_INDEX_QUEUE
];
public static $defaultDelayedQueues = [
MessagePublisher::METADATAS_QUEUE => MessagePublisher::DELAYED_METADATAS_QUEUE,
MessagePublisher::SUBDEF_QUEUE => MessagePublisher::DELAYED_SUBDEF_QUEUE
];
// default message TTL in retry queue in millisecond
const RETRY_DELAY = 10000;
public function __construct(PropertyAccess $conf)
{
$defaultConfiguration = [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/'
];
$this->hostConfig = $conf->get(['workers', 'queue', 'worker-queue'], $defaultConfiguration);
$this->conf = $conf;
$this->getChannel();
$this->declareExchange();
}
public function getConnection()
{
if (!isset($this->connection)) {
$this->connection = new AMQPStreamConnection(
$this->hostConfig['host'],
$this->hostConfig['port'],
$this->hostConfig['user'],
$this->hostConfig['password'],
$this->hostConfig['vhost']);
}
return $this->connection;
}
public function getChannel()
{
if (!isset($this->channel)) {
$this->channel = $this->getConnection()->channel();
}
return $this->channel;
}
public function declareExchange()
{
$this->channel->exchange_declare(self::ALCHEMY_EXCHANGE, 'direct', false, true, false);
$this->channel->exchange_declare(self::RETRY_ALCHEMY_EXCHANGE, 'direct', false, true, false);
}
/**
* @param $queueName
* @return AMQPChannel
*/
public function setQueue($queueName)
{
if (!isset($this->channel)) {
$this->getChannel();
$this->declareExchange();
}
if (isset(self::$defaultRetryQueues[$queueName])) {
$this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([
'x-dead-letter-exchange' => self::RETRY_ALCHEMY_EXCHANGE, // the exchange to which republish a 'dead' message
'x-dead-letter-routing-key' => self::$defaultRetryQueues[$queueName] // the routing key to apply to this 'dead' message
]));
$this->channel->queue_bind($queueName, self::ALCHEMY_EXCHANGE, $queueName);
// declare also the corresponding retry queue
// use this to delay the delivery of a message to the alchemy-exchange
$this->channel->queue_declare(self::$defaultRetryQueues[$queueName], false, true, false, false, false, new AMQPTable([
'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE,
'x-dead-letter-routing-key' => $queueName,
'x-message-ttl' => $this->getTtlPerRouting($queueName)
]));
$this->channel->queue_bind(self::$defaultRetryQueues[$queueName], AMQPConnection::RETRY_ALCHEMY_EXCHANGE, self::$defaultRetryQueues[$queueName]);
} elseif (in_array($queueName, self::$defaultRetryQueues)) {
// if it's a retry queue
$routing = array_search($queueName, AMQPConnection::$defaultRetryQueues);
$this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([
'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE,
'x-dead-letter-routing-key' => $routing,
'x-message-ttl' => $this->getTtlPerRouting($routing)
]));
$this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName);
} elseif (in_array($queueName, self::$defaultFailedQueues)) {
// if it's a failed queue
$this->channel->queue_declare($queueName, false, true, false, false, false);
$this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName);
} elseif (in_array($queueName, self::$defaultDelayedQueues)) {
// if it's a delayed queue
$routing = array_search($queueName, AMQPConnection::$defaultDelayedQueues);
$this->channel->queue_declare($queueName, false, true, false, false, false, new AMQPTable([
'x-dead-letter-exchange' => AMQPConnection::ALCHEMY_EXCHANGE,
'x-dead-letter-routing-key' => $routing,
'x-message-ttl' => 5000
]));
$this->channel->queue_bind($queueName, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName);
} else {
$this->channel->queue_declare($queueName, false, true, false, false, false);
$this->channel->queue_bind($queueName, AMQPConnection::ALCHEMY_EXCHANGE, $queueName);
}
return $this->channel;
}
public function reinitializeQueue(array $queuNames)
{
foreach ($queuNames as $queuName) {
if (in_array($queuName, self::$defaultQueues)) {
$this->channel->queue_purge($queuName);
} else {
$this->channel->queue_delete($queuName);
}
if (isset(self::$defaultRetryQueues[$queuName])) {
$this->channel->queue_delete(self::$defaultRetryQueues[$queuName]);
}
$this->setQueue($queuName);
}
}
public function connectionClose()
{
$this->channel->close();
$this->connection->close();
}
/**
* @param $routing
* @return int
*/
private function getTtlPerRouting($routing)
{
$config = $this->conf->get(['workers']);
if ($routing == MessagePublisher::PULL_QUEUE &&
isset($config['pull_assets']) &&
isset($config['pull_assets']['pullInterval']) ) {
// convert in milli second
return (int)($config['pull_assets']['pullInterval']) * 1000;
} elseif (isset($config['retry_queue']) &&
isset($config['retry_queue'][array_search($routing, AMQPConnection::$defaultQueues)])) {
return (int)($config['retry_queue'][array_search($routing, AMQPConnection::$defaultQueues)]);
}
return self::RETRY_DELAY;
}
}

View File

@@ -0,0 +1,111 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Queue;
use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use Ramsey\Uuid\Uuid;
class MessageHandler
{
const MAX_OF_TRY = 3;
private $messagePublisher;
public function __construct(MessagePublisher $messagePublisher)
{
$this->messagePublisher = $messagePublisher;
}
public function consume(AMQPConnection $serverConnection, WorkerInvoker $workerInvoker, $argQueueName, $maxProcesses)
{
$publisher = $this->messagePublisher;
$channel = $serverConnection->getChannel();
// define consume callbacks
$callback = function (AMQPMessage $message) use ($channel, $workerInvoker, $publisher) {
$data = json_decode($message->getBody(), true);
$count = 0;
if ($message->has('application_headers')) {
/** @var AMQPTable $headers */
$headers = $message->get('application_headers');
$headerData = $headers->getNativeData();
if (isset($headerData['x-death'])) {
$xDeathHeader = $headerData['x-death'];
foreach ($xDeathHeader as $xdeath) {
$queue = $xdeath['queue'];
if (!in_array($queue, AMQPConnection::$defaultQueues)) {
continue;
}
$count = $xdeath['count'];
$data['payload']['count'] = $count;
}
}
}
// if message is yet executed 3 times, save the unprocessed message in the corresponding failed queues
if ($count > self::MAX_OF_TRY && $data['message_type'] != MessagePublisher::PULL_ASSETS_TYPE) {
$this->messagePublisher->publishFailedMessage($data['payload'], $headers, AMQPConnection::$defaultFailedQueues[$data['message_type']]);
$logMessage = sprintf("Rabbit message executed 3 times, it's to be saved in %s , payload >>> %s",
AMQPConnection::$defaultFailedQueues[$data['message_type']],
json_encode($data['payload'])
);
$this->messagePublisher->pushLog($logMessage);
$channel->basic_ack($message->delivery_info['delivery_tag']);
} else {
try {
$workerInvoker->invokeWorker($data['message_type'], json_encode($data['payload']));
if ($data['message_type'] == MessagePublisher::PULL_ASSETS_TYPE) {
// make a loop for the pull assets
$channel->basic_nack($message->delivery_info['delivery_tag']);
} else {
$channel->basic_ack($message->delivery_info['delivery_tag']);
}
$oldPayload = $data['payload'];
$message = $data['message_type'].' to be consumed! >> Payload ::'. json_encode($oldPayload);
$publisher->pushLog($message);
} catch (\Exception $e) {
$channel->basic_nack($message->delivery_info['delivery_tag']);
}
}
};
$prefetchCount = ProcessPool::MAX_PROCESSES;
if ($maxProcesses) {
$prefetchCount = $maxProcesses;
}
foreach (AMQPConnection::$defaultQueues as $queueName) {
if ($argQueueName ) {
if (in_array($queueName, $argQueueName)) {
$serverConnection->setQueue($queueName);
// give prefetch message to a worker consumer at a time
$channel->basic_qos(null, $prefetchCount, null);
$channel->basic_consume($queueName, Uuid::uuid4(), false, false, false, false, $callback);
}
} else {
$serverConnection->setQueue($queueName);
// give prefetch message to a worker consumer at a time
$channel->basic_qos(null, $prefetchCount, null);
$channel->basic_consume($queueName, Uuid::uuid4(), false, false, false, false, $callback);
}
}
}
}

View File

@@ -0,0 +1,142 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Queue;
use Monolog\Logger;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use Psr\Log\LoggerInterface;
class MessagePublisher
{
const EXPORT_MAIL_TYPE = 'exportMail';
const SUBDEF_CREATION_TYPE = 'subdefCreation';
const WRITE_METADATAS_TYPE = 'writeMetadatas';
const ASSETS_INGEST_TYPE = 'assetsIngest';
const CREATE_RECORD_TYPE = 'createRecord';
const DELETE_RECORD_TYPE = 'deleteRecord';
const WEBHOOK_TYPE = 'webhook';
const POPULATE_INDEX_TYPE = 'populateIndex';
const PULL_ASSETS_TYPE = 'pullAssets';
// worker queue to be consumed, when no ack , it is requeued to the retry queue
const EXPORT_QUEUE = 'export-queue';
const SUBDEF_QUEUE = 'subdef-queue';
const METADATAS_QUEUE = 'metadatas-queue';
const WEBHOOK_QUEUE = 'webhook-queue';
const ASSETS_INGEST_QUEUE = 'ingest-queue';
const CREATE_RECORD_QUEUE = 'createrecord-queue';
const DELETE_RECORD_QUEUE = 'deleterecord-queue';
const POPULATE_INDEX_QUEUE = 'populateindex-queue';
const PULL_QUEUE = 'pull-queue';
// retry queue
// we can use these retry queue with TTL, so when message expires it is requeued to the corresponding worker queue
const RETRY_EXPORT_QUEUE = 'retry-export-queue';
const RETRY_SUBDEF_QUEUE = 'retry-subdef-queue';
const RETRY_METADATAS_QUEUE = 'retry-metadatas-queue';
const RETRY_WEBHOOK_QUEUE = 'retry-webhook-queue';
const RETRY_ASSETS_INGEST_QUEUE = 'retry-ingest-queue';
const RETRY_CREATE_RECORD_QUEUE = 'retry-createrecord-queue';
const RETRY_POPULATE_INDEX_QUEUE = 'retry-populateindex-queue';
// use this queue to make a loop on a consumer
const LOOP_PULL_QUEUE = 'loop-pull-queue';
// all failed queue, if message is treated over 3 times it goes to the failed queue
const FAILED_EXPORT_QUEUE = 'failed-export-queue';
const FAILED_SUBDEF_QUEUE = 'failed-subdef-queue';
const FAILED_METADATAS_QUEUE = 'failed-metadatas-queue';
const FAILED_WEBHOOK_QUEUE = 'failed-webhook-queue';
const FAILED_ASSETS_INGEST_QUEUE = 'failed-ingest-queue';
const FAILED_CREATE_RECORD_QUEUE = 'failed-createrecord-queue';
const FAILED_POPULATE_INDEX_QUEUE = 'failed-populateindex-queue';
// delayed queue when record is locked
const DELAYED_SUBDEF_QUEUE = 'delayed-subdef-queue';
const DELAYED_METADATAS_QUEUE = 'delayed-metadatas-queue';
const NEW_RECORD_MESSAGE = 'newrecord';
/** @var AMQPConnection $serverConnection */
private $serverConnection;
/** @var Logger */
private $logger;
public function __construct(AMQPConnection $serverConnection, LoggerInterface $logger)
{
$this->serverConnection = $serverConnection;
$this->logger = $logger;
}
public function publishMessage(array $payload, $queueName, $retryCount = null, $workerMessage = '')
{
$msg = new AMQPMessage(json_encode($payload));
$routing = array_search($queueName, AMQPConnection::$defaultRetryQueues);
if (count($retryCount) && $routing != false) {
// add a message header information
$headers = new AMQPTable([
'x-death' => [
[
'count' => $retryCount,
'exchange' => AMQPConnection::ALCHEMY_EXCHANGE,
'queue' => $routing,
'routing-keys' => $routing,
'reason' => 'rejected', // rejected is sended like nack
'time' => new \DateTime('now', new \DateTimeZone('UTC'))
]
],
'worker-message' => $workerMessage
]);
$msg->set('application_headers', $headers);
}
$channel = $this->serverConnection->setQueue($queueName);
$exchange = in_array($queueName, AMQPConnection::$defaultQueues) ? AMQPConnection::ALCHEMY_EXCHANGE : AMQPConnection::RETRY_ALCHEMY_EXCHANGE;
$channel->basic_publish($msg, $exchange, $queueName);
return true;
}
public function initializePullAssets()
{
$payload = [
'message_type' => self::PULL_ASSETS_TYPE,
'payload' => [
'initTimestamp' => new \DateTime('now', new \DateTimeZone('UTC'))
]
];
$this->publishMessage($payload, self::PULL_QUEUE);
}
public function connectionClose()
{
$this->serverConnection->connectionClose();
}
/**
* @param $message
* @param string $method
* @param array $context
*/
public function pushLog($message, $method = 'info', $context = [])
{
// write logs directly in file
call_user_func(array($this->logger, $method), $message, $context);
}
public function publishFailedMessage(array $payload, AMQPTable $headers, $queueName)
{
$msg = new AMQPMessage(json_encode($payload));
$msg->set('application_headers', $headers);
$channel = $this->serverConnection->setQueue($queueName);
$channel->basic_publish($msg, AMQPConnection::RETRY_ALCHEMY_EXCHANGE, $queueName);
}
}

View File

@@ -0,0 +1,29 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Queue;
use Alchemy\Phrasea\Model\Entities\WebhookEvent;
use Alchemy\Phrasea\Webhook\WebhookPublisherInterface;
class WebhookPublisher implements WebhookPublisherInterface
{
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
public function __construct(MessagePublisher $messagePublisher)
{
$this->messagePublisher = $messagePublisher;
}
public function publishWebhookEvent(WebhookEvent $event)
{
$payload = [
'message_type' => MessagePublisher::WEBHOOK_TYPE,
'payload' => [
'id' => $event->getId()
]
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::WEBHOOK_QUEUE);
}
}

View File

@@ -0,0 +1,70 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Subscriber;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreateEvent;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationRecordFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class AssetsIngestSubscriber implements EventSubscriberInterface
{
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
public function __construct(MessagePublisher $messagePublisher)
{
$this->messagePublisher = $messagePublisher;
}
public function onAssetsCreate(AssetsCreateEvent $event)
{
$payload = [
'message_type' => MessagePublisher::ASSETS_INGEST_TYPE,
'payload' => $event->getData()
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE);
}
public function onAssetsCreationFailure(AssetsCreationFailureEvent $event)
{
$payload = [
'message_type' => MessagePublisher::ASSETS_INGEST_TYPE,
'payload' => $event->getPayload()
];
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_ASSETS_INGEST_QUEUE,
$event->getCount(),
$event->getWorkerMessage()
);
}
public function onAssetsCreationRecordFailure(AssetsCreationRecordFailureEvent $event)
{
$payload = [
'message_type' => MessagePublisher::CREATE_RECORD_TYPE,
'payload' => $event->getPayload()
];
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_CREATE_RECORD_QUEUE,
$event->getCount(),
$event->getWorkerMessage()
);
}
public static function getSubscribedEvents()
{
return [
WorkerEvents::ASSETS_CREATE => 'onAssetsCreate',
WorkerEvents::ASSETS_CREATION_FAILURE => 'onAssetsCreationFailure',
WorkerEvents::ASSETS_CREATION_RECORD_FAILURE => 'onAssetsCreationRecordFailure'
];
}
}

View File

@@ -0,0 +1,64 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Subscriber;
use Alchemy\Phrasea\Core\Event\ExportMailEvent;
use Alchemy\Phrasea\Core\PhraseaEvents;
use Alchemy\Phrasea\WorkerManager\Event\ExportMailFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class ExportSubscriber implements EventSubscriberInterface
{
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
public function __construct(MessagePublisher $messagePublisher)
{
$this->messagePublisher = $messagePublisher;
}
public function onExportMailCreate(ExportMailEvent $event)
{
$payload = [
'message_type' => MessagePublisher::EXPORT_MAIL_TYPE,
'payload' => [
'emitterUserId' => $event->getEmitterUserId(),
'tokenValue' => $event->getTokenValue(),
'destinationMails' => serialize($event->getDestinationMails()),
'params' => serialize($event->getParams())
]
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::EXPORT_QUEUE);
}
public function onExportMailFailure(ExportMailFailureEvent $event)
{
$payload = [
'message_type' => MessagePublisher::EXPORT_MAIL_TYPE,
'payload' => [
'emitterUserId' => $event->getEmitterUserId(),
'tokenValue' => $event->getTokenValue(),
'destinationMails' => serialize($event->getDestinationMails()),
'params' => serialize($event->getParams())
]
];
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_EXPORT_QUEUE,
$event->getCount(),
$event->getWorkerMessage()
);
}
public static function getSubscribedEvents()
{
return [
PhraseaEvents::EXPORT_MAIL_CREATE => 'onExportMailCreate',
WorkerEvents::EXPORT_MAIL_FAILURE => 'onExportMailFailure'
];
}
}

View File

@@ -0,0 +1,268 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Subscriber;
use Alchemy\Phrasea\Application;
use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
use Alchemy\Phrasea\Core\Event\Record\DeletedEvent;
use Alchemy\Phrasea\Core\Event\Record\DeleteEvent;
use Alchemy\Phrasea\Core\Event\Record\MetadataChangedEvent;
use Alchemy\Phrasea\Core\Event\Record\RecordEvent;
use Alchemy\Phrasea\Core\Event\Record\RecordEvents;
use Alchemy\Phrasea\Core\Event\Record\SubdefinitionCreateEvent;
use Alchemy\Phrasea\Databox\Subdef\MediaSubdefRepository;
use Alchemy\Phrasea\WorkerManager\Event\StoryCreateCoverEvent;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionCreationFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Alchemy\Phrasea\WorkerManager\Worker\CreateRecordWorker;
use Alchemy\Phrasea\WorkerManager\Worker\Factory\WorkerFactoryInterface;
use Alchemy\Phrasea\WorkerManager\Worker\Resolver\TypeBasedWorkerResolver;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class RecordSubscriber implements EventSubscriberInterface
{
use ApplicationBoxAware;
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
/** @var TypeBasedWorkerResolver $workerResolver*/
private $workerResolver;
/** @var Application */
private $app;
public function __construct(Application $app)
{
$this->messagePublisher = $app['alchemy_worker.message.publisher'];
$this->workerResolver = $app['alchemy_worker.type_based_worker_resolver'];
$this->app = $app;
}
public function onSubdefinitionCreate(SubdefinitionCreateEvent $event)
{
$record = $this->findDataboxById($event->getRecord()->getDataboxId())->get_record($event->getRecord()->getRecordId());
$subdefs = $record->getDatabox()->get_subdef_structure()->getSubdefGroup($record->getType());
foreach ($subdefs as $subdef) {
$payload = [
'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE,
'payload' => [
'recordId' => $event->getRecord()->getRecordId(),
'databoxId' => $event->getRecord()->getDataboxId(),
'subdefName' => $subdef->get_name(),
'status' => $event->isNewRecord() ? MessagePublisher::NEW_RECORD_MESSAGE : ''
]
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::SUBDEF_QUEUE);
}
}
public function onDelete(DeleteEvent $event)
{
// first remove record from the grid answer, so first delete the record in the index elastic
$this->app['dispatcher']->dispatch(RecordEvents::DELETED, new DeletedEvent($event->getRecord()));
// publish payload to queue
$payload = [
'message_type' => MessagePublisher::DELETE_RECORD_TYPE,
'payload' => [
'recordId' => $event->getRecord()->getRecordId(),
'databoxId' => $event->getRecord()->getDataboxId(),
]
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::DELETE_RECORD_QUEUE);
}
public function onSubdefinitionCreationFailure(SubdefinitionCreationFailureEvent $event)
{
$payload = [
'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE,
'payload' => [
'recordId' => $event->getRecord()->getRecordId(),
'databoxId' => $event->getRecord()->getDataboxId(),
'subdefName' => $event->getSubdefName(),
'status' => ''
]
];
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_SUBDEF_QUEUE,
$event->getCount(),
$event->getWorkerMessage()
);
}
public function onRecordCreated(RecordEvent $event)
{
$this->messagePublisher->pushLog(sprintf('The %s= %d was successfully created',
($event->getRecord()->isStory() ? "story story_id" : "record record_id"),
$event->getRecord()->getRecordId()
));
}
public function onMetadataChanged(MetadataChangedEvent $event)
{
$databoxId = $event->getRecord()->getDataboxId();
$recordId = $event->getRecord()->getRecordId();
$mediaSubdefRepository = $this->getMediaSubdefRepository($databoxId);
$mediaSubdefs = $mediaSubdefRepository->findByRecordIdsAndNames([$recordId]);
$databox = $this->findDataboxById($databoxId);
$record = $databox->get_record($recordId);
$type = $record->getType();
foreach ($mediaSubdefs as $subdef) {
// check subdefmetadatarequired from the subview setup in admin
if ( $subdef->get_name() == 'document' || $this->isSubdefMetadataUpdateRequired($databox, $type, $subdef->get_name())) {
if ($subdef->is_physically_present()) {
$payload = [
'message_type' => MessagePublisher::WRITE_METADATAS_TYPE,
'payload' => [
'recordId' => $recordId,
'databoxId' => $databoxId,
'subdefName' => $subdef->get_name()
]
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::METADATAS_QUEUE);
} else {
$payload = [
'message_type' => MessagePublisher::WRITE_METADATAS_TYPE,
'payload' => [
'recordId' => $recordId,
'databoxId' => $databoxId,
'subdefName' => $subdef->get_name()
]
];
$logMessage = sprintf("Subdef %s is not physically present! to be passed in the %s ! payload >>> %s",
$subdef->get_name(),
MessagePublisher::RETRY_METADATAS_QUEUE,
json_encode($payload)
);
$this->messagePublisher->pushLog($logMessage);
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_METADATAS_QUEUE,
2,
'Subdef is not physically present!'
);
}
}
}
}
public function onStoryCreateCover(StoryCreateCoverEvent $event)
{
/** @var WorkerFactoryInterface[] $factories */
$factories = $this->workerResolver->getFactories();
/** @var CreateRecordWorker $createRecordWorker */
$createRecordWorker = $factories[MessagePublisher::CREATE_RECORD_TYPE]->createWorker();
$createRecordWorker->setStoryCover($event->getData());
}
public function onSubdefinitionWritemeta(SubdefinitionWritemetaEvent $event)
{
if ($event->getStatus() == SubdefinitionWritemetaEvent::FAILED) {
$payload = [
'message_type' => MessagePublisher::WRITE_METADATAS_TYPE,
'payload' => [
'recordId' => $event->getRecord()->getRecordId(),
'databoxId' => $event->getRecord()->getDataboxId(),
'subdefName' => $event->getSubdefName()
]
];
$logMessage = sprintf("Subdef %s write meta failed, error : %s ! to be passed in the %s ! payload >>> %s",
$event->getSubdefName(),
$event->getWorkerMessage(),
MessagePublisher::RETRY_METADATAS_QUEUE,
json_encode($payload)
);
$this->messagePublisher->pushLog($logMessage);
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_METADATAS_QUEUE,
$event->getCount(),
$event->getWorkerMessage()
);
} else {
$databoxId = $event->getRecord()->getDataboxId();
$recordId = $event->getRecord()->getRecordId();
$databox = $this->findDataboxById($databoxId);
$record = $databox->get_record($recordId);
$type = $record->getType();
$subdef = $record->get_subdef($event->getSubdefName());
// only the required writemetadata from admin > subview setup is to be writing
if ($subdef->get_name() == 'document' || $this->isSubdefMetadataUpdateRequired($databox, $type, $subdef->get_name())) {
$payload = [
'message_type' => MessagePublisher::WRITE_METADATAS_TYPE,
'payload' => [
'recordId' => $recordId,
'databoxId' => $databoxId,
'subdefName' => $event->getSubdefName()
]
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::METADATAS_QUEUE);
}
}
}
public static function getSubscribedEvents()
{
return [
RecordEvents::CREATED => 'onRecordCreated',
RecordEvents::SUBDEFINITION_CREATE => 'onSubdefinitionCreate',
RecordEvents::DELETE => 'onDelete',
WorkerEvents::SUBDEFINITION_CREATION_FAILURE => 'onSubdefinitionCreationFailure',
RecordEvents::METADATA_CHANGED => 'onMetadataChanged',
WorkerEvents::STORY_CREATE_COVER => 'onStoryCreateCover',
WorkerEvents::SUBDEFINITION_WRITE_META => 'onSubdefinitionWritemeta'
];
}
/**
* @param $databoxId
*
* @return MediaSubdefRepository
*/
private function getMediaSubdefRepository($databoxId)
{
return $this->app['provider.repo.media_subdef']->getRepositoryForDatabox($databoxId);
}
/**
* @param \databox $databox
* @param string $subdefType
* @param string $subdefName
* @return bool
*/
private function isSubdefMetadataUpdateRequired(\databox $databox, $subdefType, $subdefName)
{
if ($databox->get_subdef_structure()->hasSubdef($subdefType, $subdefName)) {
return $databox->get_subdef_structure()->get_subdef($subdefType, $subdefName)->isMetadataUpdateRequired();
}
return false;
}
}

View File

@@ -0,0 +1,69 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Subscriber;
use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexEvent;
use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class SearchengineSubscriber implements EventSubscriberInterface
{
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
public function __construct(MessagePublisher $messagePublisher)
{
$this->messagePublisher = $messagePublisher;
}
public function onPopulateIndex(PopulateIndexEvent $event)
{
$populateInfo = $event->getData();
// make payload per databoxId
foreach ($populateInfo['databoxIds'] as $databoxId) {
$payload = [
'message_type' => MessagePublisher::POPULATE_INDEX_TYPE,
'payload' => [
'host' => $populateInfo['host'],
'port' => $populateInfo['port'],
'indexName' => $populateInfo['indexName'],
'databoxId' => $databoxId
]
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::POPULATE_INDEX_QUEUE);
}
}
public function onPopulateIndexFailure(PopulateIndexFailureEvent $event)
{
$payload = [
'message_type' => MessagePublisher::POPULATE_INDEX_TYPE,
'payload' => [
'host' => $event->getHost(),
'port' => $event->getPort(),
'indexName' => $event->getIndexName(),
'databoxId' => $event->getDataboxId(),
]
];
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_POPULATE_INDEX_QUEUE,
$event->getCount(),
$event->getWorkerMessage()
);
}
public static function getSubscribedEvents()
{
return [
WorkerEvents::POPULATE_INDEX => 'onPopulateIndex',
WorkerEvents::POPULATE_INDEX_FAILURE => 'onPopulateIndexFailure'
];
}
}

View File

@@ -0,0 +1,48 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Subscriber;
use Alchemy\Phrasea\WorkerManager\Event\WebhookDeliverFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class WebhookSubscriber implements EventSubscriberInterface
{
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
public function __construct(MessagePublisher $messagePublisher)
{
$this->messagePublisher = $messagePublisher;
}
public function onWebhookDeliverFailure(WebhookDeliverFailureEvent $event)
{
// count = 0 mean do not retry because no api application defined
if ($event->getCount() != 0) {
$payload = [
'message_type' => MessagePublisher::WEBHOOK_TYPE,
'payload' => [
'id' => $event->getWebhookEventId(),
'delivery_id' => $event->getDeleveryId(),
]
];
$this->messagePublisher->publishMessage(
$payload,
MessagePublisher::RETRY_WEBHOOK_QUEUE,
$event->getCount(),
$event->getWorkerMessage()
);
}
}
public static function getSubscribedEvents()
{
return [
WorkerEvents::WEBHOOK_DELIVER_FAILURE => 'onWebhookDeliverFailure',
];
}
}

View File

@@ -0,0 +1,123 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\EntityManagerAware;
use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\Model\Entities\StoryWZ;
use Alchemy\Phrasea\Model\Repositories\UserRepository;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Model\DBManipulator;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use GuzzleHttp\Client;
class AssetsIngestWorker implements WorkerInterface
{
use EntityManagerAware;
private $app;
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
public function __construct(PhraseaApplication $app)
{
$this->app = $app;
$this->messagePublisher = $this->app['alchemy_worker.message.publisher'];
}
public function process(array $payload)
{
$assets = $payload['assets'];
DBManipulator::saveAssetsList($payload['commit_id'], $assets);
$uploaderClient = new Client(['base_uri' => $payload['base_url']]);
//get first asset informations to check if it's a story
try {
$body = $uploaderClient->get('/assets/'.$assets[0], [
'headers' => [
'Authorization' => 'AssetToken '.$payload['token']
]
])->getBody()->getContents();
} catch(\Exception $e) {
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$this->app['dispatcher']->dispatch(WorkerEvents::ASSETS_CREATION_FAILURE, new AssetsCreationFailureEvent(
$payload,
'Error when getting assets information !' . $e->getMessage(),
$count
));
return;
}
$body = json_decode($body,true);
$storyId = null;
if (!empty($body['formData']['is_story'])) {
$storyId = $this->createStory($body);
}
foreach ($assets as $assetId) {
$createRecordMessage['message_type'] = MessagePublisher::CREATE_RECORD_TYPE;
$createRecordMessage['payload'] = [
'asset' => $assetId,
'publisher' => $payload['publisher'],
'assetToken' => $payload['token'],
'storyId' => $storyId,
'base_url' => $payload['base_url'],
'commit_id' => $payload['commit_id']
];
$this->messagePublisher->publishMessage($createRecordMessage, MessagePublisher::CREATE_RECORD_QUEUE);
}
}
private function createStory(array $body)
{
$storyId = null;
$userRepository = $this->getUserRepository();
$user = null;
if (!empty($body['formData']['phraseanet_submiter_email'])) {
$user = $userRepository->findByEmail($body['formData']['phraseanet_submiter_email']);
}
if ($user === null && !empty($body['formData']['phraseanet_user_submiter_id'])) {
$user = $userRepository->find($body['formData']['phraseanet_user_submiter_id']);
}
if ($user !== null) {
$base_id = $body['formData']['collection_destination'];
$collection = \collection::getByBaseId($this->app, $base_id);
$story = \record_adapter::createStory($this->app, $collection);
$storyId = $story->getRecordId();
$storyWZ = new StoryWZ();
$storyWZ->setUser($user);
$storyWZ->setRecord($story);
$entityManager = $this->getEntityManager();
$entityManager->persist($storyWZ);
$entityManager->flush();
}
return $storyId;
}
/**
* @return UserRepository
*/
private function getUserRepository()
{
return $this->app['repo.users'];
}
}

View File

@@ -0,0 +1,303 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
use Alchemy\Phrasea\Application\Helper\EntityManagerAware;
use Alchemy\Phrasea\Application\Helper\BorderManagerAware;
use Alchemy\Phrasea\Application\Helper\DispatcherAware;
use Alchemy\Phrasea\Application\Helper\FilesystemAware;
use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\Border\Attribute\MetaField;
use Alchemy\Phrasea\Border\Attribute\Status;
use Alchemy\Phrasea\Border\File;
use Alchemy\Phrasea\Border\Visa;
use Alchemy\Phrasea\Core\Event\LazaretEvent;
use Alchemy\Phrasea\Core\Event\RecordEdit;
use Alchemy\Phrasea\Core\PhraseaEvents;
use Alchemy\Phrasea\Media\SubdefSubstituer;
use Alchemy\Phrasea\Model\Entities\LazaretFile;
use Alchemy\Phrasea\Model\Entities\LazaretSession;
use Alchemy\Phrasea\Model\Entities\User;
use Alchemy\Phrasea\Model\Repositories\UserRepository;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationRecordFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Model\DBManipulator;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use GuzzleHttp\Client;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
class CreateRecordWorker implements WorkerInterface
{
use ApplicationBoxAware;
use EntityManagerAware;
use BorderManagerAware;
use DispatcherAware;
use FilesystemAware;
private $app;
private $logger;
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
public function __construct(PhraseaApplication $app)
{
$this->app = $app;
$this->logger = $this->app['alchemy_worker.logger'];
$this->messagePublisher = $this->app['alchemy_worker.message.publisher'];
}
public function process(array $payload)
{
$uploaderClient = new Client(['base_uri' => $payload['base_url']]);
//get asset informations
$body = $uploaderClient->get('/assets/'.$payload['asset'], [
'headers' => [
'Authorization' => 'AssetToken '.$payload['assetToken']
]
])->getBody()->getContents();
$body = json_decode($body,true);
$tempfile = $this->getTemporaryFilesystem()->createTemporaryFile('download_', null, pathinfo($body['originalName'], PATHINFO_EXTENSION));
//download the asset
try {
$res = $uploaderClient->get('/assets/'.$payload['asset'].'/download', [
'headers' => [
'Authorization' => 'AssetToken '.$payload['assetToken']
],
'save_to' => $tempfile
]);
} catch (\Exception $e) {
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
// send to retry queue
$this->dispatch(WorkerEvents::ASSETS_CREATION_RECORD_FAILURE, new AssetsCreationRecordFailureEvent(
$payload,
'Error when downloading assets!',
$count
));
return;
}
if ($res->getStatusCode() !== 200) {
$workerMessage = sprintf('Error %s downloading "%s"', $res->getStatusCode(), $payload['base_url'].'/assets/'.$payload['asset'].'/download');
$this->logger->error($workerMessage);
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
// send to retry queue
$this->dispatch(WorkerEvents::ASSETS_CREATION_RECORD_FAILURE, new AssetsCreationRecordFailureEvent(
$payload,
$workerMessage,
$count
));
return;
}
$remainingAssets = DBManipulator::updateRemainingAssetsListByCommit($payload['commit_id'], $payload['asset']);
// if all assets in the commit are downloaded , send ack to the uploader
if ($remainingAssets == 0) {
// post ack to the uploader
$uploaderClient->post('/commits/' . $payload['commit_id'] . '/ack', [
'headers' => [
'Authorization' => 'AssetToken '.$payload['assetToken']
],
'json' => [
'acknowledged' => true
]
]
);
}
$lazaretSession = new LazaretSession();
$userRepository = $this->getUserRepository();
$user = null;
if (!empty($body['formData']['phraseanet_submiter_email'])) {
$user = $userRepository->findByEmail($body['formData']['phraseanet_submiter_email']);
}
if ($user === null && !empty($body['formData']['phraseanet_user_submiter_id'])) {
$user = $userRepository->find($body['formData']['phraseanet_user_submiter_id']);
}
if ($user !== null) {
$lazaretSession->setUser($user);
}
$this->getEntityManager()->persist($lazaretSession);
$renamedFilename = $tempfile;
$media = $this->app->getMediaFromUri($renamedFilename);
if (!isset($body['formData']['collection_destination'])) {
$this->messagePublisher->pushLog("The collection_destination is not defined");
return ;
}
$base_id = $body['formData']['collection_destination'];
$collection = \collection::getByBaseId($this->app, $base_id);
$sbasId = $collection->get_sbas_id();
$packageFile = new File($this->app, $media, $collection, $body['originalName']);
// get metadata and status
$statusbit = null;
foreach ($body['formData'] as $key => $value) {
if (strstr($key, 'metadata')) {
$tMeta = explode('-', $key);
$metaField = $collection->get_databox()->get_meta_structure()->get_element($tMeta[1]);
$packageFile->addAttribute(new MetaField($metaField, [$value]));
}
if (strstr($key, 'statusbit')) {
$tStatus = explode('-', $key);
$statusbit[$tStatus[1]] = $value;
}
}
if (!is_null($statusbit)) {
$status = '';
foreach (range(0, 31) as $i) {
$status .= isset($statusbit[$i]) ? ($statusbit[$i] ? '1' : '0') : '0';
}
$packageFile->addAttribute(new Status($this->app, strrev($status)));
}
$reasons = [];
$elementCreated = null;
$callback = function ($element, Visa $visa) use (&$reasons, &$elementCreated) {
foreach ($visa->getResponses() as $response) {
if (!$response->isOk()) {
$reasons[] = $response->getMessage($this->app['translator']);
}
}
$elementCreated = $element;
};
$this->getBorderManager()->process($lazaretSession, $packageFile, $callback);
if ($elementCreated instanceof \record_adapter) {
$this->dispatch(PhraseaEvents::RECORD_UPLOAD, new RecordEdit($elementCreated));
} else {
$this->messagePublisher->pushLog(sprintf('The file was moved to the quarantine: %s', json_encode($reasons)));
/** @var LazaretFile $elementCreated */
$this->dispatch(PhraseaEvents::LAZARET_CREATE, new LazaretEvent($elementCreated));
}
// add record in a story if story is defined
if (is_int($payload['storyId']) && $elementCreated instanceof \record_adapter) {
$this->addRecordInStory($user, $elementCreated, $sbasId, $payload['storyId'], $body['formData']);
}
}
/**
* @param string $data databoxId_storyId_recordId subdefName
*/
public function setStoryCover($data)
{
// get databoxId , storyId , recordId
$tData = explode('_', $data);
$record = $this->findDataboxById($tData[0])->get_record($tData[2]);
$story = $this->findDataboxById($tData[0])->get_record($tData[1]);
$subdefName = $tData[3];
$subdef = $record->get_subdef($tData[3]);
$media = $this->app->getMediaFromUri($subdef->getRealPath());
$this->getSubdefSubstituer()->substituteSubdef($story, $subdefName, $media); // subdefName = thumbnail | preview
$this->messagePublisher->pushLog(sprintf("Cover %s set for story story_id= %d with the record record_id = %d", $subdefName, $story->getRecordId(), $record->getRecordId()));
}
/**
* @param $user
* @param \record_adapter $elementCreated
* @param $sbasId
* @param $storyId
* @param $formData
*/
private function addRecordInStory($user, $elementCreated, $sbasId, $storyId, $formData)
{
$story = new \record_adapter($this->app, $sbasId, $storyId);
if (!$this->getAclForUser($user)->has_right_on_base($story->getBaseId(), \ACL::CANMODIFRECORD)) {
$this->messagePublisher->pushLog(sprintf("The user %s can not add document to the story story_id = %d", $user->getLogin(), $story->getRecordId()));
throw new AccessDeniedHttpException('You can not add document to this Story');
}
if (!$story->hasChild($elementCreated)) {
$story->appendChild($elementCreated);
if (SubdefCreationWorker::checkIfFirstChild($story, $elementCreated)) {
// add metadata to the story
$metadatas = [];
foreach ($formData as $key => $value) {
if (strstr($key, 'metadata')) {
$tMeta = explode('-', $key);
$metaField = $elementCreated->getDatabox()->get_meta_structure()->get_element($tMeta[1]);
$metadatas[] = [
'meta_struct_id' => $metaField->get_id(),
'meta_id' => null,
'value' => $value,
];
}
}
$story->set_metadatas($metadatas)->rebuild_subdefs();
}
$this->messagePublisher->pushLog(sprintf('The record record_id= %d was successfully added in the story record_id= %d', $elementCreated->getRecordId(), $story->getRecordId()));
$this->dispatch(PhraseaEvents::RECORD_EDIT, new RecordEdit($story));
}
}
/**
* @return UserRepository
*/
private function getUserRepository()
{
return $this->app['repo.users'];
}
/**
* @param User $user
* @return \ACL
*/
private function getAclForUser(User $user)
{
$aclProvider = $this->app['acl'];
return $aclProvider->get($user);
}
/**
* @return SubdefSubstituer
*/
private function getSubdefSubstituer()
{
return $this->app['subdef.substituer'];
}
}

View File

@@ -0,0 +1,17 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
class DeleteRecordWorker implements WorkerInterface
{
use ApplicationBoxAware;
public function process(array $payload)
{
$record = $this->findDataboxById($payload['databoxId'])->get_record($payload['recordId']);
$record->delete();
}
}

View File

@@ -0,0 +1,102 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application;
use Alchemy\Phrasea\Core\Event\ExportFailureEvent;
use Alchemy\Phrasea\Core\PhraseaEvents;
use Alchemy\Phrasea\Exception\InvalidArgumentException;
use Alchemy\Phrasea\Model\Entities\Token;
use Alchemy\Phrasea\Model\Repositories\TokenRepository;
use Alchemy\Phrasea\Model\Repositories\UserRepository;
use Alchemy\Phrasea\Notification\Emitter;
use Alchemy\Phrasea\Notification\Mail\MailRecordsExport;
use Alchemy\Phrasea\Notification\Receiver;
use Alchemy\Phrasea\WorkerManager\Event\ExportMailFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
class ExportMailWorker implements WorkerInterface
{
use Application\Helper\NotifierAware;
private $app;
public function __construct(Application $app)
{
$this->app = $app;
}
public function process(array $payload)
{
$destMails = unserialize($payload['destinationMails']);
$params = unserialize($payload['params']);
/** @var UserRepository $userRepository */
$userRepository = $this->app['repo.users'];
$user = $userRepository->find($payload['emitterUserId']);
/** @var TokenRepository $tokenRepository */
$tokenRepository = $this->app['repo.tokens'];
/** @var Token $token */
$token = $tokenRepository->findValidToken($payload['tokenValue']);
$list = unserialize($token->getData());
//zip documents
\set_export::build_zip(
$this->app,
$token,
$list,
$this->app['tmp.download.path'].'/'. $token->getValue() . '.zip'
);
$remaingEmails = $destMails;
$emitter = new Emitter($user->getDisplayName(), $user->getEmail());
foreach ($destMails as $key => $mail) {
try {
$receiver = new Receiver(null, trim($mail));
} catch (InvalidArgumentException $e) {
continue;
}
$mail = MailRecordsExport::create($this->app, $receiver, $emitter, $params['textmail']);
$mail->setButtonUrl($params['url']);
$mail->setExpiration($token->getExpiration());
$this->deliver($mail, $params['reading_confirm']);
unset($remaingEmails[$key]);
}
//some mails failed
if (count($remaingEmails) > 0) {
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
// notify to send to the retry queue
$this->app['dispatcher']->dispatch(WorkerEvents::EXPORT_MAIL_FAILURE, new ExportMailFailureEvent(
$payload['emitterUserId'],
$payload['tokenValue'],
$remaingEmails,
$payload['params'],
'some mails failed',
$count
));
foreach ($remaingEmails as $mail) {
$this->app['dispatcher']->dispatch(PhraseaEvents::EXPORT_MAIL_FAILURE, new ExportFailureEvent(
$user,
$params['ssttid'],
$params['lst'],
\eventsmanager_notify_downloadmailfail::MAIL_FAIL,
$mail
)
);
}
}
}
}

View File

@@ -0,0 +1,33 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker\Factory;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInterface;
class CallableWorkerFactory implements WorkerFactoryInterface
{
/**
* @var callable
*/
private $factory;
public function __construct(callable $factory)
{
$this->factory = $factory;
}
/**
* @return WorkerInterface
*/
public function createWorker()
{
$factory = $this->factory;
$worker = $factory();
if (! $worker instanceof WorkerInterface) {
throw new \RuntimeException('Invalid worker created, expected an instance of \Alchemy\Phrasea\WorkerManager\Worker\WorkerInterface');
}
return $worker;
}
}

View File

@@ -0,0 +1,13 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker\Factory;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInterface;
interface WorkerFactoryInterface
{
/**
* @return WorkerInterface
*/
public function createWorker();
}

View File

@@ -0,0 +1,97 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
use Alchemy\Phrasea\Application\Helper\DispatcherAware;
use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Model\DBManipulator;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
class PopulateIndexWorker implements WorkerInterface
{
use ApplicationBoxAware;
use DispatcherAware;
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
/** @var Indexer $indexer */
private $indexer;
public function __construct(MessagePublisher $messagePublisher, Indexer $indexer)
{
$this->indexer = $indexer;
$this->messagePublisher = $messagePublisher;
}
public function process(array $payload)
{
DBManipulator::savePopulateStatus($payload);
/** @var ElasticsearchOptions $options */
$options = $this->indexer->getIndex()->getOptions();
$options->setIndexName($payload['indexName']);
$options->setHost($payload['host']);
$options->setPort($payload['port']);
$databoxId = $payload['databoxId'];
$indexExists = $this->indexer->indexExists();
if (!$indexExists) {
$workerMessage = sprintf("Index %s don't exist!", $payload['indexName']);
$this->messagePublisher->pushLog($workerMessage);
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
// send to retry queue
$this->dispatch(WorkerEvents::POPULATE_INDEX_FAILURE, new PopulateIndexFailureEvent(
$payload['host'],
$payload['port'],
$payload['indexName'],
$payload['databoxId'],
$workerMessage,
$count
));
} else {
$databox = $this->findDataboxById($databoxId);
try {
$r = $this->indexer->populateIndex(Indexer::THESAURUS | Indexer::RECORDS, $databox); // , $temporary);
$this->messagePublisher->pushLog(sprintf(
"Indexation of databox \"%s\" finished in %0.2f sec (Mem. %0.2f Mo)",
$databox->get_dbname(),
$r['duration']/1000,
$r['memory']/1048576
));
} catch(\Exception $e) {
DBManipulator::deletePopulateStatus($payload);
$workerMessage = sprintf("Error on indexing : %s ", $e->getMessage());
$this->messagePublisher->pushLog($workerMessage);
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
// notify to send a retry
$this->dispatch(WorkerEvents::POPULATE_INDEX_FAILURE, new PopulateIndexFailureEvent(
$payload['host'],
$payload['port'],
$payload['indexName'],
$payload['databoxId'],
$workerMessage,
$count
));
}
}
// delete entry in populate_running
DBManipulator::deletePopulateStatus($payload);
}
}

View File

@@ -0,0 +1,100 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\Process\Process;
use Symfony\Component\Process\ProcessBuilder;
class ProcessPool implements LoggerAwareInterface
{
const MAX_PROCESSES = 4;
/**
* @var int
*/
private $maxProcesses = self::MAX_PROCESSES;
/**
* @var Process[]
*/
private $processes = [];
/**
* @var LoggerInterface
*/
private $logger;
public function __construct()
{
$this->logger = new NullLogger();
}
public function setMaxProcesses($maxProcesses)
{
$this->maxProcesses = max(1, $maxProcesses);
}
/**
* Sets a logger instance on the object
*
* @param LoggerInterface $logger
* @return null
*/
public function setLogger(LoggerInterface $logger)
{
$this->logger = $logger;
}
/**
* @param array $processArguments
* @param string|null $workingDirectory
* @return Process
*/
public function getWorkerProcess(array $processArguments, $workingDirectory = null)
{
$this->detachFinishedProcesses();
$this->waitForNextSlot();
$builder = new ProcessBuilder($processArguments);
$builder->setWorkingDirectory($workingDirectory ?: getcwd());
return ($this->processes[] = $builder->getProcess());
}
private function detachFinishedProcesses()
{
$runningProcesses = [];
foreach ($this->processes as $process) {
if ($process->isRunning()) {
$runningProcesses[] = $process;
} else {
$process->stop(0);
}
}
$this->processes = $runningProcesses;
}
private function waitForNextSlot()
{
$this->logger->debug(
sprintf('Checking for available process slot: %d processes found.', count($this->processes))
);
$interval = 1;
while (count($this->processes) >= $this->maxProcesses) {
$this->logger->debug(sprintf('%d Max process count reached, will retry in %d second.', $this->maxProcesses, $interval));
sleep($interval);
$this->detachFinishedProcesses();
$interval = min(10, $interval + 1);
}
}
}

View File

@@ -0,0 +1,138 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\WorkerManager\Configuration\Config;
use Alchemy\Phrasea\WorkerManager\Model\DBManipulator;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use GuzzleHttp\Client;
class PullAssetsWorker implements WorkerInterface
{
private $messagePublisher;
public function __construct(MessagePublisher $messagePublisher)
{
$this->messagePublisher = $messagePublisher;
}
public function process(array $payload)
{
$config = Config::getConfiguration();
if (isset($config['worker_plugin']) && isset($config['worker_plugin']['pull_assets'])) {
$config = $config['worker_plugin']['pull_assets'];
} else {
return;
}
$uploaderClient = new Client();
// if a token exist , use it
if (isset($config['assetToken'])) {
$res = $this->getCommits($uploaderClient, $config);
if ($res == null) {
return;
}
// if Unauthorized get a new token first
if ($res->getStatusCode() == 401) {
if (($config = $this->generateToken($uploaderClient, $config)) === null) {
return;
};
$res = $this->getCommits($uploaderClient, $config);
}
} else { // if there is not a token , get one from the uploader service
if (($config = $this->generateToken($uploaderClient, $config)) === null) {
return;
};
if (($res = $this->getCommits($uploaderClient, $config)) === null) {
return;
}
}
$body = $res->getBody()->getContents();
$body = json_decode($body,true);
$commits = $body['hydra:member'];
$urlInfo = parse_url($config['endpointCommit']);
$baseUrl = $urlInfo['scheme'] . '://' . $urlInfo['host'] .':'.$urlInfo['port'];
foreach ($commits as $commit) {
// send only payload in ingest-queue if the commit is ack false and it is not being creating
if (!$commit['acknowledged'] && !DBManipulator::isCommitToBeCreating($commit['id'])) {
$this->messagePublisher->pushLog("A new commit found in the uploader ! commit_ID : ".$commit['id']);
$payload = [
'message_type' => MessagePublisher::ASSETS_INGEST_TYPE,
'payload' => [
'assets' => array_map(function($asset) {
return str_replace('/assets/', '', $asset);
}, $commit['assets']),
'publisher' => $commit['userId'],
'commit_id' => $commit['id'],
'token' => $commit['token'],
'base_url' => $baseUrl
]
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE);
}
}
}
/**
* @param Client $uploaderClient
* @param array $config
* @return \Psr\Http\Message\ResponseInterface|null
*/
private function getCommits(Client $uploaderClient, array $config)
{
try {
$res = $uploaderClient->get($config['endpointCommit'], [
'headers' => [
'Authorization' => 'AssetToken '.$config['assetToken']
]
]);
} catch(\Exception $e) {
$this->messagePublisher->pushLog("An error occurred when fetching endpointCommit : " . $e->getMessage());
return null;
}
return $res;
}
/**
* @param Client $uploaderClient
* @param array $config
* @return array|null
*/
private function generateToken(Client $uploaderClient, array $config)
{
try {
$tokenBody = $uploaderClient->post($config['endpointToken'], [
'json' => [
'client_id' => $config['clientId'],
'client_secret' => $config['clientSecret'],
'grant_type' => 'client_credentials',
'scope' => 'uploader:commit_list'
]
])->getBody()->getContents();
} catch (\Exception $e) {
$this->messagePublisher->pushLog("An error occurred when fetching endpointToken : " . $e->getMessage());
return null;
}
$tokenBody = json_decode($tokenBody,true);
$config['assetToken'] = $tokenBody['access_token'];
Config::setConfiguration(['pull_assets' => $config]);
$config = Config::getConfiguration();
return $config['worker_plugin']['pull_assets'];
}
}

View File

@@ -0,0 +1,46 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker\Resolver;
use Alchemy\Phrasea\WorkerManager\Worker\Factory\WorkerFactoryInterface;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInterface;
class TypeBasedWorkerResolver implements WorkerResolverInterface
{
/**
* @var WorkerInterface[]
*/
private $workers = [];
/**
* @var WorkerFactoryInterface[]
*/
private $factories = [];
public function addFactory($messageType, WorkerFactoryInterface $workerFactory)
{
$this->factories[$messageType] = $workerFactory;
}
/**
* @return WorkerFactoryInterface[]
*/
public function getFactories()
{
return $this->factories;
}
public function getWorker($messageType, array $message)
{
if (isset($this->workers[$messageType])) {
return $this->workers[$messageType];
}
if (isset($this->factories[$messageType])) {
return $this->workers[$messageType] = $this->factories[$messageType]->createWorker();
}
throw new \RuntimeException('Invalid worker type requested: ' . $messageType);
}
}

View File

@@ -0,0 +1,15 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker\Resolver;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInterface;
interface WorkerResolverInterface
{
/**
* @param string $messageType
* @param array $message
* @return WorkerInterface
*/
public function getWorker($messageType, array $message);
}

View File

@@ -0,0 +1,213 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
use Alchemy\Phrasea\Application\Helper\EntityManagerAware;
use Alchemy\Phrasea\Core\PhraseaTokens;
use Alchemy\Phrasea\Filesystem\FilesystemService;
use Alchemy\Phrasea\Media\SubdefGenerator;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\WorkerManager\Event\StoryCreateCoverEvent;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionCreationFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class SubdefCreationWorker implements WorkerInterface
{
use ApplicationBoxAware;
use EntityManagerAware;
private $subdefGenerator;
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
private $logger;
private $dispatcher;
private $filesystem;
private $repoWorker;
public function __construct(
SubdefGenerator $subdefGenerator,
MessagePublisher $messagePublisher,
LoggerInterface $logger,
EventDispatcherInterface $dispatcher,
FilesystemService $filesystem,
WorkerRunningJobRepository $repoWorker
)
{
$this->subdefGenerator = $subdefGenerator;
$this->messagePublisher = $messagePublisher;
$this->logger = $logger;
$this->dispatcher = $dispatcher;
$this->filesystem = $filesystem;
$this->repoWorker = $repoWorker;
}
public function process(array $payload)
{
if(isset($payload['recordId']) && isset($payload['databoxId'])) {
$recordId = $payload['recordId'];
$databoxId = $payload['databoxId'];
$wantedSubdef = [$payload['subdefName']];
$databox = $this->findDataboxById($databoxId);
$record = $databox->get_record($recordId);
$oldLogger = $this->subdefGenerator->getLogger();
if (!$record->isStory()) {
// check if there is a write meta running for the record or the same task running
$canCreateSubdef = $this->repoWorker->canCreateSubdef($payload['subdefName'], $recordId, $databoxId);
if (!$canCreateSubdef) {
// the file is in used to write meta
$payload = [
'message_type' => MessagePublisher::SUBDEF_CREATION_TYPE,
'payload' => $payload
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_SUBDEF_QUEUE);
$message = MessagePublisher::SUBDEF_CREATION_TYPE.' to be re-published! >> Payload ::'. json_encode($payload);
$this->messagePublisher->pushLog($message);
return ;
}
// tell that a file is in used to create subdef
$em = $this->getEntityManager();
$em->beginTransaction();
try {
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setDataboxId($databoxId)
->setRecordId($recordId)
->setWork(PhraseaTokens::MAKE_SUBDEF)
->setWorkOn($payload['subdefName'])
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
$this->subdefGenerator->setLogger($this->logger);
$this->subdefGenerator->generateSubdefs($record, $wantedSubdef);
// begin to check if the subdef is successfully generated
$subdef = $record->getDatabox()->get_subdef_structure()->getSubdefGroup($record->getType())->getSubdef($payload['subdefName']);
$filePathToCheck = null;
if ($record->has_subdef($payload['subdefName']) ) {
$filePathToCheck = $record->get_subdef($payload['subdefName'])->getRealPath();
}
$filePathToCheck = $this->filesystem->generateSubdefPathname($record, $subdef, $filePathToCheck);
if (!$this->filesystem->exists($filePathToCheck)) {
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_CREATION_FAILURE, new SubdefinitionCreationFailureEvent(
$record,
$payload['subdefName'],
'Subdef generation failed !',
$count
));
$this->subdefGenerator->setLogger($oldLogger);
return ;
}
// checking ended
// order to write meta for the subdef if needed
$this->dispatcher->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent($record, $payload['subdefName']));
$this->subdefGenerator->setLogger($oldLogger);
// update jeton when subdef is created
$this->updateJeton($record);
$parents = $record->get_grouping_parents();
// create a cover for a story
// used when uploaded via uploader-service and grouped as a story
if (!$parents->is_empty() && isset($payload['status']) && $payload['status'] == MessagePublisher::NEW_RECORD_MESSAGE && in_array($payload['subdefName'], array('thumbnail', 'preview'))) {
foreach ($parents->get_elements() as $story) {
if (self::checkIfFirstChild($story, $record)) {
$data = implode('_', [$databoxId, $story->getRecordId(), $recordId, $payload['subdefName']]);
$this->dispatcher->dispatch(WorkerEvents::STORY_CREATE_COVER, new StoryCreateCoverEvent($data));
}
}
}
// tell that we have finished to work on this file
$em->beginTransaction();
try {
$em->remove($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
}
}
public static function checkIfFirstChild(\record_adapter $story, \record_adapter $record)
{
$sql = "SELECT * FROM regroup WHERE rid_parent = :parent_record_id AND rid_child = :children_id and ord = :ord";
$connection = $record->getDatabox()->get_connection();
$stmt = $connection->prepare($sql);
$stmt->execute([
':parent_record_id' => $story->getRecordId(),
':children_id' => $record->getRecordId(),
':ord' => 0,
]);
$row = $stmt->fetch(\PDO::FETCH_ASSOC);
$stmt->closeCursor();
if ($row) {
return true;
}
return false;
}
private function updateJeton(\record_adapter $record)
{
$connection = $record->getDatabox()->get_connection();
$connection->beginTransaction();
// mark subdef created
$sql = 'UPDATE record'
. ' SET jeton=(jeton & ~(:token)), moddate=NOW()'
. ' WHERE record_id=:record_id';
$stmt = $connection->prepare($sql);
$stmt->execute([
':record_id' => $record->getRecordId(),
':token' => PhraseaTokens::MAKE_SUBDEF,
]);
$connection->commit();
$stmt->closeCursor();
}
}

View File

@@ -0,0 +1,206 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application;
use Alchemy\Phrasea\Application\Helper\DispatcherAware;
use Alchemy\Phrasea\Core\Version;
use Alchemy\Phrasea\Model\Entities\ApiApplication;
use Alchemy\Phrasea\Model\Entities\WebhookEvent;
use Alchemy\Phrasea\Model\Entities\WebhookEventDelivery;
use Alchemy\Phrasea\Webhook\Processor\ProcessorInterface;
use Alchemy\Phrasea\WorkerManager\Event\WebhookDeliverFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Guzzle\Batch\BatchBuilder;
use Guzzle\Common\Event;
use Guzzle\Http\Client as GuzzleClient;
use Guzzle\Http\Message\Request;
use Guzzle\Plugin\Backoff\BackoffPlugin;
use Guzzle\Plugin\Backoff\CallbackBackoffStrategy;
use Guzzle\Plugin\Backoff\CurlBackoffStrategy;
use Guzzle\Plugin\Backoff\TruncatedBackoffStrategy;
use PhpAmqpLib\Wire\AMQPTable;
class WebhookWorker implements WorkerInterface
{
use DispatcherAware;
private $app;
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
public function __construct(Application $app)
{
$this->app = $app;
$this->messagePublisher = $app['alchemy_worker.message.publisher'];
}
/**
* @param array $payload
*/
public function process(array $payload)
{
if (isset($payload['id'])) {
$webhookEventId = $payload['id'];
$app = $this->app;
$httpClient = new GuzzleClient();
$version = new Version();
$httpClient->setUserAgent(sprintf('Phraseanet/%s (%s)', $version->getNumber(), $version->getName()));
$httpClient->getEventDispatcher()->addListener('request.error', function (Event $event) {
// override guzzle default behavior of throwing exceptions
// when 4xx & 5xx responses are encountered
$event->stopPropagation();
}, -254);
// Set callback which logs success or failure
$subscriber = new CallbackBackoffStrategy(function ($retries, Request $request, $response, $e) use ($app, $webhookEventId, $payload) {
$retry = true;
if ($response && (null !== $deliverId = parse_url($request->getUrl(), PHP_URL_FRAGMENT))) {
/** @var WebhookEventDelivery $delivery */
$delivery = $app['repo.webhook-delivery']->find($deliverId);
$logContext = [ 'host' => $request->getHost() ];
if ($response->isSuccessful()) {
$app['manipulator.webhook-delivery']->deliverySuccess($delivery);
$logType = 'info';
$logEntry = sprintf('Deliver success event "%d:%s" for app "%s"',
$delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(),
$delivery->getThirdPartyApplication()->getName()
);
$retry = false;
} else {
$app['manipulator.webhook-delivery']->deliveryFailure($delivery);
$logType = 'error';
$logEntry = sprintf('Deliver failure event "%d:%s" for app "%s"',
$delivery->getWebhookEvent()->getId(), $delivery->getWebhookEvent()->getName(),
$delivery->getThirdPartyApplication()->getName()
);
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$this->dispatch(WorkerEvents::WEBHOOK_DELIVER_FAILURE, new WebhookDeliverFailureEvent(
$webhookEventId,
$logEntry,
$count,
$deliverId
));
}
$app['alchemy_worker.message.publisher']->pushLog($logEntry, $logType, $logContext);
return $retry;
}
}, true, new CurlBackoffStrategy());
// set max retries
$subscriber = new TruncatedBackoffStrategy(1, $subscriber);
$subscriber = new BackoffPlugin($subscriber);
$httpClient->addSubscriber($subscriber);
$thirdPartyApplications = $this->app['repo.api-applications']->findWithDefinedWebhookCallback();
/** @var WebhookEvent|null $webhookevent */
$webhookevent = $this->app['repo.webhook-event']->find($webhookEventId);
if ($webhookevent !== null) {
$app['manipulator.webhook-event']->processed($webhookevent);
$this->messagePublisher->pushLog(sprintf('Processing event "%s" with id %d', $webhookevent->getName(), $webhookevent->getId()));
// send requests
$this->deliverEvent($httpClient, $thirdPartyApplications, $webhookevent, $payload);
}
}
}
private function deliverEvent(GuzzleClient $httpClient, array $thirdPartyApplications, WebhookEvent $webhookevent, $payload)
{
if (count($thirdPartyApplications) === 0) {
$workerMessage = 'No applications defined to listen for webhook events';
$this->messagePublisher->pushLog($workerMessage);
// count = 0 mean do not retry because no api application defined
$this->dispatch(WorkerEvents::WEBHOOK_DELIVER_FAILURE, new WebhookDeliverFailureEvent($webhookevent->getId(), $workerMessage, 0));
return;
}
// format event data
if (!isset($payload['delivery_id'])) {
$webhookData = $webhookevent->getData();
$webhookData['time'] = $webhookevent->getCreated();
$webhookevent->setData($webhookData);
}
/** @var ProcessorInterface $eventProcessor */
$eventProcessor = $this->app['webhook.processor_factory']->get($webhookevent);
$data = $eventProcessor->process($webhookevent);
// batch requests
$batch = BatchBuilder::factory()
->transferRequests(10)
->build();
/** @var ApiApplication $thirdPartyApplication */
foreach ($thirdPartyApplications as $thirdPartyApplication) {
$creator = $thirdPartyApplication->getCreator();
if ($creator == null) {
continue;
}
$creatorGrantedBaseIds = array_keys($this->app['acl']->get($creator)->get_granted_base());
$concernedBaseIds = array_intersect($webhookevent->getCollectionBaseIds(), $creatorGrantedBaseIds);
if (count($webhookevent->getCollectionBaseIds()) != 0 && count($concernedBaseIds) == 0) {
continue;
}
if (isset($payload['delivery_id']) && $payload['delivery_id'] != null) {
/** @var WebhookEventDelivery $delivery */
$delivery = $this->app['repo.webhook-delivery']->find($payload['delivery_id']);
// only the app url to retry
if ($delivery->getThirdPartyApplication()->getId() != $thirdPartyApplication->getId()) {
continue;
}
} else {
$delivery = $this->app['manipulator.webhook-delivery']->create($thirdPartyApplication, $webhookevent);
}
// append delivery id as url anchor
$uniqueUrl = $this->getUrl($thirdPartyApplication, $delivery);
// create http request with data as request body
$batch->add($httpClient->createRequest('POST', $uniqueUrl, [
'Content-Type' => 'application/vnd.phraseanet.event+json'
], json_encode($data)));
}
try {
$batch->flush();
} catch (\Exception $e) {
$this->messagePublisher->pushLog($e->getMessage());
$this->messagePublisher->publishFailedMessage(
$payload,
new AMQPTable(['worker-message' => $e->getMessage()]),
MessagePublisher::FAILED_WEBHOOK_QUEUE
);
}
}
private function getUrl(ApiApplication $application, WebhookEventDelivery $delivery)
{
return sprintf('%s#%s', $application->getWebhookUrl(), $delivery->getId());
}
}

View File

@@ -0,0 +1,12 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
interface WorkerInterface
{
/**
* @param array $payload
* @return mixed
*/
public function process(array $payload);
}

View File

@@ -0,0 +1,144 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\Process\Exception\RuntimeException as ProcessRuntimeException;
class WorkerInvoker implements LoggerAwareInterface
{
/**
* @var string
*/
private $environment;
/**
* @var string
*/
private $command = 'worker:run-service';
/**
* @var string
*/
private $binaryPath;
/**
* @var LoggerInterface
*/
private $logger;
/**
* @var ProcessPool
*/
private $processPool;
/**
* @var bool
*/
private $preservePayloads = false;
/**
* payload file prefix
*
* @var string
*/
private $prefix = 'alchemy_wk_';
/**
* WorkerInvoker constructor.
*
* @param ProcessPool $processPool
* @param bool $environment
*/
public function __construct(ProcessPool $processPool, $environment = false)
{
$this->binaryPath = $_SERVER['SCRIPT_NAME'];
$this->environment = $environment;
$this->processPool = $processPool;
$this->logger = new NullLogger();
}
public function preservePayloads()
{
$this->preservePayloads = true;
}
/**
* Sets a logger instance on the object
*
* @param LoggerInterface $logger
* @return null
*/
public function setLogger(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function setPrefix($prefix)
{
$this->prefix = $prefix;
}
/**
* @param string $messageType
* @param string $payload
*/
public function invokeWorker($messageType, $payload)
{
$args = [
$this->binaryPath,
$this->command,
'-vv',
$messageType,
$this->createPayloadFile($payload)
];
if ($this->environment) {
$args[] = sprintf('-e=%s', $this->environment);
}
if ($this->preservePayloads) {
$args[] = '--preserve-payload';
}
$process = $this->processPool->getWorkerProcess($args, getcwd());
$this->logger->debug('Invoking shell command: ' . $process->getCommandLine());
try {
$process->start([$this, 'logWorkerOutput']);
} catch (ProcessRuntimeException $e) {
$process->stop();
throw new \RuntimeException(sprintf('Command "%s" failed: %s', $process->getCommandLine(),
$e->getMessage()), 0, $e);
}
}
public function logWorkerOutput($stream, $output)
{
if ($stream == 'err') {
$this->logger->error($output);
} else {
$this->logger->info($output);
}
}
public function setMaxProcessPoolValue($maxProcesses)
{
$this->processPool->setMaxProcesses($maxProcesses);
}
private function createPayloadFile($payload)
{
$path = tempnam(sys_get_temp_dir(), $this->prefix);
if (file_put_contents($path, $payload) === false) {
throw new \RuntimeException('Cannot write payload file to path: ' . $path);
}
return $path;
}
}

View File

@@ -0,0 +1,310 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
use Alchemy\Phrasea\Application\Helper\DispatcherAware;
use Alchemy\Phrasea\Application\Helper\EntityManagerAware;
use Alchemy\Phrasea\Core\PhraseaTokens;
use Alchemy\Phrasea\Metadata\TagFactory;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Monolog\Logger;
use PHPExiftool\Driver\Metadata\Metadata;
use PHPExiftool\Driver\Metadata\MetadataBag;
use PHPExiftool\Driver\Tag;
use PHPExiftool\Driver\Value\Mono;
use PHPExiftool\Driver\Value\Multi;
use PHPExiftool\Exception\ExceptionInterface as PHPExiftoolException;
use PHPExiftool\Exception\TagUnknown;
use PHPExiftool\Writer;
use Psr\Log\LoggerInterface;
class WriteMetadatasWorker implements WorkerInterface
{
use ApplicationBoxAware;
use DispatcherAware;
use EntityManagerAware;
/** @var Logger */
private $logger;
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
/** @var Writer $writer */
private $writer;
private $repoWorker;
public function __construct(
Writer $writer,
LoggerInterface $logger,
MessagePublisher $messagePublisher,
WorkerRunningJobRepository $repoWorker
)
{
$this->writer = $writer;
$this->logger = $logger;
$this->messagePublisher = $messagePublisher;
$this->repoWorker = $repoWorker;
}
public function process(array $payload)
{
if (isset($payload['recordId']) && isset($payload['databoxId'])) {
$recordId = $payload['recordId'];
$databoxId = $payload['databoxId'];
$MWG = isset($payload['MWG']) ? $payload['MWG'] : false;
$clearDoc = isset($payload['clearDoc']) ? $payload['clearDoc'] : false;
$databox = $this->findDataboxById($databoxId);
$param = ($payload['subdefName'] == "document") ? PhraseaTokens::WRITE_META_DOC : PhraseaTokens::WRITE_META_SUBDEF;
// check if there is a make subdef running for the record or the same task running
$canWriteMeta = $this->repoWorker->canWriteMetadata($payload['subdefName'], $recordId, $databoxId);
if (!$canWriteMeta) {
// the file is in used to generate subdef
$payload = [
'message_type' => MessagePublisher::WRITE_METADATAS_TYPE,
'payload' => $payload
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_METADATAS_QUEUE);
$message = MessagePublisher::WRITE_METADATAS_TYPE.' to be re-published! >> Payload ::'. json_encode($payload);
$this->messagePublisher->pushLog($message);
return ;
}
// tell that a file is in used to create subdef
$em = $this->getEntityManager();
$em->beginTransaction();
try {
$workerRunningJob = new WorkerRunningJob();
$workerRunningJob
->setDataboxId($databoxId)
->setRecordId($recordId)
->setWork($param)
->setWorkOn($payload['subdefName'])
;
$em->persist($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
$record = $databox->get_record($recordId);
$subdef = $record->get_subdef($payload['subdefName']);
if ($subdef->is_physically_present()) {
$metadata = new MetadataBag();
// add Uuid in metadatabag
if ($record->getUuid()) {
$metadata->add(
new Metadata(
new Tag\XMPExif\ImageUniqueID(),
new Mono($record->getUuid())
)
);
$metadata->add(
new Metadata(
new Tag\ExifIFD\ImageUniqueID(),
new Mono($record->getUuid())
)
);
$metadata->add(
new Metadata(
new Tag\IPTC\UniqueDocumentID(),
new Mono($record->getUuid())
)
);
}
// read document fields and add to metadatabag
$caption = $record->get_caption();
foreach ($databox->get_meta_structure() as $fieldStructure) {
$tagName = $fieldStructure->get_tag()->getTagname();
$fieldName = $fieldStructure->get_name();
// skip fields with no src
if ($tagName == '' || $tagName == 'Phraseanet:no-source') {
continue;
}
// check exiftool known tags to skip Phraseanet:tf-*
try {
$tag = TagFactory::getFromRDFTagname($tagName);
if(!$tag->isWritable()) {
continue;
}
} catch (TagUnknown $e) {
continue;
}
try {
$field = $caption->get_field($fieldName);
$fieldValues = $field->get_values();
if ($fieldStructure->is_multi()) {
$values = array();
foreach ($fieldValues as $value) {
$values[] = $this->removeNulChar($value->getValue());
}
$value = new Multi($values);
} else {
$fieldValue = array_pop($fieldValues);
$value = $this->removeNulChar($fieldValue->getValue());
// fix the dates edited into phraseanet
if($fieldStructure->get_type() === $fieldStructure::TYPE_DATE) {
try {
$value = self::fixDate($value); // will return NULL if the date is not valid
}
catch (\Exception $e) {
$value = null; // do NOT write back to iptc
}
}
if($value !== null) { // do not write invalid dates
$value = new Mono($value);
}
}
} catch(\Exception $e) {
// the field is not set in the record, erase it
if ($fieldStructure->is_multi()) {
$value = new Multi(array(''));
}
else {
$value = new Mono('');
}
}
if($value !== null) { // do not write invalid data
$metadata->add(
new Metadata($fieldStructure->get_tag(), $value)
);
}
}
$this->writer->reset();
if ($MWG) {
$this->writer->setModule(Writer::MODULE_MWG, true);
}
$this->writer->erase($subdef->get_name() != 'document' || $clearDoc, true);
// write meta in file
try {
$this->writer->write($subdef->getRealPath(), $metadata);
$this->messagePublisher->pushLog(sprintf('meta written for sbasid=%1$d - recordid=%2$d (%3$s)', $databox->get_sbas_id(), $recordId, $subdef->get_name() ));
} catch (PHPExiftoolException $e) {
$workerMessage = sprintf('meta NOT written for sbasid=%1$d - recordid=%2$d (%3$s) because "%s"', $databox->get_sbas_id(), $recordId, $subdef->get_name() , $e->getMessage());
$this->logger->error($workerMessage);
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent(
$record,
$payload['subdefName'],
SubdefinitionWritemetaEvent::FAILED,
$workerMessage,
$count
));
}
// mark write metas finished
$this->updateJeton($record);
} else {
$count = isset($payload['count']) ? $payload['count'] + 1 : 2 ;
$this->dispatch(WorkerEvents::SUBDEFINITION_WRITE_META, new SubdefinitionWritemetaEvent(
$record,
$payload['subdefName'],
SubdefinitionWritemetaEvent::FAILED,
'Subdef is not physically present!',
$count
));
}
// tell that we have finished to work on this file
$em->beginTransaction();
try {
$em->remove($workerRunningJob);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
}
private function removeNulChar($value)
{
return str_replace("\0", "", $value);
}
private function updateJeton(\record_adapter $record)
{
$connection = $record->getDatabox()->get_connection();
$connection->beginTransaction();
$stmt = $connection->prepare('UPDATE record SET jeton=(jeton & ~(:token)), moddate=NOW() WHERE record_id = :record_id');
$stmt->execute([
':record_id' => $record->getRecordId(),
':token' => PhraseaTokens::WRITE_META,
]);
$connection->commit();
$stmt->closeCursor();
}
/**
* re-format a phraseanet date for iptc writing
* return NULL if the date is not valid
*
* @param string $value
* @return string|null
*/
private static function fixDate($value)
{
$date = null;
try {
$a = explode(';', preg_replace('/\D+/', ';', trim($value)));
switch (count($a)) {
case 3: // yyyy;mm;dd
$date = new \DateTime($a[0] . '-' . $a[1] . '-' . $a[2]);
$date = $date->format('Y-m-d H:i:s');
break;
case 6: // yyyy;mm;dd;hh;mm;ss
$date = new \DateTime($a[0] . '-' . $a[1] . '-' . $a[2] . ' ' . $a[3] . ':' . $a[4] . ':' . $a[5]);
$date = $date->format('Y-m-d H:i:s');
break;
}
}
catch (\Exception $e) {
$date = null;
}
return $date;
}
}

View File

@@ -0,0 +1,30 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "New assets",
"description": "List of assets to enqueue on Phraseanet",
"type": "object",
"properties": {
"assets": {
"type": "array"
},
"publisher": {
"type": "string"
},
"token": {
"type": "string"
},
"base_url": {
"type": "string"
},
"commit_id": {
"type": "string"
}
},
"required": [
"assets",
"publisher",
"token",
"base_url",
"commit_id"
]
}

View File

@@ -76,6 +76,15 @@
</a>
</li>
{% if app.getAclForUser(app.getAuthenticatedUser()).has_right(constant('\\ACL::TASKMANAGER')) %}
<li class="{% if feature == 'workermanager' %}selected{% endif %}">
<a target="right" href="{{ path('worker_admin') }}" class="ajax">
<img src="/assets/admin/images/TaskManager.png" />
<span>{{ 'Worker Manager' | trans }}</span>
</a>
</li>
{% endif %}
<li class="open">
<div class="{% if feature == 'bases' %}selected{% endif %}" style="padding:0 0 2px 0;">
<a id="TREE_DATABASES" target="right" href="{{ path('admin_databases') }}" class="ajax">

View File

@@ -0,0 +1,77 @@
<h1>Worker</h1>
<div>
<!-- Nav tabs -->
<ul class="nav nav-tabs" id="configurationTabs">
<li class="worker-configuration" role="presentation">
<a href="#worker-configuration" aria-controls="worker-configuration" role="tab" data-toggle="tab" data-url="/admin/worker-manager/configuration">
{{ "Configuration" | trans }}
</a>
</li>
<li class="worker-searchengine" role="presentation">
<a href="#worker-searchengine" aria-controls="worker-searchengine" role="tab" data-toggle="tab" data-url="/admin/worker-manager/searchengine">
{{ "Searchengine" | trans }}
</a>
</li>
<li class="worker-pull-assets" role="presentation">
<a href="#worker-pull-assets" aria-controls="worker-pull-assets" role="tab" data-toggle="tab" data-url="/admin/worker-manager/pull-assets">
{{ "Pull Assets" | trans }}
</a>
</li>
<li class="worker-subview active" role="presentation">
<a href="#worker-subview" aria-controls="worker-subview" role="tab" data-toggle="tab" data-url="/admin/worker-manager/subview">
{{ "Subview" | trans }}
</a>
</li>
<li class="worker-metadata" role="presentation">
<a href="#worker-metadata" aria-controls="worker-metadata" role="tab" data-toggle="tab" data-url="/admin/worker-manager/metadata">
{{ "Metadata" | trans }}
</a>
</li>
</ul>
<!-- Tab panes -->
<div class="tab-content">
<div role="tabpanel" class="tab-pane fade" id="worker-configuration"></div>
<div role="tabpanel" class="tab-pane fade" id="worker-searchengine"></div>
<div role="tabpanel" class="tab-pane fade" id="worker-pull-assets"></div>
<div role="tabpanel" class="tab-pane fade in active" id="worker-subview"></div>
<div role="tabpanel" class="tab-pane fade" id="worker-metadata"></div>
</div>
</div>
<script type="text/javascript">
var contentsDownloaded = {};
var remoteContent = function(url) {
return $.get(url);
};
var tabs = $('#configurationTabs a[data-toggle="tab"]');
tabs.on('click', function(){
$(this).tab('show');
});
$('.nav-tabs li').on('show.bs.tab', function (e) {
if (contentsDownloaded[e.target.hash] === undefined) {
$(e.target.hash).empty().html('<img src="/assets/common/images/icons/main-loader.gif" alt="loading"/>');
}
});
$('.nav-tabs').on('shown.bs.tab', function (e) {
if (contentsDownloaded[e.target.hash] === undefined) {
var targetDiv = $(e.target.hash);
remoteContent($(e.target).attr('data-url')).then(function(response) {
targetDiv.empty().html(response);
contentsDownloaded[e.target.hash] = true;
}, function(error) {
console.log(error);
targetDiv.empty().html('<i class="icon-fire">{{ 'admin:worker Retrieve configuration error'|trans }}</i>');
});
}
});
</script>

View File

@@ -0,0 +1,39 @@
<h3> Config Worker queue retry</h3>
<p>Set up the delay between two attempts per queue!</p>
{{ form_start(form, {'action': path('worker_admin_configuration')}) }}
<div class="control-group">
{{ form_row(form.assetsIngest) }}
</div>
<div class="control-group">
{{ form_row(form.createRecord) }}
</div>
<div class="control-group">
{{ form_row(form.subdefCreation) }}
</div>
<div class="control-group">
{{ form_row(form.writeMetadatas) }}
</div>
<div class="control-group">
{{ form_row(form.webhook) }}
</div>
<div class="control-group">
{{ form_row(form.exportMail) }}
</div>
<div class="control-group">
{{ form_row(form.populateIndex) }}
</div>
<div class="control-group">
<input type="submit" class="btn btn-primary" value={{ "Apply retry delay"|trans }} />
</div>
{{ form_end(form) }}

View File

@@ -0,0 +1 @@
<h1>Write metadata setting</h1>

View File

@@ -0,0 +1,29 @@
<h1>Pull Assets from uploader setting</h1>
{{ form_start(form, {'action': path('worker_admin_pullAssets')}) }}
<div class="control-group">
{{ form_row(form.endpointCommit) }}
</div>
<div class="control-group">
{{ form_row(form.endpointToken) }}
</div>
<div class="control-group">
{{ form_row(form.clientSecret) }}
</div>
<div class="control-group">
{{ form_row(form.clientId) }}
</div>
<div class="control-group">
{{ form_row(form.pullInterval) }}
</div>
<div class="control-group">
<input type="submit" class="btn btn-primary" value={{ "Initialize pull assets"|trans }} />
</div>
{{ form_end(form) }}

View File

@@ -0,0 +1,98 @@
<h1>Populate elasticsearch index</h1>
{{ form_start(form, {'action': path('worker_admin_searchengine')}) }}
<div class="control-group">
{{ form_row(form.host) }}
</div>
<div class="control-group">
{{ form_row(form.port) }}
</div>
<div class="control-group">
{{ form_row(form.indexName) }}
</div>
<div class="control-group">
<label class="control-label"> {{ 'Databox to populate' | trans }}</label>
<div class="controls">
<label class="checkbox inline">
<input class="subdef_sbas" type="checkbox" value="0"> {{ 'All' | trans }}
</label>
{% for databox in app.getApplicationBox().get_databoxes() %}
<label class="checkbox inline">
<input class="subdef_sbas" type="checkbox" name="worker_searchengine[sbas][]" value="{{ databox.get_sbas_id() }}"> {{ databox.get_viewname() }}
({{ databox.get_sbas_id() }})
</label>
{% endfor %}
</div>
</div>
<div class="control-group">
<button type="submit" class="btn btn-primary btn-trigger " id="populateButton" form="worker_searchengine">
{{ "Populate" |trans }}
</button>
</div>
{{ form_end(form) }}
<script type="text/javascript">
$(document).ready(function () {
$(".subdef_sbas").change(function (e) {
var alldb = $(".subdef_sbas[value=0]").prop('checked');
if (alldb) {
$(".subdef_sbas[value!=0]").prop('checked', true);
} else {
$(".subdef_sbas[value!=0]").attr('disabled', false);
}
});
$('#populateButton').on('click', function () {
checked = $('input[name="worker_searchengine[sbas][]"]:checked').length;
if(!checked) {
alert("You must check at least one databox to populate.");
return false;
}
$("form[name='worker_searchengine']").submit();
});
$("form[name='worker_searchengine']").on('submit', function () {
var form = $(this);
var sbasIds = [];
$.each($('input[name="worker_searchengine[sbas][]"]:checked'), function() {
sbasIds.push($(this).val());
});
$.ajax({
type: 'GET',
url: '/admin/worker-manager/populate-status',
data: {
sbasIds: sbasIds
},
success: function (data) {
if (data == 0) {
$('#right-ajax').empty().addClass('loading');
$.ajax({
url: form.attr('action'),
type: 'POST',
data: form.serialize(),
success: function (data) {
$('#right-ajax').removeClass('loading').html(data);
}
});
} else {
alert("Worker is in process to indexing one of the selected databox!!");
}
}
});
return false;
});
});
</script>

View File

@@ -0,0 +1 @@
<h1>Subview setting</h1>

View File

@@ -0,0 +1,45 @@
<?php
namespace Alchemy\Tests\Phrasea\WorkerManager\Subscriber;
use Alchemy\Phrasea\Application;
use Alchemy\Phrasea\WorkerManager\Subscriber\ExportSubscriber;
use Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber;
/**
* @covers Alchemy\Phrasea\WorkerManager\Subscriber\ExportSubscriber
* @covers Alchemy\Phrasea\WorkerManager\Subscriber\RecordSubscriber
*/
class SubscriberTest extends \PhraseanetTestCase
{
public function testCallsImplements()
{
$app = new Application(Application::ENV_TEST);
$app['alchemy_worker.message.publisher'] = $this->prophesize('Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher');
$sexportSubscriber = new ExportSubscriber($app['alchemy_worker.message.publisher']->reveal());
$this->assertInstanceOf('Symfony\\Component\\EventDispatcher\\EventSubscriberInterface', $sexportSubscriber);
$recordSubscriber = new RecordSubscriber($app);
$this->assertInstanceOf('Symfony\\Component\\EventDispatcher\\EventSubscriberInterface', $recordSubscriber);
}
public function testIfPublisheMessageOnSubscribeEvent()
{
$app = new Application(Application::ENV_TEST);
$app['alchemy_worker.message.publisher'] = $this->getMockBuilder('Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher')
->disableOriginalConstructor()
->getMock();
$app['alchemy_worker.type_based_worker_resolver'] = $this->getMockBuilder('Alchemy\Phrasea\WorkerManager\Worker\Resolver\TypeBasedWorkerResolver')
->disableOriginalConstructor()
->getMock();
$app['alchemy_worker.message.publisher']->expects($this->atLeastOnce())->method('publishMessage');
$event = $this->prophesize('Alchemy\Phrasea\Core\Event\ExportMailEvent');
$sut = new ExportSubscriber($app['alchemy_worker.message.publisher']);
$sut->onExportMailCreate($event->reveal());
}
}

View File

@@ -0,0 +1,15 @@
<?php
namespace Alchemy\Tests\Phrasea\WorkerManager\Worker\Factory;
use Alchemy\Phrasea\WorkerManager\Worker\Factory\CallableWorkerFactory;
class CallableWorkerFactoryTest extends \PHPUnit_Framework_TestCase
{
public function testClassImplements()
{
$sut = new CallableWorkerFactory(function () {});
$this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\Factory\\WorkerFactoryInterface', $sut);
}
}

View File

@@ -0,0 +1,77 @@
<?php
namespace Alchemy\Tests\Phrasea\WorkerManager\Worker\Resolver;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Alchemy\Phrasea\WorkerManager\Worker\Factory\CallableWorkerFactory;
use Alchemy\Phrasea\WorkerManager\Worker\Factory\WorkerFactoryInterface;
use Alchemy\Phrasea\WorkerManager\Worker\Resolver\TypeBasedWorkerResolver;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInterface;
class TypeBasedWorkerResolverTest extends \PhraseanetTestCase
{
public function testClassImplements()
{
$sut = new TypeBasedWorkerResolver();
$this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\Resolver\\WorkerResolverInterface', $sut);
}
public function testGetFactories()
{
$workerFactory = $this->getMockBuilder(WorkerFactoryInterface::class)
->disableOriginalConstructor()
->getMock()
;
$sut = new TypeBasedWorkerResolver();
$sut->addFactory(MessagePublisher::SUBDEF_CREATION_TYPE, $workerFactory);
$this->assertContainsOnlyInstancesOf(WorkerFactoryInterface::class, $sut->getFactories());
}
public function testGetWorkerSuccess()
{
$worker = $this->getMockBuilder(WorkerInterface::class)
->disableOriginalConstructor()
->getMock();
$workerFactory = $this->getMockBuilder(CallableWorkerFactory::class)
->disableOriginalConstructor()
->getMock();
$workerFactory->method('createWorker')->will($this->returnValue($worker));
$sut = new TypeBasedWorkerResolver();
$sut->addFactory(MessagePublisher::SUBDEF_CREATION_TYPE, $workerFactory);
$this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface',
$sut->getWorker(MessagePublisher::SUBDEF_CREATION_TYPE, ['mock-message']));
}
public function testGetWorkerWrongTypeThrowException()
{
$worker = $this->getMockBuilder(WorkerInterface::class)
->disableOriginalConstructor()
->getMock();
$workerFactory = $this->getMockBuilder(CallableWorkerFactory::class)
->disableOriginalConstructor()
->getMock();
$workerFactory->method('createWorker')->will($this->returnValue($worker));
$sut = new TypeBasedWorkerResolver();
$sut->addFactory(MessagePublisher::SUBDEF_CREATION_TYPE, $workerFactory);
$this->expectException(\RuntimeException::class);
$sut->getWorker(MessagePublisher::WRITE_METADATAS_TYPE, ['mock-message']);
}
}

View File

@@ -0,0 +1,73 @@
<?php
namespace Alchemy\Tests\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker;
use Symfony\Component\Process\Exception\RuntimeException as ProcessRuntimeException;
use Symfony\Component\Process\Process;
class WorkerInvokerTest extends \PhraseanetTestCase
{
public function testClassImplements()
{
$processPool = $this->prophesize(ProcessPool::class);
$sut = new WorkerInvoker($processPool->reveal());
$this->assertInstanceOf('Psr\\Log\\LoggerAwareInterface', $sut);
}
public function testInvokeWorkerSuccess()
{
$process = $this->getMockBuilder(Process::class)
->disableOriginalConstructor()
->getMock()
;
$process ->expects($this->exactly(1))
->method('start')
;
$processPool = $this->getMockBuilder(ProcessPool::class)
->disableOriginalConstructor()
->getMock();
$processPool->method('getWorkerProcess')->will($this->returnValue($process));
$sut = new WorkerInvoker($processPool);
$sut->invokeWorker(MessagePublisher::SUBDEF_CREATION_TYPE, json_encode(['mock-payload']));
}
public function testInvokeWorkerWhenThrowException()
{
$process = $this->getMockBuilder(Process::class)
->disableOriginalConstructor()
->getMock()
;
$process ->expects($this->exactly(1))
->method('start')
->will($this->throwException(new ProcessRuntimeException()))
;
$processPool = $this->getMockBuilder(ProcessPool::class)
->disableOriginalConstructor()
->getMock();
$processPool->method('getWorkerProcess')->will($this->returnValue($process));
$sut = new WorkerInvoker($processPool);
try {
$sut->invokeWorker(MessagePublisher::SUBDEF_CREATION_TYPE, json_encode(['mock-payload']));
$this->fail('Should have raised an exception');
} catch (\Exception $e) {
}
}
}

View File

@@ -0,0 +1,59 @@
<?php
namespace Alchemy\Tests\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application;
use Alchemy\Phrasea\WorkerManager\Worker\AssetsIngestWorker;
use Alchemy\Phrasea\WorkerManager\Worker\CreateRecordWorker;
use Alchemy\Phrasea\WorkerManager\Worker\ExportMailWorker;
use Alchemy\Phrasea\WorkerManager\Worker\SubdefCreationWorker;
use Alchemy\Phrasea\WorkerManager\Worker\WriteMetadatasWorker;
/**
* @covers Alchemy\Phrasea\WorkerManager\Worker\ExportMailWorker
* @covers Alchemy\Phrasea\WorkerManager\Worker\SubdefCreationWorker
* @covers Alchemy\Phrasea\WorkerManager\Worker\WriteMetadatasWorker
*/
class WorkerServiceTest extends \PHPUnit_Framework_TestCase
{
public function testImplementationClass()
{
$app = new Application(Application::ENV_TEST);
$exportMailWorker = new ExportMailWorker($app);
$this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $exportMailWorker);
$app['subdef.generator'] = $this->prophesize('Alchemy\Phrasea\Media\SubdefGenerator')->reveal();
$app['alchemy_worker.message.publisher'] = $this->prophesize('Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher')->reveal();
$app['alchemy_worker.logger'] = $this->prophesize("Monolog\Logger")->reveal();
$app['dispatcher'] = $this->prophesize('Symfony\Component\EventDispatcher\EventDispatcherInterface')->reveal();
$app['phraseanet.filesystem'] = $this->prophesize('Alchemy\Phrasea\Filesystem\FilesystemService')->reveal();
$app['repo.worker-running-job'] = $this->prophesize('Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository')->reveal();
$writer = $this->prophesize('PHPExiftool\Writer')->reveal();
$subdefCreationWorker = new SubdefCreationWorker(
$app['subdef.generator'],
$app['alchemy_worker.message.publisher'],
$app['alchemy_worker.logger'],
$app['dispatcher'],
$app['phraseanet.filesystem'],
$app['repo.worker-running-job']
);
$this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $subdefCreationWorker);
$writemetadatasWorker = new WriteMetadatasWorker(
$writer,
$app['alchemy_service.logger'],
$app['alchemy_service.message.publisher'],
$app['repo.worker-running-job']
);
$this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $writemetadatasWorker);
$assetsWorker = new AssetsIngestWorker($app);
$this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $assetsWorker);
$createRecordWorker = new CreateRecordWorker($app);
$this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $createRecordWorker);
}
}