PHRAS-3023_slow-query-while-indexing_4.1

- fix bound limit depending on order/direction options
- add index on dbox:record.moddate
This commit is contained in:
Jean-Yves Gaulier
2020-04-09 19:11:08 +02:00
parent 3d5a6efcc7
commit cb6a781b02
5 changed files with 177 additions and 43 deletions

View File

@@ -17,7 +17,7 @@ class Version
* @var string
*/
private $number = '4.1.0-alpha.25a';
private $number = '4.1.0-alpha.26a';
/**
* @var string

View File

@@ -38,7 +38,7 @@ class ElasticsearchOptions
/** @var string */
private $populateDirection;
/** @var int[] */
/** @var int[][] */
private $_customValues = [];
private $activeTab;
@@ -437,15 +437,9 @@ class ElasticsearchOptions
/**
* @return string
*/
public function getPopulateOrderAsSQL()
public function getPopulateOrder()
{
static $orderAsColumn = [
self::POPULATE_ORDER_RID => "`record_id`",
self::POPULATE_ORDER_MODDATE => "`moddate`",
];
// populateOrder IS one of the keys (ensured by setPopulateOrder)
return $orderAsColumn[$this->populateOrder];
return $this->populateOrder;
}
/**
@@ -465,6 +459,14 @@ class ElasticsearchOptions
}
/**
* @return string
*/
public function getPopulateDirection()
{
return $this->populateDirection;
}
/**
* @return string
*/

View File

@@ -11,7 +11,6 @@
namespace Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record;
use Alchemy\Phrasea\Core\PhraseaTokens;
use Alchemy\Phrasea\SearchEngine\Elastic\ElasticsearchOptions;
use Alchemy\Phrasea\SearchEngine\Elastic\Exception\Exception;
use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegate;
@@ -19,7 +18,9 @@ use Alchemy\Phrasea\SearchEngine\Elastic\Indexer\Record\Delegate\FetcherDelegate
use Closure;
use databox;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Driver\Connection as ConnectionInterface;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Statement;
use LogicException;
use PDO;
class Fetcher
@@ -30,8 +31,16 @@ class Fetcher
private $statement;
private $delegate;
// since we fetch records dy DESC, this will be the HIGHEST record_id fetched during last batch
private $upper_rid = PHP_INT_MAX;
// since we fetch records dy different order/direction, we setup sql limit
/** @var int|string */
private $boundLimit; // may be highest or lowest int or date, as a startup condition for sql or loop
/** @var int|string */
private $lastLimit; // the last falue fetched
/** @var Closure */
private $updateLastLimitDelegate; // must update the lastLimit by comparing the current record rid or moddate while fetching
/** @var string */
private $sqlLimitColumn; // the sql expresion(column) used to order/compare (record_id or moddate)
private $batchSize = 1;
private $buffer = array();
@@ -46,6 +55,32 @@ class Fetcher
$this->connection = $databox->get_connection();;
$this->hydrators = $hydrators;
$this->delegate = $delegate ?: new FetcherDelegate();
// set the boundLimit and updateDelegate, depends on populate-order and populate-direction
// the bound limit value is used on first run, but also as initial value on fetch loop
$this->sqlLimitColumn = ($options->getPopulateOrder() === $options::POPULATE_ORDER_RID) ?
'record_id'
:
'DATE_FORMAT(moddate, \'%Y%m%d%H%i%s\')'; // handles "0000-00..." better than timestamp
//
// too bad we cannot assign to a variable a builtin function ("min" or "max") as a closure (= vector)
// we need to encapsulate the builtin function into a closure in php.
//
if($options->getPopulateDirection() === $options::POPULATE_DIRECTION_ASC) {
$this->boundLimit = 0;
$this->updateLastLimitDelegate = function($record) {
$this->lastLimit = max($this->lastLimit, (int)($record['limit_value']));
};
}
else {
$this->boundLimit = PHP_INT_MAX;
$this->updateLastLimitDelegate = function($record) {
$this->lastLimit = min($this->lastLimit, (int)($record['limit_value']));
};
}
// limit for first run
$this->lastLimit = $this->boundLimit;
}
public function getDatabox()
@@ -53,6 +88,11 @@ class Fetcher
return $this->databox;
}
/**
* @return mixed
* @throws DBALException
* @throws Exception
*/
public function fetch()
{
if (empty($this->buffer)) {
@@ -65,24 +105,29 @@ class Fetcher
return array_pop($this->buffer);
}
/**
* @return array
* @throws DBALException
* @throws Exception
*/
private function fetchBatch()
{
// Fetch records rows
$statement = $this->getExecutedStatement();
// printf("Query %d(%d) -> %d rows\n", $this->upper_rid, $this->batchSize, $statement->rowCount());
// printf("Query %d(%d) -> %d rows\n", $this->lastLimit, $this->batchSize, $statement->rowCount());
$records = [];
$this->upper_rid = PHP_INT_MAX;
$this->lastLimit = $this->boundLimit; // initial low or high value
while ($record = $statement->fetch()) {
$records[$record['record_id']] = $record;
$rid = (int)($record['record_id']);
if($rid < $this->upper_rid) {
$this->upper_rid = (int)($record['record_id']);
}
// compare/update limit
// call_user_func($this->updateLastLimitDelegate, $records);
($this->updateLastLimitDelegate)($record);
}
if (empty($records)) {
/** @noinspection PhpUndefinedMethodInspection */
$this->onDrain->__invoke();
return;
return [];
}
// Hydrate records
@@ -96,6 +141,7 @@ class Fetcher
}
if ($this->postFetch) {
/** @noinspection PhpUndefinedMethodInspection */
$this->postFetch->__invoke($records);
}
@@ -105,13 +151,13 @@ class Fetcher
public function restart()
{
$this->buffer = array();
$this->upper_rid = PHP_INT_MAX;
$this->lastLimit = $this->boundLimit;
}
public function setBatchSize($size)
{
if ($size < 1) {
throw new \LogicException("Batch size must be greater than or equal to 1");
throw new LogicException("Batch size must be greater than or equal to 1");
}
$this->batchSize = (int) $size;
}
@@ -127,28 +173,34 @@ class Fetcher
}
/**
* @return \Doctrine\DBAL\Driver\Statement
* @return Statement
* @throws DBALException
*/
private function getExecutedStatement()
{
if (!$this->statement) {
$sql = "SELECT r.*, c.asciiname AS collection_name, subdef.width, subdef.height, subdef.size\n"
. " FROM ((\n"
. " SELECT r.record_id, r.coll_id AS collection_id, r.uuid, r.status AS flags_bitfield, r.sha256,\n"
. " r.originalname AS original_name, r.mime, r.type, r.parent_record_id,\n"
. " r.credate AS created_on, r.moddate AS updated_on, r.coll_id\n"
. " FROM record r\n"
. " SELECT record_id, coll_id AS collection_id, uuid, status AS flags_bitfield, sha256,\n"
. " originalname AS original_name, mime, type, parent_record_id,\n"
. " credate AS created_on, moddate AS updated_on, coll_id,\n"
. " " . $this->sqlLimitColumn . " AS limit_value\n"
. " FROM record\n"
. " WHERE -- WHERE\n"
. " ORDER BY " . $this->options->getPopulateOrderAsSQL() . " " . $this->options->getPopulateDirectionAsSQL() . "\n"
. " ORDER BY " . ($this->options->getPopulateOrder() === $this->options::POPULATE_ORDER_RID ? 'record_id':'moddate')
. " " . $this->options->getPopulateDirectionAsSQL() . "\n"
. " LIMIT :limit\n"
. " ) AS r\n"
. " INNER JOIN coll c ON (c.coll_id = r.coll_id)\n"
. " )\n"
. " LEFT JOIN\n"
. " subdef ON subdef.record_id=r.record_id AND subdef.name='document'\n"
. " ORDER BY " . $this->options->getPopulateOrderAsSQL() . " " . $this->options->getPopulateDirectionAsSQL() . "";
. " ORDER BY " . ($this->options->getPopulateOrder() === $this->options::POPULATE_ORDER_RID ? 'record_id':'updated_on')
. " " . $this->options->getPopulateDirectionAsSQL() . "";
$where = 'record_id < :upper_rid';
$where = $this->sqlLimitColumn .
($this->options->getPopulateDirection() === $this->options::POPULATE_DIRECTION_DESC ? ' < ' : ' > ') .
':bound';
if( ($w = $this->delegate->buildWhereClause()) != '') {
$where = '(' . $where . ') AND (' . $w . ')';
}
@@ -175,12 +227,12 @@ class Fetcher
}
}
// Reference bound parameters
$statement->bindParam(':upper_rid', $this->upper_rid, PDO::PARAM_INT);
$statement->bindParam(':bound', $this->lastLimit, PDO::PARAM_INT);
$statement->bindParam(':limit', $this->batchSize, PDO::PARAM_INT);
$this->statement = $statement;
} else {
// Inject own query parameters
$params[':upper_rid'] = $this->upper_rid;
$params[':bound'] = $this->lastLimit;
$params[':limit'] = $this->batchSize;
$types[':offset'] = $types[':limit'] = PDO::PARAM_INT;

View File

@@ -0,0 +1,73 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2019 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
use Alchemy\Phrasea\Application;
class patch_410alpha26a implements patchInterface
{
/** @var string */
private $release = '4.1.0-alpha.26a';
/** @var array */
private $concern = [base::DATA_BOX];
/**
* Returns the release version.
*
* @return string
*/
public function get_release()
{
return $this->release;
}
/**
* {@inheritdoc}
*/
public function concern()
{
return $this->concern;
}
/**
* {@inheritdoc}
*/
public function require_all_upgrades()
{
return false;
}
/**
* {@inheritdoc}
*/
public function getDoctrineMigrations()
{
return [];
}
/**
* {@inheritdoc}
*/
public function apply(base $databox, Application $app)
{
$sql = "ALTER TABLE `record` ADD INDEX `moddate` (`moddate`);";
try {
$stmt = $databox->get_connection()->prepare($sql);
$stmt->execute();
$stmt->closeCursor();
}
catch (\Exception $e) {
// the index already exists ?
}
return true;
}
}

View File

@@ -3284,6 +3284,13 @@
<field>parent_record_id</field>
</fields>
</index>
<index>
<name>moddate</name>
<type>INDEX</type>
<fields>
<field>moddate</field>
</fields>
</index>
</indexes>
<engine>InnoDB</engine>
</table>