mirror of
https://github.com/alchemy-fr/Phraseanet.git
synced 2025-10-24 02:13:15 +00:00
PHRAS-716 #time 3d
fix : admin/base progress bar (indexation) fix : reindex button fix : indexation does not skip records between fetches
This commit is contained in:
@@ -577,29 +577,23 @@ class DataboxController extends Controller
|
|||||||
|
|
||||||
$ret = [
|
$ret = [
|
||||||
'success' => false,
|
'success' => false,
|
||||||
'msg' => $this->app->trans('An error occured'),
|
|
||||||
'sbas_id' => null,
|
'sbas_id' => null,
|
||||||
|
'msg' => $this->app->trans('An error occured'),
|
||||||
'indexable' => false,
|
'indexable' => false,
|
||||||
'records' => 0,
|
|
||||||
'xml_indexed' => 0,
|
|
||||||
'thesaurus_indexed' => 0,
|
|
||||||
'viewname' => null,
|
'viewname' => null,
|
||||||
'printLogoURL' => null,
|
'printLogoURL' => null,
|
||||||
|
'counts' => null,
|
||||||
];
|
];
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$databox = $this->findDataboxById($databox_id);
|
$databox = $this->findDataboxById($databox_id);
|
||||||
$data = $databox->get_indexed_record_amount();
|
|
||||||
|
|
||||||
|
$ret['sbas_id'] = $databox_id;
|
||||||
$ret['indexable'] = $appbox->is_databox_indexable($databox);
|
$ret['indexable'] = $appbox->is_databox_indexable($databox);
|
||||||
$ret['viewname'] = (($databox->get_dbname() == $databox->get_viewname())
|
$ret['viewname'] = (($databox->get_dbname() == $databox->get_viewname())
|
||||||
? $this->app->trans('admin::base: aucun alias')
|
? $this->app->trans('admin::base: aucun alias')
|
||||||
: $databox->get_viewname());
|
: $databox->get_viewname());
|
||||||
$ret['records'] = $databox->get_record_amount();
|
$ret['counts'] = $databox->get_counts();
|
||||||
$ret['sbas_id'] = $databox_id;
|
|
||||||
$ret['xml_indexed'] = $data['xml_indexed'];
|
|
||||||
$ret['thesaurus_indexed'] = $data['thesaurus_indexed'];
|
|
||||||
$ret['jeton_subdef'] = $data['jeton_subdef'];
|
|
||||||
if ($this->app['filesystem']->exists($this->app['root.path'] . '/config/minilogos/logopdf_' . $databox_id . '.jpg')) {
|
if ($this->app['filesystem']->exists($this->app['root.path'] . '/config/minilogos/logopdf_' . $databox_id . '.jpg')) {
|
||||||
$ret['printLogoURL'] = '/custom/minilogos/logopdf_' . $databox_id . '.jpg';
|
$ret['printLogoURL'] = '/custom/minilogos/logopdf_' . $databox_id . '.jpg';
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ class Indexer
|
|||||||
private $recordIndexer;
|
private $recordIndexer;
|
||||||
private $termIndexer;
|
private $termIndexer;
|
||||||
|
|
||||||
private $indexQueue;
|
private $indexQueue; // contains RecordInterface(s)
|
||||||
private $deleteQueue;
|
private $deleteQueue;
|
||||||
|
|
||||||
private $previousRefreshInterval = self::DEFAULT_REFRESH_INTERVAL;
|
private $previousRefreshInterval = self::DEFAULT_REFRESH_INTERVAL;
|
||||||
|
|||||||
@@ -23,10 +23,11 @@ class BulkOperation
|
|||||||
private $logger;
|
private $logger;
|
||||||
|
|
||||||
private $stack = array();
|
private $stack = array();
|
||||||
private $opCount = 0;
|
private $opData = [];
|
||||||
private $index;
|
private $index;
|
||||||
private $type;
|
private $type;
|
||||||
private $flushLimit = 1000;
|
private $flushLimit = 1000;
|
||||||
|
private $flushCallbacks = [];
|
||||||
|
|
||||||
public function __construct(Client $client, LoggerInterface $logger)
|
public function __construct(Client $client, LoggerInterface $logger)
|
||||||
{
|
{
|
||||||
@@ -52,27 +53,32 @@ class BulkOperation
|
|||||||
$this->flushLimit = (int) $limit;
|
$this->flushLimit = (int) $limit;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function index(array $params)
|
public function onFlush(\Closure $callback)
|
||||||
|
{
|
||||||
|
$this->flushCallbacks[] = $callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function index(array $params, $_data)
|
||||||
{
|
{
|
||||||
$header = $this->buildHeader('index', $params);
|
$header = $this->buildHeader('index', $params);
|
||||||
$body = igorw\get_in($params, ['body']);
|
$body = igorw\get_in($params, ['body']);
|
||||||
$this->push($header, $body);
|
$this->push($header, $body, $_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function delete(array $params)
|
public function delete(array $params, $_data)
|
||||||
{
|
{
|
||||||
$this->push($this->buildHeader('delete', $params));
|
$this->push($this->buildHeader('delete', $params), null, $_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function push($header, $body = null)
|
private function push($header, $body, $_data)
|
||||||
{
|
{
|
||||||
$this->stack[] = $header;
|
$this->stack[] = $header;
|
||||||
if ($body) {
|
if ($body) {
|
||||||
$this->stack[] = $body;
|
$this->stack[] = $body;
|
||||||
}
|
}
|
||||||
$this->opCount++;
|
$this->opData[] = $_data;
|
||||||
|
|
||||||
if ($this->flushLimit === $this->opCount) {
|
if (count($this->opData) === $this->flushLimit) {
|
||||||
$this->flush();
|
$this->flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -93,11 +99,10 @@ class BulkOperation
|
|||||||
}
|
}
|
||||||
$params['body'] = $this->stack;
|
$params['body'] = $this->stack;
|
||||||
|
|
||||||
$this->logger->debug("ES Bulk query about to be performed\n", ['opCount' => $this->opCount]);
|
$this->logger->debug("ES Bulk query about to be performed\n", ['opCount' => count($this->opData)]);
|
||||||
|
|
||||||
$response = $this->client->bulk($params);
|
$response = $this->client->bulk($params);
|
||||||
$this->stack = array();
|
$this->stack = array();
|
||||||
$this->opCount = 0;
|
|
||||||
|
|
||||||
if (igorw\get_in($response, ['errors'], true)) {
|
if (igorw\get_in($response, ['errors'], true)) {
|
||||||
foreach ($response['items'] as $key => $item) {
|
foreach ($response['items'] as $key => $item) {
|
||||||
@@ -106,6 +111,10 @@ class BulkOperation
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
foreach($this->flushCallbacks as $flushCallback) {
|
||||||
|
$flushCallback($this->opData);
|
||||||
|
}
|
||||||
|
$this->opData = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
private function buildHeader($key, array $params)
|
private function buildHeader($key, array $params)
|
||||||
|
|||||||
@@ -16,12 +16,14 @@ use Alchemy\Phrasea\SearchEngine\Elastic\Exception\Exception;
|
|||||||
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegate;
|
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegate;
|
||||||
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegateInterface;
|
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegateInterface;
|
||||||
use Closure;
|
use Closure;
|
||||||
|
use databox;
|
||||||
use Doctrine\DBAL\Connection;
|
use Doctrine\DBAL\Connection;
|
||||||
use Doctrine\DBAL\Driver\Connection as ConnectionInterface;
|
use Doctrine\DBAL\Driver\Connection as ConnectionInterface;
|
||||||
use PDO;
|
use PDO;
|
||||||
|
|
||||||
class Fetcher
|
class Fetcher
|
||||||
{
|
{
|
||||||
|
private $databox;
|
||||||
private $connection;
|
private $connection;
|
||||||
private $statement;
|
private $statement;
|
||||||
private $delegate;
|
private $delegate;
|
||||||
@@ -34,13 +36,19 @@ class Fetcher
|
|||||||
private $postFetch;
|
private $postFetch;
|
||||||
private $onDrain;
|
private $onDrain;
|
||||||
|
|
||||||
public function __construct(ConnectionInterface $connection, array $hydrators, FetcherDelegateInterface $delegate = null)
|
public function __construct(databox $databox, array $hydrators, FetcherDelegateInterface $delegate = null)
|
||||||
{
|
{
|
||||||
$this->connection = $connection;
|
$this->databox = $databox;
|
||||||
|
$this->connection = $databox->get_connection();;
|
||||||
$this->hydrators = $hydrators;
|
$this->hydrators = $hydrators;
|
||||||
$this->delegate = $delegate ?: new FetcherDelegate();
|
$this->delegate = $delegate ?: new FetcherDelegate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function getDatabox()
|
||||||
|
{
|
||||||
|
return $this->databox;
|
||||||
|
}
|
||||||
|
|
||||||
public function fetch()
|
public function fetch()
|
||||||
{
|
{
|
||||||
if (empty($this->buffer)) {
|
if (empty($this->buffer)) {
|
||||||
@@ -64,7 +72,6 @@ class Fetcher
|
|||||||
$records[$record['record_id']] = $record;
|
$records[$record['record_id']] = $record;
|
||||||
$this->offset++;
|
$this->offset++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (empty($records)) {
|
if (empty($records)) {
|
||||||
$this->onDrain->__invoke();
|
$this->onDrain->__invoke();
|
||||||
return;
|
return;
|
||||||
@@ -87,6 +94,12 @@ class Fetcher
|
|||||||
return $records;
|
return $records;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function restart()
|
||||||
|
{
|
||||||
|
$this->buffer = array();
|
||||||
|
$this->offset = 0;
|
||||||
|
}
|
||||||
|
|
||||||
public function setBatchSize($size)
|
public function setBatchSize($size)
|
||||||
{
|
{
|
||||||
if ($size < 1) {
|
if ($size < 1) {
|
||||||
@@ -105,28 +118,24 @@ class Fetcher
|
|||||||
$this->onDrain = $onDrain;
|
$this->onDrain = $onDrain;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return \Doctrine\DBAL\Driver\Statement
|
||||||
|
*/
|
||||||
private function getExecutedStatement()
|
private function getExecutedStatement()
|
||||||
{
|
{
|
||||||
if (!$this->statement) {
|
if (!$this->statement) {
|
||||||
$sql = <<<SQL
|
$sql = "SELECT r.record_id"
|
||||||
SELECT r.record_id
|
. ", r.coll_id AS collection_id"
|
||||||
, r.coll_id as collection_id
|
. ", c.asciiname AS collection_name"
|
||||||
, c.asciiname as collection_name
|
. ", r.uuid"
|
||||||
, r.uuid
|
. ", r.status AS flags_bitfield"
|
||||||
, r.status as flags_bitfield
|
. ", r.sha256" // -- TODO rename in "hash"
|
||||||
, r.sha256 -- TODO rename in "hash"
|
. ", r.originalname AS original_name"
|
||||||
, r.originalname as original_name
|
. ", r.mime, r.type, r.parent_record_id, r.credate AS created_on, r.moddate AS updated_on"
|
||||||
, r.mime
|
. " FROM record r INNER JOIN coll c ON (c.coll_id = r.coll_id)"
|
||||||
, r.type
|
. " -- WHERE"
|
||||||
, r.parent_record_id
|
. " ORDER BY r.record_id DESC"
|
||||||
, r.credate as created_on
|
. " LIMIT :offset, :limit";
|
||||||
, r.moddate as updated_on
|
|
||||||
FROM record r
|
|
||||||
INNER JOIN coll c ON (c.coll_id = r.coll_id)
|
|
||||||
-- WHERE
|
|
||||||
ORDER BY r.record_id DESC
|
|
||||||
LIMIT :offset, :limit
|
|
||||||
SQL;
|
|
||||||
|
|
||||||
$where = $this->delegate->buildWhereClause();
|
$where = $this->delegate->buildWhereClause();
|
||||||
$sql = str_replace('-- WHERE', $where, $sql);
|
$sql = str_replace('-- WHERE', $where, $sql);
|
||||||
|
|||||||
@@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
|
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer;
|
||||||
|
|
||||||
|
use Alchemy\Phrasea\Model\RecordInterface;
|
||||||
use Alchemy\Phrasea\SearchEngine\Elastic\Exception\Exception;
|
use Alchemy\Phrasea\SearchEngine\Elastic\Exception\Exception;
|
||||||
use Alchemy\Phrasea\SearchEngine\Elastic\Exception\MergeException;
|
use Alchemy\Phrasea\SearchEngine\Elastic\Exception\MergeException;
|
||||||
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\BulkOperation;
|
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\BulkOperation;
|
||||||
@@ -59,6 +60,19 @@ class RecordIndexer
|
|||||||
|
|
||||||
private $logger;
|
private $logger;
|
||||||
|
|
||||||
|
private $submited_records = [];
|
||||||
|
|
||||||
|
private function getUniqueOperationId()
|
||||||
|
{
|
||||||
|
static $_key = null;
|
||||||
|
static $_n = 0;
|
||||||
|
if($_key == null) {
|
||||||
|
mt_srand();
|
||||||
|
$_key = dechex(mt_rand());
|
||||||
|
}
|
||||||
|
return $_key . '_' . ($_n++);
|
||||||
|
}
|
||||||
|
|
||||||
public function __construct(Structure $structure, RecordHelper $helper, Thesaurus $thesaurus, \appbox $appbox, array $locales, LoggerInterface $logger)
|
public function __construct(Structure $structure, RecordHelper $helper, Thesaurus $thesaurus, \appbox $appbox, array $locales, LoggerInterface $logger)
|
||||||
{
|
{
|
||||||
$this->structure = $structure;
|
$this->structure = $structure;
|
||||||
@@ -73,7 +87,31 @@ class RecordIndexer
|
|||||||
{
|
{
|
||||||
foreach ($databoxes as $databox) {
|
foreach ($databoxes as $databox) {
|
||||||
$this->logger->info(sprintf('Indexing database %s...', $databox->get_viewname()));
|
$this->logger->info(sprintf('Indexing database %s...', $databox->get_viewname()));
|
||||||
$fetcher = $this->createFetcherForDatabox($databox);
|
$fetcher = $this->createFetcherForDatabox($databox); // no delegate, scan the whole records
|
||||||
|
|
||||||
|
// post fetch : flag records as "indexing"
|
||||||
|
$fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) {
|
||||||
|
RecordQueuer::didStartIndexingRecords($records, $databox);
|
||||||
|
// do not restart the fetcher since it has no clause on jetons
|
||||||
|
});
|
||||||
|
|
||||||
|
// bulk flush : flag records as "indexed"
|
||||||
|
$bulk->onFlush(function($operation_identifiers) use ($databox) {
|
||||||
|
// nb: because the same bulk could be used by many "clients", this (each) callback may receive
|
||||||
|
// operation_identifiers that does not belong to us.
|
||||||
|
// flag only records that our fetcher worked on
|
||||||
|
$ops = array_flip($operation_identifiers); // now the key is the op_identifiers
|
||||||
|
$records = array_intersect_key(
|
||||||
|
$this->submited_records, // this is OUR records list
|
||||||
|
$ops // reduce to the records indexed by this bulk (should be the same...)
|
||||||
|
);
|
||||||
|
// Commit and remove "indexing" flag
|
||||||
|
RecordQueuer::didFinishIndexingRecords(array_values($records), $databox);
|
||||||
|
foreach (array_keys($records) as $id) {
|
||||||
|
unset($this->submited_records[$id]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
$this->indexFromFetcher($bulk, $fetcher);
|
$this->indexFromFetcher($bulk, $fetcher);
|
||||||
$this->logger->info(sprintf('Finished indexing %s', $databox->get_viewname()));
|
$this->logger->info(sprintf('Finished indexing %s', $databox->get_viewname()));
|
||||||
}
|
}
|
||||||
@@ -91,23 +129,63 @@ class RecordIndexer
|
|||||||
// Make fetcher
|
// Make fetcher
|
||||||
$delegate = new ScheduledFetcherDelegate();
|
$delegate = new ScheduledFetcherDelegate();
|
||||||
$fetcher = $this->createFetcherForDatabox($databox, $delegate);
|
$fetcher = $this->createFetcherForDatabox($databox, $delegate);
|
||||||
// Keep track of fetched records, flag them as "indexing"
|
|
||||||
$fetched = array();
|
// post fetch : flag records as "indexing"
|
||||||
$fetcher->setPostFetch(function(array $records) use ($databox, &$fetched) {
|
$fetcher->setPostFetch(function(array $records) use ($databox, $fetcher) {
|
||||||
// TODO Do not keep all indexed records in memory...
|
|
||||||
$fetched += $records;
|
|
||||||
RecordQueuer::didStartIndexingRecords($records, $databox);
|
RecordQueuer::didStartIndexingRecords($records, $databox);
|
||||||
|
// because changing the flag on the records affects the "where" clause of the fetcher,
|
||||||
|
// restart it each time
|
||||||
|
$fetcher->restart();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// bulk flush : flag records as "indexed"
|
||||||
|
$bulk->onFlush(function($operation_identifiers) use ($databox) {
|
||||||
|
// nb: because the same bulk could be used by many "clients", this (each) callback may receive
|
||||||
|
// operation_identifiers that does not belong to us.
|
||||||
|
// flag only records that our fetcher worked on
|
||||||
|
$ops = array_flip($operation_identifiers); // now the key is the op_identifiers
|
||||||
|
$records = array_intersect_key(
|
||||||
|
$this->submited_records, // this is OUR records list
|
||||||
|
$ops // reduce to the records indexed by this bulk (should be the same...)
|
||||||
|
);
|
||||||
|
// Commit and remove "indexing" flag
|
||||||
|
RecordQueuer::didFinishIndexingRecords(array_values($records), $databox);
|
||||||
|
foreach (array_keys($records) as $id) {
|
||||||
|
unset($this->submited_records[$id]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Perform indexing
|
// Perform indexing
|
||||||
$this->indexFromFetcher($bulk, $fetcher);
|
$this->indexFromFetcher($bulk, $fetcher);
|
||||||
// Commit and remove "indexing" flag
|
|
||||||
$bulk->flush();
|
|
||||||
RecordQueuer::didFinishIndexingRecords($fetched, $databox);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function index(BulkOperation $bulk, Iterator $records)
|
public function index(BulkOperation $bulk, Iterator $records)
|
||||||
{
|
{
|
||||||
foreach ($this->createFetchersForRecords($records) as $fetcher) {
|
foreach ($this->createFetchersForRecords($records) as $fetcher) {
|
||||||
|
$databox = $fetcher->getDatabox();
|
||||||
|
|
||||||
|
// post fetch : flag records as "indexing"
|
||||||
|
$fetcher->setPostFetch(function(array $records) use ($fetcher, $databox) {
|
||||||
|
RecordQueuer::didStartIndexingRecords($records, $databox);
|
||||||
|
});
|
||||||
|
|
||||||
|
// bulk flush : flag records as "indexed"
|
||||||
|
$bulk->onFlush(function($operation_identifiers) use ($databox) {
|
||||||
|
// nb: because the same bulk could be used by many "clients", this (each) callback may receive
|
||||||
|
// operation_identifiers that does not belong to us.
|
||||||
|
// flag only records that our fetcher worked on
|
||||||
|
$ops = array_flip($operation_identifiers); // now the key is the op_identifiers
|
||||||
|
$records = array_intersect_key(
|
||||||
|
$this->submited_records, // this is OUR records list
|
||||||
|
$ops // reduce to the records indexed by this bulk (should be the same...)
|
||||||
|
);
|
||||||
|
// Commit and remove "indexing" flag
|
||||||
|
RecordQueuer::didFinishIndexingRecords(array_values($records), $databox);
|
||||||
|
foreach (array_keys($records) as $id) {
|
||||||
|
unset($this->submited_records[$id]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
$this->indexFromFetcher($bulk, $fetcher);
|
$this->indexFromFetcher($bulk, $fetcher);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -118,16 +196,19 @@ class RecordIndexer
|
|||||||
$params = array();
|
$params = array();
|
||||||
$params['id'] = $record->getId();
|
$params['id'] = $record->getId();
|
||||||
$params['type'] = self::TYPE_NAME;
|
$params['type'] = self::TYPE_NAME;
|
||||||
$bulk->delete($params);
|
$bulk->delete($params, null); // no _data is related to a delete op
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Iterator $records
|
||||||
|
* @return Fetcher[]
|
||||||
|
*/
|
||||||
private function createFetchersForRecords(Iterator $records)
|
private function createFetchersForRecords(Iterator $records)
|
||||||
{
|
{
|
||||||
$fetchers = array();
|
$fetchers = array();
|
||||||
foreach ($this->groupRecordsByDatabox($records) as $group) {
|
foreach ($this->groupRecordsByDatabox($records) as $group) {
|
||||||
$databox = $group['databox'];
|
$databox = $group['databox'];
|
||||||
$connection = $databox->get_connection();
|
|
||||||
$delegate = new RecordListFetcherDelegate($group['records']);
|
$delegate = new RecordListFetcherDelegate($group['records']);
|
||||||
$fetchers[] = $this->createFetcherForDatabox($databox, $delegate);
|
$fetchers[] = $this->createFetcherForDatabox($databox, $delegate);
|
||||||
}
|
}
|
||||||
@@ -139,7 +220,7 @@ class RecordIndexer
|
|||||||
{
|
{
|
||||||
$connection = $databox->get_connection();
|
$connection = $databox->get_connection();
|
||||||
$candidateTerms = new CandidateTerms($databox);
|
$candidateTerms = new CandidateTerms($databox);
|
||||||
$fetcher = new Fetcher($connection, array(
|
$fetcher = new Fetcher($databox, array(
|
||||||
new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->helper),
|
new CoreHydrator($databox->get_sbas_id(), $databox->get_viewname(), $this->helper),
|
||||||
new TitleHydrator($connection),
|
new TitleHydrator($connection),
|
||||||
new MetadataHydrator($connection, $this->structure, $this->helper),
|
new MetadataHydrator($connection, $this->structure, $this->helper),
|
||||||
@@ -169,13 +250,18 @@ class RecordIndexer
|
|||||||
|
|
||||||
private function indexFromFetcher(BulkOperation $bulk, Fetcher $fetcher)
|
private function indexFromFetcher(BulkOperation $bulk, Fetcher $fetcher)
|
||||||
{
|
{
|
||||||
|
/** @var RecordInterface $record */
|
||||||
while ($record = $fetcher->fetch()) {
|
while ($record = $fetcher->fetch()) {
|
||||||
$params = array();
|
$params = array();
|
||||||
$params['id'] = $record['id'];
|
$params['id'] = $record['id'];
|
||||||
unset($record['id']);
|
unset($record['id']);
|
||||||
$params['type'] = self::TYPE_NAME;
|
$params['type'] = self::TYPE_NAME;
|
||||||
$params['body'] = $this->transform($record);
|
$params['body'] = $this->transform($record);
|
||||||
$bulk->index($params);
|
|
||||||
|
$opIdentifier = $this->getUniqueOperationId();
|
||||||
|
$this->submited_records[$opIdentifier] = $record;
|
||||||
|
|
||||||
|
$bulk->index($params, $opIdentifier);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -35,38 +35,42 @@ class RecordQueuer
|
|||||||
$connection = $collection->get_connection();
|
$connection = $collection->get_connection();
|
||||||
|
|
||||||
// Set TO_INDEX flag on all records from this collection
|
// Set TO_INDEX flag on all records from this collection
|
||||||
$sql = <<<SQL
|
$sql = "UPDATE record SET jeton = (jeton | :token) WHERE coll_id = :coll_id";
|
||||||
UPDATE record
|
|
||||||
SET jeton = (jeton | :token)
|
|
||||||
WHERE coll_id = :coll_id
|
|
||||||
SQL;
|
|
||||||
$stmt = $connection->prepare($sql);
|
$stmt = $connection->prepare($sql);
|
||||||
$stmt->bindValue(':token', Flag::TO_INDEX, PDO::PARAM_INT);
|
$stmt->bindValue(':token', Flag::TO_INDEX, PDO::PARAM_INT);
|
||||||
$stmt->bindValue(':coll_id', $collection->get_coll_id(), PDO::PARAM_INT);
|
$stmt->bindValue(':coll_id', $collection->get_coll_id(), PDO::PARAM_INT);
|
||||||
$stmt->execute();
|
$stmt->execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param array $records
|
||||||
|
* @param $databox
|
||||||
|
*
|
||||||
|
* nb: changing the jeton may affect a fetcher if his "where" clause (delegate) depends on jeton.
|
||||||
|
* in this case the client of the fetcher must set a "postFetch" callback and restart the fetcher
|
||||||
|
*/
|
||||||
public static function didStartIndexingRecords(array $records, $databox)
|
public static function didStartIndexingRecords(array $records, $databox)
|
||||||
{
|
{
|
||||||
$connection = $databox->get_connection();
|
$connection = $databox->get_connection();
|
||||||
$sql = <<<SQL
|
$sql = "UPDATE record SET jeton = (jeton | :flag) WHERE record_id IN (:record_ids)";
|
||||||
UPDATE record
|
|
||||||
SET jeton = (jeton | :flag)
|
|
||||||
WHERE record_id IN (:record_ids)
|
|
||||||
SQL;
|
|
||||||
self::executeFlagQuery($connection, $sql, Flag::INDEXING, $records);
|
self::executeFlagQuery($connection, $sql, Flag::INDEXING, $records);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param array $records
|
||||||
|
* @param $databox
|
||||||
|
*
|
||||||
|
* nb: changing the jeton may affect a fetcher if his "where" clause (delegate) depends on jeton.
|
||||||
|
* in this case the client of the fetcher must set a "postFetch" callback and restart the fetcher
|
||||||
|
*/
|
||||||
public static function didFinishIndexingRecords(array $records, $databox)
|
public static function didFinishIndexingRecords(array $records, $databox)
|
||||||
{
|
{
|
||||||
$connection = $databox->get_connection();
|
$connection = $databox->get_connection();
|
||||||
$sql = <<<SQL
|
$sql = "UPDATE record SET jeton = (jeton & ~ :flag) WHERE record_id IN (:record_ids)";
|
||||||
UPDATE record
|
|
||||||
SET jeton = (jeton & ~ :flag)
|
self::executeFlagQuery($connection, $sql, Flag::TO_INDEX | Flag::INDEXING, $records);
|
||||||
WHERE record_id IN (:record_ids)
|
|
||||||
SQL;
|
|
||||||
$flag = Flag::TO_INDEX | Flag::INDEXING;
|
|
||||||
self::executeFlagQuery($connection, $sql, $flag, $records);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static function executeFlagQuery($connection, $sql, $flag, array $records)
|
private static function executeFlagQuery($connection, $sql, $flag, array $records)
|
||||||
|
|||||||
@@ -294,42 +294,6 @@ class databox extends base implements ThumbnailedElement
|
|||||||
return $this->id;
|
return $this->id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function get_unique_keywords()
|
|
||||||
{
|
|
||||||
$sql = "SELECT COUNT(kword_id) AS n FROM kword";
|
|
||||||
|
|
||||||
$stmt = $this->get_connection()->prepare($sql);
|
|
||||||
$stmt->execute();
|
|
||||||
$rowbas = $stmt->fetch(PDO::FETCH_ASSOC);
|
|
||||||
$stmt->closeCursor();
|
|
||||||
|
|
||||||
return ($rowbas ? $rowbas['n'] : null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function get_index_amount()
|
|
||||||
{
|
|
||||||
$sql = "SELECT COUNT(idx_id) AS n FROM idx";
|
|
||||||
|
|
||||||
$stmt = $this->get_connection()->prepare($sql);
|
|
||||||
$stmt->execute();
|
|
||||||
$rowbas = $stmt->fetch(PDO::FETCH_ASSOC);
|
|
||||||
$stmt->closeCursor();
|
|
||||||
|
|
||||||
return ($rowbas ? $rowbas['n'] : null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function get_thesaurus_hits()
|
|
||||||
{
|
|
||||||
$sql = "SELECT COUNT(thit_id) AS n FROM thit";
|
|
||||||
|
|
||||||
$stmt = $this->get_connection()->prepare($sql);
|
|
||||||
$stmt->execute();
|
|
||||||
$rowbas = $stmt->fetch(PDO::FETCH_ASSOC);
|
|
||||||
$stmt->closeCursor();
|
|
||||||
|
|
||||||
return ($rowbas ? $rowbas['n'] : null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public function get_record_details($sort)
|
public function get_record_details($sort)
|
||||||
{
|
{
|
||||||
$sql = "SELECT record.coll_id, ISNULL(coll.coll_id) AS lostcoll,
|
$sql = "SELECT record.coll_id, ISNULL(coll.coll_id) AS lostcoll,
|
||||||
@@ -390,36 +354,46 @@ class databox extends base implements ThumbnailedElement
|
|||||||
return $amount;
|
return $amount;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function get_indexed_record_amount()
|
public function get_counts()
|
||||||
{
|
{
|
||||||
$sql = "SELECT status & 3 AS status, SUM(1) AS n FROM record GROUP BY(status & 3)";
|
$mask = PhraseaTokens::MAKE_SUBDEF | PhraseaTokens::TO_INDEX | PhraseaTokens::INDEXING; // we only care about those "jetons"
|
||||||
|
$sql = "SELECT type, jeton & (".$mask.") AS status, SUM(1) AS n FROM record GROUP BY type, (jeton & ".$mask.")";
|
||||||
$stmt = $this->get_connection()->prepare($sql);
|
$stmt = $this->get_connection()->prepare($sql);
|
||||||
$stmt->execute();
|
$stmt->execute();
|
||||||
$rs = $stmt->fetchAll(PDO::FETCH_ASSOC);
|
$rs = $stmt->fetchAll(PDO::FETCH_ASSOC);
|
||||||
$stmt->closeCursor();
|
$stmt->closeCursor();
|
||||||
|
|
||||||
$ret = array(
|
$ret = array(
|
||||||
'xml_indexed' => 0,
|
'records' => 0,
|
||||||
'thesaurus_indexed' => 0,
|
'records_indexed' => 0, // jetons = 0;0
|
||||||
'jeton_subdef' => array()
|
'records_to_index' => 0, // jetons = 0;1
|
||||||
|
'records_not_indexed' => 0, // jetons = 1;0
|
||||||
|
'records_indexing' => 0, // jetons = 1;1
|
||||||
|
'subdefs_todo' => array() // by type "image", "video", ...
|
||||||
);
|
);
|
||||||
|
|
||||||
foreach ($rs as $row) {
|
foreach ($rs as $row) {
|
||||||
|
$ret['records'] += ($n = (int)($row['n']));
|
||||||
$status = $row['status'];
|
$status = $row['status'];
|
||||||
if ($status & 1)
|
switch($status & (PhraseaTokens::TO_INDEX | PhraseaTokens::INDEXING)) {
|
||||||
$ret['xml_indexed'] += $row['n'];
|
case 0:
|
||||||
if ($status & 2)
|
$ret['records_indexed'] += $n;
|
||||||
$ret['thesaurus_indexed'] += $row['n'];
|
break;
|
||||||
}
|
case PhraseaTokens::TO_INDEX:
|
||||||
|
$ret['records_to_index'] += $n;
|
||||||
$sql = "SELECT type, COUNT(record_id) AS n FROM record WHERE jeton & ".PhraseaTokens::MAKE_SUBDEF." GROUP BY type";
|
break;
|
||||||
$stmt = $this->get_connection()->prepare($sql);
|
case PhraseaTokens::INDEXING:
|
||||||
$stmt->execute();
|
$ret['records_not_indexed'] += $n;
|
||||||
$rs = $stmt->fetchAll(PDO::FETCH_ASSOC);
|
break;
|
||||||
$stmt->closeCursor();
|
case PhraseaTokens::INDEXING | PhraseaTokens::TO_INDEX:
|
||||||
|
$ret['records_indexing'] += $n;
|
||||||
foreach ($rs as $row) {
|
break;
|
||||||
$ret['jeton_subdef'][$row['type']] = (int)$row['n'];
|
}
|
||||||
|
if($status & PhraseaTokens::MAKE_SUBDEF) {
|
||||||
|
if(!array_key_exists($row['type'], $ret['subdefs_todo'])) {
|
||||||
|
$ret['subdefs_todo'][$row['type']] = 0;
|
||||||
|
}
|
||||||
|
$ret['subdefs_todo'][$row['type']] += $n;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return $ret;
|
return $ret;
|
||||||
@@ -1057,6 +1031,12 @@ class databox extends base implements ThumbnailedElement
|
|||||||
{
|
{
|
||||||
$this->get_connection()->update('pref', ['updated_on' => '0000-00-00 00:00:00'], ['prop' => 'indexes']);
|
$this->get_connection()->update('pref', ['updated_on' => '0000-00-00 00:00:00'], ['prop' => 'indexes']);
|
||||||
|
|
||||||
|
// Set TO_INDEX flag on all records
|
||||||
|
$sql = "UPDATE record SET jeton = (jeton | :token)";
|
||||||
|
$stmt = $this->connection->prepare($sql);
|
||||||
|
$stmt->bindValue(':token', PhraseaTokens::TO_INDEX, PDO::PARAM_INT);
|
||||||
|
$stmt->execute();
|
||||||
|
|
||||||
return $this;
|
return $this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -45,7 +45,7 @@
|
|||||||
|
|
||||||
<li>
|
<li>
|
||||||
{{ 'admin::base: nombre d\'enregistrements sur la base :' | trans }}
|
{{ 'admin::base: nombre d\'enregistrements sur la base :' | trans }}
|
||||||
<span id="nrecords">{{ databox.get_record_amount() }}</span>
|
<span id="records"></span>
|
||||||
|
|
||||||
(<a href="{{ path('admin_database_display_document_details', {'databox_id': databox.get_sbas_id()}) }}" class="ajax" target="rights">{{ 'phraseanet:: details' | trans }}</a>)
|
(<a href="{{ path('admin_database_display_document_details', {'databox_id': databox.get_sbas_id()}) }}" class="ajax" target="rights">{{ 'phraseanet:: details' | trans }}</a>)
|
||||||
</li>
|
</li>
|
||||||
@@ -54,41 +54,13 @@
|
|||||||
{{ 'admin::base: subdefs to be created :' | trans }}
|
{{ 'admin::base: subdefs to be created :' | trans }}
|
||||||
<span id="subdefs_todo"></span>
|
<span id="subdefs_todo"></span>
|
||||||
</li>
|
</li>
|
||||||
|
|
||||||
{% if showDetail %}
|
|
||||||
<li>
|
|
||||||
{{ 'admin::base: nombre de mots uniques sur la base :' | trans }}
|
|
||||||
{{ databox.get_unique_keywords() }}
|
|
||||||
</li>
|
|
||||||
<li>
|
|
||||||
{{ 'admin::base: nombre de mots indexes sur la base' | trans }}
|
|
||||||
{{ databox.get_index_amount() }}
|
|
||||||
</li>
|
|
||||||
{% if app['conf'].get(['registry', 'modules', 'thesaurus']) %}
|
|
||||||
<li>
|
|
||||||
{{ 'admin::base: nombre de termes de Thesaurus indexes :' | trans }}
|
|
||||||
{{ databox.get_thesaurus_hits() }}
|
|
||||||
</li>
|
|
||||||
{% endif %}
|
|
||||||
{% endif %}
|
|
||||||
</ul>
|
</ul>
|
||||||
|
|
||||||
<div id="INDEX_P_BAR" style="margin-bottom:20px;">
|
<div id="INDEX_P_BAR" style="margin-bottom:20px; width:50%">
|
||||||
<div style="height: 35px;">
|
<div class="progress">
|
||||||
<p>
|
<div class="bar bar-success records_indexed" style="transition: none; width:0%;">...</div>
|
||||||
{{ "admin::base: document indexes en utilisant la fiche xml" | trans }} :
|
<div class="bar bar-warning records_indexing" style="transition:none; width:0%;"></div>
|
||||||
<span id="xml_indexed"></span>
|
<div class="bar bar-danger records_not_indexed" style="transition:none; width:0%;"></div>
|
||||||
</p>
|
|
||||||
<div id="xml_indexed_bar"></div>
|
|
||||||
<div id="xml_indexed_percent"></div>
|
|
||||||
</div>
|
|
||||||
<div style="height: 35px;">
|
|
||||||
<p>
|
|
||||||
{{ "admin::base: document indexes en utilisant le thesaurus" | trans }} :
|
|
||||||
<span id="thesaurus_indexed"></span>
|
|
||||||
</p>
|
|
||||||
<div id="thesaurus_indexed_bar"></div>
|
|
||||||
<div id="thesaurus_indexed_percent"></div>
|
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
@@ -248,65 +220,91 @@
|
|||||||
</div>
|
</div>
|
||||||
|
|
||||||
<script type="text/javascript">
|
<script type="text/javascript">
|
||||||
function refreshDatabaseInformations()
|
|
||||||
{
|
|
||||||
// stop the refresh if the page changed
|
|
||||||
if($("#thesaurus_indexed_bar").length == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
function displayDatabaseInformations(delay)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
clearTimeout(document.refreshDatabaseInformations_timer);
|
||||||
|
}
|
||||||
|
catch(err) {
|
||||||
|
|
||||||
|
}
|
||||||
|
document.refreshDatabaseInformations_timer = setTimeout("_displayDatabaseInformations();", delay);
|
||||||
|
}
|
||||||
|
|
||||||
|
function _displayDatabaseInformations()
|
||||||
|
{
|
||||||
|
var container = $("#INDEX_P_BAR");
|
||||||
|
if(!container || container.length == 0) {
|
||||||
|
return; // wrong page ?
|
||||||
|
}
|
||||||
$.ajax({
|
$.ajax({
|
||||||
type: "GET",
|
type: "GET",
|
||||||
url: "/admin/databox/{{ databox.get_sbas_id() }}/informations/documents/",
|
url: "/admin/databox/{{ databox.get_sbas_id() }}/informations/documents/",
|
||||||
dataType: 'json',
|
dataType: 'json',
|
||||||
data: {},
|
data: {},
|
||||||
success: function(data){
|
success: function (data) {
|
||||||
if(data.viewname === '') {
|
try {
|
||||||
$("#viewname").html("{{ 'admin::base: aucun alias' | trans }}");
|
if (data.viewname === '') {
|
||||||
} else {
|
$("#viewname").html("{{ 'admin::base: aucun alias' | trans }}");
|
||||||
$("#viewname").html(data.viewname);
|
} else {
|
||||||
}
|
$("#viewname").html(data.viewname);
|
||||||
|
|
||||||
$("#nrecords").text(data.records);
|
|
||||||
$("#is_indexable").attr('checked', data.indexable);
|
|
||||||
$("#xml_indexed").text(data.xml_indexed);
|
|
||||||
$("#thesaurus_indexed").text(data.thesaurus_indexed);
|
|
||||||
|
|
||||||
if(data.records > 0)
|
|
||||||
{
|
|
||||||
var p;
|
|
||||||
p = 100*data.xml_indexed/data.records;
|
|
||||||
$("#xml_indexed_bar").width(Math.round(2*p)); // 0..200px
|
|
||||||
$("#xml_indexed_percent").text((Math.round(p*100)/100)+" %");
|
|
||||||
p = 100*data.thesaurus_indexed/data.records;
|
|
||||||
$("#thesaurus_indexed_bar").width(Math.round(2*p));
|
|
||||||
$("#thesaurus_indexed_percent").text((Math.round(p*100)/100)+" %");
|
|
||||||
|
|
||||||
var t = "";
|
|
||||||
for(var i in data.jeton_subdef)
|
|
||||||
{
|
|
||||||
t += (t==""?"":" ; ") + i + ": " + data.jeton_subdef[i];
|
|
||||||
}
|
}
|
||||||
if(t == "") {
|
|
||||||
t = "0";
|
$("#is_indexable").attr('checked', data.indexable);
|
||||||
|
$("#records").text(data.counts.records);
|
||||||
|
|
||||||
|
if (data.counts.records > 0) {
|
||||||
|
var records_indexed = data.counts.records_indexed;
|
||||||
|
var records_not_indexed = data.counts.records_not_indexed; // flag indexing but NOT to_index ???
|
||||||
|
var records_indexing = data.counts.records_indexing;
|
||||||
|
var p;
|
||||||
|
|
||||||
|
p = 100 * records_indexed / data.counts.records;
|
||||||
|
$(".records_indexed", container).width(p + "%").text(records_indexed);
|
||||||
|
|
||||||
|
if (records_not_indexed > 0) {
|
||||||
|
p = 100 * records_not_indexed / data.counts.records;
|
||||||
|
$(".records_not_indexed", container).width(p + "%").text(records_not_indexed);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
$(".records_not_indexed", container).width(0).text("");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (records_indexing > 0) {
|
||||||
|
p = 100 * records_indexing / data.counts.records;
|
||||||
|
$(".records_indexing", container).width(p + "%").text(records_indexing);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
$(".records_indexing", container).width(0).text("");
|
||||||
|
}
|
||||||
|
|
||||||
|
var t = "";
|
||||||
|
for (var i in data.counts.subdefs_todo) {
|
||||||
|
t += (t == "" ? "" : " ; ") + i + ": " + data.counts.subdefs_todo[i];
|
||||||
|
}
|
||||||
|
if (t == "") {
|
||||||
|
t = "0";
|
||||||
|
}
|
||||||
|
$("#subdefs_todo").text(t);
|
||||||
}
|
}
|
||||||
$("#subdefs_todo").text(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
if(data.printLogoURL)
|
if (data.printLogoURL) {
|
||||||
{
|
$("#printLogo").attr("src", data.printLogoURL);
|
||||||
$("#printLogo").attr("src", data.printLogoURL);
|
$("#printLogoDIV_NONE").hide();
|
||||||
$("#printLogoDIV_NONE").hide();
|
$("#printLogoDIV_OK").show();
|
||||||
$("#printLogoDIV_OK").show();
|
}
|
||||||
}
|
else {
|
||||||
else
|
$("#printLogoDIV_OK").hide();
|
||||||
{
|
$("#printLogoDIV_NONE").show();
|
||||||
$("#printLogoDIV_OK").hide();
|
}
|
||||||
$("#printLogoDIV_NONE").show();
|
|
||||||
}
|
|
||||||
|
|
||||||
// refresh every 10 sec.
|
// refresh every 10 sec.
|
||||||
setTimeout("refreshDatabaseInformations();", 10000);
|
displayDatabaseInformations(10000);
|
||||||
|
}
|
||||||
|
catch(err) {
|
||||||
|
// wrong page ? don't refresh again
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -408,7 +406,14 @@
|
|||||||
});
|
});
|
||||||
|
|
||||||
// start the refresh of the page content (progress bar etc...)
|
// start the refresh of the page content (progress bar etc...)
|
||||||
setTimeout("refreshDatabaseInformations();", 2000);
|
try {
|
||||||
|
clearTimeout(document.refreshDatabaseInformations_timer);
|
||||||
|
}
|
||||||
|
catch(err) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
displayDatabaseInformations(200); // wait 200ms
|
||||||
});
|
});
|
||||||
|
|
||||||
</script>
|
</script>
|
||||||
|
|||||||
Reference in New Issue
Block a user