LoadBalancer.php 80 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402
  1. <?php
  2. /**
  3. * Database load balancing manager
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License along
  16. * with this program; if not, write to the Free Software Foundation, Inc.,
  17. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. * http://www.gnu.org/copyleft/gpl.html
  19. *
  20. * @file
  21. */
  22. namespace Wikimedia\Rdbms;
  23. use Psr\Log\LoggerInterface;
  24. use Psr\Log\NullLogger;
  25. use Wikimedia\ScopedCallback;
  26. use BagOStuff;
  27. use EmptyBagOStuff;
  28. use WANObjectCache;
  29. use ArrayUtils;
  30. use LogicException;
  31. use UnexpectedValueException;
  32. use InvalidArgumentException;
  33. use RuntimeException;
  34. use Exception;
  35. /**
  36. * Database connection, tracking, load balancing, and transaction manager for a cluster
  37. *
  38. * @ingroup Database
  39. */
  40. class LoadBalancer implements ILoadBalancer {
  41. /** @var ILoadMonitor */
  42. private $loadMonitor;
  43. /** @var callable|null Callback to run before the first connection attempt */
  44. private $chronologyCallback;
  45. /** @var BagOStuff */
  46. private $srvCache;
  47. /** @var WANObjectCache */
  48. private $wanCache;
  49. /** @var mixed Class name or object With profileIn/profileOut methods */
  50. private $profiler;
  51. /** @var TransactionProfiler */
  52. private $trxProfiler;
  53. /** @var LoggerInterface */
  54. private $replLogger;
  55. /** @var LoggerInterface */
  56. private $connLogger;
  57. /** @var LoggerInterface */
  58. private $queryLogger;
  59. /** @var LoggerInterface */
  60. private $perfLogger;
  61. /** @var callable Exception logger */
  62. private $errorLogger;
  63. /** @var callable Deprecation logger */
  64. private $deprecationLogger;
  65. /** @var DatabaseDomain Local DB domain ID and default for selectDB() calls */
  66. private $localDomain;
  67. /**
  68. * @var IDatabase[][][]|Database[][][] Map of (connection category => server index => IDatabase[])
  69. */
  70. private $conns;
  71. /** @var array[] Map of (server index => server config array) */
  72. private $servers;
  73. /** @var array[] Map of (group => server index => weight) */
  74. private $groupLoads;
  75. /** @var bool Whether to disregard replica DB lag as a factor in replica DB selection */
  76. private $allowLagged;
  77. /** @var int Seconds to spend waiting on replica DB lag to resolve */
  78. private $waitTimeout;
  79. /** @var array The LoadMonitor configuration */
  80. private $loadMonitorConfig;
  81. /** @var string Alternate local DB domain instead of DatabaseDomain::getId() */
  82. private $localDomainIdAlias;
  83. /** @var int Amount of replication lag, in seconds, that is considered "high" */
  84. private $maxLag;
  85. /** @var string|null Default query group to use with getConnection() */
  86. private $defaultGroup;
  87. /** @var string Current server name */
  88. private $hostname;
  89. /** @var bool Whether this PHP instance is for a CLI script */
  90. private $cliMode;
  91. /** @var string Agent name for query profiling */
  92. private $agent;
  93. /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
  94. private $tableAliases = [];
  95. /** @var string[] Map of (index alias => index) */
  96. private $indexAliases = [];
  97. /** @var callable[] Map of (name => callable) */
  98. private $trxRecurringCallbacks = [];
  99. /** @var bool[] Map of (domain => whether to use "temp tables only" mode) */
  100. private $tempTablesOnlyMode = [];
  101. /** @var string|bool Explicit DBO_TRX transaction round active or false if none */
  102. private $trxRoundId = false;
  103. /** @var string Stage of the current transaction round in the transaction round life-cycle */
  104. private $trxRoundStage = self::ROUND_CURSORY;
  105. /** @var Database Connection handle that caused a problem */
  106. private $errorConnection;
  107. /** @var int[] The group replica server indexes keyed by group */
  108. private $readIndexByGroup = [];
  109. /** @var bool|DBMasterPos Replication sync position or false if not set */
  110. private $waitForPos;
  111. /** @var bool Whether the generic reader fell back to a lagged replica DB */
  112. private $laggedReplicaMode = false;
  113. /** @var string The last DB selection or connection error */
  114. private $lastError = 'Unknown error';
  115. /** @var string|bool Reason this instance is read-only or false if not */
  116. private $readOnlyReason = false;
  117. /** @var int Total number of new connections ever made with this instance */
  118. private $connectionCounter = 0;
  119. /** @var bool */
  120. private $disabled = false;
  121. /** @var bool Whether any connection has been attempted yet */
  122. private $connectionAttempted = false;
  123. /** var int An identifier for this class instance */
  124. private $id;
  125. /** @var int|null Integer ID of the managing LBFactory instance or null if none */
  126. private $ownerId;
  127. /** @var int Warn when this many connection are held */
  128. const CONN_HELD_WARN_THRESHOLD = 10;
  129. /** @var int Default 'maxLag' when unspecified */
  130. const MAX_LAG_DEFAULT = 6;
  131. /** @var int Default 'waitTimeout' when unspecified */
  132. const MAX_WAIT_DEFAULT = 10;
  133. /** @var int Seconds to cache master DB server read-only status */
  134. const TTL_CACHE_READONLY = 5;
  135. const KEY_LOCAL = 'local';
  136. const KEY_FOREIGN_FREE = 'foreignFree';
  137. const KEY_FOREIGN_INUSE = 'foreignInUse';
  138. const KEY_LOCAL_NOROUND = 'localAutoCommit';
  139. const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
  140. const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
  141. /** @var string Transaction round, explicit or implicit, has not finished writing */
  142. const ROUND_CURSORY = 'cursory';
  143. /** @var string Transaction round writes are complete and ready for pre-commit checks */
  144. const ROUND_FINALIZED = 'finalized';
  145. /** @var string Transaction round passed final pre-commit checks */
  146. const ROUND_APPROVED = 'approved';
  147. /** @var string Transaction round was committed and post-commit callbacks must be run */
  148. const ROUND_COMMIT_CALLBACKS = 'commit-callbacks';
  149. /** @var string Transaction round was rolled back and post-rollback callbacks must be run */
  150. const ROUND_ROLLBACK_CALLBACKS = 'rollback-callbacks';
  151. /** @var string Transaction round encountered an error */
  152. const ROUND_ERROR = 'error';
  153. public function __construct( array $params ) {
  154. if ( !isset( $params['servers'] ) || !count( $params['servers'] ) ) {
  155. throw new InvalidArgumentException( 'Missing or empty "servers" parameter' );
  156. }
  157. $listKey = -1;
  158. $this->servers = [];
  159. $this->groupLoads = [ self::GROUP_GENERIC => [] ];
  160. foreach ( $params['servers'] as $i => $server ) {
  161. if ( ++$listKey !== $i ) {
  162. throw new UnexpectedValueException( 'List expected for "servers" parameter' );
  163. }
  164. if ( $i == 0 ) {
  165. $server['master'] = true;
  166. } else {
  167. $server['replica'] = true;
  168. }
  169. $this->servers[$i] = $server;
  170. foreach ( ( $server['groupLoads'] ?? [] ) as $group => $ratio ) {
  171. $this->groupLoads[$group][$i] = $ratio;
  172. }
  173. $this->groupLoads[self::GROUP_GENERIC][$i] = $server['load'];
  174. }
  175. $localDomain = isset( $params['localDomain'] )
  176. ? DatabaseDomain::newFromId( $params['localDomain'] )
  177. : DatabaseDomain::newUnspecified();
  178. $this->setLocalDomain( $localDomain );
  179. $this->waitTimeout = $params['waitTimeout'] ?? self::MAX_WAIT_DEFAULT;
  180. $this->conns = self::newTrackedConnectionsArray();
  181. $this->waitForPos = false;
  182. $this->allowLagged = false;
  183. if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
  184. $this->readOnlyReason = $params['readOnlyReason'];
  185. }
  186. $this->maxLag = $params['maxLag'] ?? self::MAX_LAG_DEFAULT;
  187. $this->loadMonitorConfig = $params['loadMonitor'] ?? [ 'class' => 'LoadMonitorNull' ];
  188. $this->loadMonitorConfig += [ 'lagWarnThreshold' => $this->maxLag ];
  189. $this->srvCache = $params['srvCache'] ?? new EmptyBagOStuff();
  190. $this->wanCache = $params['wanCache'] ?? WANObjectCache::newEmpty();
  191. $this->profiler = $params['profiler'] ?? null;
  192. $this->trxProfiler = $params['trxProfiler'] ?? new TransactionProfiler();
  193. $this->errorLogger = $params['errorLogger'] ?? function ( Exception $e ) {
  194. trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
  195. };
  196. $this->deprecationLogger = $params['deprecationLogger'] ?? function ( $msg ) {
  197. trigger_error( $msg, E_USER_DEPRECATED );
  198. };
  199. foreach ( [ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ] as $key ) {
  200. $this->$key = $params[$key] ?? new NullLogger();
  201. }
  202. $this->hostname = $params['hostname'] ?? ( gethostname() ?: 'unknown' );
  203. $this->cliMode = $params['cliMode'] ?? ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' );
  204. $this->agent = $params['agent'] ?? '';
  205. if ( isset( $params['chronologyCallback'] ) ) {
  206. $this->chronologyCallback = $params['chronologyCallback'];
  207. }
  208. if ( isset( $params['roundStage'] ) ) {
  209. if ( $params['roundStage'] === self::STAGE_POSTCOMMIT_CALLBACKS ) {
  210. $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
  211. } elseif ( $params['roundStage'] === self::STAGE_POSTROLLBACK_CALLBACKS ) {
  212. $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
  213. }
  214. }
  215. $group = $params['defaultGroup'] ?? self::GROUP_GENERIC;
  216. $this->defaultGroup = isset( $this->groupLoads[$group] ) ? $group : self::GROUP_GENERIC;
  217. static $nextId;
  218. $this->id = $nextId = ( is_int( $nextId ) ? $nextId++ : mt_rand() );
  219. $this->ownerId = $params['ownerId'] ?? null;
  220. }
  221. private static function newTrackedConnectionsArray() {
  222. return [
  223. // Connection were transaction rounds may be applied
  224. self::KEY_LOCAL => [],
  225. self::KEY_FOREIGN_INUSE => [],
  226. self::KEY_FOREIGN_FREE => [],
  227. // Auto-committing counterpart connections that ignore transaction rounds
  228. self::KEY_LOCAL_NOROUND => [],
  229. self::KEY_FOREIGN_INUSE_NOROUND => [],
  230. self::KEY_FOREIGN_FREE_NOROUND => []
  231. ];
  232. }
  233. public function getLocalDomainID() {
  234. return $this->localDomain->getId();
  235. }
  236. public function resolveDomainID( $domain ) {
  237. if ( $domain === $this->localDomainIdAlias || $domain === false ) {
  238. // Local connection requested via some backwards-compatibility domain alias
  239. return $this->getLocalDomainID();
  240. }
  241. return (string)$domain;
  242. }
  243. /**
  244. * Resolve $groups into a list of query groups defining as having database servers
  245. *
  246. * @param string[]|string|bool $groups Query group(s) in preference order, [], or false
  247. * @param int $i Specific server index or DB_MASTER/DB_REPLICA
  248. * @return string[] Non-empty group list in preference order with the default group appended
  249. */
  250. private function resolveGroups( $groups, $i ) {
  251. // If a specific replica server was specified, then $groups makes no sense
  252. if ( $i > 0 && $groups !== [] && $groups !== false ) {
  253. $list = implode( ', ', (array)$groups );
  254. throw new LogicException( "Query group(s) ($list) given with server index (#$i)" );
  255. }
  256. if ( $groups === [] || $groups === false || $groups === $this->defaultGroup ) {
  257. $resolvedGroups = [ $this->defaultGroup ]; // common case
  258. } elseif ( is_string( $groups ) && isset( $this->groupLoads[$groups] ) ) {
  259. $resolvedGroups = [ $groups, $this->defaultGroup ];
  260. } elseif ( is_array( $groups ) ) {
  261. $resolvedGroups = array_keys( array_flip( $groups ) + [ self::GROUP_GENERIC => 1 ] );
  262. } else {
  263. $resolvedGroups = [ $this->defaultGroup ];
  264. }
  265. return $resolvedGroups;
  266. }
  267. /**
  268. * @param int $flags Bitfield of class CONN_* constants
  269. * @param int $i Specific server index or DB_MASTER/DB_REPLICA
  270. * @param string $domain Database domain
  271. * @return int Sanitized bitfield
  272. */
  273. private function sanitizeConnectionFlags( $flags, $i, $domain ) {
  274. // Whether an outside caller is explicitly requesting the master database server
  275. if ( $i === self::DB_MASTER || $i === $this->getWriterIndex() ) {
  276. $flags |= self::CONN_INTENT_WRITABLE;
  277. }
  278. if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
  279. // Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
  280. // resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
  281. // during larger transactions. This is useful for avoiding lock contention.
  282. // Master DB server attributes (should match those of the replica DB servers)
  283. $attributes = $this->getServerAttributes( $this->getWriterIndex() );
  284. if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
  285. // The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
  286. // to use separate connections would just cause self-deadlocks. Note that
  287. // REPEATABLE-READ staleness is not an issue since DB-level locking means
  288. // that transactions are Strict Serializable anyway.
  289. $flags &= ~self::CONN_TRX_AUTOCOMMIT;
  290. $type = $this->getServerType( $this->getWriterIndex() );
  291. $this->connLogger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
  292. } elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
  293. // T202116: integration tests are active and queries should be all be using
  294. // temporary clone tables (via prefix). Such tables are not visible accross
  295. // different connections nor can there be REPEATABLE-READ snapshot staleness,
  296. // so use the same connection for everything.
  297. $flags &= ~self::CONN_TRX_AUTOCOMMIT;
  298. }
  299. }
  300. return $flags;
  301. }
  302. /**
  303. * @param IDatabase $conn
  304. * @param int $flags
  305. * @throws DBUnexpectedError
  306. */
  307. private function enforceConnectionFlags( IDatabase $conn, $flags ) {
  308. if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT ) {
  309. if ( $conn->trxLevel() ) { // sanity
  310. throw new DBUnexpectedError(
  311. $conn,
  312. 'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
  313. );
  314. }
  315. $conn->clearFlag( $conn::DBO_TRX ); // auto-commit mode
  316. }
  317. }
  318. /**
  319. * Get a LoadMonitor instance
  320. *
  321. * @return ILoadMonitor
  322. */
  323. private function getLoadMonitor() {
  324. if ( !isset( $this->loadMonitor ) ) {
  325. $compat = [
  326. 'LoadMonitor' => LoadMonitor::class,
  327. 'LoadMonitorNull' => LoadMonitorNull::class,
  328. 'LoadMonitorMySQL' => LoadMonitorMySQL::class,
  329. ];
  330. $class = $this->loadMonitorConfig['class'];
  331. if ( isset( $compat[$class] ) ) {
  332. $class = $compat[$class];
  333. }
  334. $this->loadMonitor = new $class(
  335. $this, $this->srvCache, $this->wanCache, $this->loadMonitorConfig );
  336. $this->loadMonitor->setLogger( $this->replLogger );
  337. }
  338. return $this->loadMonitor;
  339. }
  340. /**
  341. * @param array $loads
  342. * @param bool|string $domain Domain to get non-lagged for
  343. * @param int $maxLag Restrict the maximum allowed lag to this many seconds
  344. * @return bool|int|string
  345. */
  346. private function getRandomNonLagged( array $loads, $domain = false, $maxLag = INF ) {
  347. $lags = $this->getLagTimes( $domain );
  348. # Unset excessively lagged servers
  349. foreach ( $lags as $i => $lag ) {
  350. if ( $i !== $this->getWriterIndex() ) {
  351. # How much lag this server nominally is allowed to have
  352. $maxServerLag = $this->servers[$i]['max lag'] ?? $this->maxLag; // default
  353. # Constrain that futher by $maxLag argument
  354. $maxServerLag = min( $maxServerLag, $maxLag );
  355. $host = $this->getServerName( $i );
  356. if ( $lag === false && !is_infinite( $maxServerLag ) ) {
  357. $this->replLogger->debug(
  358. __METHOD__ .
  359. ": server {host} is not replicating?", [ 'host' => $host ] );
  360. unset( $loads[$i] );
  361. } elseif ( $lag > $maxServerLag ) {
  362. $this->replLogger->debug(
  363. __METHOD__ .
  364. ": server {host} has {lag} seconds of lag (>= {maxlag})",
  365. [ 'host' => $host, 'lag' => $lag, 'maxlag' => $maxServerLag ]
  366. );
  367. unset( $loads[$i] );
  368. }
  369. }
  370. }
  371. # Find out if all the replica DBs with non-zero load are lagged
  372. $sum = 0;
  373. foreach ( $loads as $load ) {
  374. $sum += $load;
  375. }
  376. if ( $sum == 0 ) {
  377. # No appropriate DB servers except maybe the master and some replica DBs with zero load
  378. # Do NOT use the master
  379. # Instead, this function will return false, triggering read-only mode,
  380. # and a lagged replica DB will be used instead.
  381. return false;
  382. }
  383. if ( count( $loads ) == 0 ) {
  384. return false;
  385. }
  386. # Return a random representative of the remainder
  387. return ArrayUtils::pickRandom( $loads );
  388. }
  389. /**
  390. * Get the server index to use for a specified server index and query group list
  391. *
  392. * @param int $i Specific server index or DB_MASTER/DB_REPLICA
  393. * @param string[] $groups Non-empty query group list in preference order
  394. * @param string|bool $domain
  395. * @return int A specific server index (replica DBs are checked for connectivity)
  396. */
  397. private function getConnectionIndex( $i, array $groups, $domain ) {
  398. if ( $i === self::DB_MASTER ) {
  399. $i = $this->getWriterIndex();
  400. } elseif ( $i === self::DB_REPLICA ) {
  401. foreach ( $groups as $group ) {
  402. $groupIndex = $this->getReaderIndex( $group, $domain );
  403. if ( $groupIndex !== false ) {
  404. $i = $groupIndex; // group connection succeeded
  405. break;
  406. }
  407. }
  408. } elseif ( !isset( $this->servers[$i] ) ) {
  409. throw new UnexpectedValueException( "Invalid server index index #$i" );
  410. }
  411. if ( $i === self::DB_REPLICA ) {
  412. $this->lastError = 'Unknown error'; // set here in case of worse failure
  413. $this->lastError = 'No working replica DB server: ' . $this->lastError;
  414. $this->reportConnectionError();
  415. return null; // unreachable due to exception
  416. }
  417. return $i;
  418. }
  419. public function getReaderIndex( $group = false, $domain = false ) {
  420. if ( $this->getServerCount() == 1 ) {
  421. // Skip the load balancing if there's only one server
  422. return $this->getWriterIndex();
  423. }
  424. $group = is_string( $group ) ? $group : self::GROUP_GENERIC;
  425. $index = $this->getExistingReaderIndex( $group );
  426. if ( $index >= 0 ) {
  427. // A reader index was already selected and "waitForPos" was handled
  428. return $index;
  429. }
  430. // Use the server weight array for this load group
  431. if ( isset( $this->groupLoads[$group] ) ) {
  432. $loads = $this->groupLoads[$group];
  433. } else {
  434. $this->connLogger->info( __METHOD__ . ": no loads for group $group" );
  435. return false;
  436. }
  437. // Scale the configured load ratios according to each server's load and state
  438. $this->getLoadMonitor()->scaleLoads( $loads, $domain );
  439. // Pick a server to use, accounting for weights, load, lag, and "waitForPos"
  440. $this->lazyLoadReplicationPositions(); // optimizes server candidate selection
  441. list( $i, $laggedReplicaMode ) = $this->pickReaderIndex( $loads, $domain );
  442. if ( $i === false ) {
  443. // DB connection unsuccessful
  444. return false;
  445. }
  446. // If data seen by queries is expected to reflect the transactions committed as of
  447. // or after a given replication position then wait for the DB to apply those changes
  448. if ( $this->waitForPos && $i !== $this->getWriterIndex() && !$this->doWait( $i ) ) {
  449. // Data will be outdated compared to what was expected
  450. $laggedReplicaMode = true;
  451. }
  452. // Cache the reader index for future DB_REPLICA handles
  453. $this->setExistingReaderIndex( $group, $i );
  454. // Record whether the generic reader index is in "lagged replica DB" mode
  455. if ( $group === self::GROUP_GENERIC && $laggedReplicaMode ) {
  456. $this->laggedReplicaMode = true;
  457. }
  458. $serverName = $this->getServerName( $i );
  459. $this->connLogger->debug( __METHOD__ . ": using server $serverName for group '$group'" );
  460. return $i;
  461. }
  462. /**
  463. * Get the server index chosen by the load balancer for use with the given query group
  464. *
  465. * @param string $group Query group; use false for the generic group
  466. * @return int Server index or -1 if none was chosen
  467. */
  468. protected function getExistingReaderIndex( $group ) {
  469. return $this->readIndexByGroup[$group] ?? -1;
  470. }
  471. /**
  472. * Set the server index chosen by the load balancer for use with the given query group
  473. *
  474. * @param string $group Query group; use false for the generic group
  475. * @param int $index The index of a specific server
  476. */
  477. private function setExistingReaderIndex( $group, $index ) {
  478. if ( $index < 0 ) {
  479. throw new UnexpectedValueException( "Cannot set a negative read server index" );
  480. }
  481. $this->readIndexByGroup[$group] = $index;
  482. }
  483. /**
  484. * @param array $loads List of server weights
  485. * @param string|bool $domain
  486. * @return array (reader index, lagged replica mode) or (false, false) on failure
  487. */
  488. private function pickReaderIndex( array $loads, $domain = false ) {
  489. if ( $loads === [] ) {
  490. throw new InvalidArgumentException( "Server configuration array is empty" );
  491. }
  492. /** @var int|bool $i Index of selected server */
  493. $i = false;
  494. /** @var bool $laggedReplicaMode Whether server is considered lagged */
  495. $laggedReplicaMode = false;
  496. // Quickly look through the available servers for a server that meets criteria...
  497. $currentLoads = $loads;
  498. while ( count( $currentLoads ) ) {
  499. if ( $this->allowLagged || $laggedReplicaMode ) {
  500. $i = ArrayUtils::pickRandom( $currentLoads );
  501. } else {
  502. $i = false;
  503. if ( $this->waitForPos && $this->waitForPos->asOfTime() ) {
  504. $this->replLogger->debug( __METHOD__ . ": replication positions detected" );
  505. // "chronologyCallback" sets "waitForPos" for session consistency.
  506. // This triggers doWait() after connect, so it's especially good to
  507. // avoid lagged servers so as to avoid excessive delay in that method.
  508. $ago = microtime( true ) - $this->waitForPos->asOfTime();
  509. // Aim for <= 1 second of waiting (being too picky can backfire)
  510. $i = $this->getRandomNonLagged( $currentLoads, $domain, $ago + 1 );
  511. }
  512. if ( $i === false ) {
  513. // Any server with less lag than it's 'max lag' param is preferable
  514. $i = $this->getRandomNonLagged( $currentLoads, $domain );
  515. }
  516. if ( $i === false && count( $currentLoads ) ) {
  517. // All replica DBs lagged. Switch to read-only mode
  518. $this->replLogger->error(
  519. __METHOD__ . ": all replica DBs lagged. Switch to read-only mode" );
  520. $i = ArrayUtils::pickRandom( $currentLoads );
  521. $laggedReplicaMode = true;
  522. }
  523. }
  524. if ( $i === false ) {
  525. // pickRandom() returned false.
  526. // This is permanent and means the configuration or the load monitor
  527. // wants us to return false.
  528. $this->connLogger->debug( __METHOD__ . ": pickRandom() returned false" );
  529. return [ false, false ];
  530. }
  531. $serverName = $this->getServerName( $i );
  532. $this->connLogger->debug( __METHOD__ . ": Using reader #$i: $serverName..." );
  533. // Get a connection to this server without triggering other server connections
  534. $conn = $this->getServerConnection( $i, $domain, self::CONN_SILENCE_ERRORS );
  535. if ( !$conn ) {
  536. $this->connLogger->warning( __METHOD__ . ": Failed connecting to $i/$domain" );
  537. unset( $currentLoads[$i] ); // avoid this server next iteration
  538. $i = false;
  539. continue;
  540. }
  541. // Decrement reference counter, we are finished with this connection.
  542. // It will be incremented for the caller later.
  543. if ( $domain !== false ) {
  544. $this->reuseConnection( $conn );
  545. }
  546. // Return this server
  547. break;
  548. }
  549. // If all servers were down, quit now
  550. if ( $currentLoads === [] ) {
  551. $this->connLogger->error( __METHOD__ . ": all servers down" );
  552. }
  553. return [ $i, $laggedReplicaMode ];
  554. }
  555. public function waitFor( $pos ) {
  556. $oldPos = $this->waitForPos;
  557. try {
  558. $this->waitForPos = $pos;
  559. // If a generic reader connection was already established, then wait now
  560. $i = $this->getExistingReaderIndex( self::GROUP_GENERIC );
  561. if ( $i > 0 && !$this->doWait( $i ) ) {
  562. $this->laggedReplicaMode = true;
  563. }
  564. // Otherwise, wait until a connection is established in getReaderIndex()
  565. } finally {
  566. // Restore the older position if it was higher since this is used for lag-protection
  567. $this->setWaitForPositionIfHigher( $oldPos );
  568. }
  569. }
  570. public function waitForOne( $pos, $timeout = null ) {
  571. $oldPos = $this->waitForPos;
  572. try {
  573. $this->waitForPos = $pos;
  574. $i = $this->getExistingReaderIndex( self::GROUP_GENERIC );
  575. if ( $i <= 0 ) {
  576. // Pick a generic replica DB if there isn't one yet
  577. $readLoads = $this->groupLoads[self::GROUP_GENERIC];
  578. unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
  579. $readLoads = array_filter( $readLoads ); // with non-zero load
  580. $i = ArrayUtils::pickRandom( $readLoads );
  581. }
  582. if ( $i > 0 ) {
  583. $ok = $this->doWait( $i, true, $timeout );
  584. } else {
  585. $ok = true; // no applicable loads
  586. }
  587. } finally {
  588. // Restore the old position; this is used for throttling, not lag-protection
  589. $this->waitForPos = $oldPos;
  590. }
  591. return $ok;
  592. }
  593. public function waitForAll( $pos, $timeout = null ) {
  594. $timeout = $timeout ?: $this->waitTimeout;
  595. $oldPos = $this->waitForPos;
  596. try {
  597. $this->waitForPos = $pos;
  598. $serverCount = $this->getServerCount();
  599. $ok = true;
  600. for ( $i = 1; $i < $serverCount; $i++ ) {
  601. if ( $this->serverHasLoadInAnyGroup( $i ) ) {
  602. $start = microtime( true );
  603. $ok = $this->doWait( $i, true, $timeout ) && $ok;
  604. $timeout -= intval( microtime( true ) - $start );
  605. if ( $timeout <= 0 ) {
  606. break; // timeout reached
  607. }
  608. }
  609. }
  610. } finally {
  611. // Restore the old position; this is used for throttling, not lag-protection
  612. $this->waitForPos = $oldPos;
  613. }
  614. return $ok;
  615. }
  616. /**
  617. * @param int $i Specific server index
  618. * @return bool
  619. */
  620. private function serverHasLoadInAnyGroup( $i ) {
  621. foreach ( $this->groupLoads as $loadsByIndex ) {
  622. if ( ( $loadsByIndex[$i] ?? 0 ) > 0 ) {
  623. return true;
  624. }
  625. }
  626. return false;
  627. }
  628. /**
  629. * @param DBMasterPos|bool $pos
  630. */
  631. private function setWaitForPositionIfHigher( $pos ) {
  632. if ( !$pos ) {
  633. return;
  634. }
  635. if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
  636. $this->waitForPos = $pos;
  637. }
  638. }
  639. public function getAnyOpenConnection( $i, $flags = 0 ) {
  640. $i = ( $i === self::DB_MASTER ) ? $this->getWriterIndex() : $i;
  641. // Connection handles required to be in auto-commit mode use a separate connection
  642. // pool since the main pool is effected by implicit and explicit transaction rounds
  643. $autocommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
  644. $conn = false;
  645. foreach ( $this->conns as $connsByServer ) {
  646. // Get the connection array server indexes to inspect
  647. if ( $i === self::DB_REPLICA ) {
  648. $indexes = array_keys( $connsByServer );
  649. } else {
  650. $indexes = isset( $connsByServer[$i] ) ? [ $i ] : [];
  651. }
  652. foreach ( $indexes as $index ) {
  653. $conn = $this->pickAnyOpenConnection( $connsByServer[$index], $autocommit );
  654. if ( $conn ) {
  655. break;
  656. }
  657. }
  658. }
  659. if ( $conn ) {
  660. $this->enforceConnectionFlags( $conn, $flags );
  661. }
  662. return $conn;
  663. }
  664. /**
  665. * @param IDatabase[] $candidateConns
  666. * @param bool $autocommit Whether to only look for auto-commit connections
  667. * @return IDatabase|false An appropriate open connection or false if none found
  668. */
  669. private function pickAnyOpenConnection( $candidateConns, $autocommit ) {
  670. $conn = false;
  671. foreach ( $candidateConns as $candidateConn ) {
  672. if ( !$candidateConn->isOpen() ) {
  673. continue; // some sort of error occured?
  674. } elseif (
  675. $autocommit &&
  676. (
  677. // Connection is transaction round aware
  678. !$candidateConn->getLBInfo( 'autoCommitOnly' ) ||
  679. // Some sort of error left a transaction open?
  680. $candidateConn->trxLevel()
  681. )
  682. ) {
  683. continue; // some sort of error left a transaction open?
  684. }
  685. $conn = $candidateConn;
  686. }
  687. return $conn;
  688. }
  689. /**
  690. * Wait for a given replica DB to catch up to the master pos stored in "waitForPos"
  691. * @param int $index Specific server index
  692. * @param bool $open Check the server even if a new connection has to be made
  693. * @param int|null $timeout Max seconds to wait; default is "waitTimeout"
  694. * @return bool
  695. */
  696. protected function doWait( $index, $open = false, $timeout = null ) {
  697. $timeout = max( 1, intval( $timeout ?: $this->waitTimeout ) );
  698. // Check if we already know that the DB has reached this point
  699. $server = $this->getServerName( $index );
  700. $key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server, 'v1' );
  701. /** @var DBMasterPos $knownReachedPos */
  702. $knownReachedPos = $this->srvCache->get( $key );
  703. if (
  704. $knownReachedPos instanceof DBMasterPos &&
  705. $knownReachedPos->hasReached( $this->waitForPos )
  706. ) {
  707. $this->replLogger->debug(
  708. __METHOD__ .
  709. ': replica DB {dbserver} known to be caught up (pos >= $knownReachedPos).',
  710. [ 'dbserver' => $server ]
  711. );
  712. return true;
  713. }
  714. // Find a connection to wait on, creating one if needed and allowed
  715. $close = false; // close the connection afterwards
  716. $flags = self::CONN_SILENCE_ERRORS;
  717. $conn = $this->getAnyOpenConnection( $index, $flags );
  718. if ( !$conn ) {
  719. if ( !$open ) {
  720. $this->replLogger->debug(
  721. __METHOD__ . ': no connection open for {dbserver}',
  722. [ 'dbserver' => $server ]
  723. );
  724. return false;
  725. }
  726. // Get a connection to this server without triggering other server connections
  727. $conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
  728. if ( !$conn ) {
  729. $this->replLogger->warning(
  730. __METHOD__ . ': failed to connect to {dbserver}',
  731. [ 'dbserver' => $server ]
  732. );
  733. return false;
  734. }
  735. // Avoid connection spam in waitForAll() when connections
  736. // are made just for the sake of doing this lag check.
  737. $close = true;
  738. }
  739. $this->replLogger->info(
  740. __METHOD__ .
  741. ': waiting for replica DB {dbserver} to catch up...',
  742. [ 'dbserver' => $server ]
  743. );
  744. $result = $conn->masterPosWait( $this->waitForPos, $timeout );
  745. if ( $result === null ) {
  746. $this->replLogger->warning(
  747. __METHOD__ . ': Errored out waiting on {host} pos {pos}',
  748. [
  749. 'host' => $server,
  750. 'pos' => $this->waitForPos,
  751. 'trace' => ( new RuntimeException() )->getTraceAsString()
  752. ]
  753. );
  754. $ok = false;
  755. } elseif ( $result == -1 ) {
  756. $this->replLogger->warning(
  757. __METHOD__ . ': Timed out waiting on {host} pos {pos}',
  758. [
  759. 'host' => $server,
  760. 'pos' => $this->waitForPos,
  761. 'trace' => ( new RuntimeException() )->getTraceAsString()
  762. ]
  763. );
  764. $ok = false;
  765. } else {
  766. $this->replLogger->debug( __METHOD__ . ": done waiting" );
  767. $ok = true;
  768. // Remember that the DB reached this point
  769. $this->srvCache->set( $key, $this->waitForPos, BagOStuff::TTL_DAY );
  770. }
  771. if ( $close ) {
  772. $this->closeConnection( $conn );
  773. }
  774. return $ok;
  775. }
  776. public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
  777. $domain = $this->resolveDomainID( $domain );
  778. $groups = $this->resolveGroups( $groups, $i );
  779. $flags = $this->sanitizeConnectionFlags( $flags, $i, $domain );
  780. // If given DB_MASTER/DB_REPLICA, resolve it to a specific server index. Resolving
  781. // DB_REPLICA might trigger getServerConnection() calls due to the getReaderIndex()
  782. // connectivity checks or LoadMonitor::scaleLoads() server state cache regeneration.
  783. // The use of getServerConnection() instead of getConnection() avoids infinite loops.
  784. $serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
  785. // Get an open connection to that server (might trigger a new connection)
  786. $conn = $this->getServerConnection( $serverIndex, $domain, $flags );
  787. // Set master DB handles as read-only if there is high replication lag
  788. if (
  789. $serverIndex === $this->getWriterIndex() &&
  790. $this->getLaggedReplicaMode( $domain ) &&
  791. !is_string( $conn->getLBInfo( 'readOnlyReason' ) )
  792. ) {
  793. $reason = ( $this->getExistingReaderIndex( self::GROUP_GENERIC ) >= 0 )
  794. ? 'The database is read-only until replication lag decreases.'
  795. : 'The database is read-only until replica database servers becomes reachable.';
  796. $conn->setLBInfo( 'readOnlyReason', $reason );
  797. }
  798. return $conn;
  799. }
  800. /**
  801. * @param int $i Specific server index
  802. * @param string $domain Resolved DB domain
  803. * @param int $flags Bitfield of class CONN_* constants
  804. * @return IDatabase|bool
  805. * @throws InvalidArgumentException When the server index is invalid
  806. */
  807. public function getServerConnection( $i, $domain, $flags = 0 ) {
  808. // Number of connections made before getting the server index and handle
  809. $priorConnectionsMade = $this->connectionCounter;
  810. // Get an open connection to this server (might trigger a new connection)
  811. $conn = $this->localDomain->equals( $domain )
  812. ? $this->getLocalConnection( $i, $flags )
  813. : $this->getForeignConnection( $i, $domain, $flags );
  814. // Throw an error or otherwise bail out if the connection attempt failed
  815. if ( !( $conn instanceof IDatabase ) ) {
  816. if ( ( $flags & self::CONN_SILENCE_ERRORS ) != self::CONN_SILENCE_ERRORS ) {
  817. $this->reportConnectionError();
  818. }
  819. return false;
  820. }
  821. // Profile any new connections caused by this method
  822. if ( $this->connectionCounter > $priorConnectionsMade ) {
  823. $this->trxProfiler->recordConnection(
  824. $conn->getServer(),
  825. $conn->getDBname(),
  826. ( ( $flags & self::CONN_INTENT_WRITABLE ) == self::CONN_INTENT_WRITABLE )
  827. );
  828. }
  829. if ( !$conn->isOpen() ) {
  830. $this->errorConnection = $conn;
  831. // Connection was made but later unrecoverably lost for some reason.
  832. // Do not return a handle that will just throw exceptions on use, but
  833. // let the calling code, e.g. getReaderIndex(), try another server.
  834. return false;
  835. }
  836. // Make sure that flags like CONN_TRX_AUTOCOMMIT are respected by this handle
  837. $this->enforceConnectionFlags( $conn, $flags );
  838. // Set master DB handles as read-only if the load balancer is configured as read-only
  839. // or the master database server is running in server-side read-only mode. Note that
  840. // replica DB handles are always read-only via Database::assertIsWritableMaster().
  841. // Read-only mode due to replication lag is *avoided* here to avoid recursion.
  842. if ( $i === $this->getWriterIndex() ) {
  843. if ( $this->readOnlyReason !== false ) {
  844. $readOnlyReason = $this->readOnlyReason;
  845. } elseif ( $this->isMasterConnectionReadOnly( $conn, $flags ) ) {
  846. $readOnlyReason = 'The master database server is running in read-only mode.';
  847. } else {
  848. $readOnlyReason = false;
  849. }
  850. $conn->setLBInfo( 'readOnlyReason', $readOnlyReason );
  851. }
  852. return $conn;
  853. }
  854. public function reuseConnection( IDatabase $conn ) {
  855. $serverIndex = $conn->getLBInfo( 'serverIndex' );
  856. $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
  857. if ( $serverIndex === null || $refCount === null ) {
  858. return; // non-foreign connection; no domain-use tracking to update
  859. } elseif ( $conn instanceof DBConnRef ) {
  860. // DBConnRef already handles calling reuseConnection() and only passes the live
  861. // Database instance to this method. Any caller passing in a DBConnRef is broken.
  862. $this->connLogger->error(
  863. __METHOD__ . ": got DBConnRef instance.\n" .
  864. ( new LogicException() )->getTraceAsString() );
  865. return;
  866. }
  867. if ( $this->disabled ) {
  868. return; // DBConnRef handle probably survived longer than the LoadBalancer
  869. }
  870. if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
  871. $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
  872. $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
  873. } else {
  874. $connFreeKey = self::KEY_FOREIGN_FREE;
  875. $connInUseKey = self::KEY_FOREIGN_INUSE;
  876. }
  877. $domain = $conn->getDomainID();
  878. if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
  879. throw new InvalidArgumentException(
  880. "Connection $serverIndex/$domain not found; it may have already been freed" );
  881. } elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
  882. throw new InvalidArgumentException(
  883. "Connection $serverIndex/$domain mismatched; it may have already been freed" );
  884. }
  885. $conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
  886. if ( $refCount <= 0 ) {
  887. $this->conns[$connFreeKey][$serverIndex][$domain] = $conn;
  888. unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
  889. if ( !$this->conns[$connInUseKey][$serverIndex] ) {
  890. unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
  891. }
  892. $this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
  893. } else {
  894. $this->connLogger->debug( __METHOD__ .
  895. ": reference count for $serverIndex/$domain reduced to $refCount" );
  896. }
  897. }
  898. public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
  899. $domain = $this->resolveDomainID( $domain );
  900. $role = $this->getRoleFromIndex( $i );
  901. return new DBConnRef( $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
  902. }
  903. public function getLazyConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
  904. $domain = $this->resolveDomainID( $domain );
  905. $role = $this->getRoleFromIndex( $i );
  906. return new DBConnRef( $this, [ $i, $groups, $domain, $flags ], $role );
  907. }
  908. public function getMaintenanceConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ) {
  909. $domain = $this->resolveDomainID( $domain );
  910. $role = $this->getRoleFromIndex( $i );
  911. return new MaintainableDBConnRef(
  912. $this, $this->getConnection( $i, $groups, $domain, $flags ), $role );
  913. }
  914. /**
  915. * @param int $i Server index or DB_MASTER/DB_REPLICA
  916. * @return int One of DB_MASTER/DB_REPLICA
  917. */
  918. private function getRoleFromIndex( $i ) {
  919. return ( $i === self::DB_MASTER || $i === $this->getWriterIndex() )
  920. ? self::DB_MASTER
  921. : self::DB_REPLICA;
  922. }
  923. /**
  924. * @param int $i
  925. * @param string|bool $domain
  926. * @param int $flags
  927. * @return Database|bool Live database handle or false on failure
  928. * @deprecated Since 1.34 Use getConnection() instead
  929. */
  930. public function openConnection( $i, $domain = false, $flags = 0 ) {
  931. return $this->getConnection( $i, [], $domain, $flags | self::CONN_SILENCE_ERRORS );
  932. }
  933. /**
  934. * Open a connection to a local DB, or return one if it is already open.
  935. *
  936. * On error, returns false, and the connection which caused the
  937. * error will be available via $this->errorConnection.
  938. *
  939. * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
  940. *
  941. * @param int $i Server index
  942. * @param int $flags Class CONN_* constant bitfield
  943. * @return Database
  944. * @throws InvalidArgumentException When the server index is invalid
  945. * @throws UnexpectedValueException When the DB domain of the connection is corrupted
  946. */
  947. private function getLocalConnection( $i, $flags = 0 ) {
  948. // Connection handles required to be in auto-commit mode use a separate connection
  949. // pool since the main pool is effected by implicit and explicit transaction rounds
  950. $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
  951. $connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
  952. if ( isset( $this->conns[$connKey][$i][0] ) ) {
  953. $conn = $this->conns[$connKey][$i][0];
  954. } else {
  955. // Open a new connection
  956. $server = $this->getServerInfoStrict( $i );
  957. $server['serverIndex'] = $i;
  958. $server['autoCommitOnly'] = $autoCommit;
  959. $conn = $this->reallyOpenConnection( $server, $this->localDomain );
  960. $host = $this->getServerName( $i );
  961. if ( $conn->isOpen() ) {
  962. $this->connLogger->debug(
  963. __METHOD__ . ": connected to database $i at '$host'." );
  964. $this->conns[$connKey][$i][0] = $conn;
  965. } else {
  966. $this->connLogger->warning(
  967. __METHOD__ . ": failed to connect to database $i at '$host'." );
  968. $this->errorConnection = $conn;
  969. $conn = false;
  970. }
  971. }
  972. // Final sanity check to make sure the right domain is selected
  973. if (
  974. $conn instanceof IDatabase &&
  975. !$this->localDomain->isCompatible( $conn->getDomainID() )
  976. ) {
  977. throw new UnexpectedValueException(
  978. "Got connection to '{$conn->getDomainID()}', " .
  979. "but expected local domain ('{$this->localDomain}')" );
  980. }
  981. return $conn;
  982. }
  983. /**
  984. * Open a connection to a foreign DB, or return one if it is already open.
  985. *
  986. * Increments a reference count on the returned connection which locks the
  987. * connection to the requested domain. This reference count can be
  988. * decremented by calling reuseConnection().
  989. *
  990. * If a connection is open to the appropriate server already, but with the wrong
  991. * database, it will be switched to the right database and returned, as long as
  992. * it has been freed first with reuseConnection().
  993. *
  994. * On error, returns false, and the connection which caused the
  995. * error will be available via $this->errorConnection.
  996. *
  997. * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
  998. *
  999. * @param int $i Server index
  1000. * @param string $domain Domain ID to open
  1001. * @param int $flags Class CONN_* constant bitfield
  1002. * @return Database|bool Returns false on connection error
  1003. * @throws DBError When database selection fails
  1004. * @throws InvalidArgumentException When the server index is invalid
  1005. * @throws UnexpectedValueException When the DB domain of the connection is corrupted
  1006. */
  1007. private function getForeignConnection( $i, $domain, $flags = 0 ) {
  1008. $domainInstance = DatabaseDomain::newFromId( $domain );
  1009. // Connection handles required to be in auto-commit mode use a separate connection
  1010. // pool since the main pool is effected by implicit and explicit transaction rounds
  1011. $autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
  1012. if ( $autoCommit ) {
  1013. $connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
  1014. $connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
  1015. } else {
  1016. $connFreeKey = self::KEY_FOREIGN_FREE;
  1017. $connInUseKey = self::KEY_FOREIGN_INUSE;
  1018. }
  1019. /** @var Database $conn */
  1020. $conn = null;
  1021. if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
  1022. // Reuse an in-use connection for the same domain
  1023. $conn = $this->conns[$connInUseKey][$i][$domain];
  1024. $this->connLogger->debug( __METHOD__ . ": reusing connection $i/$domain" );
  1025. } elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
  1026. // Reuse a free connection for the same domain
  1027. $conn = $this->conns[$connFreeKey][$i][$domain];
  1028. unset( $this->conns[$connFreeKey][$i][$domain] );
  1029. $this->conns[$connInUseKey][$i][$domain] = $conn;
  1030. $this->connLogger->debug( __METHOD__ . ": reusing free connection $i/$domain" );
  1031. } elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
  1032. // Reuse a free connection from another domain if possible
  1033. foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $conn ) {
  1034. if ( $domainInstance->getDatabase() !== null ) {
  1035. // Check if changing the database will require a new connection.
  1036. // In that case, leave the connection handle alone and keep looking.
  1037. // This prevents connections from being closed mid-transaction and can
  1038. // also avoid overhead if the same database will later be requested.
  1039. if (
  1040. $conn->databasesAreIndependent() &&
  1041. $conn->getDBname() !== $domainInstance->getDatabase()
  1042. ) {
  1043. continue;
  1044. }
  1045. // Select the new database, schema, and prefix
  1046. $conn->selectDomain( $domainInstance );
  1047. } else {
  1048. // Stay on the current database, but update the schema/prefix
  1049. $conn->dbSchema( $domainInstance->getSchema() );
  1050. $conn->tablePrefix( $domainInstance->getTablePrefix() );
  1051. }
  1052. unset( $this->conns[$connFreeKey][$i][$oldDomain] );
  1053. // Note that if $domain is an empty string, getDomainID() might not match it
  1054. $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
  1055. $this->connLogger->debug( __METHOD__ .
  1056. ": reusing free connection from $oldDomain for $domain" );
  1057. break;
  1058. }
  1059. }
  1060. if ( !$conn ) {
  1061. // Open a new connection
  1062. $server = $this->getServerInfoStrict( $i );
  1063. $server['serverIndex'] = $i;
  1064. $server['foreignPoolRefCount'] = 0;
  1065. $server['foreign'] = true;
  1066. $server['autoCommitOnly'] = $autoCommit;
  1067. $conn = $this->reallyOpenConnection( $server, $domainInstance );
  1068. if ( !$conn->isOpen() ) {
  1069. $this->connLogger->warning( __METHOD__ . ": connection error for $i/$domain" );
  1070. $this->errorConnection = $conn;
  1071. $conn = false;
  1072. } else {
  1073. // Note that if $domain is an empty string, getDomainID() might not match it
  1074. $this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
  1075. $this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
  1076. }
  1077. }
  1078. if ( $conn instanceof IDatabase ) {
  1079. // Final sanity check to make sure the right domain is selected
  1080. if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
  1081. throw new UnexpectedValueException(
  1082. "Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
  1083. }
  1084. // Increment reference count
  1085. $refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
  1086. $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
  1087. }
  1088. return $conn;
  1089. }
  1090. public function getServerAttributes( $i ) {
  1091. return Database::attributesFromType(
  1092. $this->getServerType( $i ),
  1093. $this->servers[$i]['driver'] ?? null
  1094. );
  1095. }
  1096. /**
  1097. * Test if the specified index represents an open connection
  1098. *
  1099. * @param int $index Server index
  1100. * @return bool
  1101. */
  1102. private function isOpen( $index ) {
  1103. return (bool)$this->getAnyOpenConnection( $index );
  1104. }
  1105. /**
  1106. * Open a new network connection to a server (uncached)
  1107. *
  1108. * Returns a Database object whether or not the connection was successful.
  1109. *
  1110. * @param array $server
  1111. * @param DatabaseDomain $domain Domain the connection is for, possibly unspecified
  1112. * @return Database
  1113. * @throws DBAccessError
  1114. * @throws InvalidArgumentException
  1115. */
  1116. protected function reallyOpenConnection( array $server, DatabaseDomain $domain ) {
  1117. if ( $this->disabled ) {
  1118. throw new DBAccessError();
  1119. }
  1120. if ( $domain->getDatabase() === null ) {
  1121. // The database domain does not specify a DB name and some database systems require a
  1122. // valid DB specified on connection. The $server configuration array contains a default
  1123. // DB name to use for connections in such cases.
  1124. if ( $server['type'] === 'mysql' ) {
  1125. // For MySQL, DATABASE and SCHEMA are synonyms, connections need not specify a DB,
  1126. // and the DB name in $server might not exist due to legacy reasons (the default
  1127. // domain used to ignore the local LB domain, even when mismatched).
  1128. $server['dbname'] = null;
  1129. }
  1130. } else {
  1131. $server['dbname'] = $domain->getDatabase();
  1132. }
  1133. if ( $domain->getSchema() !== null ) {
  1134. $server['schema'] = $domain->getSchema();
  1135. }
  1136. // It is always possible to connect with any prefix, even the empty string
  1137. $server['tablePrefix'] = $domain->getTablePrefix();
  1138. // Let the handle know what the cluster master is (e.g. "db1052")
  1139. $masterName = $this->getServerName( $this->getWriterIndex() );
  1140. $server['clusterMasterHost'] = $masterName;
  1141. $server['srvCache'] = $this->srvCache;
  1142. // Set loggers and profilers
  1143. $server['connLogger'] = $this->connLogger;
  1144. $server['queryLogger'] = $this->queryLogger;
  1145. $server['errorLogger'] = $this->errorLogger;
  1146. $server['deprecationLogger'] = $this->deprecationLogger;
  1147. $server['profiler'] = $this->profiler;
  1148. $server['trxProfiler'] = $this->trxProfiler;
  1149. // Use the same agent and PHP mode for all DB handles
  1150. $server['cliMode'] = $this->cliMode;
  1151. $server['agent'] = $this->agent;
  1152. // Use DBO_DEFAULT flags by default for LoadBalancer managed databases. Assume that the
  1153. // application calls LoadBalancer::commitMasterChanges() before the PHP script completes.
  1154. $server['flags'] = $server['flags'] ?? IDatabase::DBO_DEFAULT;
  1155. $server['ownerId'] = $this->id;
  1156. // Create a live connection object
  1157. $conn = Database::factory( $server['type'], $server, Database::NEW_UNCONNECTED );
  1158. $conn->setLBInfo( $server );
  1159. $conn->setLazyMasterHandle(
  1160. $this->getLazyConnectionRef( self::DB_MASTER, [], $conn->getDomainID() )
  1161. );
  1162. $conn->setTableAliases( $this->tableAliases );
  1163. $conn->setIndexAliases( $this->indexAliases );
  1164. try {
  1165. $conn->initConnection();
  1166. ++$this->connectionCounter;
  1167. } catch ( DBConnectionError $e ) {
  1168. // ignore; let the DB handle the logging
  1169. }
  1170. if ( $server['serverIndex'] === $this->getWriterIndex() ) {
  1171. if ( $this->trxRoundId !== false ) {
  1172. $this->applyTransactionRoundFlags( $conn );
  1173. }
  1174. foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
  1175. $conn->setTransactionListener( $name, $callback );
  1176. }
  1177. }
  1178. $this->lazyLoadReplicationPositions(); // session consistency
  1179. // Log when many connection are made on requests
  1180. $count = $this->getCurrentConnectionCount();
  1181. if ( $count >= self::CONN_HELD_WARN_THRESHOLD ) {
  1182. $this->perfLogger->warning(
  1183. __METHOD__ . ": {connections}+ connections made (master={masterdb})",
  1184. [
  1185. 'connections' => $count,
  1186. 'dbserver' => $conn->getServer(),
  1187. 'masterdb' => $conn->getLBInfo( 'clusterMasterHost' )
  1188. ]
  1189. );
  1190. }
  1191. return $conn;
  1192. }
  1193. /**
  1194. * Make sure that any "waitForPos" positions are loaded and available to doWait()
  1195. */
  1196. private function lazyLoadReplicationPositions() {
  1197. if ( !$this->connectionAttempted && $this->chronologyCallback ) {
  1198. $this->connectionAttempted = true;
  1199. ( $this->chronologyCallback )( $this ); // generally calls waitFor()
  1200. $this->connLogger->debug( __METHOD__ . ': executed chronology callback.' );
  1201. }
  1202. }
  1203. /**
  1204. * @throws DBConnectionError
  1205. */
  1206. private function reportConnectionError() {
  1207. $conn = $this->errorConnection; // the connection which caused the error
  1208. $context = [
  1209. 'method' => __METHOD__,
  1210. 'last_error' => $this->lastError,
  1211. ];
  1212. if ( $conn instanceof IDatabase ) {
  1213. $context['db_server'] = $conn->getServer();
  1214. $this->connLogger->warning(
  1215. __METHOD__ . ": connection error: {last_error} ({db_server})",
  1216. $context
  1217. );
  1218. throw new DBConnectionError( $conn, "{$this->lastError} ({$context['db_server']})" );
  1219. } else {
  1220. // No last connection, probably due to all servers being too busy
  1221. $this->connLogger->error(
  1222. __METHOD__ .
  1223. ": LB failure with no last connection. Connection error: {last_error}",
  1224. $context
  1225. );
  1226. // If all servers were busy, "lastError" will contain something sensible
  1227. throw new DBConnectionError( null, $this->lastError );
  1228. }
  1229. }
  1230. public function getWriterIndex() {
  1231. return 0;
  1232. }
  1233. /**
  1234. * Returns true if the specified index is a valid server index
  1235. *
  1236. * @param int $i
  1237. * @return bool
  1238. * @deprecated Since 1.34
  1239. */
  1240. public function haveIndex( $i ) {
  1241. return array_key_exists( $i, $this->servers );
  1242. }
  1243. /**
  1244. * Returns true if the specified index is valid and has non-zero load
  1245. *
  1246. * @param int $i
  1247. * @return bool
  1248. * @deprecated Since 1.34
  1249. */
  1250. public function isNonZeroLoad( $i ) {
  1251. return ( isset( $this->servers[$i] ) && $this->groupLoads[self::GROUP_GENERIC][$i] > 0 );
  1252. }
  1253. public function getServerCount() {
  1254. return count( $this->servers );
  1255. }
  1256. public function hasReplicaServers() {
  1257. return ( $this->getServerCount() > 1 );
  1258. }
  1259. public function hasStreamingReplicaServers() {
  1260. foreach ( $this->servers as $i => $server ) {
  1261. if ( $i !== $this->getWriterIndex() && empty( $server['is static'] ) ) {
  1262. return true;
  1263. }
  1264. }
  1265. return false;
  1266. }
  1267. public function getServerName( $i ) {
  1268. $name = $this->servers[$i]['hostName'] ?? ( $this->servers[$i]['host'] ?? '' );
  1269. return ( $name != '' ) ? $name : 'localhost';
  1270. }
  1271. public function getServerInfo( $i ) {
  1272. return $this->servers[$i] ?? false;
  1273. }
  1274. public function getServerType( $i ) {
  1275. return $this->servers[$i]['type'] ?? 'unknown';
  1276. }
  1277. public function getMasterPos() {
  1278. $index = $this->getWriterIndex();
  1279. $conn = $this->getAnyOpenConnection( $index );
  1280. if ( $conn ) {
  1281. return $conn->getMasterPos();
  1282. }
  1283. $conn = $this->getConnection( $index, self::CONN_SILENCE_ERRORS );
  1284. if ( !$conn ) {
  1285. $this->reportConnectionError();
  1286. return null; // unreachable due to exception
  1287. }
  1288. try {
  1289. $pos = $conn->getMasterPos();
  1290. } finally {
  1291. $this->closeConnection( $conn );
  1292. }
  1293. return $pos;
  1294. }
  1295. public function getReplicaResumePos() {
  1296. // Get the position of any existing master server connection
  1297. $masterConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
  1298. if ( $masterConn ) {
  1299. return $masterConn->getMasterPos();
  1300. }
  1301. // Get the highest position of any existing replica server connection
  1302. $highestPos = false;
  1303. $serverCount = $this->getServerCount();
  1304. for ( $i = 1; $i < $serverCount; $i++ ) {
  1305. if ( !empty( $this->servers[$i]['is static'] ) ) {
  1306. continue; // server does not use replication
  1307. }
  1308. $conn = $this->getAnyOpenConnection( $i );
  1309. $pos = $conn ? $conn->getReplicaPos() : false;
  1310. if ( !$pos ) {
  1311. continue; // no open connection or could not get position
  1312. }
  1313. $highestPos = $highestPos ?: $pos;
  1314. if ( $pos->hasReached( $highestPos ) ) {
  1315. $highestPos = $pos;
  1316. }
  1317. }
  1318. return $highestPos;
  1319. }
  1320. public function disable( $fname = __METHOD__, $owner = null ) {
  1321. $this->assertOwnership( $fname, $owner );
  1322. $this->closeAll( $fname, $owner );
  1323. $this->disabled = true;
  1324. }
  1325. public function closeAll( $fname = __METHOD__, $owner = null ) {
  1326. $this->assertOwnership( $fname, $owner );
  1327. if ( $this->ownerId === null ) {
  1328. /** @noinspection PhpUnusedLocalVariableInspection */
  1329. $scope = ScopedCallback::newScopedIgnoreUserAbort();
  1330. }
  1331. $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $fname ) {
  1332. $host = $conn->getServer();
  1333. $this->connLogger->debug( "$fname: closing connection to database '$host'." );
  1334. $conn->close( $fname, $this->id );
  1335. } );
  1336. $this->conns = self::newTrackedConnectionsArray();
  1337. }
  1338. public function closeConnection( IDatabase $conn ) {
  1339. if ( $conn instanceof DBConnRef ) {
  1340. // Avoid calling close() but still leaving the handle in the pool
  1341. throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
  1342. }
  1343. $serverIndex = $conn->getLBInfo( 'serverIndex' );
  1344. foreach ( $this->conns as $type => $connsByServer ) {
  1345. if ( !isset( $connsByServer[$serverIndex] ) ) {
  1346. continue;
  1347. }
  1348. foreach ( $connsByServer[$serverIndex] as $i => $trackedConn ) {
  1349. if ( $conn === $trackedConn ) {
  1350. $host = $this->getServerName( $i );
  1351. $this->connLogger->debug(
  1352. __METHOD__ . ": closing connection to database $i at '$host'." );
  1353. unset( $this->conns[$type][$serverIndex][$i] );
  1354. break 2;
  1355. }
  1356. }
  1357. }
  1358. $conn->close( __METHOD__ );
  1359. }
  1360. public function commitAll( $fname = __METHOD__, $owner = null ) {
  1361. $this->commitMasterChanges( $fname, $owner );
  1362. $this->flushMasterSnapshots( $fname, $owner );
  1363. $this->flushReplicaSnapshots( $fname, $owner );
  1364. }
  1365. public function finalizeMasterChanges( $fname = __METHOD__, $owner = null ) {
  1366. $this->assertOwnership( $fname, $owner );
  1367. $this->assertTransactionRoundStage( [ self::ROUND_CURSORY, self::ROUND_FINALIZED ] );
  1368. if ( $this->ownerId === null ) {
  1369. /** @noinspection PhpUnusedLocalVariableInspection */
  1370. $scope = ScopedCallback::newScopedIgnoreUserAbort();
  1371. }
  1372. $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
  1373. // Loop until callbacks stop adding callbacks on other connections
  1374. $total = 0;
  1375. do {
  1376. $count = 0; // callbacks execution attempts
  1377. $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$count ) {
  1378. // Run any pre-commit callbacks while leaving the post-commit ones suppressed.
  1379. // Any error should cause all (peer) transactions to be rolled back together.
  1380. $count += $conn->runOnTransactionPreCommitCallbacks();
  1381. } );
  1382. $total += $count;
  1383. } while ( $count > 0 );
  1384. // Defer post-commit callbacks until after COMMIT/ROLLBACK happens on all handles
  1385. $this->forEachOpenMasterConnection( function ( Database $conn ) {
  1386. $conn->setTrxEndCallbackSuppression( true );
  1387. } );
  1388. $this->trxRoundStage = self::ROUND_FINALIZED;
  1389. return $total;
  1390. }
  1391. public function approveMasterChanges( array $options, $fname = __METHOD__, $owner = null ) {
  1392. $this->assertOwnership( $fname, $owner );
  1393. $this->assertTransactionRoundStage( self::ROUND_FINALIZED );
  1394. if ( $this->ownerId === null ) {
  1395. /** @noinspection PhpUnusedLocalVariableInspection */
  1396. $scope = ScopedCallback::newScopedIgnoreUserAbort();
  1397. }
  1398. $limit = $options['maxWriteDuration'] ?? 0;
  1399. $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
  1400. $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $limit ) {
  1401. // If atomic sections or explicit transactions are still open, some caller must have
  1402. // caught an exception but failed to properly rollback any changes. Detect that and
  1403. // throw and error (causing rollback).
  1404. $conn->assertNoOpenTransactions();
  1405. // Assert that the time to replicate the transaction will be sane.
  1406. // If this fails, then all DB transactions will be rollback back together.
  1407. $time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
  1408. if ( $limit > 0 && $time > $limit ) {
  1409. throw new DBTransactionSizeError(
  1410. $conn,
  1411. "Transaction spent $time second(s) in writes, exceeding the limit of $limit",
  1412. [ $time, $limit ]
  1413. );
  1414. }
  1415. // If a connection sits idle while slow queries execute on another, that connection
  1416. // may end up dropped before the commit round is reached. Ping servers to detect this.
  1417. if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
  1418. throw new DBTransactionError(
  1419. $conn,
  1420. "A connection to the {$conn->getDBname()} database was lost before commit"
  1421. );
  1422. }
  1423. } );
  1424. $this->trxRoundStage = self::ROUND_APPROVED;
  1425. }
  1426. public function beginMasterChanges( $fname = __METHOD__, $owner = null ) {
  1427. $this->assertOwnership( $fname, $owner );
  1428. if ( $this->trxRoundId !== false ) {
  1429. throw new DBTransactionError(
  1430. null,
  1431. "$fname: Transaction round '{$this->trxRoundId}' already started"
  1432. );
  1433. }
  1434. $this->assertTransactionRoundStage( self::ROUND_CURSORY );
  1435. if ( $this->ownerId === null ) {
  1436. /** @noinspection PhpUnusedLocalVariableInspection */
  1437. $scope = ScopedCallback::newScopedIgnoreUserAbort();
  1438. }
  1439. // Clear any empty transactions (no writes/callbacks) from the implicit round
  1440. $this->flushMasterSnapshots( $fname, $owner );
  1441. $this->trxRoundId = $fname;
  1442. $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
  1443. // Mark applicable handles as participating in this explicit transaction round.
  1444. // For each of these handles, any writes and callbacks will be tied to a single
  1445. // transaction. The (peer) handles will reject begin()/commit() calls unless they
  1446. // are part of an en masse commit or an en masse rollback.
  1447. $this->forEachOpenMasterConnection( function ( Database $conn ) {
  1448. $this->applyTransactionRoundFlags( $conn );
  1449. } );
  1450. $this->trxRoundStage = self::ROUND_CURSORY;
  1451. }
  1452. public function commitMasterChanges( $fname = __METHOD__, $owner = null ) {
  1453. $this->assertOwnership( $fname, $owner );
  1454. $this->assertTransactionRoundStage( self::ROUND_APPROVED );
  1455. if ( $this->ownerId === null ) {
  1456. /** @noinspection PhpUnusedLocalVariableInspection */
  1457. $scope = ScopedCallback::newScopedIgnoreUserAbort();
  1458. }
  1459. $failures = [];
  1460. $restore = ( $this->trxRoundId !== false );
  1461. $this->trxRoundId = false;
  1462. $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
  1463. // Commit any writes and clear any snapshots as well (callbacks require AUTOCOMMIT).
  1464. // Note that callbacks should already be suppressed due to finalizeMasterChanges().
  1465. $this->forEachOpenMasterConnection(
  1466. function ( IDatabase $conn ) use ( $fname, &$failures ) {
  1467. try {
  1468. $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
  1469. } catch ( DBError $e ) {
  1470. ( $this->errorLogger )( $e );
  1471. $failures[] = "{$conn->getServer()}: {$e->getMessage()}";
  1472. }
  1473. }
  1474. );
  1475. if ( $failures ) {
  1476. throw new DBTransactionError(
  1477. null,
  1478. "$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
  1479. );
  1480. }
  1481. if ( $restore ) {
  1482. // Unmark handles as participating in this explicit transaction round
  1483. $this->forEachOpenMasterConnection( function ( Database $conn ) {
  1484. $this->undoTransactionRoundFlags( $conn );
  1485. } );
  1486. }
  1487. $this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
  1488. }
  1489. public function runMasterTransactionIdleCallbacks( $fname = __METHOD__, $owner = null ) {
  1490. $this->assertOwnership( $fname, $owner );
  1491. if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
  1492. $type = IDatabase::TRIGGER_COMMIT;
  1493. } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
  1494. $type = IDatabase::TRIGGER_ROLLBACK;
  1495. } else {
  1496. throw new DBTransactionError(
  1497. null,
  1498. "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
  1499. );
  1500. }
  1501. if ( $this->ownerId === null ) {
  1502. /** @noinspection PhpUnusedLocalVariableInspection */
  1503. $scope = ScopedCallback::newScopedIgnoreUserAbort();
  1504. }
  1505. $oldStage = $this->trxRoundStage;
  1506. $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
  1507. // Now that the COMMIT/ROLLBACK step is over, enable post-commit callback runs
  1508. $this->forEachOpenMasterConnection( function ( Database $conn ) {
  1509. $conn->setTrxEndCallbackSuppression( false );
  1510. } );
  1511. $e = null; // first exception
  1512. $fname = __METHOD__;
  1513. // Loop until callbacks stop adding callbacks on other connections
  1514. do {
  1515. // Run any pending callbacks for each connection...
  1516. $count = 0; // callback execution attempts
  1517. $this->forEachOpenMasterConnection(
  1518. function ( Database $conn ) use ( $type, &$e, &$count ) {
  1519. if ( $conn->trxLevel() ) {
  1520. return; // retry in the next iteration, after commit() is called
  1521. }
  1522. try {
  1523. $count += $conn->runOnTransactionIdleCallbacks( $type );
  1524. } catch ( Exception $ex ) {
  1525. $e = $e ?: $ex;
  1526. }
  1527. }
  1528. );
  1529. // Clear out any active transactions left over from callbacks...
  1530. $this->forEachOpenMasterConnection( function ( Database $conn ) use ( &$e, $fname ) {
  1531. if ( $conn->writesPending() ) {
  1532. // A callback from another handle wrote to this one and DBO_TRX is set
  1533. $this->queryLogger->warning( $fname . ": found writes pending." );
  1534. $fnames = implode( ', ', $conn->pendingWriteAndCallbackCallers() );
  1535. $this->queryLogger->warning(
  1536. "$fname: found writes pending ($fnames).",
  1537. [
  1538. 'db_server' => $conn->getServer(),
  1539. 'db_name' => $conn->getDBname()
  1540. ]
  1541. );
  1542. } elseif ( $conn->trxLevel() ) {
  1543. // A callback from another handle read from this one and DBO_TRX is set,
  1544. // which can easily happen if there is only one DB (no replicas)
  1545. $this->queryLogger->debug( "$fname: found empty transaction." );
  1546. }
  1547. try {
  1548. $conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
  1549. } catch ( Exception $ex ) {
  1550. $e = $e ?: $ex;
  1551. }
  1552. } );
  1553. } while ( $count > 0 );
  1554. $this->trxRoundStage = $oldStage;
  1555. return $e;
  1556. }
  1557. public function runMasterTransactionListenerCallbacks( $fname = __METHOD__, $owner = null ) {
  1558. $this->assertOwnership( $fname, $owner );
  1559. if ( $this->trxRoundStage === self::ROUND_COMMIT_CALLBACKS ) {
  1560. $type = IDatabase::TRIGGER_COMMIT;
  1561. } elseif ( $this->trxRoundStage === self::ROUND_ROLLBACK_CALLBACKS ) {
  1562. $type = IDatabase::TRIGGER_ROLLBACK;
  1563. } else {
  1564. throw new DBTransactionError(
  1565. null,
  1566. "Transaction should be in the callback stage (not '{$this->trxRoundStage}')"
  1567. );
  1568. }
  1569. if ( $this->ownerId === null ) {
  1570. /** @noinspection PhpUnusedLocalVariableInspection */
  1571. $scope = ScopedCallback::newScopedIgnoreUserAbort();
  1572. }
  1573. $e = null;
  1574. $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
  1575. $this->forEachOpenMasterConnection( function ( Database $conn ) use ( $type, &$e ) {
  1576. try {
  1577. $conn->runTransactionListenerCallbacks( $type );
  1578. } catch ( Exception $ex ) {
  1579. $e = $e ?: $ex;
  1580. }
  1581. } );
  1582. $this->trxRoundStage = self::ROUND_CURSORY;
  1583. return $e;
  1584. }
  1585. public function rollbackMasterChanges( $fname = __METHOD__, $owner = null ) {
  1586. $this->assertOwnership( $fname, $owner );
  1587. if ( $this->ownerId === null ) {
  1588. /** @noinspection PhpUnusedLocalVariableInspection */
  1589. $scope = ScopedCallback::newScopedIgnoreUserAbort();
  1590. }
  1591. $restore = ( $this->trxRoundId !== false );
  1592. $this->trxRoundId = false;
  1593. $this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
  1594. $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
  1595. $conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
  1596. } );
  1597. if ( $restore ) {
  1598. // Unmark handles as participating in this explicit transaction round
  1599. $this->forEachOpenMasterConnection( function ( Database $conn ) {
  1600. $this->undoTransactionRoundFlags( $conn );
  1601. } );
  1602. }
  1603. $this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
  1604. }
  1605. /**
  1606. * @param string|string[] $stage
  1607. * @throws DBTransactionError
  1608. */
  1609. private function assertTransactionRoundStage( $stage ) {
  1610. $stages = (array)$stage;
  1611. if ( !in_array( $this->trxRoundStage, $stages, true ) ) {
  1612. $stageList = implode(
  1613. '/',
  1614. array_map( function ( $v ) {
  1615. return "'$v'";
  1616. }, $stages )
  1617. );
  1618. throw new DBTransactionError(
  1619. null,
  1620. "Transaction round stage must be $stageList (not '{$this->trxRoundStage}')"
  1621. );
  1622. }
  1623. }
  1624. /**
  1625. * Assure that if this instance is owned, the caller is either the owner or is internal
  1626. *
  1627. * If an LBFactory owns the LoadBalancer, then certain methods should only called through
  1628. * that LBFactory to avoid broken contracts. Otherwise, those methods can publically be
  1629. * called by anything. In any case, internal methods from the LoadBalancer itself should
  1630. * always be allowed.
  1631. *
  1632. * @param string $fname
  1633. * @param int|null $owner Owner ID of the caller
  1634. * @throws DBTransactionError
  1635. */
  1636. private function assertOwnership( $fname, $owner ) {
  1637. if ( $this->ownerId !== null && $owner !== $this->ownerId && $owner !== $this->id ) {
  1638. throw new DBTransactionError(
  1639. null,
  1640. "$fname: LoadBalancer is owned by ID '{$this->ownerId}' (got '$owner')."
  1641. );
  1642. }
  1643. }
  1644. /**
  1645. * Make all DB servers with DBO_DEFAULT/DBO_TRX set join the transaction round
  1646. *
  1647. * Some servers may have neither flag enabled, meaning that they opt out of such
  1648. * transaction rounds and remain in auto-commit mode. Such behavior might be desired
  1649. * when a DB server is used for something like simple key/value storage.
  1650. *
  1651. * @param Database $conn
  1652. */
  1653. private function applyTransactionRoundFlags( Database $conn ) {
  1654. if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
  1655. return; // transaction rounds do not apply to these connections
  1656. }
  1657. if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
  1658. // DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
  1659. // Force DBO_TRX even in CLI mode since a commit round is expected soon.
  1660. $conn->setFlag( $conn::DBO_TRX, $conn::REMEMBER_PRIOR );
  1661. }
  1662. if ( $conn->getFlag( $conn::DBO_TRX ) ) {
  1663. $conn->setLBInfo( 'trxRoundId', $this->trxRoundId );
  1664. }
  1665. }
  1666. /**
  1667. * @param Database $conn
  1668. */
  1669. private function undoTransactionRoundFlags( Database $conn ) {
  1670. if ( $conn->getLBInfo( 'autoCommitOnly' ) ) {
  1671. return; // transaction rounds do not apply to these connections
  1672. }
  1673. if ( $conn->getFlag( $conn::DBO_TRX ) ) {
  1674. $conn->setLBInfo( 'trxRoundId', null ); // remove the round ID
  1675. }
  1676. if ( $conn->getFlag( $conn::DBO_DEFAULT ) ) {
  1677. $conn->restoreFlags( $conn::RESTORE_PRIOR );
  1678. }
  1679. }
  1680. public function flushReplicaSnapshots( $fname = __METHOD__, $owner = null ) {
  1681. $this->assertOwnership( $fname, $owner );
  1682. $this->forEachOpenReplicaConnection( function ( IDatabase $conn ) use ( $fname ) {
  1683. $conn->flushSnapshot( $fname );
  1684. } );
  1685. }
  1686. public function flushMasterSnapshots( $fname = __METHOD__, $owner = null ) {
  1687. $this->assertOwnership( $fname, $owner );
  1688. $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( $fname ) {
  1689. $conn->flushSnapshot( $fname );
  1690. } );
  1691. }
  1692. /**
  1693. * @return string
  1694. * @since 1.32
  1695. */
  1696. public function getTransactionRoundStage() {
  1697. return $this->trxRoundStage;
  1698. }
  1699. public function hasMasterConnection() {
  1700. return $this->isOpen( $this->getWriterIndex() );
  1701. }
  1702. public function hasMasterChanges() {
  1703. $pending = 0;
  1704. $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$pending ) {
  1705. $pending |= $conn->writesOrCallbacksPending();
  1706. } );
  1707. return (bool)$pending;
  1708. }
  1709. public function lastMasterChangeTimestamp() {
  1710. $lastTime = false;
  1711. $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$lastTime ) {
  1712. $lastTime = max( $lastTime, $conn->lastDoneWrites() );
  1713. } );
  1714. return $lastTime;
  1715. }
  1716. public function hasOrMadeRecentMasterChanges( $age = null ) {
  1717. $age = ( $age === null ) ? $this->waitTimeout : $age;
  1718. return ( $this->hasMasterChanges()
  1719. || $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
  1720. }
  1721. public function pendingMasterChangeCallers() {
  1722. $fnames = [];
  1723. $this->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$fnames ) {
  1724. $fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
  1725. } );
  1726. return $fnames;
  1727. }
  1728. public function getLaggedReplicaMode( $domain = false ) {
  1729. if ( $this->laggedReplicaMode ) {
  1730. return true; // stay in lagged replica mode
  1731. }
  1732. if ( $this->hasStreamingReplicaServers() ) {
  1733. // This will set "laggedReplicaMode" as needed
  1734. $this->getReaderIndex( self::GROUP_GENERIC, $domain );
  1735. }
  1736. return $this->laggedReplicaMode;
  1737. }
  1738. public function laggedReplicaUsed() {
  1739. return $this->laggedReplicaMode;
  1740. }
  1741. public function getReadOnlyReason( $domain = false ) {
  1742. $domainInstance = DatabaseDomain::newFromId( $this->resolveDomainID( $domain ) );
  1743. if ( $this->readOnlyReason !== false ) {
  1744. return $this->readOnlyReason;
  1745. } elseif ( $this->isMasterRunningReadOnly( $domainInstance ) ) {
  1746. return 'The master database server is running in read-only mode.';
  1747. } elseif ( $this->getLaggedReplicaMode( $domain ) ) {
  1748. return ( $this->getExistingReaderIndex( self::GROUP_GENERIC ) >= 0 )
  1749. ? 'The database is read-only until replication lag decreases.'
  1750. : 'The database is read-only until a replica database server becomes reachable.';
  1751. }
  1752. return false;
  1753. }
  1754. /**
  1755. * @param IDatabase $conn Master connection
  1756. * @param int $flags Bitfield of class CONN_* constants
  1757. * @return bool Whether the entire server or currently selected DB/schema is read-only
  1758. */
  1759. private function isMasterConnectionReadOnly( IDatabase $conn, $flags = 0 ) {
  1760. // Note that table prefixes are not related to server-side read-only mode
  1761. $key = $this->srvCache->makeGlobalKey(
  1762. 'rdbms-server-readonly',
  1763. $conn->getServer(),
  1764. $conn->getDBname(),
  1765. $conn->dbSchema()
  1766. );
  1767. if ( ( $flags & self::CONN_REFRESH_READ_ONLY ) == self::CONN_REFRESH_READ_ONLY ) {
  1768. try {
  1769. $readOnly = (int)$conn->serverIsReadOnly();
  1770. } catch ( DBError $e ) {
  1771. $readOnly = 0;
  1772. }
  1773. $this->srvCache->set( $key, $readOnly, BagOStuff::TTL_PROC_SHORT );
  1774. } else {
  1775. $readOnly = $this->srvCache->getWithSetCallback(
  1776. $key,
  1777. BagOStuff::TTL_PROC_SHORT,
  1778. function () use ( $conn ) {
  1779. try {
  1780. return (int)$conn->serverIsReadOnly();
  1781. } catch ( DBError $e ) {
  1782. return 0;
  1783. }
  1784. }
  1785. );
  1786. }
  1787. return (bool)$readOnly;
  1788. }
  1789. /**
  1790. * @param DatabaseDomain $domain
  1791. * @return bool Whether the entire master server or the local domain DB is read-only
  1792. */
  1793. private function isMasterRunningReadOnly( DatabaseDomain $domain ) {
  1794. // Context will often be HTTP GET/HEAD; heavily cache the results
  1795. return (bool)$this->wanCache->getWithSetCallback(
  1796. // Note that table prefixes are not related to server-side read-only mode
  1797. $this->wanCache->makeGlobalKey(
  1798. 'rdbms-server-readonly',
  1799. $this->getMasterServerName(),
  1800. $domain->getDatabase(),
  1801. $domain->getSchema()
  1802. ),
  1803. self::TTL_CACHE_READONLY,
  1804. function () use ( $domain ) {
  1805. $old = $this->trxProfiler->setSilenced( true );
  1806. try {
  1807. $index = $this->getWriterIndex();
  1808. // Reset the cache for isMasterConnectionReadOnly()
  1809. $flags = self::CONN_REFRESH_READ_ONLY;
  1810. $conn = $this->getServerConnection( $index, $domain->getId(), $flags );
  1811. // Reuse the process cache set above
  1812. $readOnly = (int)$this->isMasterConnectionReadOnly( $conn );
  1813. $this->reuseConnection( $conn );
  1814. } catch ( DBError $e ) {
  1815. $readOnly = 0;
  1816. }
  1817. $this->trxProfiler->setSilenced( $old );
  1818. return $readOnly;
  1819. },
  1820. [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG, 'lockTSE' => 10, 'busyValue' => 0 ]
  1821. );
  1822. }
  1823. public function allowLagged( $mode = null ) {
  1824. if ( $mode === null ) {
  1825. return $this->allowLagged;
  1826. }
  1827. $this->allowLagged = $mode;
  1828. return $this->allowLagged;
  1829. }
  1830. public function pingAll() {
  1831. $success = true;
  1832. $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$success ) {
  1833. if ( !$conn->ping() ) {
  1834. $success = false;
  1835. }
  1836. } );
  1837. return $success;
  1838. }
  1839. public function forEachOpenConnection( $callback, array $params = [] ) {
  1840. foreach ( $this->conns as $connsByServer ) {
  1841. foreach ( $connsByServer as $serverConns ) {
  1842. foreach ( $serverConns as $conn ) {
  1843. $callback( $conn, ...$params );
  1844. }
  1845. }
  1846. }
  1847. }
  1848. public function forEachOpenMasterConnection( $callback, array $params = [] ) {
  1849. $masterIndex = $this->getWriterIndex();
  1850. foreach ( $this->conns as $connsByServer ) {
  1851. if ( isset( $connsByServer[$masterIndex] ) ) {
  1852. /** @var IDatabase $conn */
  1853. foreach ( $connsByServer[$masterIndex] as $conn ) {
  1854. $callback( $conn, ...$params );
  1855. }
  1856. }
  1857. }
  1858. }
  1859. public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
  1860. foreach ( $this->conns as $connsByServer ) {
  1861. foreach ( $connsByServer as $i => $serverConns ) {
  1862. if ( $i === $this->getWriterIndex() ) {
  1863. continue; // skip master
  1864. }
  1865. foreach ( $serverConns as $conn ) {
  1866. $callback( $conn, ...$params );
  1867. }
  1868. }
  1869. }
  1870. }
  1871. /**
  1872. * @return int
  1873. */
  1874. private function getCurrentConnectionCount() {
  1875. $count = 0;
  1876. foreach ( $this->conns as $connsByServer ) {
  1877. foreach ( $connsByServer as $serverConns ) {
  1878. $count += count( $serverConns );
  1879. }
  1880. }
  1881. return $count;
  1882. }
  1883. public function getMaxLag( $domain = false ) {
  1884. $host = '';
  1885. $maxLag = -1;
  1886. $maxIndex = 0;
  1887. if ( $this->hasReplicaServers() ) {
  1888. $lagTimes = $this->getLagTimes( $domain );
  1889. foreach ( $lagTimes as $i => $lag ) {
  1890. if ( $this->groupLoads[self::GROUP_GENERIC][$i] > 0 && $lag > $maxLag ) {
  1891. $maxLag = $lag;
  1892. $host = $this->getServerInfoStrict( $i, 'host' );
  1893. $maxIndex = $i;
  1894. }
  1895. }
  1896. }
  1897. return [ $host, $maxLag, $maxIndex ];
  1898. }
  1899. public function getLagTimes( $domain = false ) {
  1900. if ( !$this->hasReplicaServers() ) {
  1901. return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
  1902. }
  1903. $knownLagTimes = []; // map of (server index => 0 seconds)
  1904. $indexesWithLag = [];
  1905. foreach ( $this->servers as $i => $server ) {
  1906. if ( empty( $server['is static'] ) ) {
  1907. $indexesWithLag[] = $i; // DB server might have replication lag
  1908. } else {
  1909. $knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
  1910. }
  1911. }
  1912. return $this->getLoadMonitor()->getLagTimes( $indexesWithLag, $domain ) + $knownLagTimes;
  1913. }
  1914. /**
  1915. * Get the lag in seconds for a given connection, or zero if this load
  1916. * balancer does not have replication enabled.
  1917. *
  1918. * This should be used in preference to Database::getLag() in cases where
  1919. * replication may not be in use, since there is no way to determine if
  1920. * replication is in use at the connection level without running
  1921. * potentially restricted queries such as SHOW SLAVE STATUS. Using this
  1922. * function instead of Database::getLag() avoids a fatal error in this
  1923. * case on many installations.
  1924. *
  1925. * @param IDatabase $conn
  1926. * @return int|bool Returns false on error
  1927. * @deprecated Since 1.34 Use IDatabase::getLag() instead
  1928. */
  1929. public function safeGetLag( IDatabase $conn ) {
  1930. if ( $conn->getLBInfo( 'is static' ) ) {
  1931. return 0; // static dataset
  1932. } elseif ( $conn->getLBInfo( 'serverIndex' ) == $this->getWriterIndex() ) {
  1933. return 0; // this is the master
  1934. }
  1935. return $conn->getLag();
  1936. }
  1937. public function waitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
  1938. $timeout = max( 1, $timeout ?: $this->waitTimeout );
  1939. if ( $this->getServerCount() <= 1 || !$conn->getLBInfo( 'replica' ) ) {
  1940. return true; // server is not a replica DB
  1941. }
  1942. if ( !$pos ) {
  1943. // Get the current master position, opening a connection if needed
  1944. $index = $this->getWriterIndex();
  1945. $flags = self::CONN_SILENCE_ERRORS;
  1946. $masterConn = $this->getAnyOpenConnection( $index, $flags );
  1947. if ( $masterConn ) {
  1948. $pos = $masterConn->getMasterPos();
  1949. } else {
  1950. $masterConn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
  1951. if ( !$masterConn ) {
  1952. throw new DBReplicationWaitError(
  1953. null,
  1954. "Could not obtain a master database connection to get the position"
  1955. );
  1956. }
  1957. $pos = $masterConn->getMasterPos();
  1958. $this->closeConnection( $masterConn );
  1959. }
  1960. }
  1961. if ( $pos instanceof DBMasterPos ) {
  1962. $start = microtime( true );
  1963. $result = $conn->masterPosWait( $pos, $timeout );
  1964. $seconds = max( microtime( true ) - $start, 0 );
  1965. if ( $result == -1 || is_null( $result ) ) {
  1966. $msg = __METHOD__ . ': timed out waiting on {host} pos {pos} [{seconds}s]';
  1967. $this->replLogger->warning( $msg, [
  1968. 'host' => $conn->getServer(),
  1969. 'pos' => $pos,
  1970. 'seconds' => round( $seconds, 6 ),
  1971. 'trace' => ( new RuntimeException() )->getTraceAsString()
  1972. ] );
  1973. $ok = false;
  1974. } else {
  1975. $this->replLogger->debug( __METHOD__ . ': done waiting' );
  1976. $ok = true;
  1977. }
  1978. } else {
  1979. $ok = false; // something is misconfigured
  1980. $this->replLogger->error(
  1981. __METHOD__ . ': could not get master pos for {host}',
  1982. [
  1983. 'host' => $conn->getServer(),
  1984. 'trace' => ( new RuntimeException() )->getTraceAsString()
  1985. ]
  1986. );
  1987. }
  1988. return $ok;
  1989. }
  1990. /**
  1991. * Wait for a replica DB to reach a specified master position
  1992. *
  1993. * This will connect to the master to get an accurate position if $pos is not given
  1994. *
  1995. * @param IDatabase $conn Replica DB
  1996. * @param DBMasterPos|bool $pos Master position; default: current position
  1997. * @param int $timeout Timeout in seconds [optional]
  1998. * @return bool Success
  1999. * @since 1.28
  2000. * @deprecated Since 1.34 Use waitForMasterPos() instead
  2001. */
  2002. public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = null ) {
  2003. return $this->waitForMasterPos( $conn, $pos, $timeout );
  2004. }
  2005. public function setTransactionListener( $name, callable $callback = null ) {
  2006. if ( $callback ) {
  2007. $this->trxRecurringCallbacks[$name] = $callback;
  2008. } else {
  2009. unset( $this->trxRecurringCallbacks[$name] );
  2010. }
  2011. $this->forEachOpenMasterConnection(
  2012. function ( IDatabase $conn ) use ( $name, $callback ) {
  2013. $conn->setTransactionListener( $name, $callback );
  2014. }
  2015. );
  2016. }
  2017. public function setTableAliases( array $aliases ) {
  2018. $this->tableAliases = $aliases;
  2019. }
  2020. public function setIndexAliases( array $aliases ) {
  2021. $this->indexAliases = $aliases;
  2022. }
  2023. public function setLocalDomainPrefix( $prefix ) {
  2024. // Find connections to explicit foreign domains still marked as in-use...
  2025. $domainsInUse = [];
  2026. $this->forEachOpenConnection( function ( IDatabase $conn ) use ( &$domainsInUse ) {
  2027. // Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
  2028. // Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
  2029. if ( $conn->getLBInfo( 'foreignPoolRefCount' ) > 0 ) {
  2030. $domainsInUse[] = $conn->getDomainID();
  2031. }
  2032. } );
  2033. // Do not switch connections to explicit foreign domains unless marked as safe
  2034. if ( $domainsInUse ) {
  2035. $domains = implode( ', ', $domainsInUse );
  2036. throw new DBUnexpectedError( null,
  2037. "Foreign domain connections are still in use ($domains)" );
  2038. }
  2039. $this->setLocalDomain( new DatabaseDomain(
  2040. $this->localDomain->getDatabase(),
  2041. $this->localDomain->getSchema(),
  2042. $prefix
  2043. ) );
  2044. // Update the prefix for all local connections...
  2045. $this->forEachOpenConnection( function ( IDatabase $conn ) use ( $prefix ) {
  2046. if ( !$conn->getLBInfo( 'foreign' ) ) {
  2047. $conn->tablePrefix( $prefix );
  2048. }
  2049. } );
  2050. }
  2051. public function redefineLocalDomain( $domain ) {
  2052. $this->closeAll( __METHOD__, $this->id );
  2053. $this->setLocalDomain( DatabaseDomain::newFromId( $domain ) );
  2054. }
  2055. public function setTempTablesOnlyMode( $value, $domain ) {
  2056. $old = $this->tempTablesOnlyMode[$domain] ?? false;
  2057. if ( $value ) {
  2058. $this->tempTablesOnlyMode[$domain] = true;
  2059. } else {
  2060. unset( $this->tempTablesOnlyMode[$domain] );
  2061. }
  2062. return $old;
  2063. }
  2064. /**
  2065. * @param DatabaseDomain $domain
  2066. */
  2067. private function setLocalDomain( DatabaseDomain $domain ) {
  2068. $this->localDomain = $domain;
  2069. // In case a caller assumes that the domain ID is simply <db>-<prefix>, which is almost
  2070. // always true, gracefully handle the case when they fail to account for escaping.
  2071. if ( $this->localDomain->getTablePrefix() != '' ) {
  2072. $this->localDomainIdAlias =
  2073. $this->localDomain->getDatabase() . '-' . $this->localDomain->getTablePrefix();
  2074. } else {
  2075. $this->localDomainIdAlias = $this->localDomain->getDatabase();
  2076. }
  2077. }
  2078. /**
  2079. * @param int $i Server index
  2080. * @param string|null $field Server index field [optional]
  2081. * @return array|mixed
  2082. * @throws InvalidArgumentException
  2083. */
  2084. private function getServerInfoStrict( $i, $field = null ) {
  2085. if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
  2086. throw new InvalidArgumentException( "No server with index '$i'" );
  2087. }
  2088. if ( $field !== null ) {
  2089. if ( !array_key_exists( $field, $this->servers[$i] ) ) {
  2090. throw new InvalidArgumentException( "No field '$field' in server index '$i'" );
  2091. }
  2092. return $this->servers[$i][$field];
  2093. }
  2094. return $this->servers[$i];
  2095. }
  2096. /**
  2097. * @return string
  2098. */
  2099. private function getMasterServerName() {
  2100. return $this->getServerName( $this->getWriterIndex() );
  2101. }
  2102. function __destruct() {
  2103. // Avoid connection leaks for sanity
  2104. $this->disable( __METHOD__, $this->ownerId );
  2105. }
  2106. }
  2107. /**
  2108. * @deprecated since 1.29
  2109. */
  2110. class_alias( LoadBalancer::class, 'LoadBalancer' );