add time to message, ad status info in worker table, fix subdef elastic flush

This commit is contained in:
aynsix
2020-05-14 18:48:00 +03:00
parent 67c8bf2125
commit 0b45ae377c
11 changed files with 219 additions and 9 deletions

View File

@@ -3,6 +3,7 @@
namespace Alchemy\Phrasea\Model\Entities; namespace Alchemy\Phrasea\Model\Entities;
use Doctrine\ORM\Mapping as ORM; use Doctrine\ORM\Mapping as ORM;
use Gedmo\Mapping\Annotation as Gedmo;
/** /**
* @ORM\Table(name="WorkerRunningJob", * @ORM\Table(name="WorkerRunningJob",
@@ -15,6 +16,9 @@ use Doctrine\ORM\Mapping as ORM;
*/ */
class WorkerRunningJob class WorkerRunningJob
{ {
const FINISHED = 'finished';
const RUNNING = 'running';
/** /**
* @ORM\Column(type="integer") * @ORM\Column(type="integer")
* @ORM\Id * @ORM\Id
@@ -43,6 +47,22 @@ class WorkerRunningJob
*/ */
private $workOn; private $workOn;
/**
* @Gedmo\Timestampable(on="create")
* @ORM\Column(type="datetime")
*/
private $created;
/**
* @ORM\Column(type="datetime")
*/
private $published;
/**
* @ORM\Column(type="string", name="status")
*/
private $status;
/** /**
* @return integer * @return integer
*/ */
@@ -130,4 +150,50 @@ class WorkerRunningJob
{ {
return $this->workOn; return $this->workOn;
} }
/**
* @return \DateTime
*/
public function getCreated()
{
return $this->created;
}
/**
* @param \DateTime $published
* @return $this
*/
public function setPublished(\DateTime $published)
{
$this->published = $published;
return $this;
}
/**
* @return mixed
*/
public function getPublished()
{
return $this->published;
}
/**
* @param $status
* @return $this
*/
public function setStatus($status)
{
$this->status = $status;
return $this;
}
/**
* @return mixed
*/
public function getStatus()
{
return $this->status;
}
} }

View File

@@ -3,6 +3,7 @@
namespace Alchemy\Phrasea\Model\Repositories; namespace Alchemy\Phrasea\Model\Repositories;
use Alchemy\Phrasea\Core\PhraseaTokens; use Alchemy\Phrasea\Core\PhraseaTokens;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Doctrine\ORM\EntityRepository; use Doctrine\ORM\EntityRepository;
class WorkerRunningJobRepository extends EntityRepository class WorkerRunningJobRepository extends EntityRepository
@@ -23,7 +24,8 @@ class WorkerRunningJobRepository extends EntityRepository
FROM WorkerRunningJob FROM WorkerRunningJob
WHERE ((work & :write_meta) > 0 OR ((work & :make_subdef) > 0 AND work_on = :work_on) ) WHERE ((work & :write_meta) > 0 OR ((work & :make_subdef) > 0 AND work_on = :work_on) )
AND record_id = :record_id AND record_id = :record_id
AND databox_id = :databox_id'; AND databox_id = :databox_id
AND status = :status';
$query = $this->_em->createNativeQuery($sql, $rsm); $query = $this->_em->createNativeQuery($sql, $rsm);
$query->setParameters([ $query->setParameters([
@@ -31,7 +33,8 @@ class WorkerRunningJobRepository extends EntityRepository
'make_subdef'=> PhraseaTokens::MAKE_SUBDEF, 'make_subdef'=> PhraseaTokens::MAKE_SUBDEF,
'work_on' => $subdefName, 'work_on' => $subdefName,
'record_id' => $recordId, 'record_id' => $recordId,
'databox_id' => $databoxId 'databox_id' => $databoxId,
'status' => WorkerRunningJob::RUNNING
] ]
); );
@@ -55,7 +58,8 @@ class WorkerRunningJobRepository extends EntityRepository
FROM WorkerRunningJob FROM WorkerRunningJob
WHERE ((work & :make_subdef) > 0 OR ((work & :write_meta) > 0 AND work_on = :work_on) ) WHERE ((work & :make_subdef) > 0 OR ((work & :write_meta) > 0 AND work_on = :work_on) )
AND record_id = :record_id AND record_id = :record_id
AND databox_id = :databox_id'; AND databox_id = :databox_id
AND status = :status';
$query = $this->_em->createNativeQuery($sql, $rsm); $query = $this->_em->createNativeQuery($sql, $rsm);
$query->setParameters([ $query->setParameters([
@@ -63,10 +67,34 @@ class WorkerRunningJobRepository extends EntityRepository
'write_meta' => PhraseaTokens::WRITE_META, 'write_meta' => PhraseaTokens::WRITE_META,
'work_on' => $subdefName, 'work_on' => $subdefName,
'record_id' => $recordId, 'record_id' => $recordId,
'databox_id' => $databoxId 'databox_id' => $databoxId,
'status' => WorkerRunningJob::RUNNING
] ]
); );
return count($query->getResult()) == 0; return count($query->getResult()) == 0;
} }
public function truncateWorkerTable()
{
$connection = $this->_em->getConnection();
$platform = $connection->getDatabasePlatform();
$this->_em->beginTransaction();
try {
$connection->executeUpdate($platform->getTruncateTableSQL('WorkerRunningJob'));
} catch (\Exception $e) {
$this->_em->rollback();
}
}
public function deleteFinishedWorks()
{
$this->_em->beginTransaction();
try {
$this->_em->getConnection()->delete('WorkerRunningJob', ['status' => WorkerRunningJob::FINISHED]);
$this->_em->commit();
} catch (\Exception $e) {
$this->_em->rollback();
}
}
} }

View File

@@ -4,6 +4,7 @@ namespace Alchemy\Phrasea\WorkerManager\Controller;
use Alchemy\Phrasea\Application as PhraseaApplication; use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\Controller\Controller; use Alchemy\Phrasea\Controller\Controller;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions; use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions;
use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexEvent; use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents; use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
@@ -63,6 +64,34 @@ class AdminConfigurationController extends Controller
]); ]);
} }
public function infoAction(PhraseaApplication $app, Request $request)
{
/** @var WorkerRunningJobRepository $repoWorker */
$repoWorker = $app['repo.worker-running-job'];
return $this->render('admin/worker-manager/worker_info.html.twig', [
'workerRunningJob' => $repoWorker->findAll()
]);
}
public function truncateTableAction(PhraseaApplication $app, Request $request)
{
/** @var WorkerRunningJobRepository $repoWorker */
$repoWorker = $app['repo.worker-running-job'];
$repoWorker->truncateWorkerTable();
return $app->redirectPath('worker_admin');
}
public function deleteFinishedAction(PhraseaApplication $app, Request $request)
{
/** @var WorkerRunningJobRepository $repoWorker */
$repoWorker = $app['repo.worker-running-job'];
$repoWorker->deleteFinishedWorks();
return $app->redirectPath('worker_admin');
}
public function searchengineAction(PhraseaApplication $app, Request $request) public function searchengineAction(PhraseaApplication $app, Request $request)
{ {
$options = $this->getElasticsearchOptions(); $options = $this->getElasticsearchOptions();

View File

@@ -69,7 +69,8 @@ class AlchemyWorkerServiceProvider implements PluginProviderInterface
$app['alchemy_worker.logger'], $app['alchemy_worker.logger'],
$app['dispatcher'], $app['dispatcher'],
$app['phraseanet.filesystem'], $app['phraseanet.filesystem'],
$app['repo.worker-running-job'] $app['repo.worker-running-job'],
$app['elasticsearch.indexer']
)) ))
->setApplicationBox($app['phraseanet.appbox']) ->setApplicationBox($app['phraseanet.appbox'])
->setEntityManagerLocator(new LazyLocator($app, 'orm.em')) ->setEntityManagerLocator(new LazyLocator($app, 'orm.em'))

View File

@@ -53,6 +53,18 @@ class ControllerServiceProvider implements ControllerProviderInterface, ServiceP
->method('GET|POST') ->method('GET|POST')
->bind('worker_admin_configuration'); ->bind('worker_admin_configuration');
$controllers->match('/info', 'controller.worker.admin.configuration:infoAction')
->method('GET')
->bind('worker_admin_info');
$controllers->match('/truncate', 'controller.worker.admin.configuration:truncateTableAction')
->method('POST')
->bind('worker_admin_truncate');
$controllers->match('/delete-finished', 'controller.worker.admin.configuration:deleteFinishedAction')
->method('POST')
->bind('worker_admin_delete_finished');
$controllers->match('/searchengine', 'controller.worker.admin.configuration:searchengineAction') $controllers->match('/searchengine', 'controller.worker.admin.configuration:searchengineAction')
->method('GET|POST') ->method('GET|POST')
->bind('worker_admin_searchengine'); ->bind('worker_admin_searchengine');

View File

@@ -72,6 +72,8 @@ class MessagePublisher
public function publishMessage(array $payload, $queueName, $retryCount = null, $workerMessage = '') public function publishMessage(array $payload, $queueName, $retryCount = null, $workerMessage = '')
{ {
// add published timestamp to all message payload
$payload['payload']['published'] = time();
$msg = new AMQPMessage(json_encode($payload)); $msg = new AMQPMessage(json_encode($payload));
$routing = array_search($queueName, AMQPConnection::$defaultRetryQueues); $routing = array_search($queueName, AMQPConnection::$defaultRetryQueues);

View File

@@ -9,6 +9,7 @@ use Alchemy\Phrasea\Filesystem\FilesystemService;
use Alchemy\Phrasea\Media\SubdefGenerator; use Alchemy\Phrasea\Media\SubdefGenerator;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob; use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository; use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\WorkerManager\Event\StoryCreateCoverEvent; use Alchemy\Phrasea\WorkerManager\Event\StoryCreateCoverEvent;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionCreationFailureEvent; use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionCreationFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent; use Alchemy\Phrasea\WorkerManager\Event\SubdefinitionWritemetaEvent;
@@ -31,6 +32,7 @@ class SubdefCreationWorker implements WorkerInterface
private $dispatcher; private $dispatcher;
private $filesystem; private $filesystem;
private $repoWorker; private $repoWorker;
private $indexer;
public function __construct( public function __construct(
SubdefGenerator $subdefGenerator, SubdefGenerator $subdefGenerator,
@@ -38,7 +40,8 @@ class SubdefCreationWorker implements WorkerInterface
LoggerInterface $logger, LoggerInterface $logger,
EventDispatcherInterface $dispatcher, EventDispatcherInterface $dispatcher,
FilesystemService $filesystem, FilesystemService $filesystem,
WorkerRunningJobRepository $repoWorker WorkerRunningJobRepository $repoWorker,
Indexer $indexer
) )
{ {
$this->subdefGenerator = $subdefGenerator; $this->subdefGenerator = $subdefGenerator;
@@ -47,6 +50,7 @@ class SubdefCreationWorker implements WorkerInterface
$this->dispatcher = $dispatcher; $this->dispatcher = $dispatcher;
$this->filesystem = $filesystem; $this->filesystem = $filesystem;
$this->repoWorker = $repoWorker; $this->repoWorker = $repoWorker;
$this->indexer = $indexer;
} }
public function process(array $payload) public function process(array $payload)
@@ -84,12 +88,15 @@ class SubdefCreationWorker implements WorkerInterface
$em->beginTransaction(); $em->beginTransaction();
try { try {
$date = new \DateTime();
$workerRunningJob = new WorkerRunningJob(); $workerRunningJob = new WorkerRunningJob();
$workerRunningJob $workerRunningJob
->setDataboxId($databoxId) ->setDataboxId($databoxId)
->setRecordId($recordId) ->setRecordId($recordId)
->setWork(PhraseaTokens::MAKE_SUBDEF) ->setWork(PhraseaTokens::MAKE_SUBDEF)
->setWorkOn($payload['subdefName']) ->setWorkOn($payload['subdefName'])
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
; ;
$em->persist($workerRunningJob); $em->persist($workerRunningJob);
@@ -152,10 +159,14 @@ class SubdefCreationWorker implements WorkerInterface
} }
} }
// update elastic
$this->indexer->flushQueue();
// tell that we have finished to work on this file // tell that we have finished to work on this file
$em->beginTransaction(); $em->beginTransaction();
try { try {
$em->remove($workerRunningJob); $workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$em->persist($workerRunningJob);
$em->flush(); $em->flush();
$em->commit(); $em->commit();
} catch (\Exception $e) { } catch (\Exception $e) {

View File

@@ -88,12 +88,15 @@ class WriteMetadatasWorker implements WorkerInterface
$em->beginTransaction(); $em->beginTransaction();
try { try {
$date = new \DateTime();
$workerRunningJob = new WorkerRunningJob(); $workerRunningJob = new WorkerRunningJob();
$workerRunningJob $workerRunningJob
->setDataboxId($databoxId) ->setDataboxId($databoxId)
->setRecordId($recordId) ->setRecordId($recordId)
->setWork($param) ->setWork($param)
->setWorkOn($payload['subdefName']) ->setWorkOn($payload['subdefName'])
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningJob::RUNNING)
; ;
$em->persist($workerRunningJob); $em->persist($workerRunningJob);
@@ -246,7 +249,8 @@ class WriteMetadatasWorker implements WorkerInterface
// tell that we have finished to work on this file // tell that we have finished to work on this file
$em->beginTransaction(); $em->beginTransaction();
try { try {
$em->remove($workerRunningJob); $workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$em->persist($workerRunningJob);
$em->flush(); $em->flush();
$em->commit(); $em->commit();
} catch (\Exception $e) { } catch (\Exception $e) {

View File

@@ -10,6 +10,11 @@
{{ "Configuration" | trans }} {{ "Configuration" | trans }}
</a> </a>
</li> </li>
<li class="worker-info" role="presentation">
<a href="#worker-info" aria-controls="worker-info" role="tab" data-toggle="tab" data-url="/admin/worker-manager/info">
{{ "Working info" | trans }}
</a>
</li>
<li class="worker-searchengine" role="presentation"> <li class="worker-searchengine" role="presentation">
<a href="#worker-searchengine" aria-controls="worker-searchengine" role="tab" data-toggle="tab" data-url="/admin/worker-manager/searchengine"> <a href="#worker-searchengine" aria-controls="worker-searchengine" role="tab" data-toggle="tab" data-url="/admin/worker-manager/searchengine">
{{ "Searchengine" | trans }} {{ "Searchengine" | trans }}
@@ -36,6 +41,7 @@
<!-- Tab panes --> <!-- Tab panes -->
<div class="tab-content"> <div class="tab-content">
<div role="tabpanel" class="tab-pane fade" id="worker-configuration"></div> <div role="tabpanel" class="tab-pane fade" id="worker-configuration"></div>
<div role="tabpanel" class="tab-pane fade" id="worker-info"></div>
<div role="tabpanel" class="tab-pane fade" id="worker-searchengine"></div> <div role="tabpanel" class="tab-pane fade" id="worker-searchengine"></div>
<div role="tabpanel" class="tab-pane fade" id="worker-pull-assets"></div> <div role="tabpanel" class="tab-pane fade" id="worker-pull-assets"></div>
<div role="tabpanel" class="tab-pane fade in active" id="worker-subview"></div> <div role="tabpanel" class="tab-pane fade in active" id="worker-subview"></div>

View File

@@ -0,0 +1,49 @@
<h1> worker information</h1>
<form action="{{ path("worker_admin_truncate") }}" method="POST">
<p class="alert alert-danger">
<strong>Warning!</strong>
It's truncate all work table content !
</p>
<button class="btn btn-primary">
{{ 'Truncate table' }}
</button>
</form>
<form action="{{ path("worker_admin_delete_finished") }}" method="POST">
<p class="alert alert-danger">
<strong>Warning!</strong>
It's delete all finished works !
</p>
<button class="btn btn-primary">
{{ 'Delete finished' }}
</button>
</form>
<table class="admintable">
<thead>
<tr>
<th class="sortable">databox_id</th>
<th class="sortable">record_id</th>
<th class="sortable">work</th>
<th class="sortable">work_on</th>
<th class="sortable">created</th>
<th class="sortable">published</th>
<th class="sortable">status</th>
</tr>
</thead>
<tbody>
{% for workerRow in workerRunningJob %}
<tr>
<td>{{ workerRow.databoxId }}</td>
<td>{{ workerRow.recordId }}</td>
<td>{{ workerRow.work }}</td>
<td>{{ workerRow.workOn }}</td>
<td>{{ workerRow.created|date('Y-m-d H:i:s') }}</td>
<td>{{ workerRow.published|date('Y-m-d H:i:s') }}</td>
<td>{{ workerRow.status }}</td>
</tr>
{% endfor %}
</tbody>
</table>

View File

@@ -29,6 +29,7 @@ class WorkerServiceTest extends \PHPUnit_Framework_TestCase
$app['dispatcher'] = $this->prophesize('Symfony\Component\EventDispatcher\EventDispatcherInterface')->reveal(); $app['dispatcher'] = $this->prophesize('Symfony\Component\EventDispatcher\EventDispatcherInterface')->reveal();
$app['phraseanet.filesystem'] = $this->prophesize('Alchemy\Phrasea\Filesystem\FilesystemService')->reveal(); $app['phraseanet.filesystem'] = $this->prophesize('Alchemy\Phrasea\Filesystem\FilesystemService')->reveal();
$app['repo.worker-running-job'] = $this->prophesize('Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository')->reveal(); $app['repo.worker-running-job'] = $this->prophesize('Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository')->reveal();
$app['elasticsearch.indexer'] = $this->prophesize('Alchemy\Phrasea\SearchEngine\Elastic\Indexer')->reveal();
$writer = $this->prophesize('PHPExiftool\Writer')->reveal(); $writer = $this->prophesize('PHPExiftool\Writer')->reveal();
@@ -38,7 +39,8 @@ class WorkerServiceTest extends \PHPUnit_Framework_TestCase
$app['alchemy_worker.logger'], $app['alchemy_worker.logger'],
$app['dispatcher'], $app['dispatcher'],
$app['phraseanet.filesystem'], $app['phraseanet.filesystem'],
$app['repo.worker-running-job'] $app['repo.worker-running-job'],
$app['elasticsearch.indexer']
); );
$this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $subdefCreationWorker); $this->assertInstanceOf('Alchemy\\Phrasea\\WorkerManager\\Worker\\WorkerInterface', $subdefCreationWorker);