Extract record index and term index from respective indexers

This commit is contained in:
Thibaud Fabre
2016-10-19 09:54:50 +02:00
parent f2cfe93f8c
commit 6e88b97c55
11 changed files with 459 additions and 393 deletions

View File

@@ -13,7 +13,10 @@ namespace Alchemy\Phrasea\Core\Provider;
use Alchemy\Phrasea\Controller\LazyLocator;
use Alchemy\Phrasea\Core\Event\Subscriber\Thesaurus\ReindexRequiredEventSubscriber;
use Alchemy\Phrasea\SearchEngine\Elastic\DataboxFetcherFactory;
use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions;
use Alchemy\Phrasea\SearchEngine\Elastic\Index;
use Alchemy\Phrasea\SearchEngine\Elastic\IndexLocator;
use Alchemy\Phrasea\SearchEngine\Elastic\Search\QueryVisitor;
use Alchemy\Phrasea\SearchEngine\SearchEngineLogger;
use Alchemy\Phrasea\Exception\InvalidArgumentException;
@@ -83,11 +86,26 @@ class SearchEngineServiceProvider implements ServiceProviderInterface
/* Indexer related services */
$app['elasticsearch.index'] = $app->share(function ($app) {
return new Index($app['elasticsearch.options'], $app['elasticsearch.index.locator']);
});
$app['elasticsearch.index.record'] = $app->share(function ($app) {
return new Indexer\RecordIndex($app['search_engine.structure'], array_keys($app['locales.available']));
});
$app['elasticsearch.index.term'] = $app->share(function ($app) {
return new Indexer\TermIndex(array_keys($app['locales.available']));
});
$app['elasticsearch.index.locator'] = $app->share(function ($app) {
return new IndexLocator($app, 'elasticsearch.index.record', 'elasticsearch.index.term');
});
$app['elasticsearch.indexer'] = $app->share(function ($app) {
return new Indexer(
$app['elasticsearch.client'],
$app['elasticsearch.options'],
$app['elasticsearch.index'],
$app['elasticsearch.indexer.term_indexer'],
$app['elasticsearch.indexer.record_indexer'],
$app['phraseanet.appbox'],
@@ -103,17 +121,25 @@ class SearchEngineServiceProvider implements ServiceProviderInterface
);
});
$app['elasticsearch.indexer.databox_fetcher_factory'] = $app->share(function ($app) {
return new DataboxFetcherFactory(
$app['elasticsearch.record_helper'],
$app,
'search_engine.structure',
'thesaurus'
);
});
$app['elasticsearch.indexer.record_indexer'] = $app->share(function ($app) {
// TODO Use upcomming monolog factory
$logger = new Logger('indexer');
$logger->pushHandler(new ErrorLogHandler());
return new RecordIndexer(
$app['search_engine.structure'],
$app['elasticsearch.indexer.databox_fetcher_factory'],
$app['elasticsearch.record_helper'],
$app['thesaurus'],
array_keys($app['locales.available']),
$app['monolog'],
$app['dispatcher']
$app['dispatcher'],
$app['monolog']
);
});

View File

@@ -0,0 +1,94 @@
<?php
namespace Alchemy\Phrasea\SearchEngine\Elastic;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegateInterface;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Fetcher;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\CoreHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\FlagHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\MetadataHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\SubDefinitionHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\ThesaurusHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\TitleHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Structure\Structure;
use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\CandidateTerms;
class DataboxFetcherFactory
{
/**
* @var \ArrayAccess
*/
private $container;
/**
* @var string
*/
private $structureKey;
/**
* @var string
*/
private $thesaurusKey;
/**
* @var RecordHelper
*/
private $recordHelper;
/**
* @param RecordHelper $recordHelper
* @param \ArrayAccess $container
* @param string $structureKey
* @param string $thesaurusKey
*/
public function __construct(RecordHelper $recordHelper, \ArrayAccess $container, $structureKey, $thesaurusKey)
{
$this->recordHelper = $recordHelper;
$this->container = $container;
$this->structureKey = $structureKey;
$this->thesaurusKey = $thesaurusKey;
}
/**
* @param \databox $databox
* @param FetcherDelegateInterface $fetcherDelegate
* @return Fetcher
*/
public function createFetcher(\databox $databox, FetcherDelegateInterface $fetcherDelegate = null)
{
$connection = $databox->get_connection();
$candidateTerms = new CandidateTerms($databox);
$fetcher = new Fetcher($databox, array(
new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->recordHelper),
new TitleHydrator($connection),
new MetadataHydrator($connection, $this->getStructure(), $this->recordHelper),
new FlagHydrator($this->getStructure(), $databox),
new ThesaurusHydrator($this->getStructure(), $this->getThesaurus(), $candidateTerms),
new SubDefinitionHydrator($connection)
), $fetcherDelegate);
$fetcher->setBatchSize(200);
$fetcher->onDrain(function() use ($candidateTerms) {
$candidateTerms->save();
});
return $fetcher;
}
/**
* @return Structure
*/
private function getStructure()
{
return $this->container[$this->structureKey];
}
/**
* @return Thesaurus
*/
private function getThesaurus()
{
return $this->container[$this->thesaurusKey];
}
}

View File

@@ -2,17 +2,12 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\RecordIndexer;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\TermIndexer;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\RecordIndex;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\TermIndex;
class Index
{
/**
* @var string
*/
private $name;
/**
* @var array
*/
@@ -24,41 +19,32 @@ class Index
private $options;
/**
* @var RecordIndexer
* @var IndexLocator
*/
private $recordIndexer;
private $indexLocator;
/**
* @var TermIndexer
*/
private $termIndexer;
/**
* @param string $name
* @param ElasticsearchOptions $options
* @param RecordIndexer $recordIndexer
* @param TermIndexer $termIndexer
* @param IndexLocator $indexLocator
*/
public function __construct(
$name,
ElasticsearchOptions $options,
RecordIndexer $recordIndexer,
TermIndexer $termIndexer
IndexLocator $indexLocator
) {
$this->name = $name;
$this->options = $options;
$this->recordIndexer = $recordIndexer;
$this->termIndexer = $termIndexer;
$this->indexLocator = $indexLocator;
$this->buildDefaultAnalysis();
}
/**
* Returns the index name (this is same value as defined in ElasticsearchOptions)
*
* @return string
*/
public function getName()
{
return $this->name;
return $this->options->getIndexName();
}
/**
@@ -78,19 +64,19 @@ class Index
}
/**
* @return RecordIndexer
* @return RecordIndex
*/
public function getRecordIndexer()
public function getRecordIndex()
{
return $this->recordIndexer;
return $this->indexLocator->getRecordIndex();
}
/**
* @return TermIndexer
* @return TermIndex
*/
public function getTermIndexer()
public function getTermIndex()
{
return $this->termIndexer;
return $this->indexLocator->getTermIndex();
}
private function buildDefaultAnalysis()

View File

@@ -0,0 +1,50 @@
<?php
namespace Alchemy\Phrasea\SearchEngine\Elastic;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\RecordIndex;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\TermIndex;
class IndexLocator
{
/**
* @var \ArrayAccess
*/
private $container;
/**
* @var
*/
private $recordIndexKey;
/**
* @var
*/
private $termIndexKey;
/**
* @param \ArrayAccess $container
* @param string $recordIndexKey
* @param string $termIndexKey
*/
public function __construct(\ArrayAccess $container, $recordIndexKey, $termIndexKey)
{
$this->container = $container;
$this->recordIndexKey = $recordIndexKey;
$this->termIndexKey = $termIndexKey;
}
/**
* @return TermIndex
*/
public function getTermIndex()
{
return $this->container[$this->termIndexKey];
}
/**
* @return RecordIndex
*/
public function getRecordIndex()
{
return $this->container[$this->recordIndexKey];
}
}

View File

@@ -31,11 +31,6 @@ class Indexer
const THESAURUS = 1;
const RECORDS = 2;
/**
* @var Index
*/
private $index;
/**
* @var \Elasticsearch\Client
*/
@@ -61,14 +56,37 @@ class Indexer
*/
private $deleteQueue;
public function __construct(Client $client, ElasticsearchOptions $options, TermIndexer $termIndexer, RecordIndexer $recordIndexer, appbox $appbox, LoggerInterface $logger = null)
/**
* @var RecordIndexer
*/
private $recordIndexer;
/**
* @var TermIndexer
*/
private $termIndexer;
/**
* @var Index
*/
private $index;
public function __construct(
Client $client,
Index $index,
TermIndexer $termIndexer,
RecordIndexer $recordIndexer,
appbox $appbox,
LoggerInterface $logger = null
)
{
$this->client = $client;
$this->appbox = $appbox;
$this->index = $index;
$this->recordIndexer = $recordIndexer;
$this->termIndexer = $termIndexer;
$this->logger = $logger ?: new NullLogger();
$this->index = new Index($options->getIndexName(), $options, $recordIndexer, $termIndexer);
$this->indexQueue = new SplObjectStorage();
$this->deleteQueue = new SplObjectStorage();
}
@@ -82,8 +100,8 @@ class Indexer
$params['body']['settings']['analysis'] = $this->index->getAnalysis();
if ($withMapping) {
$params['body']['mappings'][RecordIndexer::TYPE_NAME] = $this->index->getRecordIndexer()->getMapping();
$params['body']['mappings'][TermIndexer::TYPE_NAME] = $this->index->getTermIndexer()->getMapping();
$params['body']['mappings'][RecordIndexer::TYPE_NAME] = $this->index->getRecordIndex()->getMapping();
$params['body']['mappings'][TermIndexer::TYPE_NAME] = $this->index->getTermIndex()->getMapping();
}
$this->client->indices()->create($params);
@@ -92,10 +110,10 @@ class Indexer
public function updateMapping()
{
$params = array();
$params['index'] = $this->index->getOptions()->getIndexName();
$params['index'] = $this->index->getName();
$params['type'] = RecordIndexer::TYPE_NAME;
$params['body'][RecordIndexer::TYPE_NAME] = $this->index->getRecordIndexer()->getMapping();
$params['body'][TermIndexer::TYPE_NAME] = $this->index->getTermIndexer()->getMapping();
$params['body'][RecordIndexer::TYPE_NAME] = $this->index->getRecordIndex()->getMapping();
$params['body'][TermIndexer::TYPE_NAME] = $this->index->getTermIndex()->getMapping();
// @todo This must throw a new indexation if a mapping is edited
$this->client->indices()->putMapping($params);
@@ -103,31 +121,25 @@ class Indexer
public function deleteIndex()
{
$params = array('index' => $this->index->getOptions()->getIndexName());
$params = array('index' => $this->index->getName());
$this->client->indices()->delete($params);
}
public function indexExists()
{
$params = array('index' => $this->index->getOptions()->getIndexName());
$params = array('index' => $this->index->getName());
return $this->client->indices()->exists($params);
}
public function populateIndex($what, array $databoxes_id = [])
public function populateIndex($what, \databox $databox)
{
$stopwatch = new Stopwatch();
$stopwatch->start('populate');
if ($databoxes_id) {
// If databoxes are given, only use those
$databoxes = array_map(array($this->appbox, 'get_databox'), $databoxes_id);
} else {
$databoxes = $this->appbox->get_databoxes();
}
$this->apply(function (BulkOperation $bulk) use ($what, $databox) {
if ($what & self::THESAURUS) {
$this->index->getTermIndexer()->populateIndex($bulk, $databoxes);
$this->termIndexer->populateIndex($bulk, $databox);
// Record indexing depends on indexed terms so we need to make
// everything ready to search
@@ -136,16 +148,16 @@ class Indexer
}
if ($what & self::RECORDS) {
$this->index->getRecordIndexer()->populateIndex($bulk, $databoxes);
$this->recordIndexer->populateIndex($bulk, $databox);
// Final flush
$bulk->flush();
}
}, $this->index);
// Optimize index
$params = array('index' => $this->index->getOptions()->getIndexName());
$params = array('index' => $this->index->getName());
$this->client->indices()->optimize($params);
});
$event = $stopwatch->stop('populate');
printf("Indexation finished in %s min (Mem. %s Mo)", ($event->getDuration()/1000/60), bcdiv($event->getMemory(), 1048576, 2));
@@ -191,9 +203,9 @@ class Indexer
*/
public function indexScheduledRecords(\databox $databox)
{
$this->apply(function(BulkOperation $bulk) use($databoxes) {
$this->index->getRecordIndexer()->indexScheduled($bulk, $databoxes);
});
$this->apply(function(BulkOperation $bulk) use($databox) {
$this->recordIndexer->indexScheduled($bulk, $databox);
}, $this->index);
}
public function flushQueue()
@@ -207,23 +219,25 @@ class Indexer
}
$this->apply(function(BulkOperation $bulk) {
$this->index->getRecordIndexer()->index($bulk, $this->indexQueue);
$this->index->getRecordIndexer()->delete($bulk, $this->deleteQueue);
$this->recordIndexer->index($bulk, $this->indexQueue);
$this->recordIndexer->delete($bulk, $this->deleteQueue);
$bulk->flush();
});
}, $this->index);
$this->indexQueue = new SplObjectStorage();
$this->deleteQueue = new SplObjectStorage();
}
private function apply(Closure $work)
private function apply(Closure $work, Index $index)
{
// Prepare the bulk operation
$bulk = new BulkOperation($this->client, $this->logger);
$bulk->setDefaultIndex($this->index->getOptions()->getIndexName());
$bulk->setDefaultIndex($index->getName());
$bulk->setAutoFlushLimit(1000);
// Do the work
$work($bulk);
$work($bulk, $index);
// Flush just in case, it's a noop when already done
$bulk->flush();
}

View File

@@ -11,11 +11,13 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\SearchEngine\Elastic\FieldMapping;
use Alchemy\Phrasea\SearchEngine\Elastic\Mapping;
use Alchemy\Phrasea\SearchEngine\Elastic\Structure\Field;
use Alchemy\Phrasea\SearchEngine\Elastic\MappingBuilder;
use Alchemy\Phrasea\SearchEngine\Elastic\MappingProvider;
use Alchemy\Phrasea\SearchEngine\Elastic\Structure\Structure;
class RecordIndex
class RecordIndex implements MappingProvider
{
/**
* @var Structure
@@ -37,129 +39,120 @@ class RecordIndex
$this->locales = $locales;
}
/**
* @return Mapping
*/
public function getMapping()
{
$mapping = new Mapping();
$mapping
// Identifiers
->add('record_id', 'integer') // Compound primary key
->add('databox_id', 'integer') // Compound primary key
->add('databox_name', 'string')->notAnalyzed() // database name (still indexed for facets)
->add('base_id', 'integer') // Unique collection ID
->add('collection_id', 'integer')->notIndexed() // Useless collection ID (local to databox)
->add('collection_name', 'string')->notAnalyzed() // Collection name (still indexed for facets)
->add('uuid', 'string')->notIndexed()
->add('sha256', 'string')->notIndexed()
// Mandatory metadata
->add('original_name', 'string')->notIndexed()
->add('mime', 'string')->notAnalyzed() // Indexed for Kibana only
->add('type', 'string')->notAnalyzed()
->add('record_type', 'string')->notAnalyzed() // record or story
// Dates
->add('created_on', 'date')->format(Mapping::DATE_FORMAT_MYSQL_OR_CAPTION)
->add('updated_on', 'date')->format(Mapping::DATE_FORMAT_MYSQL_OR_CAPTION)
// Thesaurus
->add('concept_path', $this->getThesaurusPathMapping())
// EXIF
->add('metadata_tags', $this->getMetadataTagMapping())
// Status
->add('flags', $this->getFlagsMapping())
->add('flags_bitfield', 'integer')->notIndexed()
// Keep some fields arround for display purpose
->add('subdefs', Mapping::disabledMapping())
->add('title', Mapping::disabledMapping());
$mapping = new MappingBuilder();
// Compound primary key
$mapping->addField('record_id', FieldMapping::TYPE_INTEGER);
$mapping->addField('databox_id', FieldMapping::TYPE_INTEGER);
// Database name (still indexed for facets)
$mapping->addStringField('databox_name')->disableAnalysis();
// Unique collection ID
$mapping->addIntegerField('base_id');
// Useless collection ID (local to databox)
$mapping->addIntegerField('collection_id')->disableIndexing();
// Collection name (still indexed for facets)
$mapping->addStringField('collection_name')->disableAnalysis();
$mapping->addStringField('uuid')->disableIndexing();
$mapping->addStringField('sha256')->disableIndexing();
$mapping->addStringField('original_name')->disableIndexing();
$mapping->addStringField('mime')->disableAnalysis();
$mapping->addStringField('type')->disableAnalysis();
$mapping->addStringField('record_type')->disableAnalysis();
$mapping->addDateField('created_on', FieldMapping::DATE_FORMAT_MYSQL_OR_CAPTION);
$mapping->addDateField('updated_on', FieldMapping::DATE_FORMAT_MYSQL_OR_CAPTION);
$mapping->add($this->buildThesaurusPathMapping('concept_path'));
$mapping->add($this->buildMetadataTagMapping('metadata_tags'));
$mapping->add($this->buildFlagMapping('flags'));
$mapping->addIntegerField('flags_bitfield')->disableIndexing();
$mapping->addObjectField('subdefs')->disableMapping();
$mapping->addObjectField('title')->disableMapping();
// Caption mapping
$this->buildCaptionMapping($this->structure->getUnrestrictedFields(), $mapping, 'caption');
$this->buildCaptionMapping($this->structure->getPrivateFields(), $mapping, 'private_caption');
$this->buildCaptionMapping($mapping, 'caption', $this->structure->getUnrestrictedFields());
$this->buildCaptionMapping($mapping, 'private_caption', $this->structure->getPrivateFields());
return $mapping->export();
return $mapping->getMapping();
}
private function buildCaptionMapping(array $fields, Mapping $root, $section)
private function buildCaptionMapping(MappingBuilder $parent, $name, array $fields)
{
$mapping = new Mapping();
$fieldConverter = new Mapping\FieldToFieldMappingConverter();
$captionMapping = new Mapping\ComplexFieldMapping($name, FieldMapping::TYPE_OBJECT);
$captionMapping->useAsPropertyContainer();
foreach ($fields as $field) {
$this->addFieldToMapping($field, $mapping);
$captionMapping->addChild($fieldConverter->convertField($field, $this->locales));
}
$root->add($section, $mapping);
$root
->add(sprintf('%s_all', $section), 'string')
->addLocalizedSubfields($this->locales)
->addRawVersion();
$parent->add($captionMapping);
$localizedCaptionMapping = new Mapping\StringFieldMapping(sprintf('%s_all', $name));
$localizedCaptionMapping
->addLocalizedChildren($this->locales)
->addChild((new Mapping\StringFieldMapping('raw'))->enableRawIndexing());
$parent->add($localizedCaptionMapping);
return $captionMapping;
}
private function addFieldToMapping(Field $field, Mapping $mapping)
private function buildThesaurusPathMapping($name)
{
$type = $field->getType();
$mapping->add($field->getName(), $type);
if ($type === Mapping::TYPE_DATE) {
$mapping->format(Mapping::DATE_FORMAT_CAPTION);
}
if ($type === Mapping::TYPE_STRING) {
$searchable = $field->isSearchable();
$facet = $field->isFacet();
if (!$searchable && !$facet) {
$mapping->notIndexed();
} else {
$mapping->addRawVersion();
$mapping->addAnalyzedVersion($this->locales);
$mapping->enableTermVectors(true);
}
}
}
private function getThesaurusPathMapping()
{
$mapping = new Mapping();
$thesaurusMapping = new Mapping\ComplexFieldMapping($name, FieldMapping::TYPE_OBJECT);
foreach (array_keys($this->structure->getThesaurusEnabledFields()) as $name) {
$mapping
->add($name, 'string')
->analyzer('thesaurus_path', 'indexing')
->analyzer('keyword', 'searching')
->addRawVersion()
;
$child = new Mapping\StringFieldMapping($name);
$child->setAnalyzer('thesaurus_path', 'indexing');
$child->setAnalyzer('keyword', 'searching');
$child->addChild((new Mapping\StringFieldMapping('raw'))->enableRawIndexing());
$thesaurusMapping->addChild($thesaurusMapping);
}
return $mapping;
return $thesaurusMapping;
}
private function getMetadataTagMapping()
private function buildMetadataTagMapping($name)
{
$mapping = new Mapping();
$tagConverter = new Mapping\MetadataTagToFieldMappingConverter();
$metadataMapping = new Mapping\ComplexFieldMapping($name, FieldMapping::TYPE_OBJECT);
$metadataMapping->useAsPropertyContainer();
foreach ($this->structure->getMetadataTags() as $tag) {
$type = $tag->getType();
$mapping->add($tag->getName(), $type);
if ($type === Mapping::TYPE_STRING) {
if ($tag->isAnalyzable()) {
$mapping->addRawVersion();
} else {
$mapping->notAnalyzed();
}
}
$metadataMapping->addChild($tagConverter->convertTag($tag));
}
return $mapping;
return $metadataMapping;
}
private function getFlagsMapping()
private function buildFlagMapping($name)
{
$mapping = new Mapping();
$index = 0;
$flagMapping = new Mapping\ComplexFieldMapping($name, FieldMapping::TYPE_OBJECT);
foreach ($this->structure->getAllFlags() as $name => $_) {
$mapping->add($name, 'boolean');
$flagMapping->useAsPropertyContainer();
foreach ($this->structure->getAllFlags() as $childName => $_) {
if (trim($childName) == '') {
$childName = 'flag_' . $index++;
}
return $mapping;
$flagMapping->addChild(new FieldMapping($childName, FieldMapping::TYPE_BOOLEAN));
}
return $flagMapping;
}
}

View File

@@ -13,27 +13,18 @@ namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\Core\Event\Thesaurus\ReindexRequiredEvent;
use Alchemy\Phrasea\Core\Event\Thesaurus\ThesaurusEvents;
use Alchemy\Phrasea\SearchEngine\Elastic\DataboxFetcherFactory;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegateInterface;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\RecordListFetcherDelegate;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\ScheduledFetcherDelegate;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Fetcher;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\CoreHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\FlagHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\MetadataHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\SubDefinitionHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\ThesaurusHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Hydrator\TitleHydrator;
use Alchemy\Phrasea\SearchEngine\Elastic\Mapping;
use Alchemy\Phrasea\SearchEngine\Elastic\MappingBuilder;
use Alchemy\Phrasea\SearchEngine\Elastic\RecordHelper;
use Alchemy\Phrasea\SearchEngine\Elastic\Structure\Field;
use Alchemy\Phrasea\SearchEngine\Elastic\Structure\Structure;
use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus;
use Alchemy\Phrasea\SearchEngine\Elastic\Thesaurus\CandidateTerms;
use databox;
use Iterator;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use record_adapter;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
@@ -41,26 +32,11 @@ class RecordIndexer
{
const TYPE_NAME = 'record';
/**
* @var Structure
*/
private $structure;
/**
* @var RecordHelper
*/
private $helper;
/**
* @var Thesaurus
*/
private $thesaurus;
/**
* @var array
*/
private $locales;
/**
* @var LoggerInterface
*/
@@ -71,34 +47,27 @@ class RecordIndexer
*/
private $eventDispatcher;
private function getUniqueOperationId($record_key)
{
$_key = dechex(mt_rand());
return $_key . '_' . $record_key;
}
/**
* @var DataboxFetcherFactory
*/
private $fetcherFactory;
/**
* @param Structure $structure
* @param DataboxFetcherFactory $fetcherFactory
* @param RecordHelper $helper
* @param Thesaurus $thesaurus
* @param array $locales
* @param LoggerInterface $logger
* @param EventDispatcherInterface $eventDispatcher
*/
public function __construct(
Structure $structure,
DataboxFetcherFactory $fetcherFactory,
RecordHelper $helper,
Thesaurus $thesaurus,
array $locales,
LoggerInterface $logger,
EventDispatcherInterface $eventDispatcher
)
{
$this->structure = $structure;
$this->helper = $helper;
$this->thesaurus = $thesaurus;
$this->locales = $locales;
$this->logger = $logger;
EventDispatcherInterface $eventDispatcher,
LoggerInterface $logger = null
) {
$this->eventDispatcher = $eventDispatcher;
$this->fetcherFactory = $fetcherFactory;
$this->helper = $helper;
$this->logger = $logger ?: new NullLogger();
}
/**
@@ -108,6 +77,7 @@ class RecordIndexer
private function getUniqueOperationId($record_key)
{
$_key = dechex(mt_rand());
return $_key . '_' . $record_key;
}
@@ -149,11 +119,11 @@ class RecordIndexer
*/
public function populateIndex(BulkOperation $bulk, databox $databox)
{
foreach ($databoxes as $databox) {
$this->logger->info(sprintf('Indexing database %s...', $databox->get_viewname()));
$submitted_records = [];
$fetcher = $this->createFetcherForDatabox($databox); // no delegate, scan the whole records
// No delegate, scan all records
$fetcher = $this->fetcherFactory->createFetcher($databox);
// post fetch : flag records as "indexing"
$fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) {
@@ -185,7 +155,7 @@ class RecordIndexer
// Make fetcher
$delegate = new ScheduledFetcherDelegate();
$fetcher = $this->createFetcherForDatabox($databox, $delegate);
$fetcher = $this->fetcherFactory->createFetcher($databox, $delegate);
// post fetch : flag records as "indexing"
$fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) {
@@ -261,34 +231,12 @@ class RecordIndexer
$databox = $group['databox'];
$delegate = new RecordListFetcherDelegate($group['records']);
$fetchers[] = $this->createFetcherForDatabox($databox, $delegate);
$fetchers[] = $this->fetcherFactory->createFetcher($databox, $delegate);
}
return $fetchers;
}
private function createFetcherForDatabox(databox $databox, FetcherDelegateInterface $delegate = null)
{
$connection = $databox->get_connection();
$candidateTerms = new CandidateTerms($databox);
$fetcher = new Fetcher($databox, array(
new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->helper),
new TitleHydrator($connection),
new MetadataHydrator($connection, $this->structure, $this->helper),
new FlagHydrator($this->structure, $databox),
new ThesaurusHydrator($this->structure, $this->thesaurus, $candidateTerms),
new SubDefinitionHydrator($connection)
), $delegate);
$fetcher->setBatchSize(200);
$fetcher->onDrain(function() use ($candidateTerms) {
$candidateTerms->save();
});
return $fetcher;
}
private function groupRecordsByDatabox(Iterator $records)
{
$databoxes = array();
@@ -344,119 +292,4 @@ class RecordIndexer
$bulk->index($params, $op_identifier);
}
}
public function getMapping()
{
$mapping = new MappingBuilder();
// Compound primary key
$mapping->addField('record_id', FieldMapping::TYPE_INTEGER);
$mapping->addField('databox_id', FieldMapping::TYPE_INTEGER);
// Database name (still indexed for facets)
$mapping->addStringField('databox_name')->disableAnalysis();
// Unique collection ID
$mapping->addField('base_id', FieldMapping::TYPE_INTEGER);
// Useless collection ID (local to databox)
$mapping->addField('collection_id', FieldMapping::TYPE_INTEGER)->disableIndexing();
// Collection name (still indexed for facets)
$mapping->addStringField('collection_name')->disableAnalysis();
$mapping->addStringField('uuid')->disableIndexing();
$mapping->addStringField('sha256')->disableIndexing();
$mapping->addStringField('original_name')->disableIndexing();
$mapping->addStringField('mime')->disableAnalysis();
$mapping->addStringField('type')->disableAnalysis();
$mapping->addStringField('record_type')->disableAnalysis();
$mapping->addDateField('created_on', FieldMapping::DATE_FORMAT_MYSQL_OR_CAPTION);
$mapping->addDateField('updated_on', FieldMapping::DATE_FORMAT_MYSQL_OR_CAPTION);
$mapping->add($this->buildThesaurusPathMapping('concept_path'));
$mapping->add($this->buildMetadataTagMapping('metadata_tags'));
$mapping->add($this->buildFlagMapping('flags'));
$mapping->addField('flags_bitfield', FieldMapping::TYPE_INTEGER)->disableIndexing();
$mapping->addField('subdefs', FieldMapping::TYPE_OBJECT)->disableMapping();
$mapping->addField('title', FieldMapping::TYPE_OBJECT)->disableMapping();
// Caption mapping
$this->buildCaptionMapping($mapping, 'caption', $this->structure->getUnrestrictedFields());
$this->buildCaptionMapping($mapping, 'private_caption', $this->structure->getPrivateFields());
echo var_export($mapping->getMapping()->export()); die();
}
private function buildCaptionMapping(MappingBuilder $parent, $name, array $fields)
{
$fieldConverter = new Mapping\FieldToFieldMappingConverter();
$captionMapping = new Mapping\ComplexFieldMapping($name, FieldMapping::TYPE_OBJECT);
$captionMapping->useAsPropertyContainer();
foreach ($fields as $field) {
$captionMapping->addChild($fieldConverter->convertField($field, $this->locales));
}
$parent->add($captionMapping);
$localizedCaptionMapping = new Mapping\StringFieldMapping(sprintf('%s_all', $name));
$localizedCaptionMapping
->addLocalizedChildren($this->locales)
->addChild((new Mapping\StringFieldMapping('raw'))->enableRawIndexing());
$parent->add($localizedCaptionMapping);
return $captionMapping;
}
private function buildThesaurusPathMapping($name)
{
$thesaurusMapping = new Mapping\ComplexFieldMapping($name, FieldMapping::TYPE_OBJECT);
foreach (array_keys($this->structure->getThesaurusEnabledFields()) as $name) {
$child = new Mapping\StringFieldMapping($name);
$child->setAnalyzer('thesaurus_path', 'indexing');
$child->setAnalyzer('keyword', 'searching');
$child->addChild((new Mapping\StringFieldMapping('raw'))->enableRawIndexing());
$thesaurusMapping->addChild($thesaurusMapping);
}
return $thesaurusMapping;
}
private function buildMetadataTagMapping($name)
{
$tagConverter = new Mapping\MetadataTagToFieldMappingConverter();
$metadataMapping = new Mapping\ComplexFieldMapping($name, FieldMapping::TYPE_OBJECT);
$metadataMapping->useAsPropertyContainer();
foreach ($this->structure->getMetadataTags() as $tag) {
$metadataMapping->addChild($tagConverter->convertTag($tag));
}
return $metadataMapping;
}
private function buildFlagMapping($name)
{
$index = 0;
$flagMapping = new Mapping\ComplexFieldMapping($name, FieldMapping::TYPE_OBJECT);
$flagMapping->useAsPropertyContainer();
foreach ($this->structure->getAllFlags() as $childName => $_) {
if (trim($childName) == '') {
$childName = 'flag_' . $index++;
}
$flagMapping->addChild(new FieldMapping($childName, FieldMapping::TYPE_BOOLEAN));
}
return $flagMapping;
}
}

View File

@@ -0,0 +1,52 @@
<?php
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Alchemy\Phrasea\SearchEngine\Elastic\FieldMapping;
use Alchemy\Phrasea\SearchEngine\Elastic\MappingBuilder;
use Alchemy\Phrasea\SearchEngine\Elastic\MappingProvider;
class TermIndex implements MappingProvider
{
/**
* @var string[]
*/
private $locales;
/**
* @param string[] $locales
*/
public function __construct(array $locales)
{
$this->locales = $locales;
}
/**
* @return \Alchemy\Phrasea\SearchEngine\Elastic\Mapping
*/
public function getMapping()
{
$mapping = new MappingBuilder();
$mapping->addStringField('raw_value')->disableAnalysis();
$mapping->addStringField('value')
->setAnalyzer('general_light')
->addAnalyzedChild('strict', 'thesaurus_term_strict')
->addLocalizedChildren($this->locales);
$mapping->addStringField('context')
->setAnalyzer('general_light')
->addAnalyzedChild('strict', 'thesaurus_term_strict')
->addLocalizedChildren($this->locales);
$mapping->addStringField('path')
->setAnalyzer('thesaurus_path', 'indexing')
->setAnalyzer('keyword', 'searching')
->addRawChild();
$mapping->addStringField('lang')->disableAnalysis();
$mapping->addIntegerField('databox_id');
return $mapping->getMapping();
}
}

View File

@@ -29,18 +29,32 @@ class TermIndexer
*/
private $appbox;
/**
* @var Navigator
*/
private $navigator;
private $locales;
/**
* @var LoggerInterface
*/
private $logger;
public function __construct(\appbox $appbox, array $locales, LoggerInterface $logger)
/**
* @param \appbox $appbox
* @param LoggerInterface $logger
*/
public function __construct(\appbox $appbox, LoggerInterface $logger)
{
$this->appbox = $appbox;
$this->navigator = new Navigator();
$this->locales = $locales;
$this->logger = $logger;
}
/**
* @param BulkOperation $bulk
* @param databox $databox
* @throws \Doctrine\DBAL\DBALException
*/
public function populateIndex(BulkOperation $bulk, databox $databox)
{
$databoxId = $databox->get_sbas_id();
@@ -80,30 +94,4 @@ class TermIndexer
[$indexDate, $indexDate]
);
}
public function getMapping()
{
$mapping = new MappingBuilder();
$mapping->addStringField('raw_value')->disableAnalysis();
$mapping->addStringField('value')
->setAnalyzer('general_light')
->addAnalyzedChild('strict', 'thesaurus_term_strict')
->addLocalizedChildren($this->locales);
$mapping->addStringField('context')
->setAnalyzer('general_light')
->addAnalyzedChild('strict', 'thesaurus_term_strict')
->addLocalizedChildren($this->locales);
$mapping->addStringField('path')
->setAnalyzer('thesaurus_path', 'indexing')
->setAnalyzer('keyword', 'searching')
->addRawChild();
$mapping->addStringField('lang')->disableAnalysis();
$mapping->addField('databox_id', FieldMapping::TYPE_STRING);
return $mapping->getMapping()->export();
}
}

View File

@@ -11,6 +11,7 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic;
use Alchemy\Phrasea\SearchEngine\Elastic\Mapping\ComplexFieldMapping;
use Alchemy\Phrasea\SearchEngine\Elastic\Mapping\DateFieldMapping;
use Alchemy\Phrasea\SearchEngine\Elastic\Mapping\StringFieldMapping;
@@ -27,7 +28,7 @@ class MappingBuilder
}
/**
* @param string $name;
* @param string $name
* @return StringFieldMapping
*/
public function addStringField($name)
@@ -35,6 +36,24 @@ class MappingBuilder
return $this->mapping->addField(new StringFieldMapping($name));
}
/**
* @param string $name
* @return FieldMapping
*/
public function addIntegerField($name)
{
return $this->mapping->addField(new FieldMapping($name, FieldMapping::TYPE_INTEGER));
}
/**
* @param string $name
* @return FieldMapping
*/
public function addObjectField($name)
{
return $this->mapping->addField(new ComplexFieldMapping($name, FieldMapping::TYPE_OBJECT));
}
/**
* @param string $name
* @param string $format

View File

@@ -0,0 +1,11 @@
<?php
namespace Alchemy\Phrasea\SearchEngine\Elastic;
interface MappingProvider
{
/**
* @return Mapping
*/
public function getMapping();
}