Revert "PHRAS-714_thesaurus_indexation"

This commit is contained in:
Thibaud Fabre
2016-10-18 20:15:23 +02:00
committed by GitHub
parent eba83584f4
commit ba8be47423
9 changed files with 121 additions and 162 deletions

View File

@@ -59,13 +59,8 @@ class IndexPopulateCommand extends Command
throw new \RuntimeException("Could not provide --thesaurus and --records option at the same time."); throw new \RuntimeException("Could not provide --thesaurus and --records option at the same time.");
} }
$databoxes_id = $input->getOption('databox_id'); $databoxes = $input->getOption('databox_id');
$app = $this->container; $this->container['elasticsearch.indexer']->populateIndex($what, $databoxes);
foreach($app->getDataboxes() as $databox) {
if(!$databoxes_id || in_array($databox->get_sbas_id(), $databoxes_id)) {
$this->container['elasticsearch.indexer']->populateIndex($what, $databox);
}
}
} }
} }

View File

@@ -89,17 +89,12 @@ class SearchEngineServiceProvider implements ServiceProviderInterface
$app['elasticsearch.options'], $app['elasticsearch.options'],
$app['elasticsearch.indexer.term_indexer'], $app['elasticsearch.indexer.term_indexer'],
$app['elasticsearch.indexer.record_indexer'], $app['elasticsearch.indexer.record_indexer'],
$app['phraseanet.appbox'], $app['phraseanet.appbox']
new Logger('es.indexer')
); );
}); });
$app['elasticsearch.indexer.term_indexer'] = $app->share(function ($app) { $app['elasticsearch.indexer.term_indexer'] = $app->share(function ($app) {
return new TermIndexer( return new TermIndexer($app['phraseanet.appbox'], array_keys($app['locales.available']));
$app['phraseanet.appbox'],
array_keys($app['locales.available']),
new Logger('term.indexer')
);
}); });
$app['elasticsearch.indexer.record_indexer'] = $app->share(function ($app) { $app['elasticsearch.indexer.record_indexer'] = $app->share(function ($app) {

View File

@@ -22,7 +22,6 @@ use Elasticsearch\Client;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use igorw; use igorw;
use Psr\Log\NullLogger; use Psr\Log\NullLogger;
use record_adapter;
use Symfony\Component\Stopwatch\Stopwatch; use Symfony\Component\Stopwatch\Stopwatch;
use SplObjectStorage; use SplObjectStorage;
@@ -42,9 +41,7 @@ class Indexer
private $recordIndexer; private $recordIndexer;
private $termIndexer; private $termIndexer;
/** @var SplObjectStorage */ private $indexQueue; // contains RecordInterface(s)
private $indexQueue; // contains record_adapter(s)
/** @var SplObjectStorage */
private $deleteQueue; private $deleteQueue;
public function __construct(Client $client, ElasticsearchOptions $options, TermIndexer $termIndexer, RecordIndexer $recordIndexer, appbox $appbox, LoggerInterface $logger = null) public function __construct(Client $client, ElasticsearchOptions $options, TermIndexer $termIndexer, RecordIndexer $recordIndexer, appbox $appbox, LoggerInterface $logger = null)
@@ -101,15 +98,21 @@ class Indexer
return $this->client->indices()->exists($params); return $this->client->indices()->exists($params);
} }
public function populateIndex($what, \databox $databox) public function populateIndex($what, array $databoxes_id = [])
{ {
$stopwatch = new Stopwatch(); $stopwatch = new Stopwatch();
$stopwatch->start('populate'); $stopwatch->start('populate');
$this->apply(function (BulkOperation $bulk) use ($what, $databox) { if ($databoxes_id) {
// If databoxes are given, only use those
$databoxes = array_map(array($this->appbox, 'get_databox'), $databoxes_id);
} else {
$databoxes = $this->appbox->get_databoxes();
}
$this->apply(function(BulkOperation $bulk) use ($what, $databoxes) {
if ($what & self::THESAURUS) { if ($what & self::THESAURUS) {
$this->termIndexer->populateIndex($bulk, $databox); $this->termIndexer->populateIndex($bulk, $databoxes);
// Record indexing depends on indexed terms so we need to make // Record indexing depends on indexed terms so we need to make
// everything ready to search // everything ready to search
@@ -118,7 +121,7 @@ class Indexer
} }
if ($what & self::RECORDS) { if ($what & self::RECORDS) {
$this->recordIndexer->populateIndex($this, $bulk, $databox); $this->recordIndexer->populateIndex($bulk, $databoxes);
// Final flush // Final flush
$bulk->flush(); $bulk->flush();
@@ -130,7 +133,7 @@ class Indexer
}); });
$event = $stopwatch->stop('populate'); $event = $stopwatch->stop('populate');
$this->logger->info(sprintf("Indexation finished in %0.02f sec (Mem. %0.02f Mo)", ($event->getDuration()/1000), $event->getMemory()/1048576)); printf("Indexation finished in %s min (Mem. %s Mo)", ($event->getDuration()/1000/60), bcdiv($event->getMemory(), 1048576, 2));
} }
public function migrateMappingForDatabox($databox) public function migrateMappingForDatabox($databox)
@@ -157,24 +160,24 @@ class Indexer
RecordQueuer::queueRecordsFromCollection($collection); RecordQueuer::queueRecordsFromCollection($collection);
} }
public function indexRecord(record_adapter $record) public function indexRecord(RecordInterface $record)
{ {
$this->indexQueue->attach($record); $this->indexQueue->attach($record);
} }
public function deleteRecord(record_adapter $record) public function deleteRecord(RecordInterface $record)
{ {
$this->deleteQueue->attach($record); $this->deleteQueue->attach($record);
} }
/** /**
* @param \databox $databox databox to index * @param \databox[] $databoxes databoxes to index
* @throws \Exception * @throws \Exception
*/ */
public function indexScheduledRecords(\databox $databox) public function indexScheduledRecords(array $databoxes)
{ {
$this->apply(function(BulkOperation $bulk) use ($databox) { $this->apply(function(BulkOperation $bulk) use($databoxes) {
$this->recordIndexer->indexScheduled($this, $bulk, $databox); $this->recordIndexer->indexScheduled($bulk, $databoxes);
}); });
} }
@@ -189,7 +192,7 @@ class Indexer
} }
$this->apply(function(BulkOperation $bulk) { $this->apply(function(BulkOperation $bulk) {
$this->recordIndexer->index($this, $bulk, $this->indexQueue); $this->recordIndexer->index($bulk, $this->indexQueue);
$this->recordIndexer->delete($bulk, $this->deleteQueue); $this->recordIndexer->delete($bulk, $this->deleteQueue);
$bulk->flush(); $bulk->flush();
}); });

View File

@@ -10,7 +10,7 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer; namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer; use Alchemy\Phrasea\Model\RecordInterface;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegateInterface; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegateInterface;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\RecordListFetcherDelegate; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\RecordListFetcherDelegate;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\ScheduledFetcherDelegate; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\ScheduledFetcherDelegate;
@@ -30,7 +30,6 @@ use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\CandidateTerms;
use databox; use databox;
use Iterator; use Iterator;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use record_adapter;
class RecordIndexer class RecordIndexer
{ {
@@ -94,12 +93,13 @@ class RecordIndexer
* index whole databox(es), don't test actual "jetons" * index whole databox(es), don't test actual "jetons"
* called by command "populate" * called by command "populate"
* *
* @param Indexer $indexer
* @param BulkOperation $bulk * @param BulkOperation $bulk
* @param databox $databox * @param databox[] $databoxes
*/ */
public function populateIndex(Indexer $indexer, BulkOperation $bulk, databox $databox) public function populateIndex(BulkOperation $bulk, array $databoxes)
{ {
foreach ($databoxes as $databox) {
$submited_records = []; $submited_records = [];
$this->logger->info(sprintf('Indexing database %s...', $databox->get_viewname())); $this->logger->info(sprintf('Indexing database %s...', $databox->get_viewname()));
@@ -118,20 +118,27 @@ class RecordIndexer
}); });
// Perform indexing // Perform indexing
$this->indexFromFetcher($indexer, $bulk, $fetcher, $submited_records); $this->indexFromFetcher($bulk, $fetcher, $submited_records);
$this->logger->info(sprintf('Finished indexing %s', $databox->get_viewname())); $this->logger->info(sprintf('Finished indexing %s', $databox->get_viewname()));
} }
}
/** /**
* Index the records flagged as "to_index" on databox * Index the records flagged as "to_index" on databoxes
* called by task "indexer" * called by task "indexer"
* *
* @param Indexer $indexer
* @param BulkOperation $bulk * @param BulkOperation $bulk
* @param databox $databox * @param databox[] $databoxes
*/ */
public function indexScheduled(Indexer $indexer, BulkOperation $bulk, databox $databox) public function indexScheduled(BulkOperation $bulk, array $databoxes)
{
foreach ($databoxes as $databox) {
$this->indexScheduledInDatabox($bulk, $databox);
}
}
private function indexScheduledInDatabox(BulkOperation $bulk, databox $databox)
{ {
$submited_records = []; $submited_records = [];
@@ -141,7 +148,6 @@ class RecordIndexer
// post fetch : flag records as "indexing" // post fetch : flag records as "indexing"
$fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) { $fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) {
$this->logger->debug(sprintf("indexing %d records", count($records)));
RecordQueuer::didStartIndexingRecords($records, $databox); RecordQueuer::didStartIndexingRecords($records, $databox);
// because changing the flag on the records affects the "where" clause of the fetcher, // because changing the flag on the records affects the "where" clause of the fetcher,
// restart it each time // restart it each time
@@ -154,17 +160,16 @@ class RecordIndexer
}); });
// Perform indexing // Perform indexing
$this->indexFromFetcher($indexer, $bulk, $fetcher, $submited_records); $this->indexFromFetcher($bulk, $fetcher, $submited_records);
} }
/** /**
* Index a list of records * Index a list of records
* *
* @param Indexer $indexer
* @param BulkOperation $bulk * @param BulkOperation $bulk
* @param Iterator $records * @param Iterator $records
*/ */
public function index(Indexer $indexer, BulkOperation $bulk, Iterator $records) public function index(BulkOperation $bulk, Iterator $records)
{ {
foreach ($this->createFetchersForRecords($records) as $fetcher) { foreach ($this->createFetchersForRecords($records) as $fetcher) {
$submited_records = []; $submited_records = [];
@@ -182,7 +187,7 @@ class RecordIndexer
}); });
// Perform indexing // Perform indexing
$this->indexFromFetcher($indexer, $bulk, $fetcher, $submited_records); $this->indexFromFetcher($bulk, $fetcher, $submited_records);
} }
} }
@@ -222,18 +227,14 @@ class RecordIndexer
{ {
$connection = $databox->get_connection(); $connection = $databox->get_connection();
$candidateTerms = new CandidateTerms($databox); $candidateTerms = new CandidateTerms($databox);
$fetcher = new Fetcher( $fetcher = new Fetcher($databox, array(
$databox,
array(
new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->helper), new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->helper),
new TitleHydrator($connection), new TitleHydrator($connection),
new MetadataHydrator($connection, $this->structure, $this->helper), new MetadataHydrator($connection, $this->structure, $this->helper),
new FlagHydrator($this->structure, $databox), new FlagHydrator($this->structure, $databox),
new ThesaurusHydrator($this->structure, $this->thesaurus, $candidateTerms), new ThesaurusHydrator($this->structure, $this->thesaurus, $candidateTerms),
new SubDefinitionHydrator($connection) new SubDefinitionHydrator($connection)
), ), $delegate);
$delegate
);
$fetcher->setBatchSize(200); $fetcher->setBatchSize(200);
$fetcher->onDrain(function() use ($candidateTerms) { $fetcher->onDrain(function() use ($candidateTerms) {
$candidateTerms->save(); $candidateTerms->save();
@@ -246,41 +247,21 @@ class RecordIndexer
{ {
$databoxes = array(); $databoxes = array();
foreach ($records as $record) { foreach ($records as $record) {
/** @var record_adapter $record */ $databox = $record->get_databox();
$databox = $record->getDatabox(); $hash = spl_object_hash($databox);
$k = $databox->get_sbas_id(); $databoxes[$hash]['databox'] = $databox;
if(!array_key_exists($k, $databoxes)) { $databoxes[$hash]['records'][] = $record;
$databoxes[$k] = [
'databox' => $databox,
'records' => []
];
}
$databoxes[$k]['records'][] = $record;
} }
return array_values($databoxes); return array_values($databoxes);
} }
private function indexFromFetcher(Indexer $indexer, BulkOperation $bulk, Fetcher $fetcher, array &$submited_records) private function indexFromFetcher(BulkOperation $bulk, Fetcher $fetcher, array &$submited_records)
{ {
$databox = $fetcher->getDatabox(); /** @var RecordInterface $record */
$first = true;
/** @var record_adapter $record */
while ($record = $fetcher->fetch()) { while ($record = $fetcher->fetch()) {
if($first) {
$sql = "SELECT prop FROM pref WHERE prop IN('thesaurus','thesaurus_index')"
. " ORDER BY updated_on DESC, IF(prop='thesaurus', 'a', 'z') DESC LIMIT 1";
if($databox->get_connection()->fetchColumn($sql) == 'thesaurus') {
// the thesaurus was modified, enforce index
$indexer->populateIndex(Indexer::THESAURUS, $databox);
}
$first = false;
}
$op_identifier = $this->getUniqueOperationId($record['id']); $op_identifier = $this->getUniqueOperationId($record['id']);
$this->logger->debug(sprintf("indexing record %s of databox %s", $record['record_id'], $databox->get_sbas_id()));
$params = array(); $params = array();
$params['id'] = $record['id']; $params['id'] = $record['id'];
unset($record['id']); unset($record['id']);

View File

@@ -11,12 +11,13 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer; namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\BulkOperation;
use Alchemy\Phrasea\SearchEngine\Elastic\Mapping; use Alchemy\Phrasea\SearchEngine\Elastic\Mapping;
use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\Helper; use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\Helper;
use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\Navigator; use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\Navigator;
use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\TermVisitor; use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\TermVisitor;
use databox; use databox;
use Psr\Log\LoggerInterface; use DOMDocument;
class TermIndexer class TermIndexer
{ {
@@ -29,18 +30,18 @@ class TermIndexer
private $navigator; private $navigator;
private $locales; private $locales;
private $logger;
public function __construct(\appbox $appbox, array $locales, LoggerInterface $logger) public function __construct(\appbox $appbox, array $locales)
{ {
$this->appbox = $appbox; $this->appbox = $appbox;
$this->navigator = new Navigator(); $this->navigator = new Navigator();
$this->locales = $locales; $this->locales = $locales;
$this->logger = $logger;
} }
public function populateIndex(BulkOperation $bulk, databox $databox) public function populateIndex(BulkOperation $bulk, array $databoxes)
{ {
foreach ($databoxes as $databox) {
/** @var databox $databox */
$databoxId = $databox->get_sbas_id(); $databoxId = $databox->get_sbas_id();
$visitor = new TermVisitor(function ($term) use ($bulk, $databoxId) { $visitor = new TermVisitor(function ($term) use ($bulk, $databoxId) {
@@ -51,9 +52,6 @@ class TermIndexer
$id = sprintf('%s_%s', $databoxId, $term['id']); $id = sprintf('%s_%s', $databoxId, $term['id']);
unset($term['id']); unset($term['id']);
$term['path'] = sprintf('/%s%s', $databoxId, $term['path']); $term['path'] = sprintf('/%s%s', $databoxId, $term['path']);
$this->logger->debug(sprintf("Indexing term \"%s\"", $term['path']));
$term['databox_id'] = $databoxId; $term['databox_id'] = $databoxId;
// Index request // Index request
@@ -65,18 +63,9 @@ class TermIndexer
$bulk->index($params, null); $bulk->index($params, null);
}); });
$indexDate = $databox->get_connection()->fetchColumn("SELECT updated_on FROM pref WHERE prop='thesaurus'");
$document = Helper::thesaurusFromDatabox($databox); $document = Helper::thesaurusFromDatabox($databox);
$this->navigator->walk($document, $visitor); $this->navigator->walk($document, $visitor);
}
$databox->get_connection()->executeUpdate(
"INSERT INTO pref (prop, value, locale, updated_on, created_on)"
. " VALUES ('thesaurus_index', '', '-', ?, NOW())"
. " ON DUPLICATE KEY UPDATE updated_on=?",
[$indexDate, $indexDate]
);
} }
public function getMapping() public function getMapping()

View File

@@ -110,7 +110,7 @@ class IndexerSubscriber implements EventSubscriberInterface
public function onThesaurusChange(ThesaurusEvent $event) public function onThesaurusChange(ThesaurusEvent $event)
{ {
$databox = $event->getDatabox(); $databox = $event->getDatabox();
$databox->delete_data_from_cache(\databox::CACHE_THESAURUS); $this->getIndexer()->scheduleRecordsFromDataboxForIndexing($databox);
} }
public function onCollectionChange(CollectionEvent $event) public function onCollectionChange(CollectionEvent $event)

View File

@@ -67,10 +67,6 @@ class Helper
return $parents; return $parents;
} }
/**
* @param databox $databox
* @return DOMDocument
*/
public static function thesaurusFromDatabox(databox $databox) public static function thesaurusFromDatabox(databox $databox)
{ {
return self::document($databox->get_dom_thesaurus()); return self::document($databox->get_dom_thesaurus());
@@ -93,10 +89,6 @@ class Helper
return $document; return $document;
} }
/**
* @param $document
* @return DOMDocument
*/
private static function document($document) private static function document($document)
{ {
if (!$document) { if (!$document) {

View File

@@ -11,10 +11,6 @@ namespace Alchemy\Phrasea\TaskManager\Job;
use Alchemy\Phrasea\TaskManager\Editor\IndexerEditor; use Alchemy\Phrasea\TaskManager\Editor\IndexerEditor;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\Core\Version;
use Silex\Application;
use Psr\Log\LoggerInterface;
class IndexerJob extends AbstractJob class IndexerJob extends AbstractJob
{ {
@@ -51,16 +47,12 @@ class IndexerJob extends AbstractJob
*/ */
protected function doJob(JobData $data) protected function doJob(JobData $data)
{ {
$app = $data->getApplication(); $app = $data->getApplication();
/** @var Indexer $indexer */ /** @var Indexer $indexer */
$indexer = $app['elasticsearch.indexer']; $indexer = $app['elasticsearch.indexer'];
$databoxes = array_filter($app->getDataboxes(), function (\databox $databox) use ($app) {
foreach($app->getDataboxes() as $databox) { return $app->getApplicationBox()->is_databox_indexable($databox);
if($app->getApplicationBox()->is_databox_indexable($databox)) { });
$indexer->indexScheduledRecords($databox); $indexer->indexScheduledRecords($databoxes);
} }
} }
}
}

View File

@@ -14,7 +14,7 @@
{% set opener = 'opener' %} {% set opener = 'opener' %}
{% endif %} {% endif %}
{% if context %} {% if context is not none %}
{% set zterm %} {% set zterm %}
{% trans with {'%term%' : term, '%context%' : context} %}thesaurus:: le terme %term% avec contexte %context%{% endtrans %} {% trans with {'%term%' : term, '%context%' : context} %}thesaurus:: le terme %term% avec contexte %context%{% endtrans %}
{% endset %} {% endset %}
@@ -57,13 +57,14 @@
</center> </center>
{% else %} {% else %}
{% if nb_candidates_bad > 0 %} {% if nb_candidates_bad > 0 %}
// present dans les candidats, mais aucun champ acceptable : on informe
{% set prop_label = 'thesaurus:: est candidat en provenance des champs mais ne peut etre accepte a cet emplacement du thesaurus' | trans %} {% set prop_label = 'thesaurus:: est candidat en provenance des champs mais ne peut etre accepte a cet emplacement du thesaurus' | trans %}
{% else %} {% else %}
// pas present dans les candidats
{% set prop_label = 'thesaurus:: n\'est pas present dans les candidats' | trans %} {% set prop_label = 'thesaurus:: n\'est pas present dans les candidats' | trans %}
{% endif %} {% endif %}
<br/> <br/>
<br/> <h3>{{ 'thesaurus:: attention :' | trans }}</h3>
<br/>
<br/> <br/>
<br/> <br/>
{{ zterm }} {{ zterm }}
@@ -71,10 +72,14 @@
<br/> <br/>
{{ prop_label }} {{ prop_label }}
<br/> <br/>
<br/>
<br/>
<br/>
<form> <form>
<center>
<div class='x3Dbox' style='margin:15px; height:90px; overflow:auto;'>
<input type="radio" name="reindex" value="0" id="rad0" checked><label for="rad0">{{ 'thesaurus:: Ajouter le terme dans reindexer' | trans }}</label><br/>
<br/>
<input type="radio" name="reindex" value="1" id="rad1"><label for="rad1">{{ 'thesaurus:: ajouter le terme et reindexer' | trans }}</label><br/>
</div>
</center>
<input type="button" id="cancel_button" value="{{ 'boutton::annuler' | trans }}" onclick="clkBut('cancel');" style="width:100px;"> <input type="button" id="cancel_button" value="{{ 'boutton::annuler' | trans }}" onclick="clkBut('cancel');" style="width:100px;">
&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp;
<input type="button" id="submit_button" value="{{ 'boutton::valider' | trans }}" onclick="clkBut('submit');" style="width:100px;"> <input type="button" id="submit_button" value="{{ 'boutton::valider' | trans }}" onclick="clkBut('submit');" style="width:100px;">
@@ -154,8 +159,15 @@
parms += "&k={{ context | url_encode }}"; parms += "&k={{ context | url_encode }}";
{% endif %} {% endif %}
parms += "&sylng={{ sylng }}"; parms += "&sylng={{ sylng }}";
parms += "&reindex=0";
for(i=0; i<(n=document.getElementsByName("reindex")).length; i++)
{
if(n[i].checked)
{
parms += "&reindex=" + encodeURIComponent(n[i].value);
break;
}
}
ret = loadXMLDoc(url, parms, true); ret = loadXMLDoc(url, parms, true);
refresh = ret.getElementsByTagName("refresh"); refresh = ret.getElementsByTagName("refresh");
for(i=0; i<refresh.length; i++) for(i=0; i<refresh.length; i++)