Add bulk indexing

This commit is contained in:
Mathieu Darse
2014-08-26 19:22:28 +02:00
parent 7db59052d6
commit 2d4decbf65
2 changed files with 91 additions and 3 deletions

View File

@@ -0,0 +1,83 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Alchemy\Phrasea\SearchEngine\Elastic;
use Elasticsearch\Client;
use igorw;
class BulkOperation
{
private $client;
private $stack = array();
private $index;
private $type;
private $flushLimit = 1000;
public function __construct(Client $client)
{
$this->client = $client;
}
public function setDefaultIndex($index)
{
$this->index = (string) $index;
}
public function setDefaultType($type)
{
if (!$this->index) {
throw new \RuntimeException('You must provide a default index first');
}
$this->type = (string) $type;
}
public function setAutoFlushLimit($limit)
{
$this->flushLimit = (int) $limit;
}
public function index(array $params)
{
$body = igorw\get_in($params, ['body']);
unset($params['body']);
$header = array();
$header['index'] = $params;
$this->stack[] = $header;
$this->stack[] = $body;
if ($this->flushLimit === count($this->stack) / 2) {
$this->flush();
}
}
public function flush()
{
// Do not try to flush an empty stack
if (count($this->stack) === 0) {
return;
}
$params = array();
if ($this->index) {
$params['index'] = $this->index;
if ($this->type) {
$params['type'] = $this->type;
}
}
$params['body'] = $this->stack;
printf("ES Bulk query with %d items\n", count($this->stack) / 2);
$this->client->bulk($params);
$this->stack = array();
}
}

View File

@@ -78,19 +78,24 @@ class Indexer
$this->disableShardRefreshing();
try {
// Prepare the bulk operation
$bulk = new BulkOperation($this->client);
$bulk->setDefaultIndex($this->options['index']);
$bulk->setDefaultType(self::RECORD_TYPE);
$bulk->setAutoFlushLimit(1000);
foreach ($this->appbox->get_databoxes() as $databox) {
$fetcher = new RecordFetcher($databox);
$fetcher->setBatchSize(200);
while ($record = $fetcher->fetch()) {
$params = array();
$params['index'] = $this->options['index'];
$params['type'] = self::RECORD_TYPE;
$params['id'] = $record['id'];
$params['body'] = $record;
$response = $this->client->index($params);
$bulk->index($params);
}
}
$bulk->flush();
// Optimize index
$params = array('index' => $this->options['index']);
$this->client->indices()->optimize($params);