PHRAS-716 #time 1d

fix : operation list by databox
This commit is contained in:
Jean-Yves Gaulier
2015-09-21 15:58:43 +02:00
parent 5146881076
commit a02b3961bf
4 changed files with 104 additions and 86 deletions

View File

@@ -23,7 +23,7 @@ class BulkOperation
private $logger;
private $stack = array();
private $opData = [];
private $operationIdentifiers = [];
private $index;
private $type;
private $flushLimit = 1000;
@@ -58,27 +58,27 @@ class BulkOperation
$this->flushCallbacks[] = $callback;
}
public function index(array $params, $_data)
public function index(array $params, $operationIdentifier)
{
$header = $this->buildHeader('index', $params);
$body = igorw\get_in($params, ['body']);
$this->push($header, $body, $_data);
$this->push($header, $body, $operationIdentifier);
}
public function delete(array $params, $_data)
public function delete(array $params, $operationIdentifier)
{
$this->push($this->buildHeader('delete', $params), null, $_data);
$this->push($this->buildHeader('delete', $params), null, $operationIdentifier);
}
private function push($header, $body, $_data)
private function push($header, $body, $operationIdentifier)
{
$this->stack[] = $header;
if ($body) {
$this->stack[] = $body;
}
$this->opData[] = $_data;
$this->operationIdentifiers[] = $operationIdentifier;
if (count($this->opData) === $this->flushLimit) {
if (count($this->operationIdentifiers) === $this->flushLimit) {
$this->flush();
}
}
@@ -99,22 +99,32 @@ class BulkOperation
}
$params['body'] = $this->stack;
$this->logger->debug("ES Bulk query about to be performed\n", ['opCount' => count($this->opData)]);
$this->logger->debug("ES Bulk query about to be performed\n", ['opCount' => count($this->operationIdentifiers)]);
$response = $this->client->bulk($params);
$this->stack = array();
if (igorw\get_in($response, ['errors'], true)) {
foreach ($response['items'] as $key => $item) {
if ($item['index']['status'] >= 400) { // 4xx or 5xx error
throw new Exception(sprintf('%d: %s', $key, $item['index']['error']));
$callbackData = []; // key: operationIdentifier passed when command was pushed on this bulk
// value: json result from es for the command
// nb: results (items) are returned IN THE SAME ORDER as commands were pushed in the stack
// so the items[X] match the operationIdentifiers[X]
foreach ($response['items'] as $key => $item) {
foreach($item as $command=>$result) { // command may be "index" or "delete"
if($response['errors'] && $result['status'] >= 400) { // 4xx or 5xx error
$err = array_key_exists('error', $result) ? $result['error'] : ($command . " error " . $result['status']);
throw new Exception(sprintf('%d: %s', $key, $err));
}
}
$operationIdentifier = $this->operationIdentifiers[$key];
if(is_string($operationIdentifier) || is_int($operationIdentifier)) { // dont include null keys
$callbackData[$operationIdentifier] = $response['items'][$key];
}
}
foreach($this->flushCallbacks as $flushCallback) {
$flushCallback($this->opData);
foreach($this->flushCallbacks as $iCallBack=>$flushCallback) {
$flushCallback($callbackData);
}
$this->opData = [];
$this->operationIdentifiers = [];
}
private function buildHeader($key, array $params)

View File

@@ -60,17 +60,10 @@ class RecordIndexer
private $logger;
private $submited_records = [];
private function getUniqueOperationId()
private function getUniqueOperationId($record_key)
{
static $_key = null;
static $_n = 0;
if($_key == null) {
mt_srand();
$_key = dechex(mt_rand());
}
return $_key . '_' . ($_n++);
$_key = dechex(mt_rand());
return $_key . '_' . $record_key;
}
public function __construct(Structure $structure, RecordHelper $helper, Thesaurus $thesaurus, \appbox $appbox, array $locales, LoggerInterface $logger)
@@ -83,10 +76,45 @@ class RecordIndexer
$this->logger = $logger;
}
/**
* ES made a bulk op, check our (index) operations to drop the "indexing" & "to_index" jetons
*
* @param databox $databox
* @param array $operation_identifiers key:op_identifier ; value:operation result (json from es)
* @param array $submited_records records indexed, key:op_identifier
*/
private function onBulkFlush(databox $databox, array $operation_identifiers, array &$submited_records)
{
// nb: because the same bulk could be used by many "clients", this (each) callback may receive
// operation_identifiers that does not belong to it.
// flag only records that the fetcher worked on
$records = array_intersect_key(
$submited_records, // this is OUR records list
$operation_identifiers // reduce to the records indexed by this bulk (should be the same...)
);
if(count($records) === 0) {
return;
}
// Commit and remove "indexing" flag
RecordQueuer::didFinishIndexingRecords(array_values($records), $databox);
foreach (array_keys($records) as $id) {
unset($submited_records[$id]);
}
}
/**
* index whole databox(es), don't test actual "jetons"
*
* @param BulkOperation $bulk
* @param databox[] $databoxes
*/
public function populateIndex(BulkOperation $bulk, array $databoxes)
{
foreach ($databoxes as $databox) {
$submited_records = [];
$this->logger->info(sprintf('Indexing database %s...', $databox->get_viewname()));
$fetcher = $this->createFetcherForDatabox($databox); // no delegate, scan the whole records
// post fetch : flag records as "indexing"
@@ -96,27 +124,22 @@ class RecordIndexer
});
// bulk flush : flag records as "indexed"
$bulk->onFlush(function($operation_identifiers) use ($databox) {
// nb: because the same bulk could be used by many "clients", this (each) callback may receive
// operation_identifiers that does not belong to us.
// flag only records that our fetcher worked on
$ops = array_flip($operation_identifiers); // now the key is the op_identifiers
$records = array_intersect_key(
$this->submited_records, // this is OUR records list
$ops // reduce to the records indexed by this bulk (should be the same...)
);
// Commit and remove "indexing" flag
RecordQueuer::didFinishIndexingRecords(array_values($records), $databox);
foreach (array_keys($records) as $id) {
unset($this->submited_records[$id]);
}
$bulk->onFlush(function($operation_identifiers) use ($databox, &$submited_records) {
$this->onBulkFlush($databox, $operation_identifiers, $submited_records);
});
$this->indexFromFetcher($bulk, $fetcher);
// Perform indexing
$this->indexFromFetcher($bulk, $fetcher, $submited_records);
$this->logger->info(sprintf('Finished indexing %s', $databox->get_viewname()));
}
}
/**
* Index the records flagged as "to_index" on all databoxes
*
* @param BulkOperation $bulk
*/
public function indexScheduled(BulkOperation $bulk)
{
foreach ($this->appbox->get_databoxes() as $databox) {
@@ -126,6 +149,8 @@ class RecordIndexer
private function indexScheduledInDatabox(BulkOperation $bulk, databox $databox)
{
$submited_records = [];
// Make fetcher
$delegate = new ScheduledFetcherDelegate();
$fetcher = $this->createFetcherForDatabox($databox, $delegate);
@@ -139,64 +164,55 @@ class RecordIndexer
});
// bulk flush : flag records as "indexed"
$bulk->onFlush(function($operation_identifiers) use ($databox) {
// nb: because the same bulk could be used by many "clients", this (each) callback may receive
// operation_identifiers that does not belong to us.
// flag only records that our fetcher worked on
$ops = array_flip($operation_identifiers); // now the key is the op_identifiers
$records = array_intersect_key(
$this->submited_records, // this is OUR records list
$ops // reduce to the records indexed by this bulk (should be the same...)
);
// Commit and remove "indexing" flag
RecordQueuer::didFinishIndexingRecords(array_values($records), $databox);
foreach (array_keys($records) as $id) {
unset($this->submited_records[$id]);
}
$bulk->onFlush(function($operation_identifiers) use ($databox, &$submited_records) {
$this->onBulkFlush($databox, $operation_identifiers, $submited_records);
});
// Perform indexing
$this->indexFromFetcher($bulk, $fetcher);
$this->indexFromFetcher($bulk, $fetcher, $submited_records);
}
/**
* Index a list of records
*
* @param BulkOperation $bulk
* @param Iterator $records
*/
public function index(BulkOperation $bulk, Iterator $records)
{
foreach ($this->createFetchersForRecords($records) as $fetcher) {
$submited_records = [];
$databox = $fetcher->getDatabox();
// post fetch : flag records as "indexing"
$fetcher->setPostFetch(function(array $records) use ($fetcher, $databox) {
RecordQueuer::didStartIndexingRecords($records, $databox);
// do not restart the fetcher since it has no clause on jetons
});
// bulk flush : flag records as "indexed"
$bulk->onFlush(function($operation_identifiers) use ($databox) {
// nb: because the same bulk could be used by many "clients", this (each) callback may receive
// operation_identifiers that does not belong to us.
// flag only records that our fetcher worked on
$ops = array_flip($operation_identifiers); // now the key is the op_identifiers
$records = array_intersect_key(
$this->submited_records, // this is OUR records list
$ops // reduce to the records indexed by this bulk (should be the same...)
);
// Commit and remove "indexing" flag
RecordQueuer::didFinishIndexingRecords(array_values($records), $databox);
foreach (array_keys($records) as $id) {
unset($this->submited_records[$id]);
}
$this->onBulkFlush($databox, $operation_identifiers, $submited_records);
});
$this->indexFromFetcher($bulk, $fetcher);
// Perform indexing
$this->indexFromFetcher($bulk, $fetcher, $submited_records);
}
}
/**
* Deleta a list of records
*
* @param BulkOperation $bulk
* @param Iterator $records
*/
public function delete(BulkOperation $bulk, Iterator $records)
{
foreach ($records as $record) {
$params = array();
$params['id'] = $record->getId();
$params['type'] = self::TYPE_NAME;
$bulk->delete($params, null); // no _data is related to a delete op
$bulk->delete($params, null); // no operationIdentifier is related to a delete op
}
}
@@ -248,20 +264,21 @@ class RecordIndexer
return array_values($databoxes);
}
private function indexFromFetcher(BulkOperation $bulk, Fetcher $fetcher)
private function indexFromFetcher(BulkOperation $bulk, Fetcher $fetcher, array &$submited_records)
{
/** @var RecordInterface $record */
while ($record = $fetcher->fetch()) {
$op_identifier = $this->getUniqueOperationId($record['id']);
$params = array();
$params['id'] = $record['id'];
unset($record['id']);
$params['type'] = self::TYPE_NAME;
$params['body'] = $this->transform($record);
$opIdentifier = $this->getUniqueOperationId();
$this->submited_records[$opIdentifier] = $record;
$submited_records[$op_identifier] = $record;
$bulk->index($params, $opIdentifier);
$bulk->index($params, $op_identifier);
}
}

View File

@@ -65,15 +65,14 @@ class RecordQueuer
* nb: changing the jeton may affect a fetcher if his "where" clause (delegate) depends on jeton.
* in this case the client of the fetcher must set a "postFetch" callback and restart the fetcher
*/
public static function didFinishIndexingRecords(array $records, $databox)
public static function didFinishIndexingRecords(array $records, databox $databox)
{
$connection = $databox->get_connection();
$sql = "UPDATE record SET jeton = (jeton & ~ :flag) WHERE record_id IN (:record_ids)";
self::executeFlagQuery($connection, $sql, Flag::TO_INDEX | Flag::INDEXING, $records);
}
private static function executeFlagQuery($connection, $sql, $flag, array $records)
private static function executeFlagQuery(Connection $connection, $sql, $flag, array $records)
{
return $connection->executeQuery($sql, array(
':flag' => $flag,

View File

@@ -405,14 +405,6 @@
}
});
// start the refresh of the page content (progress bar etc...)
try {
clearTimeout(document.refreshDatabaseInformations_timer);
}
catch(err) {
}
displayDatabaseInformations(200); // wait 200ms
});