remove sqllite db , add finished colomn

This commit is contained in:
aynsix
2020-05-26 19:35:33 +03:00
parent c8d70838f9
commit d25da3ee0c
27 changed files with 2492 additions and 1980 deletions

View File

@@ -150,6 +150,12 @@ class RepositoriesServiceProvider implements ServiceProviderInterface
$app['repo.worker-running-job'] = $app->share(function (PhraseaApplication $app) {
return $app['orm.em']->getRepository('Phraseanet:WorkerRunningJob');
});
$app['repo.worker-running-populate'] = $app->share(function (PhraseaApplication $app) {
return $app['orm.em']->getRepository('Phraseanet:WorkerRunningPopulate');
});
$app['repo.worker-running-uploader'] = $app->share(function (PhraseaApplication $app) {
return $app['orm.em']->getRepository('Phraseanet:WorkerRunningUploader');
});
$app['repo.databoxes'] = $app->share(function (PhraseaApplication $app) {
$appbox = $app->getApplicationBox();

View File

@@ -17,7 +17,7 @@ class Version
* @var string
*/
private $number = '4.1.0-alpha.27a';
private $number = '4.1.0-alpha.28a';
/**
* @var string

View File

@@ -59,6 +59,11 @@ class WorkerRunningJob
*/
private $published;
/**
* @ORM\Column(type="datetime", nullable=true)
*/
private $finished;
/**
* @ORM\Column(type="string", name="status")
*/
@@ -179,6 +184,25 @@ class WorkerRunningJob
return $this->published;
}
/**
* @param \DateTime $finished
* @return $this
*/
public function setFinished(\DateTime $finished)
{
$this->finished = $finished;
return $this;
}
/**
* @return mixed
*/
public function getFinished()
{
return $this->finished;
}
/**
* @param $status
* @return $this

View File

@@ -0,0 +1,220 @@
<?php
namespace Alchemy\Phrasea\Model\Entities;
use Doctrine\ORM\Mapping as ORM;
use Gedmo\Mapping\Annotation as Gedmo;
/**
* @ORM\Table(name="WorkerRunningPopulate",
* indexes={
* @ORM\index(name="host", columns={"host"}),
* @ORM\index(name="port", columns={"port"}),
* @ORM\index(name="index_name", columns={"index_name"}),
* @ORM\index(name="databox_id", columns={"databox_id"}),
* }
* )
* @ORM\Entity(repositoryClass="Alchemy\Phrasea\Model\Repositories\WorkerRunningPopulateRepository")
*/
class WorkerRunningPopulate
{
const FINISHED = 'finished';
const RUNNING = 'running';
/**
* @ORM\Column(type="integer")
* @ORM\Id
* @ORM\GeneratedValue
*/
private $id;
/**
* @ORM\Column(type="string", name="host")
*/
private $host;
/**
* @ORM\Column(type="string", name="port")
*/
private $port;
/**
* @ORM\Column(type="string", name="index_name")
*/
private $indexName;
/**
* @ORM\Column(type="integer", name="databox_id")
*/
private $databoxId;
/**
* @Gedmo\Timestampable(on="create")
* @ORM\Column(type="datetime")
*/
private $created;
/**
* @ORM\Column(type="datetime")
*/
private $published;
/**
* @ORM\Column(type="datetime", nullable=true)
*/
private $finished;
/**
* @ORM\Column(type="string", name="status")
*/
private $status;
/**
* @return integer
*/
public function getId()
{
return $this->id;
}
/**
* @param $host
* @return $this
*/
public function setHost($host)
{
$this->host = $host;
return $this;
}
/**
* @return mixed
*/
public function getHost()
{
return $this->host;
}
/**
* @param $port
* @return $this
*/
public function setPort($port)
{
$this->port = $port;
return $this;
}
/**
* @return mixed
*/
public function getPort()
{
return $this->port;
}
/**
* @param $indexName
* @return $this
*/
public function setIndexName($indexName)
{
$this->indexName = $indexName;
return $this;
}
/**
* @return mixed
*/
public function getIndexName()
{
return $this->indexName;
}
/**
* @param $databoxId
* @return $this
*/
public function setDataboxId($databoxId)
{
$this->databoxId = $databoxId;
return $this;
}
/**
* @return mixed
*/
public function getDataboxId()
{
return $this->databoxId;
}
/**
* @return \DateTime
*/
public function getCreated()
{
return $this->created;
}
/**
* @param \DateTime $published
* @return $this
*/
public function setPublished(\DateTime $published)
{
$this->published = $published;
return $this;
}
/**
* @return mixed
*/
public function getPublished()
{
return $this->published;
}
/**
* @param \DateTime $finished
* @return $this
*/
public function setFinished(\DateTime $finished)
{
$this->finished = $finished;
return $this;
}
/**
* @return mixed
*/
public function getFinished()
{
return $this->finished;
}
/**
* @param $status
* @return $this
*/
public function setStatus($status)
{
$this->status = $status;
return $this;
}
/**
* @return mixed
*/
public function getStatus()
{
return $this->status;
}
}

View File

@@ -0,0 +1,198 @@
<?php
namespace Alchemy\Phrasea\Model\Entities;
use Doctrine\ORM\Mapping as ORM;
use Gedmo\Mapping\Annotation as Gedmo;
/**
* @ORM\Table(name="WorkerRunningUploader",
* indexes={
* @ORM\index(name="commit_id", columns={"commit_id"}),
* @ORM\index(name="asset_id", columns={"asset_id"}),
* }
* )
* @ORM\Entity(repositoryClass="Alchemy\Phrasea\Model\Repositories\WorkerRunningUploaderRepository")
*/
class WorkerRunningUploader
{
const DOWNLOADED = 'downloaded';
const RUNNING = 'running';
const TYPE_PULL = 'pull';
const TYPE_PUSH = 'push';
/**
* @ORM\Column(type="integer")
* @ORM\Id
* @ORM\GeneratedValue
*/
private $id;
/**
* @ORM\Column(type="string", name="commit_id")
*/
private $commitId;
/**
* @ORM\Column(type="string", name="asset_id")
*/
private $assetId;
/**
* @ORM\Column(type="string", name="type")
*/
private $type;
/**
* @Gedmo\Timestampable(on="create")
* @ORM\Column(type="datetime")
*/
private $created;
/**
* @ORM\Column(type="datetime")
*/
private $published;
/**
* @ORM\Column(type="datetime", nullable=true)
*/
private $finished;
/**
* @ORM\Column(type="string", name="status")
*/
private $status;
/**
* @return integer
*/
public function getId()
{
return $this->id;
}
/**
* @param $commitId
* @return $this
*/
public function setCommitId($commitId)
{
$this->commitId = $commitId;
return $this;
}
/**
* @return mixed
*/
public function getCommitId()
{
return $this->commitId;
}
/**
* @param $assetId
* @return $this
*/
public function setAssetId($assetId)
{
$this->assetId = $assetId;
return $this;
}
/**
* @return mixed
*/
public function getAssetId()
{
return $this->assetId;
}
/**
* @return \DateTime
*/
public function getCreated()
{
return $this->created;
}
/**
* @param \DateTime $published
* @return $this
*/
public function setPublished(\DateTime $published)
{
$this->published = $published;
return $this;
}
/**
* @return mixed
*/
public function getPublished()
{
return $this->published;
}
/**
* @param \DateTime $finished
* @return $this
*/
public function setFinished(\DateTime $finished)
{
$this->finished = $finished;
return $this;
}
/**
* @return mixed
*/
public function getFinished()
{
return $this->finished;
}
/**
* @param $status
* @return $this
*/
public function setStatus($status)
{
$this->status = $status;
return $this;
}
/**
* @return mixed
*/
public function getStatus()
{
return $this->status;
}
/**
* @param $type
* @return $this
*/
public function setType($type)
{
$this->type = $type;
return $this;
}
/**
* @return mixed
*/
public function getType()
{
return $this->type;
}
}

View File

@@ -0,0 +1,29 @@
<?php
namespace Alchemy\Phrasea\Model\Repositories;
use Alchemy\Phrasea\Model\Entities\WorkerRunningPopulate;
use Doctrine\ORM\EntityRepository;
class WorkerRunningPopulateRepository extends EntityRepository
{
public function getEntityManager()
{
return parent::getEntityManager();
}
/**
* @param array $databoxIds
* @return int
*/
public function checkPopulateStatusByDataboxIds(array $databoxIds)
{
$qb = $this->createQueryBuilder('w');
$qb->where($qb->expr()->in('w.databoxId', $databoxIds))
->andWhere('w.status = :status')
->setParameter('status', WorkerRunningPopulate::RUNNING)
;
return count($qb->getQuery()->getResult());
}
}

View File

@@ -0,0 +1,35 @@
<?php
namespace Alchemy\Phrasea\Model\Repositories;
use Alchemy\Phrasea\Model\Entities\WorkerRunningUploader;
use Doctrine\ORM\EntityRepository;
class WorkerRunningUploaderRepository extends EntityRepository
{
public function getEntityManager()
{
return parent::getEntityManager();
}
/**
* @param $commitId
* @return bool
*/
public function canAck($commitId)
{
$qb = $this->createQueryBuilder('w');
$res = $qb
->where('w.commitId = :commitId')
->andWhere('w.status != :status')
->setParameters([
'commitId' => $commitId,
'status' => WorkerRunningUploader::DOWNLOADED
])
->getQuery()
->getResult()
;
return count($res) == 0;
}
}

View File

@@ -6,13 +6,13 @@ use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\Controller\Controller;
use Alchemy\Phrasea\Model\Entities\WorkerRunningJob;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningJobRepository;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningPopulateRepository;
use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions;
use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Form\WorkerConfigurationType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerPullAssetsType;
use Alchemy\Phrasea\WorkerManager\Form\WorkerSearchengineType;
use Alchemy\Phrasea\WorkerManager\Model\DBManipulator;
use Alchemy\Phrasea\WorkerManager\Queue\AMQPConnection;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
@@ -147,7 +147,10 @@ class AdminConfigurationController extends Controller
{
$databoxIds = $request->get('sbasIds');
return DBManipulator::checkPopulateIndexStatusByDataboxId($databoxIds);
/** @var WorkerRunningPopulateRepository $repoWorkerPopulate */
$repoWorkerPopulate = $app['repo.worker-running-populate'];
return $repoWorkerPopulate->checkPopulateStatusByDataboxIds($databoxIds);
}
public function pullAssetsAction(PhraseaApplication $app, Request $request)

View File

@@ -1,175 +0,0 @@
<?php
namespace Alchemy\Phrasea\WorkerManager\Model;
use Alchemy\Phrasea\WorkerManager\Configuration\Config;
class DBManipulator
{
/**
* @param array $params = [host, port, indexName, databoxId]
*/
public static function savePopulateStatus(array $params)
{
$pdo = Config::getWorkerSqliteConnection();
$pdo->beginTransaction();
try {
$pdo->query("CREATE TABLE IF NOT EXISTS populate_running(host TEXT NOT NULL, port TEXT NOT NULL, index_name TEXT NOT NULL, databox_id TEXT NOT NULL);");
$stmt = $pdo->prepare("INSERT INTO populate_running(host, port, index_name, databox_id) VALUES(:host, :port, :index_name, :databox_id)");
$stmt->execute([
':host' => $params['host'],
':port' => $params['port'],
':index_name' => $params['indexName'],
':databox_id' => $params['databoxId']
]);
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
}
}
/**
* @param array $params = [host, port, indexName, databoxId]
*/
public static function deletePopulateStatus(array $params)
{
$pdo = Config::getWorkerSqliteConnection();
$pdo->beginTransaction();
try {
$stmt = $pdo->prepare("DELETE FROM populate_running WHERE host = :host AND port= :port AND index_name= :index_name AND databox_id= :databox_id");
$stmt->execute([
':host' => $params['host'],
':port' => $params['port'],
':index_name' => $params['indexName'],
':databox_id' => $params['databoxId']
]);
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
}
}
/**
* Update commits table in the temporary sqlite worker.db
* @param $commitId
* @param $assetId
* @return int the number of the remaining assets in the commit
*/
public static function updateRemainingAssetsListByCommit($commitId, $assetId)
{
$row = 1;
$pdo = Config::getWorkerSqliteConnection();
$pdo->beginTransaction();
try {
// remove assetId from the assets list
$stmt = $pdo->prepare("DELETE FROM commits WHERE commit_id = :commit_id AND asset= :assetId");
$stmt->execute([
':commit_id' => $commitId,
':assetId' => $assetId
]);
$stmt = $pdo->prepare("SELECT * FROM commits WHERE commit_id = :commit_id");
$stmt->execute([
':commit_id' => $commitId,
]);
$row = $stmt->fetchAll(\PDO::FETCH_ASSOC);
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
}
return count($row);
}
/**
* @param $commitId
* @return bool
*/
public static function isCommitToBeCreating($commitId)
{
$pdo = Config::getWorkerSqliteConnection();
$pdo->beginTransaction();
$row = 0;
try {
$pdo->query("CREATE TABLE IF NOT EXISTS commits(commit_id TEXT NOT NULL, asset TEXT NOT NULL);");
$stmt = $pdo->prepare("SELECT * FROM commits WHERE commit_id = :commit_id");
$stmt->execute([
':commit_id' => $commitId,
]);
$row = $stmt->fetchAll(\PDO::FETCH_ASSOC);
$pdo->commit();
} catch (\Exception $e) {
//no-op
}
return count($row) ? true : false;
}
public static function saveAssetsList($commitId, $assetIds)
{
$pdo = Config::getWorkerSqliteConnection();
$pdo->beginTransaction();
try {
$pdo->query("CREATE TABLE IF NOT EXISTS commits(commit_id TEXT NOT NULL, asset TEXT NOT NULL);");
// insert all assets ID in the temporary sqlite database
foreach ($assetIds as $assetId) {
$stmt = $pdo->prepare("INSERT INTO commits(commit_id, asset) VALUES(:commit_id, :asset)");
$stmt->execute([
':commit_id' => $commitId,
':asset' => $assetId
]);
}
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
}
}
/**
* @param array $databoxIds
* @return int
*/
public static function checkPopulateIndexStatusByDataboxId(array $databoxIds)
{
$pdo = Config::getWorkerSqliteConnection();
$in = str_repeat("?,", count($databoxIds)-1) . "?";
$pdo->beginTransaction();
try {
$pdo->query("CREATE TABLE IF NOT EXISTS populate_running(host TEXT NOT NULL, port TEXT NOT NULL, index_name TEXT NOT NULL, databox_id TEXT NOT NULL);");
$stmt = $pdo->prepare("SELECT * FROM populate_running WHERE databox_id IN ($in)");
$stmt->execute($databoxIds);
$row = $stmt->fetchAll(\PDO::FETCH_ASSOC);
$pdo->commit();
} catch (\Exception $e) {
$pdo->rollBack();
}
return count($row);
}
}

View File

@@ -115,13 +115,13 @@ class AlchemyWorkerServiceProvider implements PluginProviderInterface
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::POPULATE_INDEX_TYPE, new CallableWorkerFactory(function () use ($app) {
return (new PopulateIndexWorker($app['alchemy_worker.message.publisher'], $app['elasticsearch.indexer']))
return (new PopulateIndexWorker($app['alchemy_worker.message.publisher'], $app['elasticsearch.indexer'], $app['repo.worker-running-populate']))
->setApplicationBox($app['phraseanet.appbox'])
->setDispatcher($app['dispatcher']);
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::PULL_ASSETS_TYPE, new CallableWorkerFactory(function () use ($app) {
return new PullAssetsWorker($app['alchemy_worker.message.publisher'], $app['conf']);
return new PullAssetsWorker($app['alchemy_worker.message.publisher'], $app['conf'], $app['repo.worker-running-uploader']);
}));
$app['alchemy_worker.type_based_worker_resolver']->addFactory(MessagePublisher::DELETE_RECORD_TYPE, new CallableWorkerFactory(function () use ($app) {

View File

@@ -2,6 +2,7 @@
namespace Alchemy\Phrasea\WorkerManager\Subscriber;
use Alchemy\Phrasea\Model\Entities\WorkerRunningUploader;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreateEvent;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationRecordFailureEvent;
@@ -21,11 +22,13 @@ class AssetsIngestSubscriber implements EventSubscriberInterface
public function onAssetsCreate(AssetsCreateEvent $event)
{
// this is an uploader PUSH mode
$payload = [
'message_type' => MessagePublisher::ASSETS_INGEST_TYPE,
'payload' => $event->getData()
'payload' => array_merge($event->getData(), ['type' => WorkerRunningUploader::TYPE_PUSH])
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::ASSETS_INGEST_QUEUE);
}

View File

@@ -5,10 +5,11 @@ namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\EntityManagerAware;
use Alchemy\Phrasea\Application as PhraseaApplication;
use Alchemy\Phrasea\Model\Entities\StoryWZ;
use Alchemy\Phrasea\Model\Entities\WorkerRunningUploader;
use Alchemy\Phrasea\Model\Repositories\UserRepository;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningUploaderRepository;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Model\DBManipulator;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use GuzzleHttp\Client;
@@ -21,6 +22,9 @@ class AssetsIngestWorker implements WorkerInterface
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
/** @var WorkerRunningUploaderRepository $repoWorkerUploader */
private $repoWorkerUploader;
public function __construct(PhraseaApplication $app)
{
$this->app = $app;
@@ -30,8 +34,9 @@ class AssetsIngestWorker implements WorkerInterface
public function process(array $payload)
{
$assets = $payload['assets'];
$this->repoWorkerUploader = $this->getWorkerRunningUploaderRepository();
DBManipulator::saveAssetsList($payload['commit_id'], $assets);
$this->saveAssetsList($payload['commit_id'], $assets, $payload['published'], $payload['type']);
$uploaderClient = new Client(['base_uri' => $payload['base_url']]);
@@ -120,4 +125,42 @@ class AssetsIngestWorker implements WorkerInterface
{
return $this->app['repo.users'];
}
/**
* @return WorkerRunningUploaderRepository
*/
private function getWorkerRunningUploaderRepository()
{
return $this->app['repo.worker-running-uploader'];
}
private function saveAssetsList($commitId, $assetsId, $published, $type)
{
$em = $this->repoWorkerUploader->getEntityManager();
$em->beginTransaction();
$date = new \DateTime();
try {
foreach ($assetsId as $assetId) {
$workerRunningUploader = new WorkerRunningUploader();
$workerRunningUploader
->setCommitId($commitId)
->setAssetId($assetId)
->setPublished($date->setTimestamp($published))
->setStatus(WorkerRunningUploader::RUNNING)
->setType($type)
;
$em->persist($workerRunningUploader);
unset($workerRunningUploader);
}
$em->flush();
$em->commit();
} catch(\Exception $e) {
$em->rollback();
}
}
}

View File

@@ -19,10 +19,11 @@ use Alchemy\Phrasea\Media\SubdefSubstituer;
use Alchemy\Phrasea\Model\Entities\LazaretFile;
use Alchemy\Phrasea\Model\Entities\LazaretSession;
use Alchemy\Phrasea\Model\Entities\User;
use Alchemy\Phrasea\Model\Entities\WorkerRunningUploader;
use Alchemy\Phrasea\Model\Repositories\UserRepository;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningUploaderRepository;
use Alchemy\Phrasea\WorkerManager\Event\AssetsCreationRecordFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Model\DBManipulator;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use GuzzleHttp\Client;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
@@ -41,6 +42,9 @@ class CreateRecordWorker implements WorkerInterface
/** @var MessagePublisher $messagePublisher */
private $messagePublisher;
/** @var WorkerRunningUploaderRepository $repoWorkerUploader */
private $repoWorkerUploader;
public function __construct(PhraseaApplication $app)
{
$this->app = $app;
@@ -50,6 +54,9 @@ class CreateRecordWorker implements WorkerInterface
public function process(array $payload)
{
$this->repoWorkerUploader = $this->getWorkerRunningUploaderRepository();
$em = $this->repoWorkerUploader->getEntityManager();
$uploaderClient = new Client(['base_uri' => $payload['base_url']]);
//get asset informations
@@ -63,6 +70,13 @@ class CreateRecordWorker implements WorkerInterface
$tempfile = $this->getTemporaryFilesystem()->createTemporaryFile('download_', null, pathinfo($body['originalName'], PATHINFO_EXTENSION));
/** @var WorkerRunningUploader $workerRunningUploader */
$workerRunningUploader = $this->repoWorkerUploader->findOneBy([
'commitId' => $payload['commit_id'],
'assetId' => $payload['asset']
]);
//download the asset
try {
$res = $uploaderClient->get('/assets/'.$payload['asset'].'/download', [
@@ -81,6 +95,9 @@ class CreateRecordWorker implements WorkerInterface
$count
));
$em->remove($workerRunningUploader);
$em->flush();
return;
}
@@ -98,13 +115,31 @@ class CreateRecordWorker implements WorkerInterface
$count
));
$em->remove($workerRunningUploader);
$em->flush();
return;
}
$remainingAssets = DBManipulator::updateRemainingAssetsListByCommit($payload['commit_id'], $payload['asset']);
if ($workerRunningUploader != null) {
$em->beginTransaction();
try {
$workerRunningUploader->setStatus(WorkerRunningUploader::DOWNLOADED);
$em->persist($workerRunningUploader);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
}
$canAck = $this->repoWorkerUploader->canAck($payload['commit_id']);
// if all assets in the commit are downloaded , send ack to the uploader
if ($remainingAssets == 0) {
if ($canAck) {
// post ack to the uploader
$uploaderClient->post('/commits/' . $payload['commit_id'] . '/ack', [
'headers' => [
@@ -300,4 +335,12 @@ class CreateRecordWorker implements WorkerInterface
{
return $this->app['subdef.substituer'];
}
/**
* @return WorkerRunningUploaderRepository
*/
private function getWorkerRunningUploaderRepository()
{
return $this->app['repo.worker-running-uploader'];
}
}

View File

@@ -4,11 +4,12 @@ namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Application\Helper\ApplicationBoxAware;
use Alchemy\Phrasea\Application\Helper\DispatcherAware;
use Alchemy\Phrasea\Model\Entities\WorkerRunningPopulate;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningPopulateRepository;
use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\WorkerManager\Event\PopulateIndexFailureEvent;
use Alchemy\Phrasea\WorkerManager\Event\WorkerEvents;
use Alchemy\Phrasea\WorkerManager\Model\DBManipulator;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
class PopulateIndexWorker implements WorkerInterface
@@ -22,15 +23,41 @@ class PopulateIndexWorker implements WorkerInterface
/** @var Indexer $indexer */
private $indexer;
public function __construct(MessagePublisher $messagePublisher, Indexer $indexer)
/** @var WorkerRunningPopulateRepository $repoWorkerPopulate*/
private $repoWorkerPopulate;
public function __construct(MessagePublisher $messagePublisher, Indexer $indexer, WorkerRunningPopulateRepository $repoWorkerPopulate)
{
$this->indexer = $indexer;
$this->messagePublisher = $messagePublisher;
$this->indexer = $indexer;
$this->messagePublisher = $messagePublisher;
$this->repoWorkerPopulate = $repoWorkerPopulate;
}
public function process(array $payload)
{
DBManipulator::savePopulateStatus($payload);
$em = $this->repoWorkerPopulate->getEntityManager();
$em->beginTransaction();
$date = new \DateTime();
try {
$workerRunningPopulate = new WorkerRunningPopulate();
$workerRunningPopulate
->setHost($payload['host'])
->setPort($payload['port'])
->setIndexName($payload['indexName'])
->setDataboxId($payload['databoxId'])
->setPublished($date->setTimestamp($payload['published']))
->setStatus(WorkerRunningPopulate::RUNNING)
;
$em->persist($workerRunningPopulate);
$em->flush();
$em->commit();
} catch (\Exception $e) {
$em->rollback();
}
/** @var ElasticsearchOptions $options */
$options = $this->indexer->getIndex()->getOptions();
@@ -71,7 +98,12 @@ class PopulateIndexWorker implements WorkerInterface
$r['memory']/1048576
));
} catch(\Exception $e) {
DBManipulator::deletePopulateStatus($payload);
if ($workerRunningPopulate != null) {
$em->remove($workerRunningPopulate);
$em->flush();
}
$workerMessage = sprintf("Error on indexing : %s ", $e->getMessage());
$this->messagePublisher->pushLog($workerMessage);
@@ -90,8 +122,17 @@ class PopulateIndexWorker implements WorkerInterface
}
}
// delete entry in populate_running
DBManipulator::deletePopulateStatus($payload);
// tell that the populate is finished
if ($workerRunningPopulate != null) {
$workerRunningPopulate
->setStatus(WorkerRunningPopulate::FINISHED)
->setFinished(new \DateTime('now'))
;
$em->persist($workerRunningPopulate);
$em->flush();
}
}
}

View File

@@ -3,7 +3,8 @@
namespace Alchemy\Phrasea\WorkerManager\Worker;
use Alchemy\Phrasea\Core\Configuration\PropertyAccess;
use Alchemy\Phrasea\WorkerManager\Model\DBManipulator;
use Alchemy\Phrasea\Model\Entities\WorkerRunningUploader;
use Alchemy\Phrasea\Model\Repositories\WorkerRunningUploaderRepository;
use Alchemy\Phrasea\WorkerManager\Queue\MessagePublisher;
use GuzzleHttp\Client;
@@ -12,10 +13,14 @@ class PullAssetsWorker implements WorkerInterface
private $messagePublisher;
private $conf;
public function __construct(MessagePublisher $messagePublisher, PropertyAccess $conf)
/** @var WorkerRunningUploaderRepository $repoWorkerUploader */
private $repoWorkerUploader;
public function __construct(MessagePublisher $messagePublisher, PropertyAccess $conf, WorkerRunningUploaderRepository $repoWorkerUploader)
{
$this->messagePublisher = $messagePublisher;
$this->conf = $conf;
$this->messagePublisher = $messagePublisher;
$this->conf = $conf;
$this->repoWorkerUploader = $repoWorkerUploader;
}
public function process(array $payload)
@@ -62,9 +67,10 @@ class PullAssetsWorker implements WorkerInterface
foreach ($commits as $commit) {
// send only payload in ingest-queue if the commit is ack false and it is not being creating
if (!$commit['acknowledged'] && !DBManipulator::isCommitToBeCreating($commit['id'])) {
if (!$commit['acknowledged'] && !$this->isCommitToBeCreating($commit['id'])) {
$this->messagePublisher->pushLog("A new commit found in the uploader ! commit_ID : ".$commit['id']);
// this is an uploader PULL mode
$payload = [
'message_type' => MessagePublisher::ASSETS_INGEST_TYPE,
'payload' => [
@@ -74,7 +80,8 @@ class PullAssetsWorker implements WorkerInterface
'publisher' => $commit['userId'],
'commit_id' => $commit['id'],
'token' => $commit['token'],
'base_url' => $baseUrl
'base_url' => $baseUrl,
'type' => WorkerRunningUploader::TYPE_PULL
]
];
@@ -134,4 +141,15 @@ class PullAssetsWorker implements WorkerInterface
return $this->conf->get(['workers', 'pull_assets']);
}
/**
* @param $commitId
* @return bool
*/
private function isCommitToBeCreating($commitId)
{
$res = $this->repoWorkerUploader->findBy(['commitId' => $commitId]);
return count($res) != 0;
}
}

View File

@@ -75,9 +75,6 @@ class SubdefCreationWorker implements WorkerInterface
'payload' => $payload
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_SUBDEF_QUEUE);
//
// $message = MessagePublisher::SUBDEF_CREATION_TYPE.' to be re-published! >> Payload ::'. json_encode($payload);
// $this->messagePublisher->pushLog($message);
return ;
}
@@ -176,6 +173,7 @@ class SubdefCreationWorker implements WorkerInterface
$em->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$workerRunningJob->setFinished(new \DateTime('now'));
$em->persist($workerRunningJob);
$em->flush();
$em->commit();

View File

@@ -77,9 +77,6 @@ class WriteMetadatasWorker implements WorkerInterface
];
$this->messagePublisher->publishMessage($payload, MessagePublisher::DELAYED_METADATAS_QUEUE);
// $message = MessagePublisher::WRITE_METADATAS_TYPE.' to be re-published! >> Payload ::'. json_encode($payload);
// $this->messagePublisher->pushLog($message);
return ;
}
@@ -251,6 +248,7 @@ class WriteMetadatasWorker implements WorkerInterface
$em->beginTransaction();
try {
$workerRunningJob->setStatus(WorkerRunningJob::FINISHED);
$workerRunningJob->setFinished(new \DateTime('now'));
$em->persist($workerRunningJob);
$em->flush();
$em->commit();