PHRAS-3237 record mover as worker

This commit is contained in:
aynsix
2021-08-03 10:39:58 +03:00
parent 8777bd7176
commit 6b7f467a3d
19 changed files with 876 additions and 88 deletions

View File

@@ -12,6 +12,7 @@ use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Form\WorkerConfigurationType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerFtpType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerPullAssetsType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerRecordMoverType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerSearchengineType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerValidationReminderType;
use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection;
@@ -334,12 +335,71 @@ class AdminConfigurationController extends Controller
$running = true;
}
}
return $this->render('admin/worker-manager/worker_validation_reminder.html.twig', [
'form' => $form->createView(),
'running' => $running
]);
}
public function recordMoverAction(PhraseaApplication $app, Request $request)
{
$config = $this->getConf()->get(['workers', 'record_mover'], []);
$ttl_retry = $this->getConf()->get(['workers','queues', MessagePublisher::RECORD_MOVER_TYPE, 'ttl_retry'], null);
if(!is_null($ttl_retry)) {
$ttl_retry /= 1000; // form is in sec
}
$config['ttl_retry'] = $ttl_retry;
$form = $app->form(new WorkerRecordMoverType(), $config);
$form->handleRequest($request);
if ($form->isSubmitted() && $form->isValid()) {
$data = $form->getData();
switch($data['act']) {
case 'save' : // save the form content (settings) in 2 places
$ttl_retry = $data['ttl_retry'];
unset($data['act'], $data['ttl_retry'], $config['ttl_retry']);
// save most data under workers/record_mover
$app['conf']->set(['workers', 'record_mover'], array_merge($config, $data));
// save ttl in the q settings
if(!is_null($ttl_retry)) {
$this->getConf()->set(['workers','queues', MessagePublisher::RECORD_MOVER_TYPE, 'ttl_retry'], 1000 * (int)$ttl_retry);
}
$this->getAMQPConnection()->reinitializeQueue([MessagePublisher::RECORD_MOVER_TYPE]);
break;
case 'start':
// reinitialize the validation reminder queues
$this->getAMQPConnection()->setQueue(MessagePublisher::RECORD_MOVER_TYPE);
$this->getAMQPConnection()->reinitializeQueue([MessagePublisher::RECORD_MOVER_TYPE]);
$this->getMessagePublisher()->initializeLoopQueue(MessagePublisher::RECORD_MOVER_TYPE);
break;
case 'stop':
$this->getAMQPConnection()->reinitializeQueue([MessagePublisher::RECORD_MOVER_TYPE]);
break;
}
return $app->redirectPath('worker_admin', ['_fragment'=>'worker-record-mover']);
}
// guess if the q is "running" = check if there are pending message on Q or loop-Q
$running = false;
$qStatuses = $this->getAMQPConnection()->getQueuesStatus();
foreach([
MessagePublisher::RECORD_MOVER_TYPE,
$this->getAMQPConnection()->getLoopQueueName(MessagePublisher::RECORD_MOVER_TYPE)
] as $qName) {
if(isset($qStatuses[$qName]) && $qStatuses[$qName]['messageCount'] > 0) {
$running = true;
}
}
return $this->render('admin/worker-manager/worker_record_mover.html.twig', [
'form' => $form->createView(),
'running' => $running
]);
}
public function populateStatusAction(PhraseaApplication $app, Request $request)
{
$databoxIds = $request->get('sbasIds');