Merge branch 'master' into PHRAS-3272_cli-plinks-creation_MASTER

This commit is contained in:
jygaulier
2020-11-18 16:11:30 +01:00
committed by GitHub
17 changed files with 603 additions and 198 deletions

View File

@@ -210,6 +210,29 @@ class AdminConfigurationController extends Controller
]);
}
public function validationReminderAction(PhraseaApplication $app, Request $request)
{
$interval = $app['conf']->get(['workers', 'validationReminder', 'interval'], 7200);
if ($request->getMethod() == 'POST') {
$reminderInterval = (int)$request->request->get('worker_reminder_interval');
// save the period interval in second
$app['conf']->set(['workers', 'validationReminder', 'interval'], $reminderInterval);
/** @var AMQPConnection $serverConnection */
$serverConnection = $this->app['alchemy_worker.amqp.connection'];
// reinitialize the validation reminder queues
$serverConnection->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_QUEUE]);
$this->app['alchemy_worker.message.publisher']->initializeLoopQueue(MessagePublisher::VALIDATION_REMINDER_TYPE);
return $app->redirectPath('worker_admin');
}
return $this->render('admin/worker-manager/worker_validation_reminder.html.twig', [
'interval' => $interval
]);
}
public function populateStatusAction(PhraseaApplication $app, Request $request)
{
$databoxIds = $request->get('sbasIds');
@@ -236,7 +259,7 @@ class AdminConfigurationController extends Controller
// reinitialize the pull queues
$serverConnection->reinitializeQueue([MessagePublisher::PULL_QUEUE]);
$this->app['alchemy_worker.message.publisher']->initializePullAssets();
$this->app['alchemy_worker.message.publisher']->initializeLoopQueue(MessagePublisher::PULL_ASSETS_TYPE);
return $app->redirectPath('worker_admin');
}

View File

@@ -19,6 +19,7 @@ use Alchemy\Phrasea\WorkerManager\Worker\PullAssetsWorker;
use Alchemy\Phrasea\WorkerManager\Worker\Resolver\TypeBasedWorkerResolver;
use Alchemy\Phrasea\WorkerManager\Worker\SubdefCreationWorker;
use Alchemy\Phrasea\WorkerManager\Worker\SubtitleWorker;
use Alchemy\Phrasea\WorkerManager\Worker\ValidationReminderWorker;
use Alchemy\Phrasea\WorkerManager\Worker\WebhookWorker;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker;
use Alchemy\Phrasea\WorkerManager\Worker\WriteMetadatasWorker;
@@ -146,6 +147,10 @@ class AlchemyWorkerServiceProvider implements PluginProviderInterface
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::MAIN_QUEUE_TYPE, new CallableWorkerFactory(function () use ($app) {
return new MainQueueWorker($app['alchemy_worker.message.publisher'], $app['repo.worker-job']);
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::VALIDATION_REMINDER_TYPE, new CallableWorkerFactory(function () use ($app) {
return new ValidationReminderWorker($app);
}));
}
/**

View File

@@ -84,6 +84,10 @@ class ControllerServiceProvider implements ControllerProviderInterface, ServiceP
->method('GET|POST')
->bind('worker_admin_pullAssets');
$controllers->match('/validation-reminder', 'controller.worker.admin.configuration:validationReminderAction')
->method('GET|POST')
->bind('worker_admin_validationReminder');
$controllers->match('/queue-monitor', 'controller.worker.admin.configuration:queueMonitorAction')
->method('GET')
->bind('worker_admin_queue_monitor');

View File

@@ -27,11 +27,12 @@ class AMQPConnection
MessagePublisher::WEBHOOK_TYPE => MessagePublisher::WEBHOOK_QUEUE,
MessagePublisher::ASSETS_INGEST_TYPE => MessagePublisher::ASSETS_INGEST_QUEUE,
MessagePublisher::CREATE_RECORD_TYPE => MessagePublisher::CREATE_RECORD_QUEUE,
MessagePublisher::PULL_QUEUE => MessagePublisher::PULL_QUEUE,
MessagePublisher::PULL_ASSETS_TYPE => MessagePublisher::PULL_QUEUE,
MessagePublisher::POPULATE_INDEX_TYPE => MessagePublisher::POPULATE_INDEX_QUEUE,
MessagePublisher::DELETE_RECORD_TYPE => MessagePublisher::DELETE_RECORD_QUEUE,
MessagePublisher::MAIN_QUEUE_TYPE => MessagePublisher::MAIN_QUEUE,
MessagePublisher::SUBTITLE_TYPE => MessagePublisher::SUBTITLE_QUEUE,
MessagePublisher::VALIDATION_REMINDER_TYPE => MessagePublisher::VALIDATION_REMINDER_QUEUE,
MessagePublisher::EXPOSE_UPLOAD_TYPE => MessagePublisher::EXPOSE_UPLOAD_QUEUE
];
@@ -44,7 +45,8 @@ class AMQPConnection
MessagePublisher::ASSETS_INGEST_QUEUE => MessagePublisher::RETRY_ASSETS_INGEST_QUEUE,
MessagePublisher::CREATE_RECORD_QUEUE => MessagePublisher::RETRY_CREATE_RECORD_QUEUE,
MessagePublisher::POPULATE_INDEX_QUEUE => MessagePublisher::RETRY_POPULATE_INDEX_QUEUE,
MessagePublisher::PULL_QUEUE => MessagePublisher::LOOP_PULL_QUEUE
MessagePublisher::PULL_QUEUE => MessagePublisher::LOOP_PULL_QUEUE,
MessagePublisher::VALIDATION_REMINDER_QUEUE => MessagePublisher::LOOP_VALIDATION_REMINDER_QUEUE
];
public static $defaultFailedQueues = [
@@ -62,6 +64,11 @@ class AMQPConnection
MessagePublisher::SUBDEF_QUEUE => MessagePublisher::DELAYED_SUBDEF_QUEUE
];
public static $defaultLoopTypes = [
MessagePublisher::PULL_ASSETS_TYPE,
MessagePublisher::VALIDATION_REMINDER_TYPE
];
// default message TTL in retry queue in millisecond
const RETRY_DELAY = 10000;
@@ -265,6 +272,18 @@ class AMQPConnection
isset($config['pull_assets']['pullInterval']) ) {
// convert in milli second
return (int)($config['pull_assets']['pullInterval']) * 1000;
} elseif ($routing == MessagePublisher::VALIDATION_REMINDER_QUEUE) {
if (isset($config['validationReminder']) &&
isset($config['validationReminder']['interval'])) {
// convert in milli second
return (int)($config['validationReminder']['interval']) * 1000;
}
// default value to 2 hour if not set
return (int) 7200 * 1000;
} elseif (isset($config['retry_queue']) &&
isset($config['retry_queue'][array_search($routing, AMQPConnection::$defaultQueues)])) {

View File

@@ -4,6 +4,7 @@ namespace Alchemy\Phrasea\WorkerManager\Queue;
use Alchemy\Phrasea\WorkerManager\Worker\ProcessPool;
use Alchemy\Phrasea\WorkerManager\Worker\WorkerInvoker;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use Ramsey\Uuid\Uuid;
@@ -61,7 +62,7 @@ class MessageHandler
}
// if message is yet executed 3 times, save the unprocessed message in the corresponding failed queues
if ($count > self::MAX_OF_TRY && $data['message_type'] != MessagePublisher::PULL_ASSETS_TYPE) {
if ($count > self::MAX_OF_TRY && !in_array($data['message_type'], AMQPConnection::$defaultLoopTypes)) {
$this->messagePublisher->publishFailedMessage($data['payload'], $headers, AMQPConnection::$defaultFailedQueues[$data['message_type']]);
$logMessage = sprintf("Rabbit message executed 3 times, it's to be saved in %s , payload >>> %s",
@@ -75,8 +76,8 @@ class MessageHandler
try {
$workerInvoker->invokeWorker($data['message_type'], json_encode($data['payload']));
if ($data['message_type'] == MessagePublisher::PULL_ASSETS_TYPE) {
// make a loop for the pull assets
if (in_array($data['message_type'], AMQPConnection::$defaultLoopTypes)) {
// make a loop for the loop type
$channel->basic_nack($message->delivery_info['delivery_tag']);
} else {
$channel->basic_ack($message->delivery_info['delivery_tag']);
@@ -101,19 +102,26 @@ class MessageHandler
foreach (AMQPConnection::$defaultQueues as $queueName) {
if ($argQueueName ) {
if (in_array($queueName, $argQueueName)) {
$serverConnection->setQueue($queueName);
// give prefetch message to a worker consumer at a time
$channel->basic_qos(null, $prefetchCount, null);
$channel->basic_consume($queueName, Uuid::uuid4(), false, false, false, false, $callback);
$this->runConsumer($queueName, $serverConnection, $channel, $prefetchCount, $callback);
}
} else {
$serverConnection->setQueue($queueName);
// give prefetch message to a worker consumer at a time
$channel->basic_qos(null, $prefetchCount, null);
$channel->basic_consume($queueName, Uuid::uuid4(), false, false, false, false, $callback);
$this->runConsumer($queueName, $serverConnection, $channel, $prefetchCount, $callback);
}
}
}
private function runConsumer($queueName, AMQPConnection $serverConnection, AMQPChannel $channel, $prefetchCount, $callback)
{
// initialize validation reminder when starting consumer
if ($queueName == MessagePublisher::VALIDATION_REMINDER_QUEUE) {
$serverConnection->reinitializeQueue([MessagePublisher::VALIDATION_REMINDER_QUEUE]);
$this->messagePublisher->initializeLoopQueue(MessagePublisher::VALIDATION_REMINDER_TYPE);
}
$serverConnection->setQueue($queueName);
// give prefetch message to a worker consumer at a time
$channel->basic_qos(null, $prefetchCount, null);
$channel->basic_consume($queueName, Uuid::uuid4(), false, false, false, false, $callback);
}
}

View File

@@ -9,20 +9,20 @@ use Psr\Log\LoggerInterface;
class MessagePublisher
{
const EXPORT_MAIL_TYPE = 'exportMail';
const SUBDEF_CREATION_TYPE = 'subdefCreation';
const WRITE_METADATAS_TYPE = 'writeMetadatas';
const ASSETS_INGEST_TYPE = 'assetsIngest';
const CREATE_RECORD_TYPE = 'createRecord';
const DELETE_RECORD_TYPE = 'deleteRecord';
const WEBHOOK_TYPE = 'webhook';
const POPULATE_INDEX_TYPE = 'populateIndex';
const PULL_ASSETS_TYPE = 'pullAssets';
const SUBTITLE_TYPE = 'subtitle';
const MAIN_QUEUE_TYPE = 'mainQueue';
const EXPORT_MAIL_TYPE = 'exportMail';
const SUBDEF_CREATION_TYPE = 'subdefCreation';
const WRITE_METADATAS_TYPE = 'writeMetadatas';
const ASSETS_INGEST_TYPE = 'assetsIngest';
const CREATE_RECORD_TYPE = 'createRecord';
const DELETE_RECORD_TYPE = 'deleteRecord';
const WEBHOOK_TYPE = 'webhook';
const POPULATE_INDEX_TYPE = 'populateIndex';
const PULL_ASSETS_TYPE = 'pullAssets';
const VALIDATION_REMINDER_TYPE = 'validationReminder';
const SUBTITLE_TYPE = 'subtitle';
const MAIN_QUEUE_TYPE = 'mainQueue';
const EXPOSE_UPLOAD_TYPE = 'exposeUpload';
const MAIN_QUEUE = 'main-queue';
const SUBTITLE_QUEUE = 'subtitle-queue';
@@ -36,6 +36,7 @@ class MessagePublisher
const DELETE_RECORD_QUEUE = 'deleterecord-queue';
const POPULATE_INDEX_QUEUE = 'populateindex-queue';
const PULL_QUEUE = 'pull-queue';
const VALIDATION_REMINDER_QUEUE = 'validationReminder-queue';
const EXPOSE_UPLOAD_QUEUE = 'exposeupload-queue';
// retry queue
@@ -47,8 +48,9 @@ class MessagePublisher
const RETRY_ASSETS_INGEST_QUEUE = 'retry-ingest-queue';
const RETRY_CREATE_RECORD_QUEUE = 'retry-createrecord-queue';
const RETRY_POPULATE_INDEX_QUEUE = 'retry-populateindex-queue';
// use this queue to make a loop on a consumer
const LOOP_PULL_QUEUE = 'loop-pull-queue';
// use those queue to make a loop on a consumer
const LOOP_PULL_QUEUE = 'loop-pull-queue';
const LOOP_VALIDATION_REMINDER_QUEUE = 'loop-validationReminder-queue';
// all failed queue, if message is treated over 3 times it goes to the failed queue
const FAILED_EXPORT_QUEUE = 'failed-export-queue';
@@ -118,16 +120,16 @@ class MessagePublisher
return true;
}
public function initializePullAssets()
public function initializeLoopQueue($type)
{
$payload = [
'message_type' => self::PULL_ASSETS_TYPE,
'message_type' => $type,
'payload' => [
'initTimestamp' => new \DateTime('now', new \DateTimeZone('UTC'))
]
];
$this->publishMessage($payload, self::PULL_QUEUE);
$this->publishMessage($payload, AMQPConnection::$defaultQueues[$type]);
}
public function connectionClose()

View File

@@ -0,0 +1,193 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\Application\Helper\NotifierAware;
use Alchemy\Phrasea\Core\Configuration\PropertyAccess;
use Alchemy\Phrasea\Core\LazyLocator;
use Alchemy\Phrasea\Model\Entities\Basket;
use Alchemy\Phrasea\Model\Entities\User;
use Alchemy\Phrasea\Model\Entities\ValidationParticipant;
use Alchemy\Phrasea\Model\Repositories\TokenRepository;
use Alchemy\Phrasea\Model\Repositories\ValidationParticipantRepository;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\Notification\Emitter;
use Alchemy\Phrasea\Notification\Mail\MailInfoValidationReminder;
use Alchemy\Phrasea\Notification\Receiver;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use DateTime;
use DateInterval;
use Doctrine\ORM\EntityManagerInterface;
class ValidationReminderWorker implements WorkerInterface
{
use NotifierAware;
private $app;
private $logger;
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
/** @var WorkerRunningJobRepository $repoWorkerJob */
private $repoWorkerJob;
public function __construct(PhraseaApplication $app)
{
$this->app = $app;
$this->messagePublisher = $this->app['alchemy_worker.message.publisher'];
$this->logger = $this->app['alchemy_worker.logger'];
}
public function process(array $payload)
{
$this->setDelivererLocator(new LazyLocator($this->app, 'notification.deliverer'));
$days = (int)$this->getConf()->get(['registry', 'actions', 'validation-reminder-days']);
$interval = sprintf('P%dD', $days);
$now = new DateTime();
$dateTo = clone($now);
try {
$dateTo->add(new DateInterval($interval));
} catch(\Exception $e) {
$this->logger->error(sprintf('<error>Bad interval "%s" ?</error>', $interval));
return ;
}
foreach ($this->getValidationParticipantRepository()->findNotConfirmedAndNotRemindedParticipantsByExpireDate($dateTo, $now) as $participant) {
$validationSession = $participant->getSession();
$basket = $validationSession->getBasket();
$canSend = true;
$user = $participant->getUser(); // always ok !
try {
$str_email = $user->getEmail(); // force to hydrate
} catch (\Exception $e) {
$this->logger->error('user not found!');
$canSend = false;
}
$emails[] =
// find the token if exists
// nb : a validation may have not generated tokens if forcing auth was required upon creation
$token = null;
try {
$token = $this->getTokenRepository()->findValidationToken($basket, $user);
}
catch (\Exception $e) {
// not unique token ? should not happen
$canSend = false;
}
if(!$canSend) {
continue;
}
if(!is_null($token)) {
$url = $this->app->url('lightbox_validation', ['basket' => $basket->getId(), 'LOG' => $token->getValue()]);
} else {
$url = $this->app->url('lightbox_validation', ['basket' => $basket->getId()]);
}
$this->doRemind($participant, $basket, $url);
}
$this->getEntityManager()->flush();
}
private function doRemind(ValidationParticipant $participant, Basket $basket, $url)
{
$params = [
'from' => $basket->getValidation()->getInitiator()->getId(),
'to' => $participant->getUser()->getId(),
'ssel_id' => $basket->getId(),
'url' => $url,
];
$datas = json_encode($params);
$mailed = false;
$userFrom = $basket->getValidation()->getInitiator();
$userTo = $participant->getUser();
if ($this->shouldSendNotificationFor($participant->getUser(), 'eventsmanager_notify_validationreminder')) {
$readyToSend = false;
$title = $receiver = $emitter = null;
try {
$title = $basket->getName();
$receiver = Receiver::fromUser($userTo);
$emitter = Emitter::fromUser($userFrom);
$readyToSend = true;
}
catch (\Exception $e) {
// no-op
}
if ($readyToSend) {
$this->logger->info(sprintf(' -> remind "%s" from "%s" to "%s"', $title, $emitter->getEmail(), $receiver->getEmail()));
$mail = MailInfoValidationReminder::create($this->app, $receiver, $emitter);
$mail->setButtonUrl($params['url']);
$mail->setTitle($title);
$this->deliver($mail);
$mailed = true;
$participant->setReminded(new DateTime('now'));
$this->getEntityManager()->persist($participant);
}
}
return $this->app['events-manager']->notify($params['to'], 'eventsmanager_notify_validationreminder', $datas, $mailed);
}
/**
* @param User $user
* @param $type
* @return mixed
*/
private function shouldSendNotificationFor(User $user, $type)
{
return $this->app['settings']->getUserNotificationSetting($user, $type);
}
/**
* @return PropertyAccess
*/
private function getConf()
{
return $this->app['conf'];
}
/**
* @return EntityManagerInterface
*/
private function getEntityManager()
{
return $this->app['orm.em'];
}
/**
* @return ValidationParticipantRepository
*/
private function getValidationParticipantRepository()
{
return $this->app['repo.validation-participants'];
}
/**
* @return TokenRepository
*/
private function getTokenRepository()
{
return $this->app['repo.tokens'];
}
}