Add reconnectable statement

This commit is contained in:
Nicolas Le Goff
2014-04-01 22:24:43 +02:00
parent cc276db6bd
commit 6f66f1de19
12 changed files with 314 additions and 293 deletions

View File

@@ -12,174 +12,66 @@
use Alchemy\Phrasea\Application;
/**
*
* @license http://opensource.org/licenses/gpl-3.0 GPLv3
* @link www.phraseanet.com
* Pool of PDO connections to phraseanet databoxes and appbox;
*/
class connection
{
/**
*
* @var Array
*/
private static $_PDO_instance = array();
private static $_PDO_instances = array();
private static $_self;
/**
*
* @var boolean
*/
private static $_selfinstance;
/**
*
* @var Array
*/
public static $log = array();
protected $app;
public function __construct(Application $app)
{
$this->app = $app;
}
/**
*
*/
public function __destruct()
{
self::printLog($this->app);
return;
}
/**
*
* @return Void
*/
public static function printLog(Application $app)
{
if (!$app['debug']) {
return;
}
$totalTime = 0;
foreach (self::$log as $entry) {
$query = $entry['query'];
do {
$query = str_replace(array("\n", " "), " ", $query);
} while ($query != str_replace(array("\n", " "), " ", $query));
$totalTime += $entry['time'];
$string = $entry['time'] . "\t" . ' - ' . $query . ' - ' . "\n";
file_put_contents(__DIR__ . '/../../logs/mysql_log.log', $string, FILE_APPEND);
}
$string = count(self::$log) . ' queries - ' . $totalTime
. "\nEND OF QUERY " . $_SERVER['PHP_SELF']
. "?";
foreach ($_GET as $key => $value) {
$string .= $key . ' = ' . $value . ' & ';
}
$string .= "\nPOST datas :\n ";
foreach ($_POST as $key => $value) {
$string .= "\t\t" . $key . ' = ' . (is_scalar($value) ? $value : 'non-scalar value') . "\n";
}
$string .= "\n\n\n\n";
file_put_contents(__DIR__ . '/../../logs/mysql_log.log', $string, FILE_APPEND);
return;
}
/**
*
* @return type
*/
protected static function instantiate(Application $app)
{
if (!self::$_selfinstance)
self::$_selfinstance = new self($app);
return;
}
/**
*
* @param Application $app
* @param string $name
*
* @return connection_pdo
*/
public static function getPDOConnection(Application $app, $name = null)
{
self::instantiate($app);
if (trim($name) == '') {
$name = 'appbox';
} elseif (is_int((int) $name)) {
$name = (int) $name;
} else {
if (!self::$_self) {
self::$_self = new self();
}
if (null !== $name && !is_numeric($name)) {
return false;
}
if (!isset(self::$_PDO_instance[$name])) {
$hostname = $port = $user = $password = $dbname = false;
$connection_params = array();
if (trim($name) !== 'appbox') {
$connection_params = phrasea::sbas_params($app);
} else {
$connexion = $app['phraseanet.configuration']['main']['database'];
$hostname = $connexion['host'];
$port = $connexion['port'];
$user = $connexion['user'];
$password = $connexion['password'];
$dbname = $connexion['dbname'];
}
if (isset($connection_params[$name])) {
$hostname = $connection_params[$name]['host'];
$port = $connection_params[$name]['port'];
$user = $connection_params[$name]['user'];
$password = $connection_params[$name]['pwd'];
$dbname = $connection_params[$name]['dbname'];
}
try {
self::$_PDO_instance[$name] = new connection_pdo($name, $hostname, $port, $user, $password, $dbname, array(), $app['debug']);
} catch (\Exception $e) {
throw new Exception('Connection not available');
}
}
if (array_key_exists($name, self::$_PDO_instance)) {
return self::$_PDO_instance[$name];
if (null === $name) {
$name = 'appbox';
} else {
$name = (int) $name;
}
throw new Exception('Connection not available');
if (isset(self::$_PDO_instances[$name]) && self::$_PDO_instances[$name] instanceof \connection_pdo) {
return self::$_PDO_instances[$name];
}
$hostname = $port = $user = $password = $databaseName = false;
if (trim($name) !== 'appbox') {
$params = phrasea::sbas_params($app);
if (!isset($params[$name])) {
throw new \Exception(sprintf('Unknown connection parameters for databox "%s"', $name));
}
$hostname = $params[$name]['host'];
$port = $params[$name]['port'];
$user = $params[$name]['user'];
$password = $params[$name]['pwd'];
$databaseName = $params[$name]['dbname'];
} else {
$params = $app['phraseanet.configuration']['main']['database'];
$hostname = $params['host'];
$port = $params['port'];
$user = $params['user'];
$password = $params['password'];
$databaseName = $params['dbname'];
}
try {
return self::$_PDO_instances[$name] = new \connection_pdo($name, $hostname, $port, $user, $password, $databaseName, array(), $app['debug']);
} catch (\Exception $e) {
throw new \Exception('Connection not available', 0, $e);
}
}
/**
*
* @param type $name
* @return type
*/
public static function close_PDO_connection($name)
public function __destruct()
{
if (isset(self::$_PDO_instance[$name])) {
self::$_PDO_instance[$name]->disconnect();
self::$_PDO_instance[$name] = null;
unset(self::$_PDO_instance[$name]);
}
return;
}
public static function close_connections()
{
foreach (self::$_PDO_instance as $name => $conn) {
self::close_PDO_connection($name);
foreach (self::$_PDO_instances as $conn) {
$conn->close();
}
self::$_PDO_instances = array();
return;
}

View File

@@ -9,18 +9,16 @@
* file that was distributed with this source code.
*/
/**
*
*
* @license http://opensource.org/licenses/gpl-3.0 GPLv3
* @link www.phraseanet.com
*/
abstract class connection_abstract
{
protected $name;
protected $name = null;
protected $dsn = null;
protected $credentials = array();
protected $multi_db = true;
protected $connection;
protected $connection = null;
abstract public function close();
abstract public function connect();
public function get_credentials()
{
@@ -32,10 +30,6 @@ abstract class connection_abstract
return $this->multi_db;
}
/**
*
* @return string
*/
public function get_name()
{
return $this->name;
@@ -43,29 +37,35 @@ abstract class connection_abstract
public function ping()
{
if (null === $this->connection) {
$this->initConn();
if (false === $this->is_connected()) {
return false;
}
try {
$this->connection->query('SELECT 1');
} catch (PDOException $e) {
} catch (\PDOException $e) {
return false;
}
return true;
}
/**
*
* @return string
*/
public function server_info()
{
if (null === $this->connection) {
$this->initConn();
if (false === $this->ping()) {
throw new \Exception('Mysql server is not reachable');
}
return $this->connection->getAttribute(constant("PDO::ATTR_SERVER_VERSION"));
}
public function __destruct()
{
$this->close();
}
protected function is_connected()
{
return $this->connection !== null;
}
}

View File

@@ -9,12 +9,6 @@
* file that was distributed with this source code.
*/
/**
*
*
* @license http://opensource.org/licenses/gpl-3.0 GPLv3
* @link www.phraseanet.com
*/
interface connection_interface
{
public function ping();

View File

@@ -9,134 +9,163 @@
* file that was distributed with this source code.
*/
/**
*
* @package connection
* @license http://opensource.org/licenses/gpl-3.0 GPLv3
* @link www.phraseanet.com
*/
class connection_pdo extends connection_abstract implements connection_interface
{
protected $debug;
protected $retryFrequency = 2;
protected $retryNumber = 1;
/**
*
* @param string $name
* @param string $hostname
* @param int $port
* @param string $user
* @param string $passwd
* @param string $dbname
* @param array $options
* @param Boolean $debug
*
* @return connection_pdo
*/
public function __construct($name, $hostname, $port, $user, $passwd, $dbname = false, $options = array(), $debug = false)
public function __construct($name, $hostname, $port, $user, $password, $databaseName = false, $options = array(), $debug = false)
{
$this->debug = $debug;
$this->name = $name;
$this->credentials['hostname'] = $hostname;
$this->credentials['port'] = $port;
$this->credentials['user'] = $user;
$this->credentials['password'] = $passwd;
$this->credentials['password'] = $password;
if ($databaseName) {
$this->credentials['dbname'] = $databaseName;
}
if ($dbname)
$this->credentials['dbname'] = $dbname;
$this->dsn = $this->buildDataSourceName($hostname, $port, $databaseName);
$this->initConn();
$this->connect();
return $this;
}
protected function initConn()
public function setRetryNumber($retryNumber)
{
$this->connection = null;
if (isset($this->credentials['dbname']))
$dsn = 'mysql:dbname=' . $this->credentials['dbname'] . ';host=' . $this->credentials['hostname'] . ';port=' . $this->credentials['port'] . ';';
else
$dsn = 'mysql:host=' . $this->credentials['hostname'] . ';port=' . $this->credentials['port'] . ';';
$this->connection = new \PDO($dsn, $this->credentials['user'], $this->credentials['password'], array());
$this->connection->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$this->connection->exec("
SET character_set_results = 'utf8', character_set_client = 'utf8',
character_set_connection = 'utf8', character_set_database = 'utf8',
character_set_server = 'utf8'");
$this->retryNumber = $retryNumber;
}
/**
*
* @return void
*/
public function disconnect()
public function setRetryFrequency($retryFrequency)
{
$this->connection = null;
$this->retryFrequency = $retryFrequency;
}
public function connect()
{
// already connected do not reconnect
if ($this->ping()) {
return;
}
// if disconnected close connection
$this->close();
$tries = $this->retryNumber;
$infiniteMode = $this->retryNumber <= 0;
$lastException = null;
do {
if (!$infiniteMode) {
$tries--;
}
try {
$this->init();
} catch (\PDOException $e) {
$this->connection = null;
$lastException = $e;
// wait only if there is at least one try remaining or in infinite mode
// && connection has not been initialized
if ($infiniteMode || (!$infiniteMode && $tries !== 0)) {
sleep($this->retryFrequency);
}
}
} while (!$this->is_connected() && ($infiniteMode || (!$infiniteMode && $tries > 0)));
if (!$this->is_connected()) {
throw new Exception(sprintf('Failed to connect to "%s" database', $this->dsn), 0, $lastException);
}
}
/**
*
* @return void
*/
public function close()
{
connection::close_PDO_connection($this->name);
$this->connection = null;
}
public function __call($method, $args)
{
if (null === $this->connection) {
$this->initConn();
if (false === $this->is_connected()) {
throw new \LogicException('No connection available');
}
if (!method_exists($this->connection, $method)) {
throw new \BadMethodCallException(sprintf('Method %s does not exist', $method));
}
$tries = 0;
do {
$tries++;
try {
set_error_handler(function ($errno, $errstr) {
if (false !== strpos($errstr, 'Error while sending QUERY packet')) {
throw new \Exception('MySQL server has gone away');
}
throw new \Exception($errstr);
});
if ('prepare' === $method && $this->debug) {
$ret = new connection_pdoStatementDebugger(call_user_func_array(array($this->connection, $method), $args));
} else {
$ret = call_user_func_array(array($this->connection, $method), $args);
try {
set_error_handler(function ($errno, $errstr) {
if (false !== strpos($errstr, 'Error while sending QUERY packet')) {
throw new \Exception('MySQL server has gone away');
}
restore_error_handler();
throw new \Exception($errstr);
});
return $ret;
} catch (\Exception $e) {
restore_error_handler();
$returnValue = $this->doMethodCall($method, $args);
$found = (false !== strpos($e->getMessage(), 'MySQL server has gone away')) || (false !== strpos($e->getMessage(), 'errno=32 Broken pipe'));
if ($tries >= 2 || !$found) {
throw $e;
}
$this->initConn();
restore_error_handler();
return $returnValue;
} catch (\Exception $e) {
restore_error_handler();
$unreachable = (false !== strpos($e->getMessage(), 'MySQL server has gone away')) || (false !== strpos($e->getMessage(), 'errno=32 Broken pipe'));
if (!$unreachable) {
throw $e;
}
} while (true);
$this->connect();
}
return $this->doMethodCall($method, $args);
}
/**
*
* @param string $message
* @return connection_pdo
*/
protected function log($message)
protected function init()
{
file_put_contents(__DIR__ . '/../../../logs/sql_log.log', $message . "\n", FILE_APPEND);
if ($this->is_connected()) {
return;
}
return $this;
$this->connection = new \PDO($this->dsn, $this->credentials['user'], $this->credentials['password'], array());
$this->connection->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
$this->connection->exec("
SET character_set_results = 'utf8', character_set_client = 'utf8',
character_set_connection = 'utf8', character_set_database = 'utf8',
character_set_server = 'utf8'
");
}
private function doMethodCall($method, $args)
{
if ('prepare' === $method) {
$pdoStatement = call_user_func_array(array($this->connection, $method), $args);
$statement = new connection_pdoStatement($pdoStatement);
// decorate statement with reconnectable statement
$statement = new connection_pdoStatementReconnectable($statement, $this);
if ($this->debug) {
// decorate reconnectable statement with debugger one
$statement = new connection_pdoStatementDebugger($statement);
}
return $statement;
}
return call_user_func_array(array($this->connection, $method), $args);
}
private function buildDataSourceName($host, $port, $databaseName = null)
{
if (isset($databaseName)) {
return sprintf('mysql:dbname=%s;host=%s;port=%s;', $databaseName, $host, $port);
}
return sprintf('host=%s;port=%s;', $host, $port);
}
}

View File

@@ -0,0 +1,37 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
class connection_pdoStatement implements \connection_statement
{
protected $statement;
public function __construct(\PDOStatement $statement)
{
$this->statement = $statement;
return $this;
}
public function getQueryString()
{
return $this->statement->queryString;
}
public function execute($params = array())
{
return $this->statement->execute($params);
}
public function __call($function_name, $parameters)
{
return call_user_func_array(array($this->statement, $function_name), $parameters);
}
}

View File

@@ -9,26 +9,27 @@
* file that was distributed with this source code.
*/
/**
*
* @license http://opensource.org/licenses/gpl-3.0 GPLv3
* @link www.phraseanet.com
*/
class connection_pdoStatementDebugger
{
/**
*
* @var PDOStatement
*/
protected $statement;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
public function __construct(PDOStatement $statement)
class connection_pdoStatementDebugger implements connection_statement
{
protected $statement;
protected $logger;
public function __construct(\connection_statement $statement, Logger $logger = null)
{
$this->statement = $statement;
$this->logger = $logger ?: new Logger('sql-query', [new StreamHandler(__DIR__ . '/../../../logs/mysql_log.log')]);
return $this;
}
public function getQueryString()
{
return $this->statement->getQueryString();
}
public function execute($params = array())
{
$start = microtime(true);
@@ -38,13 +39,17 @@ class connection_pdoStatementDebugger
} catch (\Exception $e) {
$exception = $e;
}
$time = microtime(true) - $start;
connection::$log[] = array(
'query' => '' . str_replace(array_keys($params), array_values($params), $this->statement->queryString),
'time' => $time
);
if ($exception instanceof Exception)
$this->logger->addInfo(sprintf(
'%s sec - %s - %s',
round(microtime(true) - $start, 4),
$exception !== null ? 'ERROR QUERY' : 'OK QUERY',
str_replace(array_keys($params), array_values($params), $this->getQueryString())
));
if ($exception instanceof \Exception) {
throw $exception;
}
return $result;
}

View File

@@ -0,0 +1,52 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
class connection_pdoStatementReconnectable implements \connection_statement
{
protected $queryString;
protected $statement;
protected $conn;
public function __construct(\connection_statement $statement, \connection_pdo $conn)
{
$this->statement = $statement;
$this->conn = $conn;
return $this;
}
public function getQueryString()
{
return $this->statement->getQueryString();
}
public function execute($params = array())
{
try {
return $this->statement->execute($params);
} catch (\Exception $e) {
$unreachable = ($e->getCode() === 2006) || (false !== strpos($e->getMessage(), 'MySQL server has gone away')) || (false !== strpos($e->getMessage(), 'errno=32 Broken pipe'));
if (!$unreachable) {
throw $e;
}
$this->conn->connect();
}
// retry query with update statement
$this->statement = $this->conn->prepare($this->getQueryString());
return $this->statement->execute($params);
}
public function __call($function_name, $parameters)
{
return call_user_func_array(array($this->statement, $function_name), $parameters);
}
}

View File

@@ -0,0 +1,16 @@
<?php
/*
* This file is part of Phraseanet
*
* (c) 2005-2014 Alchemy
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
interface connection_statement
{
public function execute($params = array());
public function getQueryString();
}

View File

@@ -1652,6 +1652,7 @@ class record_adapter implements record_Interface, cache_cacheableInterface
$pathdest = $this->generateSubdefPathname($subdef, $app['filesystem'], $pathdest);
$app['task-manager.logger']->addInfo(sprintf('Generating subdef %s to %s', $subdefname, $pathdest));
$this->generate_subdef($app['media-alchemyst'], $subdef, $pathdest, $app['task-manager.logger']);
if (file_exists($pathdest)) {

View File

@@ -15,19 +15,16 @@ class InstallerTest extends \PHPUnit_Framework_TestCase
public function setUp()
{
parent::setUp();
\connection::close_connections();
}
public function tearDown()
{
\connection::close_connections();
parent::tearDown();
}
public static function tearDownAfterClass()
{
$app = new Application('test');
\connection::close_connections();
\phrasea::reset_sbasDatas($app['phraseanet.appbox']);
\phrasea::reset_baseDatas($app['phraseanet.appbox']);
parent::tearDownAfterClass();

View File

@@ -106,8 +106,6 @@ abstract class PhraseanetPHPUnitAbstract extends WebTestCase
parent::setUp();
connection::close_connections();
error_reporting(-1);
\PHPUnit_Framework_Error_Warning::$enabled = true;

View File

@@ -8,6 +8,6 @@ class connectionTest extends \PhraseanetPHPUnitAbstract
$conn->exec('SET @@local.wait_timeout= 1');
usleep(1200000);
$conn->exec('SHOW DATABASES');
connection::close_connections();
$conn->close();
}
}