From 2d4decbf6532c288d3ea464b82bb0fe67eceeb00 Mon Sep 17 00:00:00 2001 From: Mathieu Darse Date: Tue, 26 Aug 2014 19:22:28 +0200 Subject: [PATCH] Add bulk indexing --- .../SearchEngine/Elastic/BulkOperation.php | 83 +++++++++++++++++++ .../Phrasea/SearchEngine/Elastic/Indexer.php | 11 ++- 2 files changed, 91 insertions(+), 3 deletions(-) create mode 100644 lib/Alchemy/Phrasea/SearchEngine/Elastic/BulkOperation.php diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/BulkOperation.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/BulkOperation.php new file mode 100644 index 0000000000..26f0432af4 --- /dev/null +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/BulkOperation.php @@ -0,0 +1,83 @@ +client = $client; + } + + public function setDefaultIndex($index) + { + $this->index = (string) $index; + } + + public function setDefaultType($type) + { + if (!$this->index) { + throw new \RuntimeException('You must provide a default index first'); + } + $this->type = (string) $type; + } + + public function setAutoFlushLimit($limit) + { + $this->flushLimit = (int) $limit; + } + + public function index(array $params) + { + $body = igorw\get_in($params, ['body']); + unset($params['body']); + + $header = array(); + $header['index'] = $params; + $this->stack[] = $header; + $this->stack[] = $body; + + if ($this->flushLimit === count($this->stack) / 2) { + $this->flush(); + } + } + + public function flush() + { + // Do not try to flush an empty stack + if (count($this->stack) === 0) { + return; + } + + $params = array(); + if ($this->index) { + $params['index'] = $this->index; + if ($this->type) { + $params['type'] = $this->type; + } + } + $params['body'] = $this->stack; + printf("ES Bulk query with %d items\n", count($this->stack) / 2); + $this->client->bulk($params); + $this->stack = array(); + } +} diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php index 369256c5f9..39c55dd8f9 100644 --- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php @@ -78,19 +78,24 @@ class Indexer $this->disableShardRefreshing(); try { + // Prepare the bulk operation + $bulk = new BulkOperation($this->client); + $bulk->setDefaultIndex($this->options['index']); + $bulk->setDefaultType(self::RECORD_TYPE); + $bulk->setAutoFlushLimit(1000); foreach ($this->appbox->get_databoxes() as $databox) { $fetcher = new RecordFetcher($databox); $fetcher->setBatchSize(200); while ($record = $fetcher->fetch()) { $params = array(); - $params['index'] = $this->options['index']; - $params['type'] = self::RECORD_TYPE; $params['id'] = $record['id']; $params['body'] = $record; - $response = $this->client->index($params); + $bulk->index($params); } } + $bulk->flush(); + // Optimize index $params = array('index' => $this->options['index']); $this->client->indices()->optimize($params);