WIP Indexer

This commit is contained in:
Mathieu Darse
2014-08-26 16:42:48 +02:00
parent 6630d67d95
commit d187bfe80d
8 changed files with 438 additions and 104 deletions

View File

@@ -14,7 +14,9 @@ namespace KonsoleKommander;
use Alchemy\Phrasea\Command\Plugin\ListPlugin;
use Alchemy\Phrasea\Command\Setup\H264ConfigurationDumper;
use Alchemy\Phrasea\Command\Setup\H264MappingGenerator;
use Alchemy\Phrasea\Command\SearchEngine\IndexFull;
use Alchemy\Phrasea\Command\SearchEngine\IndexCreateCommand;
use Alchemy\Phrasea\Command\SearchEngine\IndexDropCommand;
use Alchemy\Phrasea\Command\SearchEngine\IndexPopulateCommand;
use Alchemy\Phrasea\Command\WebsocketServer;
use Alchemy\Phrasea\Core\Version;
use Alchemy\Phrasea\Command\BuildMissingSubdefs;
@@ -116,7 +118,9 @@ $cli->command(new XSendFileConfigurationDumper());
$cli->command(new XSendFileMappingGenerator());
if ($cli['phraseanet.SE']->getName() === 'ElasticSearch') {
$cli->command(new IndexFull('searchengine:index'));
$cli->command(new IndexCreateCommand());
$cli->command(new IndexDropCommand());
$cli->command(new IndexPopulateCommand());
}
$cli->command(new WebsocketServer('ws-server:run'));

View File

@@ -0,0 +1,40 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Command\SearchEngine;
use Alchemy\Phrasea\Command\Command;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class IndexCreateCommand extends Command
{
protected function configure()
{
$this
->setName('searchengine:index:create')
->setDescription('Creates search index')
;
}
protected function doExecute(InputInterface $input, OutputInterface $output)
{
$indexer = $this->container['elasticsearch.indexer'];
if ($indexer->indexExists()) {
$output->writeln('<error>The search index already exists.</error>');
} else {
$indexer->createIndex();
$output->writeln('Search index was created');
}
}
}

View File

@@ -0,0 +1,47 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\Command\SearchEngine;
use Alchemy\Phrasea\Command\Command;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class IndexDropCommand extends Command
{
protected function configure()
{
$this
->setName('searchengine:index:drop')
->setDescription('Deletes the search index')
;
}
protected function doExecute(InputInterface $input, OutputInterface $output)
{
$question = '<question>You are about to delete the index and all contained data. Are you sure you wish to continue? (y/n)</question>';
$confirmation = $this->getHelper('dialog')->askConfirmation($output, $question, false);
if ($confirmation) {
$indexer = $this->container['elasticsearch.indexer'];
if ($indexer->indexExists()) {
$indexer->deleteIndex();
$output->writeln('Search index was dropped');
} else {
$output->writeln('<error>The index was not dropped because it does not exists</error>');
}
} else {
$output->writeln('Canceled.');
}
}
}

View File

@@ -16,11 +16,18 @@ use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class IndexFull extends Command
class IndexPopulateCommand extends Command
{
protected function configure()
{
$this
->setName('searchengine:index:populate')
->setDescription('Populate search index with record data')
;
}
protected function doExecute(InputInterface $input, OutputInterface $output)
{
$this->container['ES.indexer']->createIndex();
$this->container['ES.indexer']->reindexAll();
$this->container['elasticsearch.indexer']->populateIndex();
}
}

View File

@@ -15,6 +15,7 @@ use Alchemy\Phrasea\SearchEngine\SearchEngineLogger;
use Alchemy\Phrasea\Exception\InvalidArgumentException;
use Alchemy\Phrasea\SearchEngine\SearchEngineInterface;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Elasticsearch\Client;
use Silex\Application;
use Silex\ServiceProviderInterface;
@@ -46,8 +47,33 @@ class SearchEngineServiceProvider implements ServiceProviderInterface
return $app['phraseanet.SE.engine-class']::createSubscriber($app);
});
$app['ES.indexer'] = $app->share(function ($app) {
return new Indexer($app['phraseanet.SE'], $app['monolog'], $app['phraseanet.appbox']);
$app['elasticsearch.indexer'] = $app->share(function ($app) {
return new Indexer(
$app['elasticsearch.client'],
$app['elasticsearch.options'],
$app['monolog'],
$app['phraseanet.appbox']
);
});
$app['elasticsearch.client'] = $app->share(function($app) {
$options = $app['elasticsearch.options'];
$host = sprintf('%s:%s', $options['host'], $options['port']);
return new Client(array('hosts' => array($host)));
});
$app['elasticsearch.options'] = $app->share(function($app) {
$options = $app['conf']->get(['main', 'search-engine', 'options']);
$defaults = [
'host' => '127.0.0.1',
'port' => 9200,
'index' => 'phraseanet',
'shards' => 3,
'replicas' => 0
];
return array_replace($defaults, $options);
});
}

View File

@@ -11,57 +11,158 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic;
use Elasticsearch\Client;
use Psr\Log\LoggerInterface;
use igorw;
class Indexer
{
/** @var Elasticsearch\Client */
private $client;
private $options;
private $engine;
private $logger;
private $appbox;
public function __construct(ElasticSearchEngine $engine, LoggerInterface $logger, \appbox $appbox)
private $previousRefreshInterval = self::DEFAULT_REFRESH_INTERVAL;
const DEFAULT_REFRESH_INTERVAL = '1s';
const REFRESH_INTERVAL_KEY = 'index.refresh_interval';
const RECORD_TYPE = 'record';
public function __construct(Client $client, array $options, LoggerInterface $logger, \appbox $appbox)
{
$this->engine = $engine;
$this->client = $client;
$this->options = $options;
$this->logger = $logger;
$this->appbox = $appbox;
}
public function createIndex()
public function createIndex($withMapping = true)
{
$indexParams['index'] = $this->engine->getIndexName();
$params = array();
$params['index'] = $this->options['index'];
$params['body']['settings']['number_of_shards'] = $this->options['shards'];
$params['body']['settings']['number_of_replicas'] = $this->options['replicas'];
if ($withMapping) {
$params['body']['mappings'][self::RECORD_TYPE] = $this->getRecordMapping();
}
$this->client->indices()->create($params);
}
// Index Settings
$indexParams['body']['settings']['number_of_shards'] = 3;
$indexParams['body']['settings']['number_of_replicas'] = 0;
public function updateMapping()
{
$params = array();
$params['index'] = $this->options['index'];
$params['type'] = self::RECORD_TYPE;
$params['body'][self::RECORD_TYPE] = $this->getRecordMapping();
$this->client->indices()->putMapping($params);
}
$captionFields = [];
$businessFields = [];
public function deleteIndex()
{
$params = array('index' => $this->options['index']);
$this->client->indices()->delete($params);
}
public function indexExists()
{
$params = array('index' => $this->options['index']);
return $this->client->indices()->exists($params);
}
public function populateIndex()
{
$this->disableShardRefreshing();
try {
foreach ($this->appbox->get_databoxes() as $databox) {
foreach ($databox->get_meta_structure() as $dbField) {
$type = 'string';
if (\databox_field::TYPE_DATE === $dbField->get_type()) {
$type = 'date';
$fetcher = new RecordFetcher($databox);
$fetcher->setBatchSize(200);
while ($record = $fetcher->fetch()) {
$params = array();
$params['index'] = $this->options['index'];
$params['type'] = self::RECORD_TYPE;
$params['id'] = $record['id'];
$params['body'] = $record;
$response = $this->client->index($params);
}
if (isset($captionFields[$dbField->get_name()]) && $type !== $captionFields[$dbField->get_name()]['type']) {
$type = 'string';
}
$captionFields[$dbField->get_name()] = [
'type' => $type,
'include_in_all' => !$dbField->isBusiness(),
'analyzer' => 'french',
];
// Optimize index
$params = array('index' => $this->options['index']);
$this->client->indices()->optimize($params);
if ($dbField->isBusiness()) {
$businessFields[$dbField->get_name()] = [
'type' => $type,
'include_in_all' => false,
'analyzer' => 'french',
];
} catch (Exception $e) {
$this->restoreShardRefreshing();
throw $e;
}
}
private function disableShardRefreshing()
{
$refreshInterval = $this->getSetting(self::REFRESH_INTERVAL_KEY);
if (null !== $refreshInterval) {
$this->previousRefreshInterval = $refreshInterval;
}
$this->setSetting(self::REFRESH_INTERVAL_KEY, -1);
}
private function restoreShardRefreshing()
{
$this->setSetting(self::REFRESH_INTERVAL_KEY, $this->previousRefreshInterval);
$this->previousRefreshInterval = self::DEFAULT_REFRESH_INTERVAL;
}
private function getSetting($name)
{
$index = $this->options['index'];
$params = array();
$params['index'] = $index;
$params['name'] = $name;
$params['flat_settings'] = true;
$response = $this->client->indices()->getSettings($params);
return igorw\get_in($response, [$index, 'settings', $name]);
}
private function setSetting($name, $value)
{
$index = $this->options['index'];
$params = array();
$params['index'] = $index;
$params['body'][$name] = $value;
$response = $this->client->indices()->putSettings($params);
return igorw\get_in($response, ['acknowledged']);
}
private function getRecordMapping()
{
$mapping = new Mapping();
$mapping
// Identifiers
->add('record_id', 'integer') // Compound primary key
->add('databox_id', 'integer') // Compound primary key
->add('base_id', 'integer') // Unique collection ID
->add('collection_id', 'integer') // Useless collection ID (local to databox)
->add('uuid', 'string')->notAnalyzed()
->add('sha256', 'string')->notAnalyzed()
// Mandatory metadata
->add('original_name', 'string')->notAnalyzed()
->add('mime', 'string')->notAnalyzed()
->add('type', 'string')->notAnalyzed()
// Dates
->add('created_at', 'date')->format('yyyy-MM-dd HH:mm:ss')
->add('updated_at', 'date')->format('yyyy-MM-dd HH:mm:ss')
;
return $mapping->export();
// TODO Migrate code below this line
$status = [];
for ($i = 0; $i <= 32; $i ++) {
@@ -237,73 +338,5 @@ class Indexer
'properties' => $businessFields
];
}
$indexParams['body']['mappings']['record'] = $recordTypeMapping;
if ($this->engine->getClient()->indices()->exists(['index' => $this->engine->getIndexName()])) {
$this->engine->getClient()->indices()->delete(['index' => $this->engine->getIndexName()]);
}
$ret = $this->engine->getClient()->indices()->create($indexParams);
if (!$this->isResultOk($ret)) {
throw new \RuntimeException('Unable to create index');
}
}
private function isResultOk(array $ret)
{
if (isset($ret['acknowledged']) && $ret['acknowledged']) {
return true;
}
if (isset($ret['ok']) && $ret['ok']) {
return true;
}
return false;
}
public function reindexAll()
{
$qty = 10;
$params['index'] = $this->engine->getIndexName();
$params['body']['index']['refresh_interval'] = 300;
$ret = $this->engine->getClient()->indices()->putSettings($params);
if (!$this->isResultOk($ret)) {
$this->logger->error('Unable to set the refresh interval to 300 s. .');
}
foreach ($this->appbox->get_databoxes() as $databox) {
$offset = 0;
do {
$sql = 'SELECT record_id FROM record
WHERE parent_record_id = 0
ORDER BY record_id ASC LIMIT '.$offset.', '.$qty;
$stmt = $databox->get_connection()->prepare($sql);
$stmt->execute();
$rows = $stmt->fetchAll(\PDO::FETCH_ASSOC);
$stmt->closeCursor();
foreach ($rows as $row) {
$record = $databox->get_record($row['record_id']);
$this->engine->addRecord($record);
}
gc_collect_cycles();
$offset += $qty;
} while (count($rows) > 0);
}
$params['index'] = $this->engine->getIndexName();
$params['body']['index']['refresh_interval'] = 1;
$ret = $this->engine->getClient()->indices()->putSettings($params);
if (!$this->isResultOk($ret)) {
throw new \RuntimeException('Unable to set the refresh interval to 1 s. .');
}
}
}

View File

@@ -0,0 +1,63 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\SearchEngine\Elastic;
class Mapping
{
private $fields = array();
private $current;
public function add($name, $type)
{
// TODO Check input
$this->fields[$name] = array('type' => $type);
$this->current = $name;
return $this;
}
public function export()
{
return ['properties' => $this->fields];
}
public function notAnalyzed()
{
$field =& $this->currentField();
if ($field['type'] !== 'string') {
throw new \LogicException('Only string fields can be not analyzed');
}
$field['index'] = 'not_analyzed';
return $this;
}
public function format($format)
{
$field =& $this->currentField();
if ($field['type'] !== 'date') {
throw new \LogicException('Only date fields can have a format');
}
$field['format'] = $format;
return $this;
}
protected function &currentField()
{
if (null === $this->current) {
throw new \LogicException('You must add a field first');
}
return $this->fields[$this->current];
}
}

View File

@@ -0,0 +1,114 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\SearchEngine\Elastic;
use databox;
use Doctrine\DBAL\Driver\Connection;
use PDO;
class RecordFetcher
{
private $connection;
private $statement;
private $offset = 0;
private $batchSize = 1;
private $needsFetch = true;
private $databoxId;
public function __construct(databox $databox)
{
$this->connection = $databox->get_connection();
$this->databoxId = $databox->get_sbas_id();
}
public function fetch()
{
$statement = $this->statement();
// Start of a batch
if ($this->needsFetch) {
$statement->execute();
$this->needsFetch = false;
printf("Query %d/%d -> %d results\n", $this->offset, $this->batchSize, $statement->rowCount());
}
if ($record = $statement->fetch()) {
// printf("Record found (#%d)\n", $record['id']);
$record = $this->hydrate($record);
$this->offset++;
} else {
printf("End of records\n");
}
// If we exausted the last result set
if ($this->offset % $this->batchSize === 0 || !$record) {
$statement->closeCursor();
$this->needsFetch = true;
}
return $record;
}
public function setBatchSize($size)
{
if ($size < 1) {
throw new LogicException("Batch size must be greater than or equal to 1");
}
$this->batchSize = (int) $size;
}
private function hydrate(array $record)
{
// Some casting
$record['record_id'] = (int) $record['record_id'];
$record['collection_id'] = (int) $record['collection_id'];
// Some identifiers
$record['databox_id'] = $this->databoxId;
$record['base_id'] = null; // TODO
$record['id'] = self::uniqueIdentifier($record);
return $record;
}
private static function uniqueIdentifier($record)
{
return sprintf('%d_%d', $record['databox_id'], $record['record_id']);
}
private function statement()
{
if (!$this->statement) {
$sql = 'SELECT
record_id,
coll_id as collection_id,
uuid,
sha256,
originalname as original_name,
mime,
type,
credate as created_at,
moddate as updated_at
FROM record
WHERE parent_record_id = 0 -- Only records, not stories
ORDER BY record_id ASC
LIMIT :offset, :limit;';
$statement = $this->connection->prepare($sql);
$statement->bindParam(':offset', $this->offset, PDO::PARAM_INT);
$statement->bindParam(':limit', $this->batchSize, PDO::PARAM_INT);
$this->statement = $statement;
}
return $this->statement;
}
}