PHRAS-714_thesaurus_indexation

- new : if the thesaurus has been modified since it's last indexation, it will be re-indexed as soon a record is to be indexed.
 - nb : after thesaurus modification, the databox MUST be manually re-indexed (which will reindex the thesaurus before)
 - todo : remove useless dialog boxes from thesaurus app (... "this term was..."  "...add with/out reindexing ?..."). To be done in the future app using future routes.
This commit is contained in:
Jean-Yves Gaulier
2016-10-12 18:42:16 +02:00
parent 15d5780201
commit cf40c99ea7
9 changed files with 162 additions and 121 deletions

View File

@@ -59,8 +59,13 @@ class IndexPopulateCommand extends Command
throw new \RuntimeException("Could not provide --thesaurus and --records option at the same time.");
}
$databoxes = $input->getOption('databox_id');
$databoxes_id = $input->getOption('databox_id');
$this->container['elasticsearch.indexer']->populateIndex($what, $databoxes);
$app = $this->container;
foreach($app->getDataboxes() as $databox) {
if(!$databoxes_id || in_array($databox->get_sbas_id(), $databoxes_id)) {
$this->container['elasticsearch.indexer']->populateIndex($what, $databox);
}
}
}
}

View File

@@ -89,12 +89,17 @@ class SearchEngineServiceProvider implements ServiceProviderInterface
$app['elasticsearch.options'],
$app['elasticsearch.indexer.term_indexer'],
$app['elasticsearch.indexer.record_indexer'],
$app['phraseanet.appbox']
$app['phraseanet.appbox'],
new Logger('es.indexer')
);
});
$app['elasticsearch.indexer.term_indexer'] = $app->share(function ($app) {
return new TermIndexer($app['phraseanet.appbox'], array_keys($app['locales.available']));
return new TermIndexer(
$app['phraseanet.appbox'],
array_keys($app['locales.available']),
new Logger('term.indexer')
);
});
$app['elasticsearch.indexer.record_indexer'] = $app->share(function ($app) {

View File

@@ -22,6 +22,7 @@ use Elasticsearch\Client;
use Psr\Log\LoggerInterface;
use igorw;
use Psr\Log\NullLogger;
use record_adapter;
use Symfony\Component\Stopwatch\Stopwatch;
use SplObjectStorage;
@@ -41,7 +42,9 @@ class Indexer
private $recordIndexer;
private $termIndexer;
private $indexQueue; // contains RecordInterface(s)
/** @var SplObjectStorage */
private $indexQueue; // contains record_adapter(s)
/** @var SplObjectStorage */
private $deleteQueue;
public function __construct(Client $client, ElasticsearchOptions $options, TermIndexer $termIndexer, RecordIndexer $recordIndexer, appbox $appbox, LoggerInterface $logger = null)
@@ -98,21 +101,15 @@ class Indexer
return $this->client->indices()->exists($params);
}
public function populateIndex($what, array $databoxes_id = [])
public function populateIndex($what, \databox $databox)
{
$stopwatch = new Stopwatch();
$stopwatch->start('populate');
if ($databoxes_id) {
// If databoxes are given, only use those
$databoxes = array_map(array($this->appbox, 'get_databox'), $databoxes_id);
} else {
$databoxes = $this->appbox->get_databoxes();
}
$this->apply(function (BulkOperation $bulk) use ($what, $databox) {
$this->apply(function(BulkOperation $bulk) use ($what, $databoxes) {
if ($what & self::THESAURUS) {
$this->termIndexer->populateIndex($bulk, $databoxes);
$this->termIndexer->populateIndex($bulk, $databox);
// Record indexing depends on indexed terms so we need to make
// everything ready to search
@@ -121,7 +118,7 @@ class Indexer
}
if ($what & self::RECORDS) {
$this->recordIndexer->populateIndex($bulk, $databoxes);
$this->recordIndexer->populateIndex($this, $bulk, $databox);
// Final flush
$bulk->flush();
@@ -133,7 +130,7 @@ class Indexer
});
$event = $stopwatch->stop('populate');
printf("Indexation finished in %s min (Mem. %s Mo)", ($event->getDuration()/1000/60), bcdiv($event->getMemory(), 1048576, 2));
$this->logger->info(sprintf("Indexation finished in %0.02f sec (Mem. %0.02f Mo)", ($event->getDuration()/1000), $event->getMemory()/1048576));
}
public function migrateMappingForDatabox($databox)
@@ -160,24 +157,24 @@ class Indexer
RecordQueuer::queueRecordsFromCollection($collection);
}
public function indexRecord(RecordInterface $record)
public function indexRecord(record_adapter $record)
{
$this->indexQueue->attach($record);
}
public function deleteRecord(RecordInterface $record)
public function deleteRecord(record_adapter $record)
{
$this->deleteQueue->attach($record);
}
/**
* @param \databox[] $databoxes databoxes to index
* @param \databox $databox databox to index
* @throws \Exception
*/
public function indexScheduledRecords(array $databoxes)
public function indexScheduledRecords(\databox $databox)
{
$this->apply(function(BulkOperation $bulk) use($databoxes) {
$this->recordIndexer->indexScheduled($bulk, $databoxes);
$this->apply(function(BulkOperation $bulk) use ($databox) {
$this->recordIndexer->indexScheduled($this, $bulk, $databox);
});
}
@@ -192,7 +189,7 @@ class Indexer
}
$this->apply(function(BulkOperation $bulk) {
$this->recordIndexer->index($bulk, $this->indexQueue);
$this->recordIndexer->index($this, $bulk, $this->indexQueue);
$this->recordIndexer->delete($bulk, $this->deleteQueue);
$bulk->flush();
});

View File

@@ -10,7 +10,7 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\Model\RecordInterface;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegateInterface;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\RecordListFetcherDelegate;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\ScheduledFetcherDelegate;
@@ -30,6 +30,7 @@ use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\CandidateTerms;
use databox;
use Iterator;
use Psr\Log\LoggerInterface;
use record_adapter;
class RecordIndexer
{
@@ -93,52 +94,44 @@ class RecordIndexer
* index whole databox(es), don't test actual "jetons"
* called by command "populate"
*
* @param Indexer $indexer
* @param BulkOperation $bulk
* @param databox[] $databoxes
* @param databox $databox
*/
public function populateIndex(BulkOperation $bulk, array $databoxes)
public function populateIndex(Indexer $indexer, BulkOperation $bulk, databox $databox)
{
foreach ($databoxes as $databox) {
$submited_records = [];
$submited_records = [];
$this->logger->info(sprintf('Indexing database %s...', $databox->get_viewname()));
$this->logger->info(sprintf('Indexing database %s...', $databox->get_viewname()));
$fetcher = $this->createFetcherForDatabox($databox); // no delegate, scan the whole records
$fetcher = $this->createFetcherForDatabox($databox); // no delegate, scan the whole records
// post fetch : flag records as "indexing"
$fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) {
RecordQueuer::didStartIndexingRecords($records, $databox);
// do not restart the fetcher since it has no clause on jetons
});
// post fetch : flag records as "indexing"
$fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) {
RecordQueuer::didStartIndexingRecords($records, $databox);
// do not restart the fetcher since it has no clause on jetons
});
// bulk flush : flag records as "indexed"
$bulk->onFlush(function($operation_identifiers) use ($databox, &$submited_records) {
$this->onBulkFlush($databox, $operation_identifiers, $submited_records);
});
// bulk flush : flag records as "indexed"
$bulk->onFlush(function($operation_identifiers) use ($databox, &$submited_records) {
$this->onBulkFlush($databox, $operation_identifiers, $submited_records);
});
// Perform indexing
$this->indexFromFetcher($indexer, $bulk, $fetcher, $submited_records);
// Perform indexing
$this->indexFromFetcher($bulk, $fetcher, $submited_records);
$this->logger->info(sprintf('Finished indexing %s', $databox->get_viewname()));
}
$this->logger->info(sprintf('Finished indexing %s', $databox->get_viewname()));
}
/**
* Index the records flagged as "to_index" on databoxes
* Index the records flagged as "to_index" on databox
* called by task "indexer"
*
* @param Indexer $indexer
* @param BulkOperation $bulk
* @param databox[] $databoxes
* @param databox $databox
*/
public function indexScheduled(BulkOperation $bulk, array $databoxes)
{
foreach ($databoxes as $databox) {
$this->indexScheduledInDatabox($bulk, $databox);
}
}
private function indexScheduledInDatabox(BulkOperation $bulk, databox $databox)
public function indexScheduled(Indexer $indexer, BulkOperation $bulk, databox $databox)
{
$submited_records = [];
@@ -148,6 +141,7 @@ class RecordIndexer
// post fetch : flag records as "indexing"
$fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) {
$this->logger->debug(sprintf("indexing %d records", count($records)));
RecordQueuer::didStartIndexingRecords($records, $databox);
// because changing the flag on the records affects the "where" clause of the fetcher,
// restart it each time
@@ -160,16 +154,17 @@ class RecordIndexer
});
// Perform indexing
$this->indexFromFetcher($bulk, $fetcher, $submited_records);
$this->indexFromFetcher($indexer, $bulk, $fetcher, $submited_records);
}
/**
* Index a list of records
*
* @param Indexer $indexer
* @param BulkOperation $bulk
* @param Iterator $records
*/
public function index(BulkOperation $bulk, Iterator $records)
public function index(Indexer $indexer, BulkOperation $bulk, Iterator $records)
{
foreach ($this->createFetchersForRecords($records) as $fetcher) {
$submited_records = [];
@@ -187,7 +182,7 @@ class RecordIndexer
});
// Perform indexing
$this->indexFromFetcher($bulk, $fetcher, $submited_records);
$this->indexFromFetcher($indexer, $bulk, $fetcher, $submited_records);
}
}
@@ -227,14 +222,18 @@ class RecordIndexer
{
$connection = $databox->get_connection();
$candidateTerms = new CandidateTerms($databox);
$fetcher = new Fetcher($databox, array(
new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->helper),
new TitleHydrator($connection),
new MetadataHydrator($connection, $this->structure, $this->helper),
new FlagHydrator($this->structure, $databox),
new ThesaurusHydrator($this->structure, $this->thesaurus, $candidateTerms),
new SubDefinitionHydrator($connection)
), $delegate);
$fetcher = new Fetcher(
$databox,
array(
new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->helper),
new TitleHydrator($connection),
new MetadataHydrator($connection, $this->structure, $this->helper),
new FlagHydrator($this->structure, $databox),
new ThesaurusHydrator($this->structure, $this->thesaurus, $candidateTerms),
new SubDefinitionHydrator($connection)
),
$delegate
);
$fetcher->setBatchSize(200);
$fetcher->onDrain(function() use ($candidateTerms) {
$candidateTerms->save();
@@ -247,21 +246,41 @@ class RecordIndexer
{
$databoxes = array();
foreach ($records as $record) {
$databox = $record->get_databox();
$hash = spl_object_hash($databox);
$databoxes[$hash]['databox'] = $databox;
$databoxes[$hash]['records'][] = $record;
/** @var record_adapter $record */
$databox = $record->getDatabox();
$k = $databox->get_sbas_id();
if(!array_key_exists($k, $databoxes)) {
$databoxes[$k] = [
'databox' => $databox,
'records' => []
];
}
$databoxes[$k]['records'][] = $record;
}
return array_values($databoxes);
}
private function indexFromFetcher(BulkOperation $bulk, Fetcher $fetcher, array &$submited_records)
private function indexFromFetcher(Indexer $indexer, BulkOperation $bulk, Fetcher $fetcher, array &$submited_records)
{
/** @var RecordInterface $record */
$databox = $fetcher->getDatabox();
$first = true;
/** @var record_adapter $record */
while ($record = $fetcher->fetch()) {
if($first) {
$sql = "SELECT prop FROM pref WHERE prop IN('thesaurus','thesaurus_index')"
. " ORDER BY updated_on DESC, IF(prop='thesaurus', 'a', 'z') DESC LIMIT 1";
if($databox->get_connection()->fetchColumn($sql) == 'thesaurus') {
// the thesaurus was modified, enforce index
$indexer->populateIndex(Indexer::THESAURUS, $databox);
}
$first = false;
}
$op_identifier = $this->getUniqueOperationId($record['id']);
$this->logger->debug(sprintf("indexing record %s of databox %s", $record['record_id'], $databox->get_sbas_id()));
$params = array();
$params['id'] = $record['id'];
unset($record['id']);

View File

@@ -11,13 +11,12 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\BulkOperation;
use Alchemy\Phrasea\SearchEngine\Elastic\Mapping;
use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\Helper;
use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\Navigator;
use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\TermVisitor;
use databox;
use DOMDocument;
use Psr\Log\LoggerInterface;
class TermIndexer
{
@@ -30,42 +29,54 @@ class TermIndexer
private $navigator;
private $locales;
private $logger;
public function __construct(\appbox $appbox, array $locales)
public function __construct(\appbox $appbox, array $locales, LoggerInterface $logger)
{
$this->appbox = $appbox;
$this->navigator = new Navigator();
$this->locales = $locales;
$this->logger = $logger;
}
public function populateIndex(BulkOperation $bulk, array $databoxes)
public function populateIndex(BulkOperation $bulk, databox $databox)
{
foreach ($databoxes as $databox) {
/** @var databox $databox */
$databoxId = $databox->get_sbas_id();
$databoxId = $databox->get_sbas_id();
$visitor = new TermVisitor(function ($term) use ($bulk, $databoxId) {
// Path and id are prefixed with a databox identifier to not
// collide with other databoxes terms
$visitor = new TermVisitor(function ($term) use ($bulk, $databoxId) {
// Path and id are prefixed with a databox identifier to not
// collide with other databoxes terms
// Term structure
$id = sprintf('%s_%s', $databoxId, $term['id']);
unset($term['id']);
$term['path'] = sprintf('/%s%s', $databoxId, $term['path']);
$term['databox_id'] = $databoxId;
// Term structure
$id = sprintf('%s_%s', $databoxId, $term['id']);
unset($term['id']);
$term['path'] = sprintf('/%s%s', $databoxId, $term['path']);
// Index request
$params = array();
$params['id'] = $id;
$params['type'] = self::TYPE_NAME;
$params['body'] = $term;
$this->logger->debug(sprintf("Indexing term \"%s\"", $term['path']));
$bulk->index($params, null);
});
$term['databox_id'] = $databoxId;
$document = Helper::thesaurusFromDatabox($databox);
$this->navigator->walk($document, $visitor);
}
// Index request
$params = array();
$params['id'] = $id;
$params['type'] = self::TYPE_NAME;
$params['body'] = $term;
$bulk->index($params, null);
});
$indexDate = $databox->get_connection()->fetchColumn("SELECT updated_on FROM pref WHERE prop='thesaurus'");
$document = Helper::thesaurusFromDatabox($databox);
$this->navigator->walk($document, $visitor);
$databox->get_connection()->executeUpdate(
"INSERT INTO pref (prop, value, locale, updated_on, created_on)"
. " VALUES ('thesaurus_index', '', '-', ?, NOW())"
. " ON DUPLICATE KEY UPDATE updated_on=?",
[$indexDate, $indexDate]
);
}
public function getMapping()

View File

@@ -110,7 +110,7 @@ class IndexerSubscriber implements EventSubscriberInterface
public function onThesaurusChange(ThesaurusEvent $event)
{
$databox = $event->getDatabox();
$this->getIndexer()->scheduleRecordsFromDataboxForIndexing($databox);
$databox->delete_data_from_cache(\databox::CACHE_THESAURUS);
}
public function onCollectionChange(CollectionEvent $event)

View File

@@ -67,6 +67,10 @@ class Helper
return $parents;
}
/**
* @param databox $databox
* @return DOMDocument
*/
public static function thesaurusFromDatabox(databox $databox)
{
return self::document($databox->get_dom_thesaurus());
@@ -89,6 +93,10 @@ class Helper
return $document;
}
/**
* @param $document
* @return DOMDocument
*/
private static function document($document)
{
if (!$document) {

View File

@@ -11,6 +11,10 @@ namespace Alchemy\Phrasea\TaskManager\Job;
use Alchemy\Phrasea\TaskManager\Editor\IndexerEditor;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\Core\Version;
use Silex\Application;
use Psr\Log\LoggerInterface;
class IndexerJob extends AbstractJob
{
@@ -47,12 +51,16 @@ class IndexerJob extends AbstractJob
*/
protected function doJob(JobData $data)
{
$app = $data->getApplication();
/** @var Indexer $indexer */
$indexer = $app['elasticsearch.indexer'];
$databoxes = array_filter($app->getDataboxes(), function (\databox $databox) use ($app) {
return $app->getApplicationBox()->is_databox_indexable($databox);
});
$indexer->indexScheduledRecords($databoxes);
foreach($app->getDataboxes() as $databox) {
if($app->getApplicationBox()->is_databox_indexable($databox)) {
$indexer->indexScheduledRecords($databox);
}
}
}
}
}

View File

@@ -14,7 +14,7 @@
{% set opener = 'opener' %}
{% endif %}
{% if context is not none %}
{% if context %}
{% set zterm %}
{% trans with {'%term%' : term, '%context%' : context} %}thesaurus:: le terme %term% avec contexte %context%{% endtrans %}
{% endset %}
@@ -57,14 +57,13 @@
</center>
{% else %}
{% if nb_candidates_bad > 0 %}
// present dans les candidats, mais aucun champ acceptable : on informe
{% set prop_label = 'thesaurus:: est candidat en provenance des champs mais ne peut etre accepte a cet emplacement du thesaurus' | trans %}
{% else %}
// pas present dans les candidats
{% set prop_label = 'thesaurus:: n\'est pas present dans les candidats' | trans %}
{% endif %}
<br/>
<h3>{{ 'thesaurus:: attention :' | trans }}</h3>
<br/>
<br/>
<br/>
<br/>
{{ zterm }}
@@ -72,14 +71,10 @@
<br/>
{{ prop_label }}
<br/>
<br/>
<br/>
<br/>
<form>
<center>
<div class='x3Dbox' style='margin:15px; height:90px; overflow:auto;'>
<input type="radio" name="reindex" value="0" id="rad0" checked><label for="rad0">{{ 'thesaurus:: Ajouter le terme dans reindexer' | trans }}</label><br/>
<br/>
<input type="radio" name="reindex" value="1" id="rad1"><label for="rad1">{{ 'thesaurus:: ajouter le terme et reindexer' | trans }}</label><br/>
</div>
</center>
<input type="button" id="cancel_button" value="{{ 'boutton::annuler' | trans }}" onclick="clkBut('cancel');" style="width:100px;">
&nbsp;&nbsp;&nbsp;
<input type="button" id="submit_button" value="{{ 'boutton::valider' | trans }}" onclick="clkBut('submit');" style="width:100px;">
@@ -159,15 +154,8 @@
parms += "&k={{ context | url_encode }}";
{% endif %}
parms += "&sylng={{ sylng }}";
parms += "&reindex=0";
for(i=0; i<(n=document.getElementsByName("reindex")).length; i++)
{
if(n[i].checked)
{
parms += "&reindex=" + encodeURIComponent(n[i].value);
break;
}
}
ret = loadXMLDoc(url, parms, true);
refresh = ret.getElementsByTagName("refresh");
for(i=0; i<refresh.length; i++)