diff --git a/bin/console b/bin/console
index 9a3d2317e7..b3ce257b10 100755
--- a/bin/console
+++ b/bin/console
@@ -14,7 +14,9 @@ namespace KonsoleKommander;
use Alchemy\Phrasea\Command\Plugin\ListPlugin;
use Alchemy\Phrasea\Command\Setup\H264ConfigurationDumper;
use Alchemy\Phrasea\Command\Setup\H264MappingGenerator;
-use Alchemy\Phrasea\Command\SearchEngine\IndexFull;
+use Alchemy\Phrasea\Command\SearchEngine\IndexCreateCommand;
+use Alchemy\Phrasea\Command\SearchEngine\IndexDropCommand;
+use Alchemy\Phrasea\Command\SearchEngine\IndexPopulateCommand;
use Alchemy\Phrasea\Command\WebsocketServer;
use Alchemy\Phrasea\Core\Version;
use Alchemy\Phrasea\Command\BuildMissingSubdefs;
@@ -116,7 +118,9 @@ $cli->command(new XSendFileConfigurationDumper());
$cli->command(new XSendFileMappingGenerator());
if ($cli['phraseanet.SE']->getName() === 'ElasticSearch') {
- $cli->command(new IndexFull('searchengine:index'));
+ $cli->command(new IndexCreateCommand());
+ $cli->command(new IndexDropCommand());
+ $cli->command(new IndexPopulateCommand());
}
$cli->command(new WebsocketServer('ws-server:run'));
diff --git a/lib/Alchemy/Phrasea/Command/SearchEngine/IndexCreateCommand.php b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexCreateCommand.php
new file mode 100644
index 0000000000..6247962b08
--- /dev/null
+++ b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexCreateCommand.php
@@ -0,0 +1,40 @@
+setName('searchengine:index:create')
+ ->setDescription('Creates search index')
+ ;
+ }
+
+ protected function doExecute(InputInterface $input, OutputInterface $output)
+ {
+ $indexer = $this->container['elasticsearch.indexer'];
+
+ if ($indexer->indexExists()) {
+ $output->writeln('The search index already exists.');
+ } else {
+ $indexer->createIndex();
+ $output->writeln('Search index was created');
+ }
+ }
+}
diff --git a/lib/Alchemy/Phrasea/Command/SearchEngine/IndexDropCommand.php b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexDropCommand.php
new file mode 100644
index 0000000000..ac642f14ef
--- /dev/null
+++ b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexDropCommand.php
@@ -0,0 +1,47 @@
+setName('searchengine:index:drop')
+ ->setDescription('Deletes the search index')
+ ;
+ }
+
+ protected function doExecute(InputInterface $input, OutputInterface $output)
+ {
+
+ $question = 'You are about to delete the index and all contained data. Are you sure you wish to continue? (y/n)';
+ $confirmation = $this->getHelper('dialog')->askConfirmation($output, $question, false);
+
+ if ($confirmation) {
+ $indexer = $this->container['elasticsearch.indexer'];
+ if ($indexer->indexExists()) {
+ $indexer->deleteIndex();
+ $output->writeln('Search index was dropped');
+ } else {
+ $output->writeln('The index was not dropped because it does not exists');
+ }
+ } else {
+ $output->writeln('Canceled.');
+ }
+ }
+}
diff --git a/lib/Alchemy/Phrasea/Command/SearchEngine/IndexFull.php b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexPopulateCommand.php
similarity index 64%
rename from lib/Alchemy/Phrasea/Command/SearchEngine/IndexFull.php
rename to lib/Alchemy/Phrasea/Command/SearchEngine/IndexPopulateCommand.php
index db61f28ad6..91856dd05b 100644
--- a/lib/Alchemy/Phrasea/Command/SearchEngine/IndexFull.php
+++ b/lib/Alchemy/Phrasea/Command/SearchEngine/IndexPopulateCommand.php
@@ -16,11 +16,18 @@ use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
-class IndexFull extends Command
+class IndexPopulateCommand extends Command
{
+ protected function configure()
+ {
+ $this
+ ->setName('searchengine:index:populate')
+ ->setDescription('Populate search index with record data')
+ ;
+ }
+
protected function doExecute(InputInterface $input, OutputInterface $output)
{
- $this->container['ES.indexer']->createIndex();
- $this->container['ES.indexer']->reindexAll();
+ $this->container['elasticsearch.indexer']->populateIndex();
}
}
diff --git a/lib/Alchemy/Phrasea/Core/Provider/SearchEngineServiceProvider.php b/lib/Alchemy/Phrasea/Core/Provider/SearchEngineServiceProvider.php
index d31b51316f..2a22c1932c 100644
--- a/lib/Alchemy/Phrasea/Core/Provider/SearchEngineServiceProvider.php
+++ b/lib/Alchemy/Phrasea/Core/Provider/SearchEngineServiceProvider.php
@@ -15,6 +15,7 @@ use Alchemy\Phrasea\SearchEngine\SearchEngineLogger;
use Alchemy\Phrasea\Exception\InvalidArgumentException;
use Alchemy\Phrasea\SearchEngine\SearchEngineInterface;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
+use Elasticsearch\Client;
use Silex\Application;
use Silex\ServiceProviderInterface;
@@ -46,8 +47,33 @@ class SearchEngineServiceProvider implements ServiceProviderInterface
return $app['phraseanet.SE.engine-class']::createSubscriber($app);
});
- $app['ES.indexer'] = $app->share(function ($app) {
- return new Indexer($app['phraseanet.SE'], $app['monolog'], $app['phraseanet.appbox']);
+ $app['elasticsearch.indexer'] = $app->share(function ($app) {
+ return new Indexer(
+ $app['elasticsearch.client'],
+ $app['elasticsearch.options'],
+ $app['monolog'],
+ $app['phraseanet.appbox']
+ );
+ });
+
+ $app['elasticsearch.client'] = $app->share(function($app) {
+ $options = $app['elasticsearch.options'];
+ $host = sprintf('%s:%s', $options['host'], $options['port']);
+
+ return new Client(array('hosts' => array($host)));
+ });
+
+ $app['elasticsearch.options'] = $app->share(function($app) {
+ $options = $app['conf']->get(['main', 'search-engine', 'options']);
+ $defaults = [
+ 'host' => '127.0.0.1',
+ 'port' => 9200,
+ 'index' => 'phraseanet',
+ 'shards' => 3,
+ 'replicas' => 0
+ ];
+
+ return array_replace($defaults, $options);
});
}
diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php
index 8cfa2a4299..369256c5f9 100644
--- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php
+++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php
@@ -11,57 +11,158 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic;
+use Elasticsearch\Client;
use Psr\Log\LoggerInterface;
+use igorw;
class Indexer
{
+ /** @var Elasticsearch\Client */
+ private $client;
+ private $options;
private $engine;
private $logger;
private $appbox;
- public function __construct(ElasticSearchEngine $engine, LoggerInterface $logger, \appbox $appbox)
+ private $previousRefreshInterval = self::DEFAULT_REFRESH_INTERVAL;
+
+ const DEFAULT_REFRESH_INTERVAL = '1s';
+ const REFRESH_INTERVAL_KEY = 'index.refresh_interval';
+
+ const RECORD_TYPE = 'record';
+
+ public function __construct(Client $client, array $options, LoggerInterface $logger, \appbox $appbox)
{
- $this->engine = $engine;
+ $this->client = $client;
+ $this->options = $options;
$this->logger = $logger;
$this->appbox = $appbox;
}
- public function createIndex()
+ public function createIndex($withMapping = true)
{
- $indexParams['index'] = $this->engine->getIndexName();
+ $params = array();
+ $params['index'] = $this->options['index'];
+ $params['body']['settings']['number_of_shards'] = $this->options['shards'];
+ $params['body']['settings']['number_of_replicas'] = $this->options['replicas'];
+ if ($withMapping) {
+ $params['body']['mappings'][self::RECORD_TYPE] = $this->getRecordMapping();
+ }
+ $this->client->indices()->create($params);
+ }
- // Index Settings
- $indexParams['body']['settings']['number_of_shards'] = 3;
- $indexParams['body']['settings']['number_of_replicas'] = 0;
+ public function updateMapping()
+ {
+ $params = array();
+ $params['index'] = $this->options['index'];
+ $params['type'] = self::RECORD_TYPE;
+ $params['body'][self::RECORD_TYPE] = $this->getRecordMapping();
+ $this->client->indices()->putMapping($params);
+ }
- $captionFields = [];
- $businessFields = [];
+ public function deleteIndex()
+ {
+ $params = array('index' => $this->options['index']);
+ $this->client->indices()->delete($params);
+ }
- foreach ($this->appbox->get_databoxes() as $databox) {
- foreach ($databox->get_meta_structure() as $dbField) {
- $type = 'string';
- if (\databox_field::TYPE_DATE === $dbField->get_type()) {
- $type = 'date';
- }
- if (isset($captionFields[$dbField->get_name()]) && $type !== $captionFields[$dbField->get_name()]['type']) {
- $type = 'string';
- }
+ public function indexExists()
+ {
+ $params = array('index' => $this->options['index']);
- $captionFields[$dbField->get_name()] = [
- 'type' => $type,
- 'include_in_all' => !$dbField->isBusiness(),
- 'analyzer' => 'french',
- ];
+ return $this->client->indices()->exists($params);
+ }
- if ($dbField->isBusiness()) {
- $businessFields[$dbField->get_name()] = [
- 'type' => $type,
- 'include_in_all' => false,
- 'analyzer' => 'french',
- ];
+ public function populateIndex()
+ {
+ $this->disableShardRefreshing();
+
+ try {
+ foreach ($this->appbox->get_databoxes() as $databox) {
+ $fetcher = new RecordFetcher($databox);
+ $fetcher->setBatchSize(200);
+ while ($record = $fetcher->fetch()) {
+ $params = array();
+ $params['index'] = $this->options['index'];
+ $params['type'] = self::RECORD_TYPE;
+ $params['id'] = $record['id'];
+ $params['body'] = $record;
+ $response = $this->client->index($params);
}
}
+
+ // Optimize index
+ $params = array('index' => $this->options['index']);
+ $this->client->indices()->optimize($params);
+
+ } catch (Exception $e) {
+ $this->restoreShardRefreshing();
+ throw $e;
}
+ }
+
+ private function disableShardRefreshing()
+ {
+ $refreshInterval = $this->getSetting(self::REFRESH_INTERVAL_KEY);
+ if (null !== $refreshInterval) {
+ $this->previousRefreshInterval = $refreshInterval;
+ }
+ $this->setSetting(self::REFRESH_INTERVAL_KEY, -1);
+ }
+
+ private function restoreShardRefreshing()
+ {
+ $this->setSetting(self::REFRESH_INTERVAL_KEY, $this->previousRefreshInterval);
+ $this->previousRefreshInterval = self::DEFAULT_REFRESH_INTERVAL;
+ }
+
+ private function getSetting($name)
+ {
+ $index = $this->options['index'];
+ $params = array();
+ $params['index'] = $index;
+ $params['name'] = $name;
+ $params['flat_settings'] = true;
+ $response = $this->client->indices()->getSettings($params);
+
+ return igorw\get_in($response, [$index, 'settings', $name]);
+ }
+
+ private function setSetting($name, $value)
+ {
+ $index = $this->options['index'];
+ $params = array();
+ $params['index'] = $index;
+ $params['body'][$name] = $value;
+ $response = $this->client->indices()->putSettings($params);
+
+ return igorw\get_in($response, ['acknowledged']);
+ }
+
+ private function getRecordMapping()
+ {
+ $mapping = new Mapping();
+ $mapping
+ // Identifiers
+ ->add('record_id', 'integer') // Compound primary key
+ ->add('databox_id', 'integer') // Compound primary key
+ ->add('base_id', 'integer') // Unique collection ID
+ ->add('collection_id', 'integer') // Useless collection ID (local to databox)
+ ->add('uuid', 'string')->notAnalyzed()
+ ->add('sha256', 'string')->notAnalyzed()
+ // Mandatory metadata
+ ->add('original_name', 'string')->notAnalyzed()
+ ->add('mime', 'string')->notAnalyzed()
+ ->add('type', 'string')->notAnalyzed()
+ // Dates
+ ->add('created_at', 'date')->format('yyyy-MM-dd HH:mm:ss')
+ ->add('updated_at', 'date')->format('yyyy-MM-dd HH:mm:ss')
+ ;
+
+ return $mapping->export();
+
+
+ // TODO Migrate code below this line
$status = [];
for ($i = 0; $i <= 32; $i ++) {
@@ -237,73 +338,5 @@ class Indexer
'properties' => $businessFields
];
}
-
- $indexParams['body']['mappings']['record'] = $recordTypeMapping;
-
- if ($this->engine->getClient()->indices()->exists(['index' => $this->engine->getIndexName()])) {
- $this->engine->getClient()->indices()->delete(['index' => $this->engine->getIndexName()]);
- }
-
- $ret = $this->engine->getClient()->indices()->create($indexParams);
-
- if (!$this->isResultOk($ret)) {
- throw new \RuntimeException('Unable to create index');
- }
- }
-
- private function isResultOk(array $ret)
- {
- if (isset($ret['acknowledged']) && $ret['acknowledged']) {
- return true;
- }
- if (isset($ret['ok']) && $ret['ok']) {
- return true;
- }
-
- return false;
- }
-
- public function reindexAll()
- {
- $qty = 10;
-
- $params['index'] = $this->engine->getIndexName();
- $params['body']['index']['refresh_interval'] = 300;
-
- $ret = $this->engine->getClient()->indices()->putSettings($params);
-
- if (!$this->isResultOk($ret)) {
- $this->logger->error('Unable to set the refresh interval to 300 s. .');
- }
-
- foreach ($this->appbox->get_databoxes() as $databox) {
- $offset = 0;
- do {
- $sql = 'SELECT record_id FROM record
- WHERE parent_record_id = 0
- ORDER BY record_id ASC LIMIT '.$offset.', '.$qty;
- $stmt = $databox->get_connection()->prepare($sql);
- $stmt->execute();
- $rows = $stmt->fetchAll(\PDO::FETCH_ASSOC);
- $stmt->closeCursor();
-
- foreach ($rows as $row) {
- $record = $databox->get_record($row['record_id']);
- $this->engine->addRecord($record);
- }
-
- gc_collect_cycles();
- $offset += $qty;
- } while (count($rows) > 0);
- }
-
- $params['index'] = $this->engine->getIndexName();
- $params['body']['index']['refresh_interval'] = 1;
-
- $ret = $this->engine->getClient()->indices()->putSettings($params);
-
- if (!$this->isResultOk($ret)) {
- throw new \RuntimeException('Unable to set the refresh interval to 1 s. .');
- }
}
}
diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Mapping.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Mapping.php
new file mode 100644
index 0000000000..182470afe4
--- /dev/null
+++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Mapping.php
@@ -0,0 +1,63 @@
+fields[$name] = array('type' => $type);
+ $this->current = $name;
+
+ return $this;
+ }
+
+ public function export()
+ {
+ return ['properties' => $this->fields];
+ }
+
+ public function notAnalyzed()
+ {
+ $field =& $this->currentField();
+ if ($field['type'] !== 'string') {
+ throw new \LogicException('Only string fields can be not analyzed');
+ }
+ $field['index'] = 'not_analyzed';
+
+ return $this;
+ }
+
+ public function format($format)
+ {
+ $field =& $this->currentField();
+ if ($field['type'] !== 'date') {
+ throw new \LogicException('Only date fields can have a format');
+ }
+ $field['format'] = $format;
+
+ return $this;
+ }
+
+ protected function ¤tField()
+ {
+ if (null === $this->current) {
+ throw new \LogicException('You must add a field first');
+ }
+
+ return $this->fields[$this->current];
+ }
+}
diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/RecordFetcher.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/RecordFetcher.php
new file mode 100644
index 0000000000..6f3920e91e
--- /dev/null
+++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/RecordFetcher.php
@@ -0,0 +1,114 @@
+connection = $databox->get_connection();
+ $this->databoxId = $databox->get_sbas_id();
+ }
+
+ public function fetch()
+ {
+ $statement = $this->statement();
+
+ // Start of a batch
+ if ($this->needsFetch) {
+ $statement->execute();
+ $this->needsFetch = false;
+ printf("Query %d/%d -> %d results\n", $this->offset, $this->batchSize, $statement->rowCount());
+ }
+
+ if ($record = $statement->fetch()) {
+ // printf("Record found (#%d)\n", $record['id']);
+ $record = $this->hydrate($record);
+ $this->offset++;
+ } else {
+ printf("End of records\n");
+ }
+
+ // If we exausted the last result set
+ if ($this->offset % $this->batchSize === 0 || !$record) {
+ $statement->closeCursor();
+ $this->needsFetch = true;
+ }
+
+ return $record;
+ }
+
+ public function setBatchSize($size)
+ {
+ if ($size < 1) {
+ throw new LogicException("Batch size must be greater than or equal to 1");
+ }
+ $this->batchSize = (int) $size;
+ }
+
+ private function hydrate(array $record)
+ {
+ // Some casting
+ $record['record_id'] = (int) $record['record_id'];
+ $record['collection_id'] = (int) $record['collection_id'];
+ // Some identifiers
+ $record['databox_id'] = $this->databoxId;
+ $record['base_id'] = null; // TODO
+ $record['id'] = self::uniqueIdentifier($record);
+
+ return $record;
+ }
+
+ private static function uniqueIdentifier($record)
+ {
+ return sprintf('%d_%d', $record['databox_id'], $record['record_id']);
+ }
+
+ private function statement()
+ {
+ if (!$this->statement) {
+ $sql = 'SELECT
+ record_id,
+ coll_id as collection_id,
+ uuid,
+ sha256,
+ originalname as original_name,
+ mime,
+ type,
+ credate as created_at,
+ moddate as updated_at
+ FROM record
+ WHERE parent_record_id = 0 -- Only records, not stories
+ ORDER BY record_id ASC
+ LIMIT :offset, :limit;';
+ $statement = $this->connection->prepare($sql);
+ $statement->bindParam(':offset', $this->offset, PDO::PARAM_INT);
+ $statement->bindParam(':limit', $this->batchSize, PDO::PARAM_INT);
+ $this->statement = $statement;
+ }
+
+ return $this->statement;
+ }
+}