diff --git a/lib/Alchemy/Phrasea/Controller/Admin/DataboxController.php b/lib/Alchemy/Phrasea/Controller/Admin/DataboxController.php index d1f6e8789b..11c901159e 100644 --- a/lib/Alchemy/Phrasea/Controller/Admin/DataboxController.php +++ b/lib/Alchemy/Phrasea/Controller/Admin/DataboxController.php @@ -577,29 +577,23 @@ class DataboxController extends Controller $ret = [ 'success' => false, - 'msg' => $this->app->trans('An error occured'), 'sbas_id' => null, + 'msg' => $this->app->trans('An error occured'), 'indexable' => false, - 'records' => 0, - 'xml_indexed' => 0, - 'thesaurus_indexed' => 0, 'viewname' => null, 'printLogoURL' => null, + 'counts' => null, ]; try { $databox = $this->findDataboxById($databox_id); - $data = $databox->get_indexed_record_amount(); + $ret['sbas_id'] = $databox_id; $ret['indexable'] = $appbox->is_databox_indexable($databox); $ret['viewname'] = (($databox->get_dbname() == $databox->get_viewname()) ? $this->app->trans('admin::base: aucun alias') : $databox->get_viewname()); - $ret['records'] = $databox->get_record_amount(); - $ret['sbas_id'] = $databox_id; - $ret['xml_indexed'] = $data['xml_indexed']; - $ret['thesaurus_indexed'] = $data['thesaurus_indexed']; - $ret['jeton_subdef'] = $data['jeton_subdef']; + $ret['counts'] = $databox->get_counts(); if ($this->app['filesystem']->exists($this->app['root.path'] . '/config/minilogos/logopdf_' . $databox_id . '.jpg')) { $ret['printLogoURL'] = '/custom/minilogos/logopdf_' . $databox_id . '.jpg'; } diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php index 32585829e5..921e5738e4 100644 --- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer.php @@ -41,7 +41,7 @@ class Indexer private $recordIndexer; private $termIndexer; - private $indexQueue; + private $indexQueue; // contains RecordInterface(s) private $deleteQueue; private $previousRefreshInterval = self::DEFAULT_REFRESH_INTERVAL; diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/BulkOperation.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/BulkOperation.php index 9d1e85b283..60cf21cd8d 100644 --- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/BulkOperation.php +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/BulkOperation.php @@ -23,10 +23,11 @@ class BulkOperation private $logger; private $stack = array(); - private $opCount = 0; + private $opData = []; private $index; private $type; private $flushLimit = 1000; + private $flushCallbacks = []; public function __construct(Client $client, LoggerInterface $logger) { @@ -52,27 +53,32 @@ class BulkOperation $this->flushLimit = (int) $limit; } - public function index(array $params) + public function onFlush(\Closure $callback) + { + $this->flushCallbacks[] = $callback; + } + + public function index(array $params, $_data) { $header = $this->buildHeader('index', $params); $body = igorw\get_in($params, ['body']); - $this->push($header, $body); + $this->push($header, $body, $_data); } - public function delete(array $params) + public function delete(array $params, $_data) { - $this->push($this->buildHeader('delete', $params)); + $this->push($this->buildHeader('delete', $params), null, $_data); } - private function push($header, $body = null) + private function push($header, $body, $_data) { $this->stack[] = $header; if ($body) { $this->stack[] = $body; } - $this->opCount++; + $this->opData[] = $_data; - if ($this->flushLimit === $this->opCount) { + if (count($this->opData) === $this->flushLimit) { $this->flush(); } } @@ -93,11 +99,10 @@ class BulkOperation } $params['body'] = $this->stack; - $this->logger->debug("ES Bulk query about to be performed\n", ['opCount' => $this->opCount]); + $this->logger->debug("ES Bulk query about to be performed\n", ['opCount' => count($this->opData)]); $response = $this->client->bulk($params); $this->stack = array(); - $this->opCount = 0; if (igorw\get_in($response, ['errors'], true)) { foreach ($response['items'] as $key => $item) { @@ -106,6 +111,10 @@ class BulkOperation } } } + foreach($this->flushCallbacks as $flushCallback) { + $flushCallback($this->opData); + } + $this->opData = []; } private function buildHeader($key, array $params) diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/Record/Fetcher.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/Record/Fetcher.php index 0b9e57e023..187809c573 100644 --- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/Record/Fetcher.php +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/Record/Fetcher.php @@ -16,12 +16,14 @@ use Alchemy\Phrasea\SearchEngine\Elastic\Exception\Exception; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegate; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegateInterface; use Closure; +use databox; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Driver\Connection as ConnectionInterface; use PDO; class Fetcher { + private $databox; private $connection; private $statement; private $delegate; @@ -34,13 +36,19 @@ class Fetcher private $postFetch; private $onDrain; - public function __construct(ConnectionInterface $connection, array $hydrators, FetcherDelegateInterface $delegate = null) + public function __construct(databox $databox, array $hydrators, FetcherDelegateInterface $delegate = null) { - $this->connection = $connection; + $this->databox = $databox; + $this->connection = $databox->get_connection();; $this->hydrators = $hydrators; $this->delegate = $delegate ?: new FetcherDelegate(); } + public function getDatabox() + { + return $this->databox; + } + public function fetch() { if (empty($this->buffer)) { @@ -64,7 +72,6 @@ class Fetcher $records[$record['record_id']] = $record; $this->offset++; } - if (empty($records)) { $this->onDrain->__invoke(); return; @@ -87,6 +94,12 @@ class Fetcher return $records; } + public function restart() + { + $this->buffer = array(); + $this->offset = 0; + } + public function setBatchSize($size) { if ($size < 1) { @@ -105,28 +118,24 @@ class Fetcher $this->onDrain = $onDrain; } + /** + * @return \Doctrine\DBAL\Driver\Statement + */ private function getExecutedStatement() { if (!$this->statement) { - $sql = <<delegate->buildWhereClause(); $sql = str_replace('-- WHERE', $where, $sql); diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordIndexer.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordIndexer.php index 6650a68a53..f7c61e25d8 100644 --- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordIndexer.php +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordIndexer.php @@ -11,6 +11,7 @@ namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer; +use Alchemy\Phrasea\Model\RecordInterface; use Alchemy\Phrasea\SearchEngine\Elastic\Exception\Exception; use Alchemy\Phrasea\SearchEngine\Elastic\Exception\MergeException; use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\BulkOperation; @@ -59,6 +60,19 @@ class RecordIndexer private $logger; + private $submited_records = []; + + private function getUniqueOperationId() + { + static $_key = null; + static $_n = 0; + if($_key == null) { + mt_srand(); + $_key = dechex(mt_rand()); + } + return $_key . '_' . ($_n++); + } + public function __construct(Structure $structure, RecordHelper $helper, Thesaurus $thesaurus, \appbox $appbox, array $locales, LoggerInterface $logger) { $this->structure = $structure; @@ -73,7 +87,31 @@ class RecordIndexer { foreach ($databoxes as $databox) { $this->logger->info(sprintf('Indexing database %s...', $databox->get_viewname())); - $fetcher = $this->createFetcherForDatabox($databox); + $fetcher = $this->createFetcherForDatabox($databox); // no delegate, scan the whole records + + // post fetch : flag records as "indexing" + $fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) { + RecordQueuer::didStartIndexingRecords($records, $databox); + // do not restart the fetcher since it has no clause on jetons + }); + + // bulk flush : flag records as "indexed" + $bulk->onFlush(function($operation_identifiers) use ($databox) { + // nb: because the same bulk could be used by many "clients", this (each) callback may receive + // operation_identifiers that does not belong to us. + // flag only records that our fetcher worked on + $ops = array_flip($operation_identifiers); // now the key is the op_identifiers + $records = array_intersect_key( + $this->submited_records, // this is OUR records list + $ops // reduce to the records indexed by this bulk (should be the same...) + ); + // Commit and remove "indexing" flag + RecordQueuer::didFinishIndexingRecords(array_values($records), $databox); + foreach (array_keys($records) as $id) { + unset($this->submited_records[$id]); + } + }); + $this->indexFromFetcher($bulk, $fetcher); $this->logger->info(sprintf('Finished indexing %s', $databox->get_viewname())); } @@ -91,23 +129,63 @@ class RecordIndexer // Make fetcher $delegate = new ScheduledFetcherDelegate(); $fetcher = $this->createFetcherForDatabox($databox, $delegate); - // Keep track of fetched records, flag them as "indexing" - $fetched = array(); - $fetcher->setPostFetch(function(array $records) use ($databox, &$fetched) { - // TODO Do not keep all indexed records in memory... - $fetched += $records; + + // post fetch : flag records as "indexing" + $fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) { RecordQueuer::didStartIndexingRecords($records, $databox); + // because changing the flag on the records affects the "where" clause of the fetcher, + // restart it each time + $fetcher->restart(); }); + + // bulk flush : flag records as "indexed" + $bulk->onFlush(function($operation_identifiers) use ($databox) { + // nb: because the same bulk could be used by many "clients", this (each) callback may receive + // operation_identifiers that does not belong to us. + // flag only records that our fetcher worked on + $ops = array_flip($operation_identifiers); // now the key is the op_identifiers + $records = array_intersect_key( + $this->submited_records, // this is OUR records list + $ops // reduce to the records indexed by this bulk (should be the same...) + ); + // Commit and remove "indexing" flag + RecordQueuer::didFinishIndexingRecords(array_values($records), $databox); + foreach (array_keys($records) as $id) { + unset($this->submited_records[$id]); + } + }); + // Perform indexing $this->indexFromFetcher($bulk, $fetcher); - // Commit and remove "indexing" flag - $bulk->flush(); - RecordQueuer::didFinishIndexingRecords($fetched, $databox); } public function index(BulkOperation $bulk, Iterator $records) { foreach ($this->createFetchersForRecords($records) as $fetcher) { + $databox = $fetcher->getDatabox(); + + // post fetch : flag records as "indexing" + $fetcher->setPostFetch(function(array $records) use ($fetcher, $databox) { + RecordQueuer::didStartIndexingRecords($records, $databox); + }); + + // bulk flush : flag records as "indexed" + $bulk->onFlush(function($operation_identifiers) use ($databox) { + // nb: because the same bulk could be used by many "clients", this (each) callback may receive + // operation_identifiers that does not belong to us. + // flag only records that our fetcher worked on + $ops = array_flip($operation_identifiers); // now the key is the op_identifiers + $records = array_intersect_key( + $this->submited_records, // this is OUR records list + $ops // reduce to the records indexed by this bulk (should be the same...) + ); + // Commit and remove "indexing" flag + RecordQueuer::didFinishIndexingRecords(array_values($records), $databox); + foreach (array_keys($records) as $id) { + unset($this->submited_records[$id]); + } + }); + $this->indexFromFetcher($bulk, $fetcher); } } @@ -118,16 +196,19 @@ class RecordIndexer $params = array(); $params['id'] = $record->getId(); $params['type'] = self::TYPE_NAME; - $bulk->delete($params); + $bulk->delete($params, null); // no _data is related to a delete op } } + /** + * @param Iterator $records + * @return Fetcher[] + */ private function createFetchersForRecords(Iterator $records) { $fetchers = array(); foreach ($this->groupRecordsByDatabox($records) as $group) { $databox = $group['databox']; - $connection = $databox->get_connection(); $delegate = new RecordListFetcherDelegate($group['records']); $fetchers[] = $this->createFetcherForDatabox($databox, $delegate); } @@ -139,7 +220,7 @@ class RecordIndexer { $connection = $databox->get_connection(); $candidateTerms = new CandidateTerms($databox); - $fetcher = new Fetcher($connection, array( + $fetcher = new Fetcher($databox, array( new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->helper), new TitleHydrator($connection), new MetadataHydrator($connection, $this->structure, $this->helper), @@ -169,13 +250,18 @@ class RecordIndexer private function indexFromFetcher(BulkOperation $bulk, Fetcher $fetcher) { + /** @var RecordInterface $record */ while ($record = $fetcher->fetch()) { $params = array(); $params['id'] = $record['id']; unset($record['id']); $params['type'] = self::TYPE_NAME; $params['body'] = $this->transform($record); - $bulk->index($params); + + $opIdentifier = $this->getUniqueOperationId(); + $this->submited_records[$opIdentifier] = $record; + + $bulk->index($params, $opIdentifier); } } diff --git a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordQueuer.php b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordQueuer.php index 6bed8699f1..6b7f3622bc 100644 --- a/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordQueuer.php +++ b/lib/Alchemy/Phrasea/SearchEngine/Elastic/Indexer/RecordQueuer.php @@ -35,38 +35,42 @@ class RecordQueuer $connection = $collection->get_connection(); // Set TO_INDEX flag on all records from this collection - $sql = <<prepare($sql); $stmt->bindValue(':token', Flag::TO_INDEX, PDO::PARAM_INT); $stmt->bindValue(':coll_id', $collection->get_coll_id(), PDO::PARAM_INT); $stmt->execute(); } + /** + * @param array $records + * @param $databox + * + * nb: changing the jeton may affect a fetcher if his "where" clause (delegate) depends on jeton. + * in this case the client of the fetcher must set a "postFetch" callback and restart the fetcher + */ public static function didStartIndexingRecords(array $records, $databox) { $connection = $databox->get_connection(); - $sql = <<get_connection(); - $sql = <<id; } - public function get_unique_keywords() - { - $sql = "SELECT COUNT(kword_id) AS n FROM kword"; - - $stmt = $this->get_connection()->prepare($sql); - $stmt->execute(); - $rowbas = $stmt->fetch(PDO::FETCH_ASSOC); - $stmt->closeCursor(); - - return ($rowbas ? $rowbas['n'] : null); - } - - public function get_index_amount() - { - $sql = "SELECT COUNT(idx_id) AS n FROM idx"; - - $stmt = $this->get_connection()->prepare($sql); - $stmt->execute(); - $rowbas = $stmt->fetch(PDO::FETCH_ASSOC); - $stmt->closeCursor(); - - return ($rowbas ? $rowbas['n'] : null); - } - - public function get_thesaurus_hits() - { - $sql = "SELECT COUNT(thit_id) AS n FROM thit"; - - $stmt = $this->get_connection()->prepare($sql); - $stmt->execute(); - $rowbas = $stmt->fetch(PDO::FETCH_ASSOC); - $stmt->closeCursor(); - - return ($rowbas ? $rowbas['n'] : null); - } - public function get_record_details($sort) { $sql = "SELECT record.coll_id, ISNULL(coll.coll_id) AS lostcoll, @@ -390,36 +354,46 @@ class databox extends base implements ThumbnailedElement return $amount; } - public function get_indexed_record_amount() + public function get_counts() { - $sql = "SELECT status & 3 AS status, SUM(1) AS n FROM record GROUP BY(status & 3)"; + $mask = PhraseaTokens::MAKE_SUBDEF | PhraseaTokens::TO_INDEX | PhraseaTokens::INDEXING; // we only care about those "jetons" + $sql = "SELECT type, jeton & (".$mask.") AS status, SUM(1) AS n FROM record GROUP BY type, (jeton & ".$mask.")"; $stmt = $this->get_connection()->prepare($sql); $stmt->execute(); $rs = $stmt->fetchAll(PDO::FETCH_ASSOC); $stmt->closeCursor(); $ret = array( - 'xml_indexed' => 0, - 'thesaurus_indexed' => 0, - 'jeton_subdef' => array() + 'records' => 0, + 'records_indexed' => 0, // jetons = 0;0 + 'records_to_index' => 0, // jetons = 0;1 + 'records_not_indexed' => 0, // jetons = 1;0 + 'records_indexing' => 0, // jetons = 1;1 + 'subdefs_todo' => array() // by type "image", "video", ... ); - foreach ($rs as $row) { + $ret['records'] += ($n = (int)($row['n'])); $status = $row['status']; - if ($status & 1) - $ret['xml_indexed'] += $row['n']; - if ($status & 2) - $ret['thesaurus_indexed'] += $row['n']; - } - - $sql = "SELECT type, COUNT(record_id) AS n FROM record WHERE jeton & ".PhraseaTokens::MAKE_SUBDEF." GROUP BY type"; - $stmt = $this->get_connection()->prepare($sql); - $stmt->execute(); - $rs = $stmt->fetchAll(PDO::FETCH_ASSOC); - $stmt->closeCursor(); - - foreach ($rs as $row) { - $ret['jeton_subdef'][$row['type']] = (int)$row['n']; + switch($status & (PhraseaTokens::TO_INDEX | PhraseaTokens::INDEXING)) { + case 0: + $ret['records_indexed'] += $n; + break; + case PhraseaTokens::TO_INDEX: + $ret['records_to_index'] += $n; + break; + case PhraseaTokens::INDEXING: + $ret['records_not_indexed'] += $n; + break; + case PhraseaTokens::INDEXING | PhraseaTokens::TO_INDEX: + $ret['records_indexing'] += $n; + break; + } + if($status & PhraseaTokens::MAKE_SUBDEF) { + if(!array_key_exists($row['type'], $ret['subdefs_todo'])) { + $ret['subdefs_todo'][$row['type']] = 0; + } + $ret['subdefs_todo'][$row['type']] += $n; + } } return $ret; @@ -1057,6 +1031,12 @@ class databox extends base implements ThumbnailedElement { $this->get_connection()->update('pref', ['updated_on' => '0000-00-00 00:00:00'], ['prop' => 'indexes']); + // Set TO_INDEX flag on all records + $sql = "UPDATE record SET jeton = (jeton | :token)"; + $stmt = $this->connection->prepare($sql); + $stmt->bindValue(':token', PhraseaTokens::TO_INDEX, PDO::PARAM_INT); + $stmt->execute(); + return $this; } diff --git a/templates/web/admin/databox/databox.html.twig b/templates/web/admin/databox/databox.html.twig index f0437643cc..e3f8fb9cbd 100644 --- a/templates/web/admin/databox/databox.html.twig +++ b/templates/web/admin/databox/databox.html.twig @@ -45,7 +45,7 @@
  • {{ 'admin::base: nombre d\'enregistrements sur la base :' | trans }} - {{ databox.get_record_amount() }} + ({{ 'phraseanet:: details' | trans }})
  • @@ -54,41 +54,13 @@ {{ 'admin::base: subdefs to be created :' | trans }} - - {% if showDetail %} -
  • - {{ 'admin::base: nombre de mots uniques sur la base :' | trans }} - {{ databox.get_unique_keywords() }} -
  • -
  • - {{ 'admin::base: nombre de mots indexes sur la base' | trans }} - {{ databox.get_index_amount() }} -
  • - {% if app['conf'].get(['registry', 'modules', 'thesaurus']) %} -
  • - {{ 'admin::base: nombre de termes de Thesaurus indexes :' | trans }} - {{ databox.get_thesaurus_hits() }} -
  • - {% endif %} - {% endif %} -
    -
    -

    - {{ "admin::base: document indexes en utilisant la fiche xml" | trans }} : - -

    -
    -
    -
    -
    -

    - {{ "admin::base: document indexes en utilisant le thesaurus" | trans }} : - -

    -
    -
    +
    +
    +
    ...
    +
    +
    @@ -248,65 +220,91 @@