ChronologyProtector.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. <?php
  2. /**
  3. * Generator of database load balancing objects.
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License along
  16. * with this program; if not, write to the Free Software Foundation, Inc.,
  17. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. * http://www.gnu.org/copyleft/gpl.html
  19. *
  20. * @file
  21. * @ingroup Database
  22. */
  23. namespace Wikimedia\Rdbms;
  24. use Psr\Log\LoggerAwareInterface;
  25. use Psr\Log\LoggerInterface;
  26. use Psr\Log\NullLogger;
  27. use Wikimedia\WaitConditionLoop;
  28. use BagOStuff;
  29. /**
  30. * Helper class for mitigating DB replication lag in order to provide "session consistency"
  31. *
  32. * This helps to ensure a consistent ordering of events as seen by an client
  33. *
  34. * Kind of like Hawking's [[Chronology Protection Agency]].
  35. */
  36. class ChronologyProtector implements LoggerAwareInterface {
  37. /** @var BagOStuff */
  38. protected $store;
  39. /** @var LoggerInterface */
  40. protected $logger;
  41. /** @var string Storage key name */
  42. protected $key;
  43. /** @var string Hash of client parameters */
  44. protected $clientId;
  45. /** @var string[] Map of client information fields for logging */
  46. protected $clientLogInfo;
  47. /** @var int|null Expected minimum index of the last write to the position store */
  48. protected $waitForPosIndex;
  49. /** @var int Max seconds to wait on positions to appear */
  50. protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
  51. /** @var bool Whether to no-op all method calls */
  52. protected $enabled = true;
  53. /** @var bool Whether to check and wait on positions */
  54. protected $wait = true;
  55. /** @var bool Whether the client data was loaded */
  56. protected $initialized = false;
  57. /** @var DBMasterPos[] Map of (DB master name => position) */
  58. protected $startupPositions = [];
  59. /** @var DBMasterPos[] Map of (DB master name => position) */
  60. protected $shutdownPositions = [];
  61. /** @var float[] Map of (DB master name => 1) */
  62. protected $shutdownTouchDBs = [];
  63. /** @var int Seconds to store positions */
  64. const POSITION_TTL = 60;
  65. /** @var int Seconds to store position write index cookies (safely less than POSITION_TTL) */
  66. const POSITION_COOKIE_TTL = 10;
  67. /** @var int Max time to wait for positions to appear */
  68. const POS_STORE_WAIT_TIMEOUT = 5;
  69. /**
  70. * @param BagOStuff $store
  71. * @param array $client Map of (ip: <IP>, agent: <user-agent> [, clientId: <hash>] )
  72. * @param int|null $posIndex Write counter index
  73. * @param string $secret Secret string for HMAC hashing [optional]
  74. * @since 1.27
  75. */
  76. public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
  77. $this->store = $store;
  78. if ( isset( $client['clientId'] ) ) {
  79. $this->clientId = $client['clientId'];
  80. } else {
  81. $this->clientId = ( $secret != '' )
  82. ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
  83. : md5( $client['ip'] . "\n" . $client['agent'] );
  84. }
  85. $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
  86. $this->waitForPosIndex = $posIndex;
  87. $this->clientLogInfo = [
  88. 'clientIP' => $client['ip'],
  89. 'clientAgent' => $client['agent'],
  90. 'clientId' => $client['clientId'] ?? null
  91. ];
  92. $this->logger = new NullLogger();
  93. }
  94. public function setLogger( LoggerInterface $logger ) {
  95. $this->logger = $logger;
  96. }
  97. /**
  98. * @return string Client ID hash
  99. * @since 1.32
  100. */
  101. public function getClientId() {
  102. return $this->clientId;
  103. }
  104. /**
  105. * @param bool $enabled Whether to no-op all method calls
  106. * @since 1.27
  107. */
  108. public function setEnabled( $enabled ) {
  109. $this->enabled = $enabled;
  110. }
  111. /**
  112. * @param bool $enabled Whether to check and wait on positions
  113. * @since 1.27
  114. */
  115. public function setWaitEnabled( $enabled ) {
  116. $this->wait = $enabled;
  117. }
  118. /**
  119. * Apply the "session consistency" DB replication position to a new ILoadBalancer
  120. *
  121. * If the stash has a previous master position recorded, this will try to make
  122. * sure that the next query to a replica DB of that master will see changes up
  123. * to that position by delaying execution. The delay may timeout and allow stale
  124. * data if no non-lagged replica DBs are available.
  125. *
  126. * This method should only be called from LBFactory.
  127. *
  128. * @param ILoadBalancer $lb
  129. * @return void
  130. */
  131. public function applySessionReplicationPosition( ILoadBalancer $lb ) {
  132. if ( !$this->enabled ) {
  133. return; // disabled
  134. }
  135. $masterName = $lb->getServerName( $lb->getWriterIndex() );
  136. $startupPositions = $this->getStartupMasterPositions();
  137. $pos = $startupPositions[$masterName] ?? null;
  138. if ( $pos instanceof DBMasterPos ) {
  139. $this->logger->debug( __METHOD__ . ": pos for DB '$masterName' set to '$pos'\n" );
  140. $lb->waitFor( $pos );
  141. }
  142. }
  143. /**
  144. * Save the "session consistency" DB replication position for an end-of-life ILoadBalancer
  145. *
  146. * This saves the replication position of the master DB if this request made writes to it.
  147. *
  148. * This method should only be called from LBFactory.
  149. *
  150. * @param ILoadBalancer $lb
  151. * @return void
  152. */
  153. public function storeSessionReplicationPosition( ILoadBalancer $lb ) {
  154. if ( !$this->enabled ) {
  155. return; // disabled
  156. } elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
  157. // Only save the position if writes have been done on the connection
  158. return;
  159. }
  160. $masterName = $lb->getServerName( $lb->getWriterIndex() );
  161. if ( $lb->hasStreamingReplicaServers() ) {
  162. $pos = $lb->getReplicaResumePos();
  163. if ( $pos ) {
  164. $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos\n" );
  165. $this->shutdownPositions[$masterName] = $pos;
  166. }
  167. } else {
  168. $this->logger->debug( __METHOD__ . ": DB '$masterName' touched\n" );
  169. }
  170. $this->shutdownTouchDBs[$masterName] = 1;
  171. }
  172. /**
  173. * Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
  174. * May commit chronology data to persistent storage.
  175. *
  176. * @param callable|null $workCallback Work to do instead of waiting on syncing positions
  177. * @param string $mode One of (sync, async); whether to wait on remote datacenters
  178. * @param int|null &$cpIndex DB position key write counter; incremented on update
  179. * @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure
  180. */
  181. public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
  182. if ( !$this->enabled ) {
  183. return [];
  184. }
  185. $store = $this->store;
  186. // Some callers might want to know if a user recently touched a DB.
  187. // These writes do not need to block on all datacenters receiving them.
  188. foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
  189. $store->set(
  190. $this->getTouchedKey( $this->store, $dbName ),
  191. microtime( true ),
  192. $store::TTL_DAY
  193. );
  194. }
  195. if ( $this->shutdownPositions === [] ) {
  196. $this->logger->debug( __METHOD__ . ": no master positions to save\n" );
  197. return []; // nothing to save
  198. }
  199. $this->logger->debug(
  200. __METHOD__ . ": saving master pos for " .
  201. implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
  202. );
  203. // CP-protected writes should overwhelmingly go to the master datacenter, so merge the
  204. // positions with a DC-local lock, a DC-local get(), and an all-DC set() with WRITE_SYNC.
  205. // If set() returns success, then any get() should be able to see the new positions.
  206. if ( $store->lock( $this->key, 3 ) ) {
  207. if ( $workCallback ) {
  208. // Let the store run the work before blocking on a replication sync barrier.
  209. // If replication caught up while the work finished, the barrier will be fast.
  210. $store->addBusyCallback( $workCallback );
  211. }
  212. $ok = $store->set(
  213. $this->key,
  214. $this->mergePositions(
  215. $store->get( $this->key ),
  216. $this->shutdownPositions,
  217. $cpIndex
  218. ),
  219. self::POSITION_TTL,
  220. ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
  221. );
  222. $store->unlock( $this->key );
  223. } else {
  224. $ok = false;
  225. }
  226. if ( !$ok ) {
  227. $cpIndex = null; // nothing saved
  228. $bouncedPositions = $this->shutdownPositions;
  229. // Raced out too many times or stash is down
  230. $this->logger->warning( __METHOD__ . ": failed to save master pos for " .
  231. implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
  232. );
  233. } elseif ( $mode === 'sync' &&
  234. $store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
  235. ) {
  236. // Positions may not be in all datacenters, force LBFactory to play it safe
  237. $this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
  238. $bouncedPositions = $this->shutdownPositions;
  239. } else {
  240. $bouncedPositions = [];
  241. }
  242. return $bouncedPositions;
  243. }
  244. /**
  245. * @param string $dbName DB master name (e.g. "db1052")
  246. * @return float|bool UNIX timestamp when client last touched the DB; false if not on record
  247. * @since 1.28
  248. */
  249. public function getTouched( $dbName ) {
  250. return $this->store->get( $this->getTouchedKey( $this->store, $dbName ) );
  251. }
  252. /**
  253. * @param BagOStuff $store
  254. * @param string $dbName
  255. * @return string
  256. */
  257. private function getTouchedKey( BagOStuff $store, $dbName ) {
  258. return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $dbName );
  259. }
  260. /**
  261. * Load in previous master positions for the client
  262. */
  263. protected function getStartupMasterPositions() {
  264. if ( $this->initialized ) {
  265. return $this->startupPositions;
  266. }
  267. $this->initialized = true;
  268. $this->logger->debug( __METHOD__ . ": client ID is {$this->clientId} (read)\n" );
  269. if ( $this->wait ) {
  270. // If there is an expectation to see master positions from a certain write
  271. // index or higher, then block until it appears, or until a timeout is reached.
  272. // Since the write index restarts each time the key is created, it is possible that
  273. // a lagged store has a matching key write index. However, in that case, it should
  274. // already be expired and thus treated as non-existing, maintaining correctness.
  275. if ( $this->waitForPosIndex > 0 ) {
  276. $data = null;
  277. $indexReached = null; // highest index reached in the position store
  278. $loop = new WaitConditionLoop(
  279. function () use ( &$data, &$indexReached ) {
  280. $data = $this->store->get( $this->key );
  281. if ( !is_array( $data ) ) {
  282. return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
  283. } elseif ( !isset( $data['writeIndex'] ) ) {
  284. return WaitConditionLoop::CONDITION_REACHED; // b/c
  285. }
  286. $indexReached = max( $data['writeIndex'], $indexReached );
  287. return ( $data['writeIndex'] >= $this->waitForPosIndex )
  288. ? WaitConditionLoop::CONDITION_REACHED
  289. : WaitConditionLoop::CONDITION_CONTINUE;
  290. },
  291. $this->waitForPosStoreTimeout
  292. );
  293. $result = $loop->invoke();
  294. $waitedMs = $loop->getLastWaitTime() * 1e3;
  295. if ( $result == $loop::CONDITION_REACHED ) {
  296. $this->logger->debug(
  297. __METHOD__ . ": expected and found position index.",
  298. [
  299. 'cpPosIndex' => $this->waitForPosIndex,
  300. 'waitTimeMs' => $waitedMs
  301. ] + $this->clientLogInfo
  302. );
  303. } else {
  304. $this->logger->warning(
  305. __METHOD__ . ": expected but failed to find position index.",
  306. [
  307. 'cpPosIndex' => $this->waitForPosIndex,
  308. 'indexReached' => $indexReached,
  309. 'waitTimeMs' => $waitedMs
  310. ] + $this->clientLogInfo
  311. );
  312. }
  313. } else {
  314. $data = $this->store->get( $this->key );
  315. }
  316. $this->startupPositions = $data ? $data['positions'] : [];
  317. $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)\n" );
  318. } else {
  319. $this->startupPositions = [];
  320. $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)\n" );
  321. }
  322. return $this->startupPositions;
  323. }
  324. /**
  325. * @param array|bool $curValue
  326. * @param DBMasterPos[] $shutdownPositions
  327. * @param int|null &$cpIndex
  328. * @return array
  329. */
  330. protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
  331. /** @var DBMasterPos[] $curPositions */
  332. $curPositions = $curValue['positions'] ?? [];
  333. // Use the newest positions for each DB master
  334. foreach ( $shutdownPositions as $db => $pos ) {
  335. if (
  336. !isset( $curPositions[$db] ) ||
  337. !( $curPositions[$db] instanceof DBMasterPos ) ||
  338. $pos->asOfTime() > $curPositions[$db]->asOfTime()
  339. ) {
  340. $curPositions[$db] = $pos;
  341. }
  342. }
  343. $cpIndex = $curValue['writeIndex'] ?? 0;
  344. return [
  345. 'positions' => $curPositions,
  346. 'writeIndex' => ++$cpIndex
  347. ];
  348. }
  349. }