PHRAS-1797 porting searchengine:index to 4.1 (#2454)

* portage commande searchengine:index vers 4.1

* FIX Indexer

* Update DataboxFetcherFactory.php
This commit is contained in:
aynsix
2018-01-30 21:50:57 +04:00
committed by jygaulier
parent 06c432535f
commit df85aa8827
10 changed files with 481 additions and 47 deletions

View File

@@ -35,16 +35,21 @@ class DataboxFetcherFactory
*/
private $recordHelper;
/** @var ElasticsearchOptions */
private $options;
/**
* @param RecordHelper $recordHelper
* @param ElasticsearchOptions $options
* @param \ArrayAccess $container
* @param string $structureKey
* @param string $thesaurusKey
*/
public function __construct(RecordHelper $recordHelper, \ArrayAccess $container, $structureKey, $thesaurusKey)
public function __construct(RecordHelper $recordHelper, ElasticsearchOptions $options, \ArrayAccess $container, $structureKey, $thesaurusKey)
{
$this->recordHelper = $recordHelper;
$this->container = $container;
$this->options = $options;
$this->container = $container;
$this->structureKey = $structureKey;
$this->thesaurusKey = $thesaurusKey;
}
@@ -59,14 +64,19 @@ class DataboxFetcherFactory
$connection = $databox->get_connection();
$candidateTerms = new CandidateTerms($databox);
$fetcher = new Fetcher($databox, array(
new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->recordHelper),
new TitleHydrator($connection),
new MetadataHydrator($connection, $this->getStructure(), $this->recordHelper),
new FlagHydrator($this->getStructure(), $databox),
new ThesaurusHydrator($this->getStructure(), $this->getThesaurus(), $candidateTerms),
new SubDefinitionHydrator($connection)
), $fetcherDelegate);
$fetcher = new Fetcher(
$databox,
$this->options,
[
new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->recordHelper),
new TitleHydrator($connection, $this->recordHelper),
new MetadataHydrator($connection, $this->getStructure(), $this->recordHelper),
new FlagHydrator($this->getStructure(), $databox),
new ThesaurusHydrator($this->getStructure(), $this->getThesaurus(), $candidateTerms),
new SubDefinitionHydrator($connection)
],
$fetcherDelegate
);
$fetcher->setBatchSize(200);
$fetcher->onDrain(function() use ($candidateTerms) {

View File

@@ -11,6 +11,10 @@ namespace Alchemy\Phrasea\SearchEngine\Elastic;
class ElasticsearchOptions
{
const POPULATE_ORDER_RID = "RECORD_ID";
const POPULATE_ORDER_MODDATE = "MODIFICATION_DATE";
const POPULATE_DIRECTION_ASC = "ASC";
const POPULATE_DIRECTION_DESC = "DESC";
/** @var string */
private $host;
/** @var int */
@@ -25,6 +29,10 @@ class ElasticsearchOptions
private $minScore;
/** @var bool */
private $highlight;
/** @var string */
private $populateOrder;
/** @var string */
private $populateDirection;
/** @var int[] */
private $_customValues;
@@ -46,6 +54,8 @@ class ElasticsearchOptions
'replicas' => 0,
'minScore' => 4,
'highlight' => true,
'populate_order' => self::POPULATE_ORDER_RID,
'populate_direction' => self::POPULATE_DIRECTION_DESC,
'activeTab' => null,
];
@@ -63,6 +73,8 @@ class ElasticsearchOptions
$self->setReplicas($options['replicas']);
$self->setMinScore($options['minScore']);
$self->setHighlight($options['highlight']);
$self->setPopulateOrder($options['populate_order']);
$self->setPopulateDirection($options['populate_direction']);
$self->setActiveTab($options['activeTab']);
foreach(self::getAggregableTechnicalFields() as $k => $f) {
$self->setAggregableFieldLimit($k, $options[$k.'_limit']);
@@ -85,6 +97,8 @@ class ElasticsearchOptions
'replicas' => $this->replicas,
'minScore' => $this->minScore,
'highlight' => $this->highlight,
'populate_order' => $this->populateOrder,
'populate_direction' => $this->populateDirection,
'activeTab' => $this->activeTab
];
foreach(self::getAggregableTechnicalFields() as $k => $f) {
@@ -322,4 +336,60 @@ class ElasticsearchOptions
];
}
/**
* @param string $order
* @return bool returns false if order is invalid
*/
public function setPopulateOrder($order)
{
$order = strtoupper($order);
if (in_array($order, [self::POPULATE_ORDER_RID, self::POPULATE_ORDER_MODDATE])) {
$this->populateOrder = $order;
return true;
}
return false;
}
/**
* @return string
*/
public function getPopulateOrderAsSQL()
{
static $orderAsColumn = [
self::POPULATE_ORDER_RID => "`record_id`",
self::POPULATE_ORDER_MODDATE => "`moddate`",
];
// populateOrder IS one of the keys (ensured by setPopulateOrder)
return $orderAsColumn[$this->populateOrder];
}
/**
* @param string $direction
* @return bool returns false if direction is invalid
*/
public function setPopulateDirection($direction)
{
$direction = strtoupper($direction);
if (in_array($direction, [self::POPULATE_DIRECTION_DESC, self::POPULATE_DIRECTION_ASC])) {
$this->populateDirection = $direction;
return true;
}
return false;
}
/**
* @return string
*/
public function getPopulateDirectionAsSQL()
{
// already a SQL word
return $this->populateDirection;
}
}

View File

@@ -16,13 +16,11 @@ use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\BulkOperation;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\RecordIndexer;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\TermIndexer;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\RecordQueuer;
use appbox;
use Closure;
use Elasticsearch\Client;
use Psr\Log\LoggerInterface;
use igorw;
use Psr\Log\NullLogger;
use record_adapter;
use Symfony\Component\Stopwatch\Stopwatch;
use SplObjectStorage;
@@ -84,20 +82,64 @@ class Indexer
$this->deleteQueue = new SplObjectStorage();
}
public function createIndex($withMapping = true)
/**
* @return Index
*/
public function getIndex()
{
$params = array();
$params['index'] = $this->index->getName();
$params['body']['settings']['number_of_shards'] = $this->index->getOptions()->getShards();
$params['body']['settings']['number_of_replicas'] = $this->index->getOptions()->getReplicas();
$params['body']['settings']['analysis'] = $this->index->getAnalysis();
return $this->index;
}
if ($withMapping) {
$params['body']['mappings'][RecordIndexer::TYPE_NAME] = $this->index->getRecordIndex()->getMapping()->export();
$params['body']['mappings'][TermIndexer::TYPE_NAME] = $this->index->getTermIndex()->getMapping()->export();
public function createIndex($indexName = null)
{
$aliasName = $this->index->getName();
if($indexName === null) {
$indexName = $aliasName;
}
$now = sprintf("%s.%06d", Date('YmdHis'), 1000000*explode(' ', microtime())[0]) ;
$indexName .= ('_' . $now);
$params = [
'index' => $indexName,
'body' => [
'settings' => [
'number_of_shards' => $this->index->getOptions()->getShards(),
'number_of_replicas' => $this->index->getOptions()->getReplicas(),
'analysis' => $this->index->getAnalysis()
],
'mappings' => [
RecordIndexer::TYPE_NAME => $this->index->getRecordIndex()->getMapping()->export(),
TermIndexer::TYPE_NAME => $this->index->getTermIndex()->getMapping()->export()
]
// ,
// 'aliases' => [
// $aliasName => []
// ]
]
];
$this->client->indices()->create($params);
$params = [
'body' => [
'actions' => [
[
'add' => [
'index' => $indexName,
'alias' => $aliasName
]
]
]
]
];
$this->client->indices()->updateAliases($params);
return [
'index' => $indexName,
'alias' => $aliasName
];
}
public function updateMapping()
@@ -126,38 +168,129 @@ class Indexer
]);
}
/**
* @param string $newIndexName
* @param string $newAliasName
* @return array
*/
public function replaceIndex($newIndexName, $newAliasName)
{
$ret = [];
$oldIndexes = $this->client->indices()->getAlias(
[
'index' => $this->index->getName()
]
);
// delete old alias(es), only one alias on one index should exist
foreach($oldIndexes as $oldIndexName => $data) {
foreach($data['aliases'] as $oldAliasName => $data2) {
$params['body']['actions'][] = [
'remove' => [
'alias' => $oldAliasName,
'index' => $oldIndexName,
]
];
$ret[] = [
'action' => "ALIAS_REMOVE",
'msg' => sprintf('alias "%s" -> "%s" removed', $oldAliasName, $oldIndexName),
'alias' => $oldAliasName,
'index' => $oldIndexName,
];
}
}
// create new alias
$params['body']['actions'][] = [
'add' => [
'alias' => $this->index->getName(),
'index' => $newIndexName,
]
];
$ret[] = [
'action' => "ALIAS_ADD",
'msg' => sprintf('alias "%s" -> "%s" added', $this->index->getName(), $newIndexName),
'alias' => $this->index->getName(),
'index' => $newIndexName,
];
//
$params['body']['actions'][] = [
'remove' => [
'alias' => $newAliasName,
'index' => $newIndexName,
]
];
$ret[] = [
'action' => "ALIAS_REMOVE",
'msg' => sprintf('alias "%s" -> "%s" removed', $newAliasName, $newIndexName),
'alias' => $newAliasName,
'index' => $newIndexName,
];
$this->client->indices()->updateAliases($params);
// delete old index(es), only one index should exist
$params = [
'index' => []
];
foreach($oldIndexes as $oldIndexName => $data) {
$params['index'][] = $oldIndexName;
$ret[] = [
'action' => "INDEX_DELETE",
'msg' => sprintf('index "%s" deleted', $oldIndexName),
'index' => $oldIndexName,
];
}
$this->client->indices()->delete(
$params
);
return $ret;
}
public function populateIndex($what, \databox $databox)
{
$stopwatch = new Stopwatch();
$stopwatch->start('populate');
$this->apply(function (BulkOperation $bulk) use ($what, $databox) {
if ($what & self::THESAURUS) {
$this->termIndexer->populateIndex($bulk, $databox);
$this->apply(
function (BulkOperation $bulk) use ($what, $databox) {
if ($what & self::THESAURUS) {
$this->termIndexer->populateIndex($bulk, $databox);
// Record indexing depends on indexed terms so we need to make
// everything ready to search
$bulk->flush();
$this->client->indices()->refresh();
$this->client->indices()->clearCache();
$this->client->indices()->flushSynced();
}
// Record indexing depends on indexed terms so we need to make
// everything ready to search
$bulk->flush();
$this->client->indices()->refresh();
}
if ($what & self::RECORDS) {
$databox->clearCandidates();
$this->recordIndexer->populateIndex($bulk, $databox);
if ($what & self::RECORDS) {
$databox->clearCandidates();
$this->recordIndexer->populateIndex($bulk, $databox);
// Final flush
$bulk->flush();
}
}, $this->index);
// Final flush
$bulk->flush();
}
},
$this->index
);
// Optimize index
$params = array('index' => $this->index->getName());
$this->client->indices()->optimize($params);
$this->client->indices()->optimize(
[
'index' => $this->index->getName()
]
);
$event = $stopwatch->stop('populate');
printf("Indexation finished in %s min (Mem. %s Mo)", ($event->getDuration()/1000/60), bcdiv($event->getMemory(), 1048576, 2));
return [
'duration' => $event->getDuration(),
'memory' => $event->getMemory()
];
}
public function migrateMappingForDatabox($databox)

View File

@@ -12,6 +12,7 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record;
use Alchemy\Phrasea\Core\PhraseaTokens;
use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions;
use Alchemy\Phrasea\SearchEngine\Elastic\Exception\Exception;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegate;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegateInterface;
@@ -24,6 +25,7 @@ use PDO;
class Fetcher
{
private $databox;
private $options;
private $connection;
private $statement;
private $delegate;
@@ -36,9 +38,10 @@ class Fetcher
private $postFetch;
private $onDrain;
public function __construct(databox $databox, array $hydrators, FetcherDelegateInterface $delegate = null)
public function __construct(databox $databox,ElasticsearchOptions $options, array $hydrators, FetcherDelegateInterface $delegate = null)
{
$this->databox = $databox;
$this->options = $options;
$this->connection = $databox->get_connection();;
$this->hydrators = $hydrators;
$this->delegate = $delegate ?: new FetcherDelegate();
@@ -136,7 +139,7 @@ class Fetcher
. " FROM (record r INNER JOIN coll c ON (c.coll_id = r.coll_id))"
. " LEFT JOIN subdef ON subdef.record_id=r.record_id AND subdef.name='document'"
. " -- WHERE"
. " ORDER BY r.record_id DESC"
. " ORDER BY " . $this->options->getPopulateOrderAsSQL() . " " . $this->options->getPopulateDirectionAsSQL()
. " LIMIT :offset, :limit";
$where = $this->delegate->buildWhereClause();