Merge pull request #1988 from alchemy-fr/jygaulier-PHRAS-714

Improve architecture
This commit is contained in:
Thibaud Fabre
2016-10-18 22:17:32 +02:00
committed by GitHub
6 changed files with 65 additions and 21 deletions

View File

@@ -0,0 +1,17 @@
<?php
/*
* This file is part of phrasea-4.0.
*
* (c) Alchemy <info@alchemy.fr>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Core\Event\Thesaurus;
class ReindexRequiredEvent extends ThesaurusEvent
{
}

View File

@@ -24,4 +24,5 @@ final class ThesaurusEvents
const CONCEPT_DELETED = 'thesaurus.concept-deleted';
const SYNONYM_ADDED = 'thesaurus.synonym-added';
const CONCEPT_ADDED = 'thesaurus.concept-added';
const REINDEX_REQUIRED = 'thesaurus.reindex-required';
}

View File

@@ -12,6 +12,7 @@
namespace Alchemy\Phrasea\Core\Provider;
use Alchemy\Phrasea\Controller\LazyLocator;
use Alchemy\Phrasea\Core\Event\Subscriber\Thesaurus\ReindexRequiredEventSubscriber;
use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions;
use Alchemy\Phrasea\SearchEngine\Elastic\Search\QueryVisitor;
use Alchemy\Phrasea\SearchEngine\SearchEngineLogger;
@@ -111,7 +112,8 @@ class SearchEngineServiceProvider implements ServiceProviderInterface
$app['elasticsearch.record_helper'],
$app['thesaurus'],
array_keys($app['locales.available']),
$logger
$logger,
$app['dispatcher']
);
});

View File

@@ -157,12 +157,12 @@ class Indexer
RecordQueuer::queueRecordsFromCollection($collection);
}
public function indexRecord(record_adapter $record)
public function indexRecord(RecordInterface $record)
{
$this->indexQueue->attach($record);
}
public function deleteRecord(record_adapter $record)
public function deleteRecord(RecordInterface $record)
{
$this->deleteQueue->attach($record);
}
@@ -174,7 +174,7 @@ class Indexer
public function indexScheduledRecords(\databox $databox)
{
$this->apply(function(BulkOperation $bulk) use ($databox) {
$this->recordIndexer->indexScheduled($this, $bulk, $databox);
$this->recordIndexer->indexScheduled($bulk, $databox);
});
}
@@ -189,7 +189,7 @@ class Indexer
}
$this->apply(function(BulkOperation $bulk) {
$this->recordIndexer->index($this, $bulk, $this->indexQueue);
$this->recordIndexer->index($bulk, $this->indexQueue);
$this->recordIndexer->delete($bulk, $this->deleteQueue);
$bulk->flush();
});

View File

@@ -10,6 +10,8 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\Core\Event\Thesaurus\ReindexRequiredEvent;
use Alchemy\Phrasea\Core\Event\Thesaurus\ThesaurusEvents;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegateInterface;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\RecordListFetcherDelegate;
@@ -31,6 +33,7 @@ use databox;
use Iterator;
use Psr\Log\LoggerInterface;
use record_adapter;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class RecordIndexer
{
@@ -49,19 +52,32 @@ class RecordIndexer
private $logger;
/**
* @var EventDispatcherInterface
*/
private $eventDispatcher;
private function getUniqueOperationId($record_key)
{
$_key = dechex(mt_rand());
return $_key . '_' . $record_key;
}
public function __construct(Structure $structure, RecordHelper $helper, Thesaurus $thesaurus, array $locales, LoggerInterface $logger)
public function __construct(
Structure $structure,
RecordHelper $helper,
Thesaurus $thesaurus,
array $locales,
LoggerInterface $logger,
EventDispatcherInterface $eventDispatcher
)
{
$this->structure = $structure;
$this->helper = $helper;
$this->thesaurus = $thesaurus;
$this->locales = $locales;
$this->logger = $logger;
$this->eventDispatcher = $eventDispatcher;
}
/**
@@ -94,11 +110,10 @@ class RecordIndexer
* index whole databox(es), don't test actual "jetons"
* called by command "populate"
*
* @param Indexer $indexer
* @param BulkOperation $bulk
* @param databox $databox
*/
public function populateIndex(Indexer $indexer, BulkOperation $bulk, databox $databox)
public function populateIndex(BulkOperation $bulk, databox $databox)
{
$submited_records = [];
@@ -118,7 +133,7 @@ class RecordIndexer
});
// Perform indexing
$this->indexFromFetcher($indexer, $bulk, $fetcher, $submited_records);
$this->indexFromFetcher($bulk, $fetcher, $submited_records);
$this->logger->info(sprintf('Finished indexing %s', $databox->get_viewname()));
}
@@ -127,11 +142,10 @@ class RecordIndexer
* Index the records flagged as "to_index" on databox
* called by task "indexer"
*
* @param Indexer $indexer
* @param BulkOperation $bulk
* @param databox $databox
*/
public function indexScheduled(Indexer $indexer, BulkOperation $bulk, databox $databox)
public function indexScheduled(BulkOperation $bulk, databox $databox)
{
$submited_records = [];
@@ -154,17 +168,16 @@ class RecordIndexer
});
// Perform indexing
$this->indexFromFetcher($indexer, $bulk, $fetcher, $submited_records);
$this->indexFromFetcher($bulk, $fetcher, $submited_records);
}
/**
* Index a list of records
*
* @param Indexer $indexer
* @param BulkOperation $bulk
* @param Iterator $records
*/
public function index(Indexer $indexer, BulkOperation $bulk, Iterator $records)
public function index(BulkOperation $bulk, Iterator $records)
{
foreach ($this->createFetchersForRecords($records) as $fetcher) {
$submited_records = [];
@@ -182,7 +195,7 @@ class RecordIndexer
});
// Perform indexing
$this->indexFromFetcher($indexer, $bulk, $fetcher, $submited_records);
$this->indexFromFetcher($bulk, $fetcher, $submited_records);
}
}
@@ -261,18 +274,22 @@ class RecordIndexer
return array_values($databoxes);
}
private function indexFromFetcher(Indexer $indexer, BulkOperation $bulk, Fetcher $fetcher, array &$submited_records)
private function indexFromFetcher(BulkOperation $bulk, Fetcher $fetcher, array &$submited_records)
{
$databox = $fetcher->getDatabox();
$first = true;
/** @var record_adapter $record */
while ($record = $fetcher->fetch()) {
if($first) {
if ($first) {
$sql = "SELECT prop FROM pref WHERE prop IN('thesaurus','thesaurus_index')"
. " ORDER BY updated_on DESC, IF(prop='thesaurus', 'a', 'z') DESC LIMIT 1";
if($databox->get_connection()->fetchColumn($sql) == 'thesaurus') {
// the thesaurus was modified, enforce index
$indexer->populateIndex(Indexer::THESAURUS, $databox);
if ($databox->get_connection()->fetchColumn($sql) == 'thesaurus') {
// The thesaurus was modified, enforce index
$this->eventDispatcher->dispatch(
ThesaurusEvents::REINDEX_REQUIRED,
new ReindexRequiredEvent($databox)
);
}
$first = false;
}

View File

@@ -17,6 +17,7 @@ use Alchemy\Phrasea\Core\Event\Record\RecordEvent;
use Alchemy\Phrasea\Core\Event\Record\RecordEvents;
use Alchemy\Phrasea\Core\Event\Record\Structure\RecordStructureEvent;
use Alchemy\Phrasea\Core\Event\Record\Structure\RecordStructureEvents;
use Alchemy\Phrasea\Core\Event\Thesaurus\ReindexRequiredEvent;
use Alchemy\Phrasea\Core\Event\Thesaurus\ThesaurusEvent;
use Alchemy\Phrasea\Core\Event\Thesaurus\ThesaurusEvents;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
@@ -97,7 +98,8 @@ class IndexerSubscriber implements EventSubscriberInterface
ThesaurusEvents::CONCEPT_TRASHED => 'onThesaurusChange',
ThesaurusEvents::CONCEPT_DELETED => 'onThesaurusChange',
ThesaurusEvents::SYNONYM_ADDED => 'onThesaurusChange',
ThesaurusEvents::CONCEPT_ADDED => 'onThesaurusChange'
ThesaurusEvents::CONCEPT_ADDED => 'onThesaurusChange',
ThesaurusEvents::REINDEX_REQUIRED => 'onReindexRequired'
];
}
@@ -138,4 +140,9 @@ class IndexerSubscriber implements EventSubscriberInterface
$this->getIndexer()->flushQueue();
}
}
public function onReindexRequired(ReindexRequiredEvent $event)
{
$this->getIndexer()->populateIndex(Indexer::THESAURUS, $event->getDatabox());
}
}