From d187bfe80d3d6475ba5ec5bd53e4803fc9d4604f Mon Sep 17 00:00:00 2001 From: Mathieu Darse Date: Tue, 26 Aug 2014 16:42:48 +0200 Subject: [PATCH] WIP Indexer --- bin/console | 8 +- .../SearchEngine/IndexCreateCommand.php | 40 +++ .../Command/SearchEngine/IndexDropCommand.php | 47 ++++ ...IndexFull.php => IndexPopulateCommand.php} | 13 +- .../Provider/SearchEngineServiceProvider.php | 30 ++- .../Phrasea/SearchEngine/Elastic/Indexer.php | 227 ++++++++++-------- .../Phrasea/SearchEngine/Elastic/Mapping.php | 63 +++++ .../SearchEngine/Elastic/RecordFetcher.php | 114 +++++++++ 8 files changed, 438 insertions(+), 104 deletions(-) create mode 100644 lib/Alchemy/Phrasea/Command/SearchEngine/IndexCreateCommand.php create mode 100644 lib/Alchemy/Phrasea/Command/SearchEngine/IndexDropCommand.php rename lib/Alchemy/Phrasea/Command/SearchEngine/{IndexFull.php => IndexPopulateCommand.php} (64%) create mode 100644 lib/Alchemy/Phrasea/SearchEngine/Elastic/Mapping.php create mode 100644 lib/Alchemy/Phrasea/SearchEngine/Elastic/RecordFetcher.php diff --git a/bin/console b/bin/console index 9a3d2317e7..b3ce257b10 100755 --- a/bin/console +++ b/bin/console @@ -14,7 +14,9 @@ namespace KonsoleKommander; use Alchemy\Phrasea\Command\Plugin\ListPlugin; use Alchemy\Phrasea\Command\Setup\H264ConfigurationDumper; use Alchemy\Phrasea\Command\Setup\H264MappingGenerator; -use Alchemy\Phrasea\Command\SearchEngine\IndexFull; +use Alchemy\Phrasea\Command\SearchEngine\IndexCreateCommand; +use Alchemy\Phrasea\Command\SearchEngine\IndexDropCommand; +use Alchemy\Phrasea\Command\SearchEngine\IndexPopulateCommand; use Alchemy\Phrasea\Command\WebsocketServer; use Alchemy\Phrasea\Core\Version; use Alchemy\Phrasea\Command\BuildMissingSubdefs; @@ -116,7 +118,9 @@ $cli->command(new XSendFileConfigurationDumper()); $cli->command(new XSendFileMappingGenerator()); if ($cli['phraseanet.SE']->getName() === 'ElasticSearch') { - $cli->command(new IndexFull('searchengine:index')); + $cli->command(new IndexCreateCommand()); + $cli->command(new IndexDropCommand()); + $cli->command(new IndexPopulateCommand()); } $cli->command(new WebsocketServer('ws-server:run')); diff --git a/lib/Alchemy/Phrasea/Command/SearchEngine/IndexCreateCommand.php b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexCreateCommand.php new file mode 100644 index 0000000000..6247962b08 --- /dev/null +++ b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexCreateCommand.php @@ -0,0 +1,40 @@ +setName('searchengine:index:create') + ->setDescription('Creates search index') + ; + } + + protected function doExecute(InputInterface $input, OutputInterface $output) + { + $indexer = $this->container['elasticsearch.indexer']; + + if ($indexer->indexExists()) { + $output->writeln('The search index already exists.'); + } else { + $indexer->createIndex(); + $output->writeln('Search index was created'); + } + } +} diff --git a/lib/Alchemy/Phrasea/Command/SearchEngine/IndexDropCommand.php b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexDropCommand.php new file mode 100644 index 0000000000..ac642f14ef --- /dev/null +++ b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexDropCommand.php @@ -0,0 +1,47 @@ +setName('searchengine:index:drop') + ->setDescription('Deletes the search index') + ; + } + + protected function doExecute(InputInterface $input, OutputInterface $output) + { + + $question = 'You are about to delete the index and all contained data. Are you sure you wish to continue? (y/n)'; + $confirmation = $this->getHelper('dialog')->askConfirmation($output, $question, false); + + if ($confirmation) { + $indexer = $this->container['elasticsearch.indexer']; + if ($indexer->indexExists()) { + $indexer->deleteIndex(); + $output->writeln('Search index was dropped'); + } else { + $output->writeln('The index was not dropped because it does not exists'); + } + } else { + $output->writeln('Canceled.'); + } + } +} diff --git a/lib/Alchemy/Phrasea/Command/SearchEngine/IndexFull.php b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexPopulateCommand.php similarity index 64% rename from lib/Alchemy/Phrasea/Command/SearchEngine/IndexFull.php rename to lib/Alchemy/Phrasea/Command/SearchEngine/IndexPopulateCommand.php index db61f28ad6..91856dd05b 100644 --- a/lib/Alchemy/Phrasea/Command/SearchEngine/IndexFull.php +++ b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexPopulateCommand.php @@ -16,11 +16,18 @@ use Alchemy\Phrasea\SearchEngine\Elastic\Indexer; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -class IndexFull extends Command +class IndexPopulateCommand extends Command { + protected function configure() + { + $this + ->setName('searchengine:index:populate') + ->setDescription('Populate search index with record data') + ; + } + protected function doExecute(InputInterface $input, OutputInterface $output) { - $this->container['ES.indexer']->createIndex(); - $this->container['ES.indexer']->reindexAll(); + $this->container['elasticsearch.indexer']->populateIndex(); } } diff --git a/lib/Alchemy/Phrasea/Core/Provider/SearchEngineServiceProvider.php b/lib/Alchemy/Phrasea/Core/Provider/SearchEngineServiceProvider.php index d31b51316f..2a22c1932c 100644 --- a/lib/Alchemy/Phrasea/Core/Provider/SearchEngineServiceProvider.php +++ b/lib/Alchemy/Phrasea/Core/Provider/SearchEngineServiceProvider.php @@ -15,6 +15,7 @@ use Alchemy\Phrasea\SearchEngine\SearchEngineLogger; use Alchemy\Phrasea\Exception\InvalidArgumentException; use Alchemy\Phrasea\SearchEngine\SearchEngineInterface; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer; +use Elasticsearch\Client; use Silex\Application; use Silex\ServiceProviderInterface; @@ -46,8 +47,33 @@ class SearchEngineServiceProvider implements ServiceProviderInterface return $app['phraseanet.SE.engine-class']::createSubscriber($app); }); - $app['ES.indexer'] = $app->share(function ($app) { - return new Indexer($app['phraseanet.SE'], $app['monolog'], $app['phraseanet.appbox']); + $app['elasticsearch.indexer'] = $app->share(function ($app) { + return new Indexer( + $app['elasticsearch.client'], + $app['elasticsearch.options'], + $app['monolog'], + $app['phraseanet.appbox'] + ); + }); + + $app['elasticsearch.client'] = $app->share(function($app) { + $options = $app['elasticsearch.options']; + $host = sprintf('%s:%s', $options['host'], $options['port']); + + return new Client(array('hosts' => array($host))); + }); + + $app['elasticsearch.options'] = $app->share(function($app) { + $options = $app['conf']->get(['main', 'search-engine', 'options']); + $defaults = [ + 'host' => '127.0.0.1', + 'port' => 9200, + 'index' => 'phraseanet', + 'shards' => 3, + 'replicas' => 0 + ]; + + return array_replace($defaults, $options); }); } diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php index 8cfa2a4299..369256c5f9 100644 --- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php @@ -11,57 +11,158 @@ namespace Alchemy\Phrasea\SearchEngine\Elastic; +use Elasticsearch\Client; use Psr\Log\LoggerInterface; +use igorw; class Indexer { + /** @var Elasticsearch\Client */ + private $client; + private $options; private $engine; private $logger; private $appbox; - public function __construct(ElasticSearchEngine $engine, LoggerInterface $logger, \appbox $appbox) + private $previousRefreshInterval = self::DEFAULT_REFRESH_INTERVAL; + + const DEFAULT_REFRESH_INTERVAL = '1s'; + const REFRESH_INTERVAL_KEY = 'index.refresh_interval'; + + const RECORD_TYPE = 'record'; + + public function __construct(Client $client, array $options, LoggerInterface $logger, \appbox $appbox) { - $this->engine = $engine; + $this->client = $client; + $this->options = $options; $this->logger = $logger; $this->appbox = $appbox; } - public function createIndex() + public function createIndex($withMapping = true) { - $indexParams['index'] = $this->engine->getIndexName(); + $params = array(); + $params['index'] = $this->options['index']; + $params['body']['settings']['number_of_shards'] = $this->options['shards']; + $params['body']['settings']['number_of_replicas'] = $this->options['replicas']; + if ($withMapping) { + $params['body']['mappings'][self::RECORD_TYPE] = $this->getRecordMapping(); + } + $this->client->indices()->create($params); + } - // Index Settings - $indexParams['body']['settings']['number_of_shards'] = 3; - $indexParams['body']['settings']['number_of_replicas'] = 0; + public function updateMapping() + { + $params = array(); + $params['index'] = $this->options['index']; + $params['type'] = self::RECORD_TYPE; + $params['body'][self::RECORD_TYPE] = $this->getRecordMapping(); + $this->client->indices()->putMapping($params); + } - $captionFields = []; - $businessFields = []; + public function deleteIndex() + { + $params = array('index' => $this->options['index']); + $this->client->indices()->delete($params); + } - foreach ($this->appbox->get_databoxes() as $databox) { - foreach ($databox->get_meta_structure() as $dbField) { - $type = 'string'; - if (\databox_field::TYPE_DATE === $dbField->get_type()) { - $type = 'date'; - } - if (isset($captionFields[$dbField->get_name()]) && $type !== $captionFields[$dbField->get_name()]['type']) { - $type = 'string'; - } + public function indexExists() + { + $params = array('index' => $this->options['index']); - $captionFields[$dbField->get_name()] = [ - 'type' => $type, - 'include_in_all' => !$dbField->isBusiness(), - 'analyzer' => 'french', - ]; + return $this->client->indices()->exists($params); + } - if ($dbField->isBusiness()) { - $businessFields[$dbField->get_name()] = [ - 'type' => $type, - 'include_in_all' => false, - 'analyzer' => 'french', - ]; + public function populateIndex() + { + $this->disableShardRefreshing(); + + try { + foreach ($this->appbox->get_databoxes() as $databox) { + $fetcher = new RecordFetcher($databox); + $fetcher->setBatchSize(200); + while ($record = $fetcher->fetch()) { + $params = array(); + $params['index'] = $this->options['index']; + $params['type'] = self::RECORD_TYPE; + $params['id'] = $record['id']; + $params['body'] = $record; + $response = $this->client->index($params); } } + + // Optimize index + $params = array('index' => $this->options['index']); + $this->client->indices()->optimize($params); + + } catch (Exception $e) { + $this->restoreShardRefreshing(); + throw $e; } + } + + private function disableShardRefreshing() + { + $refreshInterval = $this->getSetting(self::REFRESH_INTERVAL_KEY); + if (null !== $refreshInterval) { + $this->previousRefreshInterval = $refreshInterval; + } + $this->setSetting(self::REFRESH_INTERVAL_KEY, -1); + } + + private function restoreShardRefreshing() + { + $this->setSetting(self::REFRESH_INTERVAL_KEY, $this->previousRefreshInterval); + $this->previousRefreshInterval = self::DEFAULT_REFRESH_INTERVAL; + } + + private function getSetting($name) + { + $index = $this->options['index']; + $params = array(); + $params['index'] = $index; + $params['name'] = $name; + $params['flat_settings'] = true; + $response = $this->client->indices()->getSettings($params); + + return igorw\get_in($response, [$index, 'settings', $name]); + } + + private function setSetting($name, $value) + { + $index = $this->options['index']; + $params = array(); + $params['index'] = $index; + $params['body'][$name] = $value; + $response = $this->client->indices()->putSettings($params); + + return igorw\get_in($response, ['acknowledged']); + } + + private function getRecordMapping() + { + $mapping = new Mapping(); + $mapping + // Identifiers + ->add('record_id', 'integer') // Compound primary key + ->add('databox_id', 'integer') // Compound primary key + ->add('base_id', 'integer') // Unique collection ID + ->add('collection_id', 'integer') // Useless collection ID (local to databox) + ->add('uuid', 'string')->notAnalyzed() + ->add('sha256', 'string')->notAnalyzed() + // Mandatory metadata + ->add('original_name', 'string')->notAnalyzed() + ->add('mime', 'string')->notAnalyzed() + ->add('type', 'string')->notAnalyzed() + // Dates + ->add('created_at', 'date')->format('yyyy-MM-dd HH:mm:ss') + ->add('updated_at', 'date')->format('yyyy-MM-dd HH:mm:ss') + ; + + return $mapping->export(); + + + // TODO Migrate code below this line $status = []; for ($i = 0; $i <= 32; $i ++) { @@ -237,73 +338,5 @@ class Indexer 'properties' => $businessFields ]; } - - $indexParams['body']['mappings']['record'] = $recordTypeMapping; - - if ($this->engine->getClient()->indices()->exists(['index' => $this->engine->getIndexName()])) { - $this->engine->getClient()->indices()->delete(['index' => $this->engine->getIndexName()]); - } - - $ret = $this->engine->getClient()->indices()->create($indexParams); - - if (!$this->isResultOk($ret)) { - throw new \RuntimeException('Unable to create index'); - } - } - - private function isResultOk(array $ret) - { - if (isset($ret['acknowledged']) && $ret['acknowledged']) { - return true; - } - if (isset($ret['ok']) && $ret['ok']) { - return true; - } - - return false; - } - - public function reindexAll() - { - $qty = 10; - - $params['index'] = $this->engine->getIndexName(); - $params['body']['index']['refresh_interval'] = 300; - - $ret = $this->engine->getClient()->indices()->putSettings($params); - - if (!$this->isResultOk($ret)) { - $this->logger->error('Unable to set the refresh interval to 300 s. .'); - } - - foreach ($this->appbox->get_databoxes() as $databox) { - $offset = 0; - do { - $sql = 'SELECT record_id FROM record - WHERE parent_record_id = 0 - ORDER BY record_id ASC LIMIT '.$offset.', '.$qty; - $stmt = $databox->get_connection()->prepare($sql); - $stmt->execute(); - $rows = $stmt->fetchAll(\PDO::FETCH_ASSOC); - $stmt->closeCursor(); - - foreach ($rows as $row) { - $record = $databox->get_record($row['record_id']); - $this->engine->addRecord($record); - } - - gc_collect_cycles(); - $offset += $qty; - } while (count($rows) > 0); - } - - $params['index'] = $this->engine->getIndexName(); - $params['body']['index']['refresh_interval'] = 1; - - $ret = $this->engine->getClient()->indices()->putSettings($params); - - if (!$this->isResultOk($ret)) { - throw new \RuntimeException('Unable to set the refresh interval to 1 s. .'); - } } } diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Mapping.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Mapping.php new file mode 100644 index 0000000000..182470afe4 --- /dev/null +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Mapping.php @@ -0,0 +1,63 @@ +fields[$name] = array('type' => $type); + $this->current = $name; + + return $this; + } + + public function export() + { + return ['properties' => $this->fields]; + } + + public function notAnalyzed() + { + $field =& $this->currentField(); + if ($field['type'] !== 'string') { + throw new \LogicException('Only string fields can be not analyzed'); + } + $field['index'] = 'not_analyzed'; + + return $this; + } + + public function format($format) + { + $field =& $this->currentField(); + if ($field['type'] !== 'date') { + throw new \LogicException('Only date fields can have a format'); + } + $field['format'] = $format; + + return $this; + } + + protected function ¤tField() + { + if (null === $this->current) { + throw new \LogicException('You must add a field first'); + } + + return $this->fields[$this->current]; + } +} diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/RecordFetcher.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/RecordFetcher.php new file mode 100644 index 0000000000..6f3920e91e --- /dev/null +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/RecordFetcher.php @@ -0,0 +1,114 @@ +connection = $databox->get_connection(); + $this->databoxId = $databox->get_sbas_id(); + } + + public function fetch() + { + $statement = $this->statement(); + + // Start of a batch + if ($this->needsFetch) { + $statement->execute(); + $this->needsFetch = false; + printf("Query %d/%d -> %d results\n", $this->offset, $this->batchSize, $statement->rowCount()); + } + + if ($record = $statement->fetch()) { + // printf("Record found (#%d)\n", $record['id']); + $record = $this->hydrate($record); + $this->offset++; + } else { + printf("End of records\n"); + } + + // If we exausted the last result set + if ($this->offset % $this->batchSize === 0 || !$record) { + $statement->closeCursor(); + $this->needsFetch = true; + } + + return $record; + } + + public function setBatchSize($size) + { + if ($size < 1) { + throw new LogicException("Batch size must be greater than or equal to 1"); + } + $this->batchSize = (int) $size; + } + + private function hydrate(array $record) + { + // Some casting + $record['record_id'] = (int) $record['record_id']; + $record['collection_id'] = (int) $record['collection_id']; + // Some identifiers + $record['databox_id'] = $this->databoxId; + $record['base_id'] = null; // TODO + $record['id'] = self::uniqueIdentifier($record); + + return $record; + } + + private static function uniqueIdentifier($record) + { + return sprintf('%d_%d', $record['databox_id'], $record['record_id']); + } + + private function statement() + { + if (!$this->statement) { + $sql = 'SELECT + record_id, + coll_id as collection_id, + uuid, + sha256, + originalname as original_name, + mime, + type, + credate as created_at, + moddate as updated_at + FROM record + WHERE parent_record_id = 0 -- Only records, not stories + ORDER BY record_id ASC + LIMIT :offset, :limit;'; + $statement = $this->connection->prepare($sql); + $statement->bindParam(':offset', $this->offset, PDO::PARAM_INT); + $statement->bindParam(':limit', $this->batchSize, PDO::PARAM_INT); + $this->statement = $statement; + } + + return $this->statement; + } +}