12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402 |
- <?php
- /**
- * Database load balancing manager
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- */
- namespace Wikimedia\Rdbms;
- use Psr\Log\LoggerInterface;
- use Psr\Log\NullLogger;
- use Wikimedia\ScopedCallback;
- use BagOStuff;
- use EmptyBagOStuff;
- use WANObjectCache;
- use ArrayUtils;
- use LogicException;
- use UnexpectedValueException;
- use InvalidArgumentException;
- use RuntimeException;
- use Exception;
- /**
- * Database connection, tracking, load balancing, and transaction manager for a cluster
- *
- * @ingroup Database
- */
- class LoadBalancer implements ILoadBalancer {
- /** @var ILoadMonitor */
- private $loadMonitor;
- /** @var callable|null Callback to run before the first connection attempt */
- private $chronologyCallback;
- /** @var BagOStuff */
- private $srvCache;
- /** @var WANObjectCache */
- private $wanCache;
- /** @var mixed Class name or object With profileIn/profileOut methods */
- private $profiler;
- /** @var TransactionProfiler */
- private $trxProfiler;
- /** @var LoggerInterface */
- private $replLogger;
- /** @var LoggerInterface */
- private $connLogger;
- /** @var LoggerInterface */
- private $queryLogger;
- /** @var LoggerInterface */
- private $perfLogger;
- /** @var callable Exception logger */
- private $errorLogger;
- /** @var callable Deprecation logger */
- private $deprecationLogger;
- /** @var DatabaseDomain Local DB domain ID and default for selectDB() calls */
- private $localDomain;
- /**
- * @var IDatabase[][][]|Database[][][] Map of (connection category => server index => IDatabase[])
- */
- private $conns;
- /** @var array[] Map of (server index => server config array) */
- private $servers;
- /** @var array[] Map of (group => server index => weight) */
- private $groupLoads;
- /** @var bool Whether to disregard replica DB lag as a factor in replica DB selection */
- private $allowLagged;
- /** @var int Seconds to spend waiting on replica DB lag to resolve */
- private $waitTimeout;
- /** @var array The LoadMonitor configuration */
- private $loadMonitorConfig;
- /** @var string Alternate local DB domain instead of DatabaseDomain::getId() */
- private $localDomainIdAlias;
- /** @var int Amount of replication lag, in seconds, that is considered "high" */
- private $maxLag;
- /** @var string|null Default query group to use with getConnection() */
- private $defaultGroup;
- /** @var string Current server name */
- private $hostname;
- /** @var bool Whether this PHP instance is for a CLI script */
- private $cliMode;
- /** @var string Agent name for query profiling */
- private $agent;
- /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
- private $tableAliases = [];
- /** @var string[] Map of (index alias => index) */
- private $indexAliases = [];
- /** @var callable[] Map of (name => callable) */
- private $trxRecurringCallbacks = [];
- /** @var bool[] Map of (domain => whether to use "temp tables only" mode) */
- private $tempTablesOnlyMode = [];
- /** @var string|bool Explicit DBO_TRX transaction round active or false if none */
- private $trxRoundId = false;
- /** @var string Stage of the current transaction round in the transaction round life-cycle */
- private $trxRoundStage = self::ROUND_CURSORY;
- /** @var Database Connection handle that caused a problem */
- private $errorConnection;
- /** @var int[] The group replica server indexes keyed by group */
- private $readIndexByGroup = [];
- /** @var bool|DBMasterPos Replication sync position or false if not set */
- private $waitForPos;
- /** @var bool Whether the generic reader fell back to a lagged replica DB */
- private $laggedReplicaMode = false;
- /** @var string The last DB selection or connection error */
- private $lastError = 'Unknown error';
- /** @var string|bool Reason this instance is read-only or false if not */
- private $readOnlyReason = false;
- /** @var int Total number of new connections ever made with this instance */
- private $connectionCounter = 0;
- /** @var bool */
- private $disabled = false;
- /** @var bool Whether any connection has been attempted yet */
- private $connectionAttempted = false;
- /** var int An identifier for this class instance */
- private $id;
- /** @var int|null Integer ID of the managing LBFactory instance or null if none */
- private $ownerId;
- /** @var int Warn when this many connection are held */
- const CONN_HELD_WARN_THRESHOLD = 10;
- /** @var int Default 'maxLag' when unspecified */
- const MAX_LAG_DEFAULT = 6;
- /** @var int Default 'waitTimeout' when unspecified */
- const MAX_WAIT_DEFAULT = 10;
- /** @var int Seconds to cache master DB server read-only status */
- const TTL_CACHE_READONLY = 5;
- const KEY_LOCAL = 'local';
- const KEY_FOREIGN_FREE = 'foreignFree';
- const KEY_FOREIGN_INUSE = 'foreignInUse';
- const KEY_LOCAL_NOROUND = 'localAutoCommit';
- const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
- const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
- /** @var string Transaction round, explicit or implicit, has not finished writing */
- const ROUND_CURSORY = 'cursory';
- /** @var string Transaction round writes are complete and ready for pre-commit checks */
- const ROUND_FINALIZED = 'finalized';
- /** @var string Transaction round passed final pre-commit checks */
- const ROUND_APPROVED = 'approved';
- /** @var string Transaction round was committed and post-commit callbacks must be run */
- const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
- /** @var string Transaction round was rolled back and post-rollback callbacks must be run */
- const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
- /** @var string Transaction round encountered an error */
- const ROUND_ERROR = 'error';
- public function __construct( array $params ) {
- if ( !isset( $params['servers'] ) || !count( $params['servers'] ) ) {
- throw new InvalidArgumentException( 'Missing or empty "servers" parameter' );
- }
- $listKey = -1;
- $this->servers = [];
- $this->groupLoads = [ self::GROUP_GENERIC => [] ];
- foreach ( $params['servers'] as $i => $server ) {
- if ( ++$listKey !== $i ) {
- throw new UnexpectedValueException( 'List expected for "servers" parameter' );
- }
- if ( $i == 0 ) {
- $server['master'] = true;
- } else {
- $server['replica'] = true;
- }
- $this->servers[$i] = $server;
- foreach ( ( $server['groupLoads'] ?? [] ) as $group => $ratio ) {
- $this->groupLoads[$group][$i] = $ratio;
- }
- $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load'];
- }
- $localDomain = isset( $params['localDomain'] )
- ? DatabaseDomain::newFromId( $params['localDomain'] )
- : DatabaseDomain::newUnspecified();
- $this->setLocalDomain( $localDomain );
- $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
- $this->conns = self::newTrackedConnectionsArray();
- $this->waitForPos = false;
- $this->allowLagged = false;
- if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
- $this->readOnlyReason = $params['readOnlyReason'];
- }
- $this->maxLag = $params['maxLag'] ?? self::MAX_LAG_DEFAULT;
- $this->loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ];
- $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
- $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
- $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
- $this->profiler = $params['profiler'] ?? null;
- $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
- $this->errorLogger = $params['errorLogger'] ?? function ( Exception $e ) {
- trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
- };
- $this->deprecationLogger = $params['deprecationLogger'] ?? function ( $msg ) {
- trigger_error( $msg, E_USER_DEPRECATED );
- };
- foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
- $this->$key = $params[$key] ?? new NullLogger();
- }
- $this->hostname = $params['hostname'] ?? ( gethostname() ?: 'unknown' );
- $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
- $this->agent = $params['agent'] ?? '';
- if ( isset( $params['chronologyCallback'] ) ) {
- $this->chronologyCallback = $params['chronologyCallback'];
- }
- if ( isset( $params['roundStage'] ) ) {
- if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
- $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
- } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
- $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
- }
- }
- $group = $params['defaultGroup'] ?? self::GROUP_GENERIC;
- $this->defaultGroup = isset( $this->groupLoads[$group] ) ? $group : self::GROUP_GENERIC;
- static $nextId;
- $this->id = $nextId = ( is_int( $nextId ) ? $nextId++ : mt_rand() );
- $this->ownerId = $params['ownerId'] ?? null;
- }
- private static function newTrackedConnectionsArray() {
- return [
- // Connection were transaction rounds may be applied
- self::KEY_LOCAL => [],
- self::KEY_FOREIGN_INUSE => [],
- self::KEY_FOREIGN_FREE => [],
- // Auto-committing counterpart connections that ignore transaction rounds
- self::KEY_LOCAL_NOROUND => [],
- self::KEY_FOREIGN_INUSE_NOROUND => [],
- self::KEY_FOREIGN_FREE_NOROUND => []
- ];
- }
- public function getLocalDomainID() {
- return $this->localDomain->getId();
- }
- public function resolveDomainID( $domain ) {
- if ( $domain === $this->localDomainIdAlias || $domain === false ) {
- // Local connection requested via some backwards-compatibility domain alias
- return $this->getLocalDomainID();
- }
- return (string)$domain;
- }
- /**
- * Resolve $groups into a list of query groups defining as having database servers
- *
- * @param string[]|string|bool $groups Query group(s) in preference order, [], or false
- * @param int $i Specific server index or DB_MASTER/DB_REPLICA
- * @return string[] Non-empty group list in preference order with the default group appended
- */
- private function resolveGroups( $groups, $i ) {
- // If a specific replica server was specified, then $groups makes no sense
- if ( $i > 0 && $groups !== [] && $groups !== false ) {
- $list = implode( ', ', (array)$groups );
- throw new LogicException( "Query group(s) ($list) given with server index (#$i)" );
- }
- if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) {
- $resolvedGroups = [ $this->defaultGroup ]; // common case
- } elseif ( is_string( $groups ) && isset( $this->groupLoads[$groups] ) ) {
- $resolvedGroups = [ $groups, $this->defaultGroup ];
- } elseif ( is_array( $groups ) ) {
- $resolvedGroups = array_keys( array_flip( $groups ) + [ self::GROUP_GENERIC => 1 ] );
- } else {
- $resolvedGroups = [ $this->defaultGroup ];
- }
- return $resolvedGroups;
- }
- /**
- * @param int $flags Bitfield of class CONN_* constants
- * @param int $i Specific server index or DB_MASTER/DB_REPLICA
- * @param string $domain Database domain
- * @return int Sanitized bitfield
- */
- private function sanitizeConnectionFlags( $flags, $i, $domain ) {
- // Whether an outside caller is explicitly requesting the master database server
- if ( $i === self::DB_MASTER || $i === $this->getWriterIndex() ) {
- $flags |= self::CONN_INTENT_WRITABLE;
- }
- if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
- // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
- // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
- // during larger transactions. This is useful for avoiding lock contention.
- // Master DB server attributes (should match those of the replica DB servers)
- $attributes = $this->getServerAttributes( $this->getWriterIndex() );
- if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
- // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
- // to use separate connections would just cause self-deadlocks. Note that
- // REPEATABLE-READ staleness is not an issue since DB-level locking means
- // that transactions are Strict Serializable anyway.
- $flags &= ~self::CONN_TRX_AUTOCOMMIT;
- $type = $this->getServerType( $this->getWriterIndex() );
- $this->connLogger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
- } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
- // T202116: integration tests are active and queries should be all be using
- // temporary clone tables (via prefix). Such tables are not visible accross
- // different connections nor can there be REPEATABLE-READ snapshot staleness,
- // so use the same connection for everything.
- $flags &= ~self::CONN_TRX_AUTOCOMMIT;
- }
- }
- return $flags;
- }
- /**
- * @param IDatabase $conn
- * @param int $flags
- * @throws DBUnexpectedError
- */
- private function enforceConnectionFlags( IDatabase $conn, $flags ) {
- if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
- if ( $conn->trxLevel() ) { // sanity
- throw new DBUnexpectedError(
- $conn,
- 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
- );
- }
- $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
- }
- }
- /**
- * Get a LoadMonitor instance
- *
- * @return ILoadMonitor
- */
- private function getLoadMonitor() {
- if ( !isset( $this->loadMonitor ) ) {
- $compat = [
- 'LoadMonitor' => LoadMonitor::class,
- 'LoadMonitorNull' => LoadMonitorNull::class,
- 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
- ];
- $class = $this->loadMonitorConfig['class'];
- if ( isset( $compat[$class] ) ) {
- $class = $compat[$class];
- }
- $this->loadMonitor = new $class(
- $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
- $this->loadMonitor->setLogger( $this->replLogger );
- }
- return $this->loadMonitor;
- }
- /**
- * @param array $loads
- * @param bool|string $domain Domain to get non-lagged for
- * @param int $maxLag Restrict the maximum allowed lag to this many seconds
- * @return bool|int|string
- */
- private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
- $lags = $this->getLagTimes( $domain );
- # Unset excessively lagged servers
- foreach ( $lags as $i => $lag ) {
- if ( $i !== $this->getWriterIndex() ) {
- # How much lag this server nominally is allowed to have
- $maxServerLag = $this->servers[$i]['max lag'] ?? $this->maxLag; // default
- # Constrain that futher by $maxLag argument
- $maxServerLag = min( $maxServerLag, $maxLag );
- $host = $this->getServerName( $i );
- if ( $lag === false && !is_infinite( $maxServerLag ) ) {
- $this->replLogger->debug(
- __METHOD__ .
- ": server {host} is not replicating?", [ 'host' => $host ] );
- unset( $loads[$i] );
- } elseif ( $lag > $maxServerLag ) {
- $this->replLogger->debug(
- __METHOD__ .
- ": server {host} has {lag} seconds of lag (>= {maxlag})",
- [ 'host' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
- );
- unset( $loads[$i] );
- }
- }
- }
- # Find out if all the replica DBs with non-zero load are lagged
- $sum = 0;
- foreach ( $loads as $load ) {
- $sum += $load;
- }
- if ( $sum == 0 ) {
- # No appropriate DB servers except maybe the master and some replica DBs with zero load
- # Do NOT use the master
- # Instead, this function will return false, triggering read-only mode,
- # and a lagged replica DB will be used instead.
- return false;
- }
- if ( count( $loads ) == 0 ) {
- return false;
- }
- # Return a random representative of the remainder
- return ArrayUtils::pickRandom( $loads );
- }
- /**
- * Get the server index to use for a specified server index and query group list
- *
- * @param int $i Specific server index or DB_MASTER/DB_REPLICA
- * @param string[] $groups Non-empty query group list in preference order
- * @param string|bool $domain
- * @return int A specific server index (replica DBs are checked for connectivity)
- */
- private function getConnectionIndex( $i, array $groups, $domain ) {
- if ( $i === self::DB_MASTER ) {
- $i = $this->getWriterIndex();
- } elseif ( $i === self::DB_REPLICA ) {
- foreach ( $groups as $group ) {
- $groupIndex = $this->getReaderIndex( $group, $domain );
- if ( $groupIndex !== false ) {
- $i = $groupIndex; // group connection succeeded
- break;
- }
- }
- } elseif ( !isset( $this->servers[$i] ) ) {
- throw new UnexpectedValueException( "Invalid server index index #$i" );
- }
- if ( $i === self::DB_REPLICA ) {
- $this->lastError = 'Unknown error'; // set here in case of worse failure
- $this->lastError = 'No working replica DB server: ' . $this->lastError;
- $this->reportConnectionError();
- return null; // unreachable due to exception
- }
- return $i;
- }
- public function getReaderIndex( $group = false, $domain = false ) {
- if ( $this->getServerCount() == 1 ) {
- // Skip the load balancing if there's only one server
- return $this->getWriterIndex();
- }
- $group = is_string( $group ) ? $group : self::GROUP_GENERIC;
- $index = $this->getExistingReaderIndex( $group );
- if ( $index >= 0 ) {
- // A reader index was already selected and "waitForPos" was handled
- return $index;
- }
- // Use the server weight array for this load group
- if ( isset( $this->groupLoads[$group] ) ) {
- $loads = $this->groupLoads[$group];
- } else {
- $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
- return false;
- }
- // Scale the configured load ratios according to each server's load and state
- $this->getLoadMonitor()->scaleLoads( $loads, $domain );
- // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
- $this->lazyLoadReplicationPositions(); // optimizes server candidate selection
- list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
- if ( $i === false ) {
- // DB connection unsuccessful
- return false;
- }
- // If data seen by queries is expected to reflect the transactions committed as of
- // or after a given replication position then wait for the DB to apply those changes
- if ( $this->waitForPos && $i !== $this->getWriterIndex() && !$this->doWait( $i ) ) {
- // Data will be outdated compared to what was expected
- $laggedReplicaMode = true;
- }
- // Cache the reader index for future DB_REPLICA handles
- $this->setExistingReaderIndex( $group, $i );
- // Record whether the generic reader index is in "lagged replica DB" mode
- if ( $group === self::GROUP_GENERIC && $laggedReplicaMode ) {
- $this->laggedReplicaMode = true;
- }
- $serverName = $this->getServerName( $i );
- $this->connLogger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
- return $i;
- }
- /**
- * Get the server index chosen by the load balancer for use with the given query group
- *
- * @param string $group Query group; use false for the generic group
- * @return int Server index or -1 if none was chosen
- */
- protected function getExistingReaderIndex( $group ) {
- return $this->readIndexByGroup[$group] ?? -1;
- }
- /**
- * Set the server index chosen by the load balancer for use with the given query group
- *
- * @param string $group Query group; use false for the generic group
- * @param int $index The index of a specific server
- */
- private function setExistingReaderIndex( $group, $index ) {
- if ( $index < 0 ) {
- throw new UnexpectedValueException( "Cannot set a negative read server index" );
- }
- $this->readIndexByGroup[$group] = $index;
- }
- /**
- * @param array $loads List of server weights
- * @param string|bool $domain
- * @return array (reader index, lagged replica mode) or (false, false) on failure
- */
- private function pickReaderIndex( array $loads, $domain = false ) {
- if ( $loads === [] ) {
- throw new InvalidArgumentException( "Server configuration array is empty" );
- }
- /** @var int|bool $i Index of selected server */
- $i = false;
- /** @var bool $laggedReplicaMode Whether server is considered lagged */
- $laggedReplicaMode = false;
- // Quickly look through the available servers for a server that meets criteria...
- $currentLoads = $loads;
- while ( count( $currentLoads ) ) {
- if ( $this->allowLagged || $laggedReplicaMode ) {
- $i = ArrayUtils::pickRandom( $currentLoads );
- } else {
- $i = false;
- if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
- $this->replLogger->debug( __METHOD__ . ": replication positions detected" );
- // "chronologyCallback" sets "waitForPos" for session consistency.
- // This triggers doWait() after connect, so it's especially good to
- // avoid lagged servers so as to avoid excessive delay in that method.
- $ago = microtime( true ) - $this->waitForPos->asOfTime();
- // Aim for <= 1 second of waiting (being too picky can backfire)
- $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
- }
- if ( $i === false ) {
- // Any server with less lag than it's 'max lag' param is preferable
- $i = $this->getRandomNonLagged( $currentLoads, $domain );
- }
- if ( $i === false && count( $currentLoads ) ) {
- // All replica DBs lagged. Switch to read-only mode
- $this->replLogger->error(
- __METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
- $i = ArrayUtils::pickRandom( $currentLoads );
- $laggedReplicaMode = true;
- }
- }
- if ( $i === false ) {
- // pickRandom() returned false.
- // This is permanent and means the configuration or the load monitor
- // wants us to return false.
- $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
- return [ false, false ];
- }
- $serverName = $this->getServerName( $i );
- $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
- // Get a connection to this server without triggering other server connections
- $conn = $this->getServerConnection( $i, $domain, self::CONN_SILENCE_ERRORS );
- if ( !$conn ) {
- $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
- unset( $currentLoads[$i] ); // avoid this server next iteration
- $i = false;
- continue;
- }
- // Decrement reference counter, we are finished with this connection.
- // It will be incremented for the caller later.
- if ( $domain !== false ) {
- $this->reuseConnection( $conn );
- }
- // Return this server
- break;
- }
- // If all servers were down, quit now
- if ( $currentLoads === [] ) {
- $this->connLogger->error( __METHOD__ . ": all servers down" );
- }
- return [ $i, $laggedReplicaMode ];
- }
- public function waitFor( $pos ) {
- $oldPos = $this->waitForPos;
- try {
- $this->waitForPos = $pos;
- // If a generic reader connection was already established, then wait now
- $i = $this->getExistingReaderIndex( self::GROUP_GENERIC );
- if ( $i > 0 && !$this->doWait( $i ) ) {
- $this->laggedReplicaMode = true;
- }
- // Otherwise, wait until a connection is established in getReaderIndex()
- } finally {
- // Restore the older position if it was higher since this is used for lag-protection
- $this->setWaitForPositionIfHigher( $oldPos );
- }
- }
- public function waitForOne( $pos, $timeout = null ) {
- $oldPos = $this->waitForPos;
- try {
- $this->waitForPos = $pos;
- $i = $this->getExistingReaderIndex( self::GROUP_GENERIC );
- if ( $i <= 0 ) {
- // Pick a generic replica DB if there isn't one yet
- $readLoads = $this->groupLoads[self::GROUP_GENERIC];
- unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
- $readLoads = array_filter( $readLoads ); // with non-zero load
- $i = ArrayUtils::pickRandom( $readLoads );
- }
- if ( $i > 0 ) {
- $ok = $this->doWait( $i, true, $timeout );
- } else {
- $ok = true; // no applicable loads
- }
- } finally {
- // Restore the old position; this is used for throttling, not lag-protection
- $this->waitForPos = $oldPos;
- }
- return $ok;
- }
- public function waitForAll( $pos, $timeout = null ) {
- $timeout = $timeout ?: $this->waitTimeout;
- $oldPos = $this->waitForPos;
- try {
- $this->waitForPos = $pos;
- $serverCount = $this->getServerCount();
- $ok = true;
- for ( $i = 1; $i < $serverCount; $i++ ) {
- if ( $this->serverHasLoadInAnyGroup( $i ) ) {
- $start = microtime( true );
- $ok = $this->doWait( $i, true, $timeout ) && $ok;
- $timeout -= intval( microtime( true ) - $start );
- if ( $timeout <= 0 ) {
- break; // timeout reached
- }
- }
- }
- } finally {
- // Restore the old position; this is used for throttling, not lag-protection
- $this->waitForPos = $oldPos;
- }
- return $ok;
- }
- /**
- * @param int $i Specific server index
- * @return bool
- */
- private function serverHasLoadInAnyGroup( $i ) {
- foreach ( $this->groupLoads as $loadsByIndex ) {
- if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
- return true;
- }
- }
- return false;
- }
- /**
- * @param DBMasterPos|bool $pos
- */
- private function setWaitForPositionIfHigher( $pos ) {
- if ( !$pos ) {
- return;
- }
- if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
- $this->waitForPos = $pos;
- }
- }
- public function getAnyOpenConnection( $i, $flags = 0 ) {
- $i = ( $i === self::DB_MASTER ) ? $this->getWriterIndex() : $i;
- // Connection handles required to be in auto-commit mode use a separate connection
- // pool since the main pool is effected by implicit and explicit transaction rounds
- $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
- $conn = false;
- foreach ( $this->conns as $connsByServer ) {
- // Get the connection array server indexes to inspect
- if ( $i === self::DB_REPLICA ) {
- $indexes = array_keys( $connsByServer );
- } else {
- $indexes = isset( $connsByServer[$i] ) ? [ $i ] : [];
- }
- foreach ( $indexes as $index ) {
- $conn = $this->pickAnyOpenConnection( $connsByServer[$index], $autocommit );
- if ( $conn ) {
- break;
- }
- }
- }
- if ( $conn ) {
- $this->enforceConnectionFlags( $conn, $flags );
- }
- return $conn;
- }
- /**
- * @param IDatabase[] $candidateConns
- * @param bool $autocommit Whether to only look for auto-commit connections
- * @return IDatabase|false An appropriate open connection or false if none found
- */
- private function pickAnyOpenConnection( $candidateConns, $autocommit ) {
- $conn = false;
- foreach ( $candidateConns as $candidateConn ) {
- if ( !$candidateConn->isOpen() ) {
- continue; // some sort of error occured?
- } elseif (
- $autocommit &&
- (
- // Connection is transaction round aware
- !$candidateConn->getLBInfo( 'autoCommitOnly' ) ||
- // Some sort of error left a transaction open?
- $candidateConn->trxLevel()
- )
- ) {
- continue; // some sort of error left a transaction open?
- }
- $conn = $candidateConn;
- }
- return $conn;
- }
- /**
- * Wait for a given replica DB to catch up to the master pos stored in "waitForPos"
- * @param int $index Specific server index
- * @param bool $open Check the server even if a new connection has to be made
- * @param int|null $timeout Max seconds to wait; default is "waitTimeout"
- * @return bool
- */
- protected function doWait( $index, $open = false, $timeout = null ) {
- $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
- // Check if we already know that the DB has reached this point
- $server = $this->getServerName( $index );
- $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server, 'v1' );
- /** @var DBMasterPos $knownReachedPos */
- $knownReachedPos = $this->srvCache->get( $key );
- if (
- $knownReachedPos instanceof DBMasterPos &&
- $knownReachedPos->hasReached( $this->waitForPos )
- ) {
- $this->replLogger->debug(
- __METHOD__ .
- ': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
- [ 'dbserver' => $server ]
- );
- return true;
- }
- // Find a connection to wait on, creating one if needed and allowed
- $close = false; // close the connection afterwards
- $flags = self::CONN_SILENCE_ERRORS;
- $conn = $this->getAnyOpenConnection( $index, $flags );
- if ( !$conn ) {
- if ( !$open ) {
- $this->replLogger->debug(
- __METHOD__ . ': no connection open for {dbserver}',
- [ 'dbserver' => $server ]
- );
- return false;
- }
- // Get a connection to this server without triggering other server connections
- $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
- if ( !$conn ) {
- $this->replLogger->warning(
- __METHOD__ . ': failed to connect to {dbserver}',
- [ 'dbserver' => $server ]
- );
- return false;
- }
- // Avoid connection spam in waitForAll() when connections
- // are made just for the sake of doing this lag check.
- $close = true;
- }
- $this->replLogger->info(
- __METHOD__ .
- ': waiting for replica DB {dbserver} to catch up...',
- [ 'dbserver' => $server ]
- );
- $result = $conn->masterPosWait( $this->waitForPos, $timeout );
- if ( $result === null ) {
- $this->replLogger->warning(
- __METHOD__ . ': Errored out waiting on {host} pos {pos}',
- [
- 'host' => $server,
- 'pos' => $this->waitForPos,
- 'trace' => ( new RuntimeException() )->getTraceAsString()
- ]
- );
- $ok = false;
- } elseif ( $result == -1 ) {
- $this->replLogger->warning(
- __METHOD__ . ': Timed out waiting on {host} pos {pos}',
- [
- 'host' => $server,
- 'pos' => $this->waitForPos,
- 'trace' => ( new RuntimeException() )->getTraceAsString()
- ]
- );
- $ok = false;
- } else {
- $this->replLogger->debug( __METHOD__ . ": done waiting" );
- $ok = true;
- // Remember that the DB reached this point
- $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
- }
- if ( $close ) {
- $this->closeConnection( $conn );
- }
- return $ok;
- }
- public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
- $domain = $this->resolveDomainID( $domain );
- $groups = $this->resolveGroups( $groups, $i );
- $flags = $this->sanitizeConnectionFlags( $flags, $i, $domain );
- // If given DB_MASTER/DB_REPLICA, resolve it to a specific server index. Resolving
- // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex()
- // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration.
- // The use of getServerConnection() instead of getConnection() avoids infinite loops.
- $serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
- // Get an open connection to that server (might trigger a new connection)
- $conn = $this->getServerConnection( $serverIndex, $domain, $flags );
- // Set master DB handles as read-only if there is high replication lag
- if (
- $serverIndex === $this->getWriterIndex() &&
- $this->getLaggedReplicaMode( $domain ) &&
- !is_string( $conn->getLBInfo( 'readOnlyReason' ) )
- ) {
- $reason = ( $this->getExistingReaderIndex( self::GROUP_GENERIC ) >= 0 )
- ? 'The database is read-only until replication lag decreases.'
- : 'The database is read-only until replica database servers becomes reachable.';
- $conn->setLBInfo( 'readOnlyReason', $reason );
- }
- return $conn;
- }
- /**
- * @param int $i Specific server index
- * @param string $domain Resolved DB domain
- * @param int $flags Bitfield of class CONN_* constants
- * @return IDatabase|bool
- * @throws InvalidArgumentException When the server index is invalid
- */
- public function getServerConnection( $i, $domain, $flags = 0 ) {
- // Number of connections made before getting the server index and handle
- $priorConnectionsMade = $this->connectionCounter;
- // Get an open connection to this server (might trigger a new connection)
- $conn = $this->localDomain->equals( $domain )
- ? $this->getLocalConnection( $i, $flags )
- : $this->getForeignConnection( $i, $domain, $flags );
- // Throw an error or otherwise bail out if the connection attempt failed
- if ( !( $conn instanceof IDatabase ) ) {
- if ( ( $flags & self::CONN_SILENCE_ERRORS ) != self::CONN_SILENCE_ERRORS ) {
- $this->reportConnectionError();
- }
- return false;
- }
- // Profile any new connections caused by this method
- if ( $this->connectionCounter > $priorConnectionsMade ) {
- $this->trxProfiler->recordConnection(
- $conn->getServer(),
- $conn->getDBname(),
- ( ( $flags & self::CONN_INTENT_WRITABLE ) == self::CONN_INTENT_WRITABLE )
- );
- }
- if ( !$conn->isOpen() ) {
- $this->errorConnection = $conn;
- // Connection was made but later unrecoverably lost for some reason.
- // Do not return a handle that will just throw exceptions on use, but
- // let the calling code, e.g. getReaderIndex(), try another server.
- return false;
- }
- // Make sure that flags like CONN_TRX_AUTOCOMMIT are respected by this handle
- $this->enforceConnectionFlags( $conn, $flags );
- // Set master DB handles as read-only if the load balancer is configured as read-only
- // or the master database server is running in server-side read-only mode. Note that
- // replica DB handles are always read-only via Database::assertIsWritableMaster().
- // Read-only mode due to replication lag is *avoided* here to avoid recursion.
- if ( $i === $this->getWriterIndex() ) {
- if ( $this->readOnlyReason !== false ) {
- $readOnlyReason = $this->readOnlyReason;
- } elseif ( $this->isMasterConnectionReadOnly( $conn, $flags ) ) {
- $readOnlyReason = 'The master database server is running in read-only mode.';
- } else {
- $readOnlyReason = false;
- }
- $conn->setLBInfo( 'readOnlyReason', $readOnlyReason );
- }
- return $conn;
- }
- public function reuseConnection( IDatabase $conn ) {
- $serverIndex = $conn->getLBInfo( 'serverIndex' );
- $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
- if ( $serverIndex === null || $refCount === null ) {
- return; // non-foreign connection; no domain-use tracking to update
- } elseif ( $conn instanceof DBConnRef ) {
- // DBConnRef already handles calling reuseConnection() and only passes the live
- // Database instance to this method. Any caller passing in a DBConnRef is broken.
- $this->connLogger->error(
- __METHOD__ . ": got DBConnRef instance.\n" .
- ( new LogicException() )->getTraceAsString() );
- return;
- }
- if ( $this->disabled ) {
- return; // DBConnRef handle probably survived longer than the LoadBalancer
- }
- if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
- $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
- $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
- } else {
- $connFreeKey = self::KEY_FOREIGN_FREE;
- $connInUseKey = self::KEY_FOREIGN_INUSE;
- }
- $domain = $conn->getDomainID();
- if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
- throw new InvalidArgumentException(
- "Connection $serverIndex/$domain not found; it may have already been freed" );
- } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
- throw new InvalidArgumentException(
- "Connection $serverIndex/$domain mismatched; it may have already been freed" );
- }
- $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
- if ( $refCount <= 0 ) {
- $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
- unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
- if ( !$this->conns[$connInUseKey][$serverIndex] ) {
- unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
- }
- $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
- } else {
- $this->connLogger->debug( __METHOD__ .
- ": reference count for $serverIndex/$domain reduced to $refCount" );
- }
- }
- public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
- $domain = $this->resolveDomainID( $domain );
- $role = $this->getRoleFromIndex( $i );
- return new DBConnRef( $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
- }
- public function getLazyConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
- $domain = $this->resolveDomainID( $domain );
- $role = $this->getRoleFromIndex( $i );
- return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
- }
- public function getMaintenanceConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
- $domain = $this->resolveDomainID( $domain );
- $role = $this->getRoleFromIndex( $i );
- return new MaintainableDBConnRef(
- $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
- }
- /**
- * @param int $i Server index or DB_MASTER/DB_REPLICA
- * @return int One of DB_MASTER/DB_REPLICA
- */
- private function getRoleFromIndex( $i ) {
- return ( $i === self::DB_MASTER || $i === $this->getWriterIndex() )
- ? self::DB_MASTER
- : self::DB_REPLICA;
- }
- /**
- * @param int $i
- * @param string|bool $domain
- * @param int $flags
- * @return Database|bool Live database handle or false on failure
- * @deprecated Since 1.34 Use getConnection() instead
- */
- public function openConnection( $i, $domain = false, $flags = 0 ) {
- return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
- }
- /**
- * Open a connection to a local DB, or return one if it is already open.
- *
- * On error, returns false, and the connection which caused the
- * error will be available via $this->errorConnection.
- *
- * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
- *
- * @param int $i Server index
- * @param int $flags Class CONN_* constant bitfield
- * @return Database
- * @throws InvalidArgumentException When the server index is invalid
- * @throws UnexpectedValueException When the DB domain of the connection is corrupted
- */
- private function getLocalConnection( $i, $flags = 0 ) {
- // Connection handles required to be in auto-commit mode use a separate connection
- // pool since the main pool is effected by implicit and explicit transaction rounds
- $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
- $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
- if ( isset( $this->conns[$connKey][$i][0] ) ) {
- $conn = $this->conns[$connKey][$i][0];
- } else {
- // Open a new connection
- $server = $this->getServerInfoStrict( $i );
- $server['serverIndex'] = $i;
- $server['autoCommitOnly'] = $autoCommit;
- $conn = $this->reallyOpenConnection( $server, $this->localDomain );
- $host = $this->getServerName( $i );
- if ( $conn->isOpen() ) {
- $this->connLogger->debug(
- __METHOD__ . ": connected to database $i at '$host'." );
- $this->conns[$connKey][$i][0] = $conn;
- } else {
- $this->connLogger->warning(
- __METHOD__ . ": failed to connect to database $i at '$host'." );
- $this->errorConnection = $conn;
- $conn = false;
- }
- }
- // Final sanity check to make sure the right domain is selected
- if (
- $conn instanceof IDatabase &&
- !$this->localDomain->isCompatible( $conn->getDomainID() )
- ) {
- throw new UnexpectedValueException(
- "Got connection to '{$conn->getDomainID()}', " .
- "but expected local domain ('{$this->localDomain}')" );
- }
- return $conn;
- }
- /**
- * Open a connection to a foreign DB, or return one if it is already open.
- *
- * Increments a reference count on the returned connection which locks the
- * connection to the requested domain. This reference count can be
- * decremented by calling reuseConnection().
- *
- * If a connection is open to the appropriate server already, but with the wrong
- * database, it will be switched to the right database and returned, as long as
- * it has been freed first with reuseConnection().
- *
- * On error, returns false, and the connection which caused the
- * error will be available via $this->errorConnection.
- *
- * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
- *
- * @param int $i Server index
- * @param string $domain Domain ID to open
- * @param int $flags Class CONN_* constant bitfield
- * @return Database|bool Returns false on connection error
- * @throws DBError When database selection fails
- * @throws InvalidArgumentException When the server index is invalid
- * @throws UnexpectedValueException When the DB domain of the connection is corrupted
- */
- private function getForeignConnection( $i, $domain, $flags = 0 ) {
- $domainInstance = DatabaseDomain::newFromId( $domain );
- // Connection handles required to be in auto-commit mode use a separate connection
- // pool since the main pool is effected by implicit and explicit transaction rounds
- $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
- if ( $autoCommit ) {
- $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
- $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
- } else {
- $connFreeKey = self::KEY_FOREIGN_FREE;
- $connInUseKey = self::KEY_FOREIGN_INUSE;
- }
- /** @var Database $conn */
- $conn = null;
- if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
- // Reuse an in-use connection for the same domain
- $conn = $this->conns[$connInUseKey][$i][$domain];
- $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
- } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
- // Reuse a free connection for the same domain
- $conn = $this->conns[$connFreeKey][$i][$domain];
- unset( $this->conns[$connFreeKey][$i][$domain] );
- $this->conns[$connInUseKey][$i][$domain] = $conn;
- $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
- } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
- // Reuse a free connection from another domain if possible
- foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $conn ) {
- if ( $domainInstance->getDatabase() !== null ) {
- // Check if changing the database will require a new connection.
- // In that case, leave the connection handle alone and keep looking.
- // This prevents connections from being closed mid-transaction and can
- // also avoid overhead if the same database will later be requested.
- if (
- $conn->databasesAreIndependent() &&
- $conn->getDBname() !== $domainInstance->getDatabase()
- ) {
- continue;
- }
- // Select the new database, schema, and prefix
- $conn->selectDomain( $domainInstance );
- } else {
- // Stay on the current database, but update the schema/prefix
- $conn->dbSchema( $domainInstance->getSchema() );
- $conn->tablePrefix( $domainInstance->getTablePrefix() );
- }
- unset( $this->conns[$connFreeKey][$i][$oldDomain] );
- // Note that if $domain is an empty string, getDomainID() might not match it
- $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
- $this->connLogger->debug( __METHOD__ .
- ": reusing free connection from $oldDomain for $domain" );
- break;
- }
- }
- if ( !$conn ) {
- // Open a new connection
- $server = $this->getServerInfoStrict( $i );
- $server['serverIndex'] = $i;
- $server['foreignPoolRefCount'] = 0;
- $server['foreign'] = true;
- $server['autoCommitOnly'] = $autoCommit;
- $conn = $this->reallyOpenConnection( $server, $domainInstance );
- if ( !$conn->isOpen() ) {
- $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
- $this->errorConnection = $conn;
- $conn = false;
- } else {
- // Note that if $domain is an empty string, getDomainID() might not match it
- $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
- $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
- }
- }
- if ( $conn instanceof IDatabase ) {
- // Final sanity check to make sure the right domain is selected
- if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
- throw new UnexpectedValueException(
- "Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
- }
- // Increment reference count
- $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
- $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
- }
- return $conn;
- }
- public function getServerAttributes( $i ) {
- return Database::attributesFromType(
- $this->getServerType( $i ),
- $this->servers[$i]['driver'] ?? null
- );
- }
- /**
- * Test if the specified index represents an open connection
- *
- * @param int $index Server index
- * @return bool
- */
- private function isOpen( $index ) {
- return (bool)$this->getAnyOpenConnection( $index );
- }
- /**
- * Open a new network connection to a server (uncached)
- *
- * Returns a Database object whether or not the connection was successful.
- *
- * @param array $server
- * @param DatabaseDomain $domain Domain the connection is for, possibly unspecified
- * @return Database
- * @throws DBAccessError
- * @throws InvalidArgumentException
- */
- protected function reallyOpenConnection( array $server, DatabaseDomain $domain ) {
- if ( $this->disabled ) {
- throw new DBAccessError();
- }
- if ( $domain->getDatabase() === null ) {
- // The database domain does not specify a DB name and some database systems require a
- // valid DB specified on connection. The $server configuration array contains a default
- // DB name to use for connections in such cases.
- if ( $server['type'] === 'mysql' ) {
- // For MySQL, DATABASE and SCHEMA are synonyms, connections need not specify a DB,
- // and the DB name in $server might not exist due to legacy reasons (the default
- // domain used to ignore the local LB domain, even when mismatched).
- $server['dbname'] = null;
- }
- } else {
- $server['dbname'] = $domain->getDatabase();
- }
- if ( $domain->getSchema() !== null ) {
- $server['schema'] = $domain->getSchema();
- }
- // It is always possible to connect with any prefix, even the empty string
- $server['tablePrefix'] = $domain->getTablePrefix();
- // Let the handle know what the cluster master is (e.g. "db1052")
- $masterName = $this->getServerName( $this->getWriterIndex() );
- $server['clusterMasterHost'] = $masterName;
- $server['srvCache'] = $this->srvCache;
- // Set loggers and profilers
- $server['connLogger'] = $this->connLogger;
- $server['queryLogger'] = $this->queryLogger;
- $server['errorLogger'] = $this->errorLogger;
- $server['deprecationLogger'] = $this->deprecationLogger;
- $server['profiler'] = $this->profiler;
- $server['trxProfiler'] = $this->trxProfiler;
- // Use the same agent and PHP mode for all DB handles
- $server['cliMode'] = $this->cliMode;
- $server['agent'] = $this->agent;
- // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
- // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
- $server['flags'] = $server['flags'] ?? IDatabase::DBO_DEFAULT;
- $server['ownerId'] = $this->id;
- // Create a live connection object
- $conn = Database::factory( $server['type'], $server, Database::NEW_UNCONNECTED );
- $conn->setLBInfo( $server );
- $conn->setLazyMasterHandle(
- $this->getLazyConnectionRef( self::DB_MASTER, [], $conn->getDomainID() )
- );
- $conn->setTableAliases( $this->tableAliases );
- $conn->setIndexAliases( $this->indexAliases );
- try {
- $conn->initConnection();
- ++$this->connectionCounter;
- } catch ( DBConnectionError $e ) {
- // ignore; let the DB handle the logging
- }
- if ( $server['serverIndex'] === $this->getWriterIndex() ) {
- if ( $this->trxRoundId !== false ) {
- $this->applyTransactionRoundFlags( $conn );
- }
- foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
- $conn->setTransactionListener( $name, $callback );
- }
- }
- $this->lazyLoadReplicationPositions(); // session consistency
- // Log when many connection are made on requests
- $count = $this->getCurrentConnectionCount();
- if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) {
- $this->perfLogger->warning(
- __METHOD__ . ": {connections}+ connections made (master={masterdb})",
- [
- 'connections' => $count,
- 'dbserver' => $conn->getServer(),
- 'masterdb' => $conn->getLBInfo( 'clusterMasterHost' )
- ]
- );
- }
- return $conn;
- }
- /**
- * Make sure that any "waitForPos" positions are loaded and available to doWait()
- */
- private function lazyLoadReplicationPositions() {
- if ( !$this->connectionAttempted && $this->chronologyCallback ) {
- $this->connectionAttempted = true;
- ( $this->chronologyCallback )( $this ); // generally calls waitFor()
- $this->connLogger->debug( __METHOD__ . ': executed chronology callback.' );
- }
- }
- /**
- * @throws DBConnectionError
- */
- private function reportConnectionError() {
- $conn = $this->errorConnection; // the connection which caused the error
- $context = [
- 'method' => __METHOD__,
- 'last_error' => $this->lastError,
- ];
- if ( $conn instanceof IDatabase ) {
- $context['db_server'] = $conn->getServer();
- $this->connLogger->warning(
- __METHOD__ . ": connection error: {last_error} ({db_server})",
- $context
- );
- throw new DBConnectionError( $conn, "{$this->lastError} ({$context['db_server']})" );
- } else {
- // No last connection, probably due to all servers being too busy
- $this->connLogger->error(
- __METHOD__ .
- ": LB failure with no last connection. Connection error: {last_error}",
- $context
- );
- // If all servers were busy, "lastError" will contain something sensible
- throw new DBConnectionError( null, $this->lastError );
- }
- }
- public function getWriterIndex() {
- return 0;
- }
- /**
- * Returns true if the specified index is a valid server index
- *
- * @param int $i
- * @return bool
- * @deprecated Since 1.34
- */
- public function haveIndex( $i ) {
- return array_key_exists( $i, $this->servers );
- }
- /**
- * Returns true if the specified index is valid and has non-zero load
- *
- * @param int $i
- * @return bool
- * @deprecated Since 1.34
- */
- public function isNonZeroLoad( $i ) {
- return ( isset( $this->servers[$i] ) && $this->groupLoads[self::GROUP_GENERIC][$i] > 0 );
- }
- public function getServerCount() {
- return count( $this->servers );
- }
- public function hasReplicaServers() {
- return ( $this->getServerCount() > 1 );
- }
- public function hasStreamingReplicaServers() {
- foreach ( $this->servers as $i => $server ) {
- if ( $i !== $this->getWriterIndex() && empty( $server['is static'] ) ) {
- return true;
- }
- }
- return false;
- }
- public function getServerName( $i ) {
- $name = $this->servers[$i]['hostName'] ?? ( $this->servers[$i]['host'] ?? '' );
- return ( $name != '' ) ? $name : 'localhost';
- }
- public function getServerInfo( $i ) {
- return $this->servers[$i] ?? false;
- }
- public function getServerType( $i ) {
- return $this->servers[$i]['type'] ?? 'unknown';
- }
- public function getMasterPos() {
- $index = $this->getWriterIndex();
- $conn = $this->getAnyOpenConnection( $index );
- if ( $conn ) {
- return $conn->getMasterPos();
- }
- $conn = $this->getConnection( $index, self::CONN_SILENCE_ERRORS );
- if ( !$conn ) {
- $this->reportConnectionError();
- return null; // unreachable due to exception
- }
- try {
- $pos = $conn->getMasterPos();
- } finally {
- $this->closeConnection( $conn );
- }
- return $pos;
- }
- public function getReplicaResumePos() {
- // Get the position of any existing master server connection
- $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
- if ( $masterConn ) {
- return $masterConn->getMasterPos();
- }
- // Get the highest position of any existing replica server connection
- $highestPos = false;
- $serverCount = $this->getServerCount();
- for ( $i = 1; $i < $serverCount; $i++ ) {
- if ( !empty( $this->servers[$i]['is static'] ) ) {
- continue; // server does not use replication
- }
- $conn = $this->getAnyOpenConnection( $i );
- $pos = $conn ? $conn->getReplicaPos() : false;
- if ( !$pos ) {
- continue; // no open connection or could not get position
- }
- $highestPos = $highestPos ?: $pos;
- if ( $pos->hasReached( $highestPos ) ) {
- $highestPos = $pos;
- }
- }
- return $highestPos;
- }
- public function disable( $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- $this->closeAll( $fname, $owner );
- $this->disabled = true;
- }
- public function closeAll( $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- if ( $this->ownerId === null ) {
- /** @noinspection PhpUnusedLocalVariableInspection */
- $scope = ScopedCallback::newScopedIgnoreUserAbort();
- }
- $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $fname ) {
- $host = $conn->getServer();
- $this->connLogger->debug( "$fname: closing connection to database '$host'." );
- $conn->close( $fname, $this->id );
- } );
- $this->conns = self::newTrackedConnectionsArray();
- }
- public function closeConnection( IDatabase $conn ) {
- if ( $conn instanceof DBConnRef ) {
- // Avoid calling close() but still leaving the handle in the pool
- throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
- }
- $serverIndex = $conn->getLBInfo( 'serverIndex' );
- foreach ( $this->conns as $type => $connsByServer ) {
- if ( !isset( $connsByServer[$serverIndex] ) ) {
- continue;
- }
- foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
- if ( $conn === $trackedConn ) {
- $host = $this->getServerName( $i );
- $this->connLogger->debug(
- __METHOD__ . ": closing connection to database $i at '$host'." );
- unset( $this->conns[$type][$serverIndex][$i] );
- break 2;
- }
- }
- }
- $conn->close( __METHOD__ );
- }
- public function commitAll( $fname = __METHOD__, $owner = null ) {
- $this->commitMasterChanges( $fname, $owner );
- $this->flushMasterSnapshots( $fname, $owner );
- $this->flushReplicaSnapshots( $fname, $owner );
- }
- public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
- if ( $this->ownerId === null ) {
- /** @noinspection PhpUnusedLocalVariableInspection */
- $scope = ScopedCallback::newScopedIgnoreUserAbort();
- }
- $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
- // Loop until callbacks stop adding callbacks on other connections
- $total = 0;
- do {
- $count = 0; // callbacks execution attempts
- $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
- // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
- // Any error should cause all (peer) transactions to be rolled back together.
- $count += $conn->runOnTransactionPreCommitCallbacks();
- } );
- $total += $count;
- } while ( $count > 0 );
- // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
- $this->forEachOpenMasterConnection( function ( Database $conn ) {
- $conn->setTrxEndCallbackSuppression( true );
- } );
- $this->trxRoundStage = self::ROUND_FINALIZED;
- return $total;
- }
- public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
- if ( $this->ownerId === null ) {
- /** @noinspection PhpUnusedLocalVariableInspection */
- $scope = ScopedCallback::newScopedIgnoreUserAbort();
- }
- $limit = $options['maxWriteDuration'] ?? 0;
- $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
- $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
- // If atomic sections or explicit transactions are still open, some caller must have
- // caught an exception but failed to properly rollback any changes. Detect that and
- // throw and error (causing rollback).
- $conn->assertNoOpenTransactions();
- // Assert that the time to replicate the transaction will be sane.
- // If this fails, then all DB transactions will be rollback back together.
- $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
- if ( $limit > 0 && $time > $limit ) {
- throw new DBTransactionSizeError(
- $conn,
- "Transaction spent $time second(s) in writes, exceeding the limit of $limit",
- [ $time, $limit ]
- );
- }
- // If a connection sits idle while slow queries execute on another, that connection
- // may end up dropped before the commit round is reached. Ping servers to detect this.
- if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
- throw new DBTransactionError(
- $conn,
- "A connection to the {$conn->getDBname()} database was lost before commit"
- );
- }
- } );
- $this->trxRoundStage = self::ROUND_APPROVED;
- }
- public function beginMasterChanges( $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- if ( $this->trxRoundId !== false ) {
- throw new DBTransactionError(
- null,
- "$fname: Transaction round '{$this->trxRoundId}' already started"
- );
- }
- $this->assertTransactionRoundStage( self::ROUND_CURSORY );
- if ( $this->ownerId === null ) {
- /** @noinspection PhpUnusedLocalVariableInspection */
- $scope = ScopedCallback::newScopedIgnoreUserAbort();
- }
- // Clear any empty transactions (no writes/callbacks) from the implicit round
- $this->flushMasterSnapshots( $fname, $owner );
- $this->trxRoundId = $fname;
- $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
- // Mark applicable handles as participating in this explicit transaction round.
- // For each of these handles, any writes and callbacks will be tied to a single
- // transaction. The (peer) handles will reject begin()/commit() calls unless they
- // are part of an en masse commit or an en masse rollback.
- $this->forEachOpenMasterConnection( function ( Database $conn ) {
- $this->applyTransactionRoundFlags( $conn );
- } );
- $this->trxRoundStage = self::ROUND_CURSORY;
- }
- public function commitMasterChanges( $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- $this->assertTransactionRoundStage( self::ROUND_APPROVED );
- if ( $this->ownerId === null ) {
- /** @noinspection PhpUnusedLocalVariableInspection */
- $scope = ScopedCallback::newScopedIgnoreUserAbort();
- }
- $failures = [];
- $restore = ( $this->trxRoundId !== false );
- $this->trxRoundId = false;
- $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
- // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
- // Note that callbacks should already be suppressed due to finalizeMasterChanges().
- $this->forEachOpenMasterConnection(
- function ( IDatabase $conn ) use ( $fname, &$failures ) {
- try {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
- } catch ( DBError $e ) {
- ( $this->errorLogger )( $e );
- $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
- }
- }
- );
- if ( $failures ) {
- throw new DBTransactionError(
- null,
- "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
- );
- }
- if ( $restore ) {
- // Unmark handles as participating in this explicit transaction round
- $this->forEachOpenMasterConnection( function ( Database $conn ) {
- $this->undoTransactionRoundFlags( $conn );
- } );
- }
- $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
- }
- public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
- $type = IDatabase::TRIGGER_COMMIT;
- } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
- $type = IDatabase::TRIGGER_ROLLBACK;
- } else {
- throw new DBTransactionError(
- null,
- "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
- );
- }
- if ( $this->ownerId === null ) {
- /** @noinspection PhpUnusedLocalVariableInspection */
- $scope = ScopedCallback::newScopedIgnoreUserAbort();
- }
- $oldStage = $this->trxRoundStage;
- $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
- // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
- $this->forEachOpenMasterConnection( function ( Database $conn ) {
- $conn->setTrxEndCallbackSuppression( false );
- } );
- $e = null; // first exception
- $fname = __METHOD__;
- // Loop until callbacks stop adding callbacks on other connections
- do {
- // Run any pending callbacks for each connection...
- $count = 0; // callback execution attempts
- $this->forEachOpenMasterConnection(
- function ( Database $conn ) use ( $type, &$e, &$count ) {
- if ( $conn->trxLevel() ) {
- return; // retry in the next iteration, after commit() is called
- }
- try {
- $count += $conn->runOnTransactionIdleCallbacks( $type );
- } catch ( Exception $ex ) {
- $e = $e ?: $ex;
- }
- }
- );
- // Clear out any active transactions left over from callbacks...
- $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e, $fname ) {
- if ( $conn->writesPending() ) {
- // A callback from another handle wrote to this one and DBO_TRX is set
- $this->queryLogger->warning( $fname . ": found writes pending." );
- $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
- $this->queryLogger->warning(
- "$fname: found writes pending ($fnames).",
- [
- 'db_server' => $conn->getServer(),
- 'db_name' => $conn->getDBname()
- ]
- );
- } elseif ( $conn->trxLevel() ) {
- // A callback from another handle read from this one and DBO_TRX is set,
- // which can easily happen if there is only one DB (no replicas)
- $this->queryLogger->debug( "$fname: found empty transaction." );
- }
- try {
- $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
- } catch ( Exception $ex ) {
- $e = $e ?: $ex;
- }
- } );
- } while ( $count > 0 );
- $this->trxRoundStage = $oldStage;
- return $e;
- }
- public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
- $type = IDatabase::TRIGGER_COMMIT;
- } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
- $type = IDatabase::TRIGGER_ROLLBACK;
- } else {
- throw new DBTransactionError(
- null,
- "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
- );
- }
- if ( $this->ownerId === null ) {
- /** @noinspection PhpUnusedLocalVariableInspection */
- $scope = ScopedCallback::newScopedIgnoreUserAbort();
- }
- $e = null;
- $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
- $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
- try {
- $conn->runTransactionListenerCallbacks( $type );
- } catch ( Exception $ex ) {
- $e = $e ?: $ex;
- }
- } );
- $this->trxRoundStage = self::ROUND_CURSORY;
- return $e;
- }
- public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- if ( $this->ownerId === null ) {
- /** @noinspection PhpUnusedLocalVariableInspection */
- $scope = ScopedCallback::newScopedIgnoreUserAbort();
- }
- $restore = ( $this->trxRoundId !== false );
- $this->trxRoundId = false;
- $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
- $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
- $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
- } );
- if ( $restore ) {
- // Unmark handles as participating in this explicit transaction round
- $this->forEachOpenMasterConnection( function ( Database $conn ) {
- $this->undoTransactionRoundFlags( $conn );
- } );
- }
- $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
- }
- /**
- * @param string|string[] $stage
- * @throws DBTransactionError
- */
- private function assertTransactionRoundStage( $stage ) {
- $stages = (array)$stage;
- if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
- $stageList = implode(
- '/',
- array_map( function ( $v ) {
- return "'$v'";
- }, $stages )
- );
- throw new DBTransactionError(
- null,
- "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
- );
- }
- }
- /**
- * Assure that if this instance is owned, the caller is either the owner or is internal
- *
- * If an LBFactory owns the LoadBalancer, then certain methods should only called through
- * that LBFactory to avoid broken contracts. Otherwise, those methods can publically be
- * called by anything. In any case, internal methods from the LoadBalancer itself should
- * always be allowed.
- *
- * @param string $fname
- * @param int|null $owner Owner ID of the caller
- * @throws DBTransactionError
- */
- private function assertOwnership( $fname, $owner ) {
- if ( $this->ownerId !== null && $owner !== $this->ownerId && $owner !== $this->id ) {
- throw new DBTransactionError(
- null,
- "$fname: LoadBalancer is owned by ID '{$this->ownerId}' (got '$owner')."
- );
- }
- }
- /**
- * Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round
- *
- * Some servers may have neither flag enabled, meaning that they opt out of such
- * transaction rounds and remain in auto-commit mode. Such behavior might be desired
- * when a DB server is used for something like simple key/value storage.
- *
- * @param Database $conn
- */
- private function applyTransactionRoundFlags( Database $conn ) {
- if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
- return; // transaction rounds do not apply to these connections
- }
- if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
- // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
- // Force DBO_TRX even in CLI mode since a commit round is expected soon.
- $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
- }
- if ( $conn->getFlag( $conn::DBO_TRX ) ) {
- $conn->setLBInfo( 'trxRoundId', $this->trxRoundId );
- }
- }
- /**
- * @param Database $conn
- */
- private function undoTransactionRoundFlags( Database $conn ) {
- if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
- return; // transaction rounds do not apply to these connections
- }
- if ( $conn->getFlag( $conn::DBO_TRX ) ) {
- $conn->setLBInfo( 'trxRoundId', null ); // remove the round ID
- }
- if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
- $conn->restoreFlags( $conn::RESTORE_PRIOR );
- }
- }
- public function flushReplicaSnapshots( $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
- $conn->flushSnapshot( $fname );
- } );
- }
- public function flushMasterSnapshots( $fname = __METHOD__, $owner = null ) {
- $this->assertOwnership( $fname, $owner );
- $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
- $conn->flushSnapshot( $fname );
- } );
- }
- /**
- * @return string
- * @since 1.32
- */
- public function getTransactionRoundStage() {
- return $this->trxRoundStage;
- }
- public function hasMasterConnection() {
- return $this->isOpen( $this->getWriterIndex() );
- }
- public function hasMasterChanges() {
- $pending = 0;
- $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
- $pending |= $conn->writesOrCallbacksPending();
- } );
- return (bool)$pending;
- }
- public function lastMasterChangeTimestamp() {
- $lastTime = false;
- $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
- $lastTime = max( $lastTime, $conn->lastDoneWrites() );
- } );
- return $lastTime;
- }
- public function hasOrMadeRecentMasterChanges( $age = null ) {
- $age = ( $age === null ) ? $this->waitTimeout : $age;
- return ( $this->hasMasterChanges()
- || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
- }
- public function pendingMasterChangeCallers() {
- $fnames = [];
- $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
- $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
- } );
- return $fnames;
- }
- public function getLaggedReplicaMode( $domain = false ) {
- if ( $this->laggedReplicaMode ) {
- return true; // stay in lagged replica mode
- }
- if ( $this->hasStreamingReplicaServers() ) {
- // This will set "laggedReplicaMode" as needed
- $this->getReaderIndex( self::GROUP_GENERIC, $domain );
- }
- return $this->laggedReplicaMode;
- }
- public function laggedReplicaUsed() {
- return $this->laggedReplicaMode;
- }
- public function getReadOnlyReason( $domain = false ) {
- $domainInstance = DatabaseDomain::newFromId( $this->resolveDomainID( $domain ) );
- if ( $this->readOnlyReason !== false ) {
- return $this->readOnlyReason;
- } elseif ( $this->isMasterRunningReadOnly( $domainInstance ) ) {
- return 'The master database server is running in read-only mode.';
- } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
- return ( $this->getExistingReaderIndex( self::GROUP_GENERIC ) >= 0 )
- ? 'The database is read-only until replication lag decreases.'
- : 'The database is read-only until a replica database server becomes reachable.';
- }
- return false;
- }
- /**
- * @param IDatabase $conn Master connection
- * @param int $flags Bitfield of class CONN_* constants
- * @return bool Whether the entire server or currently selected DB/schema is read-only
- */
- private function isMasterConnectionReadOnly( IDatabase $conn, $flags = 0 ) {
- // Note that table prefixes are not related to server-side read-only mode
- $key = $this->srvCache->makeGlobalKey(
- 'rdbms-server-readonly',
- $conn->getServer(),
- $conn->getDBname(),
- $conn->dbSchema()
- );
- if ( ( $flags & self::CONN_REFRESH_READ_ONLY ) == self::CONN_REFRESH_READ_ONLY ) {
- try {
- $readOnly = (int)$conn->serverIsReadOnly();
- } catch ( DBError $e ) {
- $readOnly = 0;
- }
- $this->srvCache->set( $key, $readOnly, BagOStuff::TTL_PROC_SHORT );
- } else {
- $readOnly = $this->srvCache->getWithSetCallback(
- $key,
- BagOStuff::TTL_PROC_SHORT,
- function () use ( $conn ) {
- try {
- return (int)$conn->serverIsReadOnly();
- } catch ( DBError $e ) {
- return 0;
- }
- }
- );
- }
- return (bool)$readOnly;
- }
- /**
- * @param DatabaseDomain $domain
- * @return bool Whether the entire master server or the local domain DB is read-only
- */
- private function isMasterRunningReadOnly( DatabaseDomain $domain ) {
- // Context will often be HTTP GET/HEAD; heavily cache the results
- return (bool)$this->wanCache->getWithSetCallback(
- // Note that table prefixes are not related to server-side read-only mode
- $this->wanCache->makeGlobalKey(
- 'rdbms-server-readonly',
- $this->getMasterServerName(),
- $domain->getDatabase(),
- $domain->getSchema()
- ),
- self::TTL_CACHE_READONLY,
- function () use ( $domain ) {
- $old = $this->trxProfiler->setSilenced( true );
- try {
- $index = $this->getWriterIndex();
- // Reset the cache for isMasterConnectionReadOnly()
- $flags = self::CONN_REFRESH_READ_ONLY;
- $conn = $this->getServerConnection( $index, $domain->getId(), $flags );
- // Reuse the process cache set above
- $readOnly = (int)$this->isMasterConnectionReadOnly( $conn );
- $this->reuseConnection( $conn );
- } catch ( DBError $e ) {
- $readOnly = 0;
- }
- $this->trxProfiler->setSilenced( $old );
- return $readOnly;
- },
- [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG, 'lockTSE' => 10, 'busyValue' => 0 ]
- );
- }
- public function allowLagged( $mode = null ) {
- if ( $mode === null ) {
- return $this->allowLagged;
- }
- $this->allowLagged = $mode;
- return $this->allowLagged;
- }
- public function pingAll() {
- $success = true;
- $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
- if ( !$conn->ping() ) {
- $success = false;
- }
- } );
- return $success;
- }
- public function forEachOpenConnection( $callback, array $params = [] ) {
- foreach ( $this->conns as $connsByServer ) {
- foreach ( $connsByServer as $serverConns ) {
- foreach ( $serverConns as $conn ) {
- $callback( $conn, ...$params );
- }
- }
- }
- }
- public function forEachOpenMasterConnection( $callback, array $params = [] ) {
- $masterIndex = $this->getWriterIndex();
- foreach ( $this->conns as $connsByServer ) {
- if ( isset( $connsByServer[$masterIndex] ) ) {
- /** @var IDatabase $conn */
- foreach ( $connsByServer[$masterIndex] as $conn ) {
- $callback( $conn, ...$params );
- }
- }
- }
- }
- public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
- foreach ( $this->conns as $connsByServer ) {
- foreach ( $connsByServer as $i => $serverConns ) {
- if ( $i === $this->getWriterIndex() ) {
- continue; // skip master
- }
- foreach ( $serverConns as $conn ) {
- $callback( $conn, ...$params );
- }
- }
- }
- }
- /**
- * @return int
- */
- private function getCurrentConnectionCount() {
- $count = 0;
- foreach ( $this->conns as $connsByServer ) {
- foreach ( $connsByServer as $serverConns ) {
- $count += count( $serverConns );
- }
- }
- return $count;
- }
- public function getMaxLag( $domain = false ) {
- $host = '';
- $maxLag = -1;
- $maxIndex = 0;
- if ( $this->hasReplicaServers() ) {
- $lagTimes = $this->getLagTimes( $domain );
- foreach ( $lagTimes as $i => $lag ) {
- if ( $this->groupLoads[self::GROUP_GENERIC][$i] > 0 && $lag > $maxLag ) {
- $maxLag = $lag;
- $host = $this->getServerInfoStrict( $i, 'host' );
- $maxIndex = $i;
- }
- }
- }
- return [ $host, $maxLag, $maxIndex ];
- }
- public function getLagTimes( $domain = false ) {
- if ( !$this->hasReplicaServers() ) {
- return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
- }
- $knownLagTimes = []; // map of (server index => 0 seconds)
- $indexesWithLag = [];
- foreach ( $this->servers as $i => $server ) {
- if ( empty( $server['is static'] ) ) {
- $indexesWithLag[] = $i; // DB server might have replication lag
- } else {
- $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
- }
- }
- return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
- }
- /**
- * Get the lag in seconds for a given connection, or zero if this load
- * balancer does not have replication enabled.
- *
- * This should be used in preference to Database::getLag() in cases where
- * replication may not be in use, since there is no way to determine if
- * replication is in use at the connection level without running
- * potentially restricted queries such as SHOW SLAVE STATUS. Using this
- * function instead of Database::getLag() avoids a fatal error in this
- * case on many installations.
- *
- * @param IDatabase $conn
- * @return int|bool Returns false on error
- * @deprecated Since 1.34 Use IDatabase::getLag() instead
- */
- public function safeGetLag( IDatabase $conn ) {
- if ( $conn->getLBInfo( 'is static' ) ) {
- return 0; // static dataset
- } elseif ( $conn->getLBInfo( 'serverIndex' ) == $this->getWriterIndex() ) {
- return 0; // this is the master
- }
- return $conn->getLag();
- }
- public function waitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
- $timeout = max( 1, $timeout ?: $this->waitTimeout );
- if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
- return true; // server is not a replica DB
- }
- if ( !$pos ) {
- // Get the current master position, opening a connection if needed
- $index = $this->getWriterIndex();
- $flags = self::CONN_SILENCE_ERRORS;
- $masterConn = $this->getAnyOpenConnection( $index, $flags );
- if ( $masterConn ) {
- $pos = $masterConn->getMasterPos();
- } else {
- $masterConn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
- if ( !$masterConn ) {
- throw new DBReplicationWaitError(
- null,
- "Could not obtain a master database connection to get the position"
- );
- }
- $pos = $masterConn->getMasterPos();
- $this->closeConnection( $masterConn );
- }
- }
- if ( $pos instanceof DBMasterPos ) {
- $start = microtime( true );
- $result = $conn->masterPosWait( $pos, $timeout );
- $seconds = max( microtime( true ) - $start, 0 );
- if ( $result == -1 || is_null( $result ) ) {
- $msg = __METHOD__ . ': timed out waiting on {host} pos {pos} [{seconds}s]';
- $this->replLogger->warning( $msg, [
- 'host' => $conn->getServer(),
- 'pos' => $pos,
- 'seconds' => round( $seconds, 6 ),
- 'trace' => ( new RuntimeException() )->getTraceAsString()
- ] );
- $ok = false;
- } else {
- $this->replLogger->debug( __METHOD__ . ': done waiting' );
- $ok = true;
- }
- } else {
- $ok = false; // something is misconfigured
- $this->replLogger->error(
- __METHOD__ . ': could not get master pos for {host}',
- [
- 'host' => $conn->getServer(),
- 'trace' => ( new RuntimeException() )->getTraceAsString()
- ]
- );
- }
- return $ok;
- }
- /**
- * Wait for a replica DB to reach a specified master position
- *
- * This will connect to the master to get an accurate position if $pos is not given
- *
- * @param IDatabase $conn Replica DB
- * @param DBMasterPos|bool $pos Master position; default: current position
- * @param int $timeout Timeout in seconds [optional]
- * @return bool Success
- * @since 1.28
- * @deprecated Since 1.34 Use waitForMasterPos() instead
- */
- public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
- return $this->waitForMasterPos( $conn, $pos, $timeout );
- }
- public function setTransactionListener( $name, callable $callback = null ) {
- if ( $callback ) {
- $this->trxRecurringCallbacks[$name] = $callback;
- } else {
- unset( $this->trxRecurringCallbacks[$name] );
- }
- $this->forEachOpenMasterConnection(
- function ( IDatabase $conn ) use ( $name, $callback ) {
- $conn->setTransactionListener( $name, $callback );
- }
- );
- }
- public function setTableAliases( array $aliases ) {
- $this->tableAliases = $aliases;
- }
- public function setIndexAliases( array $aliases ) {
- $this->indexAliases = $aliases;
- }
- public function setLocalDomainPrefix( $prefix ) {
- // Find connections to explicit foreign domains still marked as in-use...
- $domainsInUse = [];
- $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$domainsInUse ) {
- // Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
- // Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
- if ( $conn->getLBInfo( 'foreignPoolRefCount' ) > 0 ) {
- $domainsInUse[] = $conn->getDomainID();
- }
- } );
- // Do not switch connections to explicit foreign domains unless marked as safe
- if ( $domainsInUse ) {
- $domains = implode( ', ', $domainsInUse );
- throw new DBUnexpectedError( null,
- "Foreign domain connections are still in use ($domains)" );
- }
- $this->setLocalDomain( new DatabaseDomain(
- $this->localDomain->getDatabase(),
- $this->localDomain->getSchema(),
- $prefix
- ) );
- // Update the prefix for all local connections...
- $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $prefix ) {
- if ( !$conn->getLBInfo( 'foreign' ) ) {
- $conn->tablePrefix( $prefix );
- }
- } );
- }
- public function redefineLocalDomain( $domain ) {
- $this->closeAll( __METHOD__, $this->id );
- $this->setLocalDomain( DatabaseDomain::newFromId( $domain ) );
- }
- public function setTempTablesOnlyMode( $value, $domain ) {
- $old = $this->tempTablesOnlyMode[$domain] ?? false;
- if ( $value ) {
- $this->tempTablesOnlyMode[$domain] = true;
- } else {
- unset( $this->tempTablesOnlyMode[$domain] );
- }
- return $old;
- }
- /**
- * @param DatabaseDomain $domain
- */
- private function setLocalDomain( DatabaseDomain $domain ) {
- $this->localDomain = $domain;
- // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
- // always true, gracefully handle the case when they fail to account for escaping.
- if ( $this->localDomain->getTablePrefix() != '' ) {
- $this->localDomainIdAlias =
- $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
- } else {
- $this->localDomainIdAlias = $this->localDomain->getDatabase();
- }
- }
- /**
- * @param int $i Server index
- * @param string|null $field Server index field [optional]
- * @return array|mixed
- * @throws InvalidArgumentException
- */
- private function getServerInfoStrict( $i, $field = null ) {
- if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
- throw new InvalidArgumentException( "No server with index '$i'" );
- }
- if ( $field !== null ) {
- if ( !array_key_exists( $field, $this->servers[$i] ) ) {
- throw new InvalidArgumentException( "No field '$field' in server index '$i'" );
- }
- return $this->servers[$i][$field];
- }
- return $this->servers[$i];
- }
- /**
- * @return string
- */
- private function getMasterServerName() {
- return $this->getServerName( $this->getWriterIndex() );
- }
- function __destruct() {
- // Avoid connection leaks for sanity
- $this->disable( __METHOD__, $this->ownerId );
- }
- }
- /**
- * @deprecated since 1.29
- */
- class_alias( LoadBalancer::class, 'LoadBalancer' );
|