diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/BulkOperation.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/BulkOperation.php index 60cf21cd8d..488eb9f9c4 100644 --- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/BulkOperation.php +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/BulkOperation.php @@ -23,7 +23,7 @@ class BulkOperation private $logger; private $stack = array(); - private $opData = []; + private $operationIdentifiers = []; private $index; private $type; private $flushLimit = 1000; @@ -58,27 +58,27 @@ class BulkOperation $this->flushCallbacks[] = $callback; } - public function index(array $params, $_data) + public function index(array $params, $operationIdentifier) { $header = $this->buildHeader('index', $params); $body = igorw\get_in($params, ['body']); - $this->push($header, $body, $_data); + $this->push($header, $body, $operationIdentifier); } - public function delete(array $params, $_data) + public function delete(array $params, $operationIdentifier) { - $this->push($this->buildHeader('delete', $params), null, $_data); + $this->push($this->buildHeader('delete', $params), null, $operationIdentifier); } - private function push($header, $body, $_data) + private function push($header, $body, $operationIdentifier) { $this->stack[] = $header; if ($body) { $this->stack[] = $body; } - $this->opData[] = $_data; + $this->operationIdentifiers[] = $operationIdentifier; - if (count($this->opData) === $this->flushLimit) { + if (count($this->operationIdentifiers) === $this->flushLimit) { $this->flush(); } } @@ -99,22 +99,32 @@ class BulkOperation } $params['body'] = $this->stack; - $this->logger->debug("ES Bulk query about to be performed\n", ['opCount' => count($this->opData)]); + $this->logger->debug("ES Bulk query about to be performed\n", ['opCount' => count($this->operationIdentifiers)]); $response = $this->client->bulk($params); $this->stack = array(); - if (igorw\get_in($response, ['errors'], true)) { - foreach ($response['items'] as $key => $item) { - if ($item['index']['status'] >= 400) { // 4xx or 5xx error - throw new Exception(sprintf('%d: %s', $key, $item['index']['error'])); + $callbackData = []; // key: operationIdentifier passed when command was pushed on this bulk + // value: json result from es for the command + // nb: results (items) are returned IN THE SAME ORDER as commands were pushed in the stack + // so the items[X] match the operationIdentifiers[X] + foreach ($response['items'] as $key => $item) { + foreach($item as $command=>$result) { // command may be "index" or "delete" + if($response['errors'] && $result['status'] >= 400) { // 4xx or 5xx error + $err = array_key_exists('error', $result) ? $result['error'] : ($command . " error " . $result['status']); + throw new Exception(sprintf('%d: %s', $key, $err)); } } + + $operationIdentifier = $this->operationIdentifiers[$key]; + if(is_string($operationIdentifier) || is_int($operationIdentifier)) { // dont include null keys + $callbackData[$operationIdentifier] = $response['items'][$key]; + } } - foreach($this->flushCallbacks as $flushCallback) { - $flushCallback($this->opData); + foreach($this->flushCallbacks as $iCallBack=>$flushCallback) { + $flushCallback($callbackData); } - $this->opData = []; + $this->operationIdentifiers = []; } private function buildHeader($key, array $params) diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordIndexer.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordIndexer.php index f7c61e25d8..1b4229dbc6 100644 --- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordIndexer.php +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordIndexer.php @@ -60,17 +60,10 @@ class RecordIndexer private $logger; - private $submited_records = []; - - private function getUniqueOperationId() + private function getUniqueOperationId($record_key) { - static $_key = null; - static $_n = 0; - if($_key == null) { - mt_srand(); - $_key = dechex(mt_rand()); - } - return $_key . '_' . ($_n++); + $_key = dechex(mt_rand()); + return $_key . '_' . $record_key; } public function __construct(Structure $structure, RecordHelper $helper, Thesaurus $thesaurus, \appbox $appbox, array $locales, LoggerInterface $logger) @@ -83,10 +76,45 @@ class RecordIndexer $this->logger = $logger; } + /** + * ES made a bulk op, check our (index) operations to drop the "indexing" & "to_index" jetons + * + * @param databox $databox + * @param array $operation_identifiers key:op_identifier ; value:operation result (json from es) + * @param array $submited_records records indexed, key:op_identifier + */ + private function onBulkFlush(databox $databox, array $operation_identifiers, array &$submited_records) + { + // nb: because the same bulk could be used by many "clients", this (each) callback may receive + // operation_identifiers that does not belong to it. + // flag only records that the fetcher worked on + $records = array_intersect_key( + $submited_records, // this is OUR records list + $operation_identifiers // reduce to the records indexed by this bulk (should be the same...) + ); + if(count($records) === 0) { + return; + } + // Commit and remove "indexing" flag + RecordQueuer::didFinishIndexingRecords(array_values($records), $databox); + foreach (array_keys($records) as $id) { + unset($submited_records[$id]); + } + } + + /** + * index whole databox(es), don't test actual "jetons" + * + * @param BulkOperation $bulk + * @param databox[] $databoxes + */ public function populateIndex(BulkOperation $bulk, array $databoxes) { foreach ($databoxes as $databox) { + $submited_records = []; + $this->logger->info(sprintf('Indexing database %s...', $databox->get_viewname())); + $fetcher = $this->createFetcherForDatabox($databox); // no delegate, scan the whole records // post fetch : flag records as "indexing" @@ -96,27 +124,22 @@ class RecordIndexer }); // bulk flush : flag records as "indexed" - $bulk->onFlush(function($operation_identifiers) use ($databox) { - // nb: because the same bulk could be used by many "clients", this (each) callback may receive - // operation_identifiers that does not belong to us. - // flag only records that our fetcher worked on - $ops = array_flip($operation_identifiers); // now the key is the op_identifiers - $records = array_intersect_key( - $this->submited_records, // this is OUR records list - $ops // reduce to the records indexed by this bulk (should be the same...) - ); - // Commit and remove "indexing" flag - RecordQueuer::didFinishIndexingRecords(array_values($records), $databox); - foreach (array_keys($records) as $id) { - unset($this->submited_records[$id]); - } + $bulk->onFlush(function($operation_identifiers) use ($databox, &$submited_records) { + $this->onBulkFlush($databox, $operation_identifiers, $submited_records); }); - $this->indexFromFetcher($bulk, $fetcher); + // Perform indexing + $this->indexFromFetcher($bulk, $fetcher, $submited_records); + $this->logger->info(sprintf('Finished indexing %s', $databox->get_viewname())); } } + /** + * Index the records flagged as "to_index" on all databoxes + * + * @param BulkOperation $bulk + */ public function indexScheduled(BulkOperation $bulk) { foreach ($this->appbox->get_databoxes() as $databox) { @@ -126,6 +149,8 @@ class RecordIndexer private function indexScheduledInDatabox(BulkOperation $bulk, databox $databox) { + $submited_records = []; + // Make fetcher $delegate = new ScheduledFetcherDelegate(); $fetcher = $this->createFetcherForDatabox($databox, $delegate); @@ -139,64 +164,55 @@ class RecordIndexer }); // bulk flush : flag records as "indexed" - $bulk->onFlush(function($operation_identifiers) use ($databox) { - // nb: because the same bulk could be used by many "clients", this (each) callback may receive - // operation_identifiers that does not belong to us. - // flag only records that our fetcher worked on - $ops = array_flip($operation_identifiers); // now the key is the op_identifiers - $records = array_intersect_key( - $this->submited_records, // this is OUR records list - $ops // reduce to the records indexed by this bulk (should be the same...) - ); - // Commit and remove "indexing" flag - RecordQueuer::didFinishIndexingRecords(array_values($records), $databox); - foreach (array_keys($records) as $id) { - unset($this->submited_records[$id]); - } + $bulk->onFlush(function($operation_identifiers) use ($databox, &$submited_records) { + $this->onBulkFlush($databox, $operation_identifiers, $submited_records); }); // Perform indexing - $this->indexFromFetcher($bulk, $fetcher); + $this->indexFromFetcher($bulk, $fetcher, $submited_records); } + /** + * Index a list of records + * + * @param BulkOperation $bulk + * @param Iterator $records + */ public function index(BulkOperation $bulk, Iterator $records) { foreach ($this->createFetchersForRecords($records) as $fetcher) { + $submited_records = []; $databox = $fetcher->getDatabox(); // post fetch : flag records as "indexing" $fetcher->setPostFetch(function(array $records) use ($fetcher, $databox) { RecordQueuer::didStartIndexingRecords($records, $databox); + // do not restart the fetcher since it has no clause on jetons }); // bulk flush : flag records as "indexed" $bulk->onFlush(function($operation_identifiers) use ($databox) { - // nb: because the same bulk could be used by many "clients", this (each) callback may receive - // operation_identifiers that does not belong to us. - // flag only records that our fetcher worked on - $ops = array_flip($operation_identifiers); // now the key is the op_identifiers - $records = array_intersect_key( - $this->submited_records, // this is OUR records list - $ops // reduce to the records indexed by this bulk (should be the same...) - ); - // Commit and remove "indexing" flag - RecordQueuer::didFinishIndexingRecords(array_values($records), $databox); - foreach (array_keys($records) as $id) { - unset($this->submited_records[$id]); - } + $this->onBulkFlush($databox, $operation_identifiers, $submited_records); }); - $this->indexFromFetcher($bulk, $fetcher); + // Perform indexing + $this->indexFromFetcher($bulk, $fetcher, $submited_records); } } + /** + * Deleta a list of records + * + * @param BulkOperation $bulk + * @param Iterator $records + */ public function delete(BulkOperation $bulk, Iterator $records) { foreach ($records as $record) { $params = array(); $params['id'] = $record->getId(); $params['type'] = self::TYPE_NAME; - $bulk->delete($params, null); // no _data is related to a delete op + $bulk->delete($params, null); // no operationIdentifier is related to a delete op } } @@ -248,20 +264,21 @@ class RecordIndexer return array_values($databoxes); } - private function indexFromFetcher(BulkOperation $bulk, Fetcher $fetcher) + private function indexFromFetcher(BulkOperation $bulk, Fetcher $fetcher, array &$submited_records) { /** @var RecordInterface $record */ while ($record = $fetcher->fetch()) { + $op_identifier = $this->getUniqueOperationId($record['id']); + $params = array(); $params['id'] = $record['id']; unset($record['id']); $params['type'] = self::TYPE_NAME; $params['body'] = $this->transform($record); - $opIdentifier = $this->getUniqueOperationId(); - $this->submited_records[$opIdentifier] = $record; + $submited_records[$op_identifier] = $record; - $bulk->index($params, $opIdentifier); + $bulk->index($params, $op_identifier); } } diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordQueuer.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordQueuer.php index 6b7f3622bc..7251988fe2 100644 --- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordQueuer.php +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordQueuer.php @@ -65,15 +65,14 @@ class RecordQueuer * nb: changing the jeton may affect a fetcher if his "where" clause (delegate) depends on jeton. * in this case the client of the fetcher must set a "postFetch" callback and restart the fetcher */ - public static function didFinishIndexingRecords(array $records, $databox) + public static function didFinishIndexingRecords(array $records, databox $databox) { $connection = $databox->get_connection(); $sql = "UPDATE record SET jeton = (jeton & ~ :flag) WHERE record_id IN (:record_ids)"; - self::executeFlagQuery($connection, $sql, Flag::TO_INDEX | Flag::INDEXING, $records); } - private static function executeFlagQuery($connection, $sql, $flag, array $records) + private static function executeFlagQuery(Connection $connection, $sql, $flag, array $records) { return $connection->executeQuery($sql, array( ':flag' => $flag, diff --git a/templates/web/admin/databox/databox.html.twig b/templates/web/admin/databox/databox.html.twig index e3f8fb9cbd..ed9971fab2 100644 --- a/templates/web/admin/databox/databox.html.twig +++ b/templates/web/admin/databox/databox.html.twig @@ -405,14 +405,6 @@ } }); - // start the refresh of the page content (progress bar etc...) - try { - clearTimeout(document.refreshDatabaseInformations_timer); - } - catch(err) { - - } - displayDatabaseInformations(200); // wait 200ms });