JobRunner.php 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. <?php
  2. /**
  3. * Job queue runner utility methods
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License along
  16. * with this program; if not, write to the Free Software Foundation, Inc.,
  17. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. * http://www.gnu.org/copyleft/gpl.html
  19. *
  20. * @file
  21. * @ingroup JobQueue
  22. */
  23. use MediaWiki\MediaWikiServices;
  24. use MediaWiki\Logger\LoggerFactory;
  25. use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
  26. use Psr\Log\LoggerAwareInterface;
  27. use Psr\Log\LoggerInterface;
  28. use Wikimedia\ScopedCallback;
  29. use Wikimedia\Rdbms\LBFactory;
  30. use Wikimedia\Rdbms\DBError;
  31. /**
  32. * Job queue runner utility methods
  33. *
  34. * @ingroup JobQueue
  35. * @since 1.24
  36. */
  37. class JobRunner implements LoggerAwareInterface {
  38. /** @var Config */
  39. protected $config;
  40. /** @var callable|null Debug output handler */
  41. protected $debug;
  42. /**
  43. * @var LoggerInterface $logger
  44. */
  45. protected $logger;
  46. const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present
  47. const LAG_CHECK_PERIOD = 1.0; // check replica DB lag this many seconds
  48. const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors
  49. const READONLY_BACKOFF_TTL = 30; // seconds to back off a queue due to read-only errors
  50. /**
  51. * @param callable $debug Optional debug output handler
  52. */
  53. public function setDebugHandler( $debug ) {
  54. $this->debug = $debug;
  55. }
  56. /**
  57. * @param LoggerInterface $logger
  58. * @return void
  59. */
  60. public function setLogger( LoggerInterface $logger ) {
  61. $this->logger = $logger;
  62. }
  63. /**
  64. * @param LoggerInterface|null $logger
  65. */
  66. public function __construct( LoggerInterface $logger = null ) {
  67. if ( $logger === null ) {
  68. $logger = LoggerFactory::getInstance( 'runJobs' );
  69. }
  70. $this->setLogger( $logger );
  71. $this->config = MediaWikiServices::getInstance()->getMainConfig();
  72. }
  73. /**
  74. * Run jobs of the specified number/type for the specified time
  75. *
  76. * The response map has a 'job' field that lists status of each job, including:
  77. * - type : the job type
  78. * - status : ok/failed
  79. * - error : any error message string
  80. * - time : the job run time in ms
  81. * The response map also has:
  82. * - backoffs : the (job type => seconds) map of backoff times
  83. * - elapsed : the total time spent running tasks in ms
  84. * - reached : the reason the script finished, one of (none-ready, job-limit, time-limit,
  85. * memory-limit)
  86. *
  87. * This method outputs status information only if a debug handler was set.
  88. * Any exceptions are caught and logged, but are not reported as output.
  89. *
  90. * @param array $options Map of parameters:
  91. * - type : the job type (or false for the default types)
  92. * - maxJobs : maximum number of jobs to run
  93. * - maxTime : maximum time in seconds before stopping
  94. * - throttle : whether to respect job backoff configuration
  95. * @return array Summary response that can easily be JSON serialized
  96. */
  97. public function run( array $options ) {
  98. $jobClasses = $this->config->get( 'JobClasses' );
  99. $profilerLimits = $this->config->get( 'TrxProfilerLimits' );
  100. $response = [ 'jobs' => [], 'reached' => 'none-ready' ];
  101. $type = $options['type'] ?? false;
  102. $maxJobs = $options['maxJobs'] ?? false;
  103. $maxTime = $options['maxTime'] ?? false;
  104. $noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
  105. // Bail if job type is invalid
  106. if ( $type !== false && !isset( $jobClasses[$type] ) ) {
  107. $response['reached'] = 'none-possible';
  108. return $response;
  109. }
  110. // Bail out if DB is in read-only mode
  111. if ( wfReadOnly() ) {
  112. $response['reached'] = 'read-only';
  113. return $response;
  114. }
  115. $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
  116. if ( $lbFactory->hasTransactionRound() ) {
  117. throw new LogicException( __METHOD__ . ' called with an active transaction round.' );
  118. }
  119. // Bail out if there is too much DB lag.
  120. // This check should not block as we want to try other wiki queues.
  121. list( , $maxLag ) = $lbFactory->getMainLB()->getMaxLag();
  122. if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
  123. $response['reached'] = 'replica-lag-limit';
  124. return $response;
  125. }
  126. // Catch huge single updates that lead to replica DB lag
  127. $trxProfiler = Profiler::instance()->getTransactionProfiler();
  128. $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
  129. $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
  130. // Some jobs types should not run until a certain timestamp
  131. $backoffs = []; // map of (type => UNIX expiry)
  132. $backoffDeltas = []; // map of (type => seconds)
  133. $wait = 'wait'; // block to read backoffs the first time
  134. $group = JobQueueGroup::singleton();
  135. $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
  136. $jobsPopped = 0;
  137. $timeMsTotal = 0;
  138. $startTime = microtime( true ); // time since jobs started running
  139. $lastCheckTime = 1; // timestamp of last replica DB check
  140. do {
  141. // Sync the persistent backoffs with concurrent runners
  142. $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
  143. $blacklist = $noThrottle ? [] : array_keys( $backoffs );
  144. $wait = 'nowait'; // less important now
  145. if ( $type === false ) {
  146. $job = $group->pop(
  147. JobQueueGroup::TYPE_DEFAULT,
  148. JobQueueGroup::USE_CACHE,
  149. $blacklist
  150. );
  151. } elseif ( in_array( $type, $blacklist ) ) {
  152. $job = false; // requested queue in backoff state
  153. } else {
  154. $job = $group->pop( $type ); // job from a single queue
  155. }
  156. if ( $job ) { // found a job
  157. ++$jobsPopped;
  158. $popTime = time();
  159. $jType = $job->getType();
  160. WebRequest::overrideRequestId( $job->getRequestId() );
  161. // Back off of certain jobs for a while (for throttling and for errors)
  162. $ttw = $this->getBackoffTimeToWait( $job );
  163. if ( $ttw > 0 ) {
  164. // Always add the delta for other runners in case the time running the
  165. // job negated the backoff for each individually but not collectively.
  166. $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
  167. ? $backoffDeltas[$jType] + $ttw
  168. : $ttw;
  169. $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
  170. }
  171. $info = $this->executeJob( $job, $lbFactory, $stats, $popTime );
  172. if ( $info['status'] !== false || !$job->allowRetries() ) {
  173. $group->ack( $job ); // succeeded or job cannot be retried
  174. }
  175. // Back off of certain jobs for a while (for throttling and for errors)
  176. if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) {
  177. $ttw = max( $ttw, $this->getErrorBackoffTTL( $info['error'] ) );
  178. $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
  179. ? $backoffDeltas[$jType] + $ttw
  180. : $ttw;
  181. }
  182. $response['jobs'][] = [
  183. 'type' => $jType,
  184. 'status' => ( $info['status'] === false ) ? 'failed' : 'ok',
  185. 'error' => $info['error'],
  186. 'time' => $info['timeMs']
  187. ];
  188. $timeMsTotal += $info['timeMs'];
  189. // Break out if we hit the job count or wall time limits...
  190. if ( $maxJobs && $jobsPopped >= $maxJobs ) {
  191. $response['reached'] = 'job-limit';
  192. break;
  193. } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
  194. $response['reached'] = 'time-limit';
  195. break;
  196. }
  197. // Don't let any of the main DB replica DBs get backed up.
  198. // This only waits for so long before exiting and letting
  199. // other wikis in the farm (on different masters) get a chance.
  200. $timePassed = microtime( true ) - $lastCheckTime;
  201. if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
  202. $success = $lbFactory->waitForReplication( [
  203. 'ifWritesSince' => $lastCheckTime,
  204. 'timeout' => self::MAX_ALLOWED_LAG,
  205. ] );
  206. if ( !$success ) {
  207. $response['reached'] = 'replica-lag-limit';
  208. break;
  209. }
  210. $lastCheckTime = microtime( true );
  211. }
  212. // Bail if near-OOM instead of in a job
  213. if ( !$this->checkMemoryOK() ) {
  214. $response['reached'] = 'memory-limit';
  215. break;
  216. }
  217. }
  218. } while ( $job ); // stop when there are no jobs
  219. // Sync the persistent backoffs for the next runJobs.php pass
  220. if ( $backoffDeltas ) {
  221. $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
  222. }
  223. $response['backoffs'] = $backoffs;
  224. $response['elapsed'] = $timeMsTotal;
  225. return $response;
  226. }
  227. /**
  228. * @param string $error
  229. * @return int TTL in seconds
  230. */
  231. private function getErrorBackoffTTL( $error ) {
  232. return strpos( $error, 'DBReadOnlyError' ) !== false
  233. ? self::READONLY_BACKOFF_TTL
  234. : self::ERROR_BACKOFF_TTL;
  235. }
  236. /**
  237. * @param RunnableJob $job
  238. * @param LBFactory $lbFactory
  239. * @param StatsdDataFactoryInterface $stats
  240. * @param float $popTime
  241. * @return array Map of status/error/timeMs
  242. */
  243. private function executeJob( RunnableJob $job, LBFactory $lbFactory, $stats, $popTime ) {
  244. $jType = $job->getType();
  245. $msg = $job->toString() . " STARTING";
  246. $this->logger->debug( $msg, [
  247. 'job_type' => $job->getType(),
  248. ] );
  249. $this->debugCallback( $msg );
  250. // Clear out title cache data from prior snapshots
  251. // (e.g. from before JobRunner was invoked in this process)
  252. MediaWikiServices::getInstance()->getLinkCache()->clear();
  253. // Run the job...
  254. $rssStart = $this->getMaxRssKb();
  255. $jobStartTime = microtime( true );
  256. try {
  257. $fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope
  258. // Flush any pending changes left over from an implicit transaction round
  259. if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
  260. $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
  261. } else {
  262. $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
  263. }
  264. // Clear any stale REPEATABLE-READ snapshots from replica DB connections
  265. $lbFactory->flushReplicaSnapshots( $fnameTrxOwner );
  266. $status = $job->run();
  267. $error = $job->getLastError();
  268. // Commit all pending changes from this job
  269. $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
  270. // Run any deferred update tasks; doUpdates() manages transactions itself
  271. DeferredUpdates::doUpdates();
  272. } catch ( Exception $e ) {
  273. MWExceptionHandler::rollbackMasterChangesAndLog( $e );
  274. $status = false;
  275. $error = get_class( $e ) . ': ' . $e->getMessage();
  276. }
  277. // Always attempt to call teardown() even if Job throws exception.
  278. try {
  279. $job->tearDown( $status );
  280. } catch ( Exception $e ) {
  281. MWExceptionHandler::logException( $e );
  282. }
  283. $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
  284. $rssEnd = $this->getMaxRssKb();
  285. // Record how long jobs wait before getting popped
  286. $readyTs = $job->getReadyTimestamp();
  287. if ( $readyTs ) {
  288. $pickupDelay = max( 0, $popTime - $readyTs );
  289. $stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
  290. $stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
  291. }
  292. // Record root job age for jobs being run
  293. $rootTimestamp = $job->getRootJobParams()['rootJobTimestamp'];
  294. if ( $rootTimestamp ) {
  295. $age = max( 0, $popTime - wfTimestamp( TS_UNIX, $rootTimestamp ) );
  296. $stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age );
  297. }
  298. // Track the execution time for jobs
  299. $stats->timing( "jobqueue.run.$jType", $timeMs );
  300. // Track RSS increases for jobs (in case of memory leaks)
  301. if ( $rssStart && $rssEnd ) {
  302. $stats->updateCount( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
  303. }
  304. if ( $status === false ) {
  305. $msg = $job->toString() . " t={job_duration} error={job_error}";
  306. $this->logger->error( $msg, [
  307. 'job_type' => $job->getType(),
  308. 'job_duration' => $timeMs,
  309. 'job_error' => $error,
  310. ] );
  311. $msg = $job->toString() . " t=$timeMs error={$error}";
  312. $this->debugCallback( $msg );
  313. } else {
  314. $msg = $job->toString() . " t={job_duration} good";
  315. $this->logger->info( $msg, [
  316. 'job_type' => $job->getType(),
  317. 'job_duration' => $timeMs,
  318. ] );
  319. $msg = $job->toString() . " t=$timeMs good";
  320. $this->debugCallback( $msg );
  321. }
  322. return [ 'status' => $status, 'error' => $error, 'timeMs' => $timeMs ];
  323. }
  324. /**
  325. * @return int|null Max memory RSS in kilobytes
  326. */
  327. private function getMaxRssKb() {
  328. $info = wfGetRusage() ?: [];
  329. // see https://linux.die.net/man/2/getrusage
  330. return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null;
  331. }
  332. /**
  333. * @param RunnableJob $job
  334. * @return int Seconds for this runner to avoid doing more jobs of this type
  335. * @see $wgJobBackoffThrottling
  336. */
  337. private function getBackoffTimeToWait( RunnableJob $job ) {
  338. $throttling = $this->config->get( 'JobBackoffThrottling' );
  339. if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
  340. return 0; // not throttled
  341. }
  342. $itemsPerSecond = $throttling[$job->getType()];
  343. if ( $itemsPerSecond <= 0 ) {
  344. return 0; // not throttled
  345. }
  346. $seconds = 0;
  347. if ( $job->workItemCount() > 0 ) {
  348. $exactSeconds = $job->workItemCount() / $itemsPerSecond;
  349. // use randomized rounding
  350. $seconds = floor( $exactSeconds );
  351. $remainder = $exactSeconds - $seconds;
  352. $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
  353. }
  354. return (int)$seconds;
  355. }
  356. /**
  357. * Get the previous backoff expiries from persistent storage
  358. * On I/O or lock acquisition failure this returns the original $backoffs.
  359. *
  360. * @param array $backoffs Map of (job type => UNIX timestamp)
  361. * @param string $mode Lock wait mode - "wait" or "nowait"
  362. * @return array Map of (job type => backoff expiry timestamp)
  363. */
  364. private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
  365. $file = wfTempDir() . '/mw-runJobs-backoffs.json';
  366. if ( is_file( $file ) ) {
  367. $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
  368. $handle = fopen( $file, 'rb' );
  369. if ( !flock( $handle, LOCK_SH | $noblock ) ) {
  370. fclose( $handle );
  371. return $backoffs; // don't wait on lock
  372. }
  373. $content = stream_get_contents( $handle );
  374. flock( $handle, LOCK_UN );
  375. fclose( $handle );
  376. $ctime = microtime( true );
  377. $cBackoffs = json_decode( $content, true ) ?: [];
  378. foreach ( $cBackoffs as $type => $timestamp ) {
  379. if ( $timestamp < $ctime ) {
  380. unset( $cBackoffs[$type] );
  381. }
  382. }
  383. } else {
  384. $cBackoffs = [];
  385. }
  386. return $cBackoffs;
  387. }
  388. /**
  389. * Merge the current backoff expiries from persistent storage
  390. *
  391. * The $deltas map is set to an empty array on success.
  392. * On I/O or lock acquisition failure this returns the original $backoffs.
  393. *
  394. * @param array $backoffs Map of (job type => UNIX timestamp)
  395. * @param array $deltas Map of (job type => seconds)
  396. * @param string $mode Lock wait mode - "wait" or "nowait"
  397. * @return array The new backoffs account for $backoffs and the latest file data
  398. */
  399. private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
  400. if ( !$deltas ) {
  401. return $this->loadBackoffs( $backoffs, $mode );
  402. }
  403. $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
  404. $file = wfTempDir() . '/mw-runJobs-backoffs.json';
  405. $handle = fopen( $file, 'wb+' );
  406. if ( !flock( $handle, LOCK_EX | $noblock ) ) {
  407. fclose( $handle );
  408. return $backoffs; // don't wait on lock
  409. }
  410. $ctime = microtime( true );
  411. $content = stream_get_contents( $handle );
  412. $cBackoffs = json_decode( $content, true ) ?: [];
  413. foreach ( $deltas as $type => $seconds ) {
  414. $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
  415. ? $cBackoffs[$type] + $seconds
  416. : $ctime + $seconds;
  417. }
  418. foreach ( $cBackoffs as $type => $timestamp ) {
  419. if ( $timestamp < $ctime ) {
  420. unset( $cBackoffs[$type] );
  421. }
  422. }
  423. ftruncate( $handle, 0 );
  424. fwrite( $handle, json_encode( $cBackoffs ) );
  425. flock( $handle, LOCK_UN );
  426. fclose( $handle );
  427. $deltas = [];
  428. return $cBackoffs;
  429. }
  430. /**
  431. * Make sure that this script is not too close to the memory usage limit.
  432. * It is better to die in between jobs than OOM right in the middle of one.
  433. * @return bool
  434. */
  435. private function checkMemoryOK() {
  436. static $maxBytes = null;
  437. if ( $maxBytes === null ) {
  438. $m = [];
  439. if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
  440. list( , $num, $unit ) = $m;
  441. $conv = [ 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ];
  442. $maxBytes = $num * $conv[strtolower( $unit )];
  443. } else {
  444. $maxBytes = 0;
  445. }
  446. }
  447. $usedBytes = memory_get_usage();
  448. if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
  449. $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes}).";
  450. $this->logger->error( $msg, [
  451. 'used_bytes' => $usedBytes,
  452. 'max_bytes' => $maxBytes,
  453. ] );
  454. $msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
  455. $this->debugCallback( $msg );
  456. return false;
  457. }
  458. return true;
  459. }
  460. /**
  461. * Log the job message
  462. * @param string $msg The message to log
  463. */
  464. private function debugCallback( $msg ) {
  465. if ( $this->debug ) {
  466. call_user_func_array( $this->debug, [ wfTimestamp( TS_DB ) . " $msg\n" ] );
  467. }
  468. }
  469. /**
  470. * Issue a commit on all masters who are currently in a transaction and have
  471. * made changes to the database. It also supports sometimes waiting for the
  472. * local wiki's replica DBs to catch up. See the documentation for
  473. * $wgJobSerialCommitThreshold for more.
  474. *
  475. * @param LBFactory $lbFactory
  476. * @param RunnableJob $job
  477. * @param string $fnameTrxOwner
  478. * @throws DBError
  479. */
  480. private function commitMasterChanges( LBFactory $lbFactory, RunnableJob $job, $fnameTrxOwner ) {
  481. $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' );
  482. $time = false;
  483. $lb = $lbFactory->getMainLB();
  484. if ( $syncThreshold !== false && $lb->hasStreamingReplicaServers() ) {
  485. // Generally, there is one master connection to the local DB
  486. $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
  487. // We need natively blocking fast locks
  488. if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
  489. $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
  490. if ( $time < $syncThreshold ) {
  491. $dbwSerial = false;
  492. }
  493. } else {
  494. $dbwSerial = false;
  495. }
  496. } else {
  497. // There are no replica DBs or writes are all to foreign DB (we don't handle that)
  498. $dbwSerial = false;
  499. }
  500. if ( !$dbwSerial ) {
  501. $lbFactory->commitMasterChanges(
  502. $fnameTrxOwner,
  503. // Abort if any transaction was too big
  504. [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
  505. );
  506. return;
  507. }
  508. $ms = intval( 1000 * $time );
  509. $msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]";
  510. $this->logger->info( $msg, [
  511. 'job_type' => $job->getType(),
  512. 'job_commit_write_ms' => $ms,
  513. ] );
  514. $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
  515. $this->debugCallback( $msg );
  516. // Wait for an exclusive lock to commit
  517. if ( !$dbwSerial->lock( 'jobrunner-serial-commit', $fnameTrxOwner, 30 ) ) {
  518. // This will trigger a rollback in the main loop
  519. throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
  520. }
  521. $unlocker = new ScopedCallback( function () use ( $dbwSerial, $fnameTrxOwner ) {
  522. $dbwSerial->unlock( 'jobrunner-serial-commit', $fnameTrxOwner );
  523. } );
  524. // Wait for the replica DBs to catch up
  525. $pos = $lb->getMasterPos();
  526. if ( $pos ) {
  527. $lb->waitForAll( $pos );
  528. }
  529. // Actually commit the DB master changes
  530. $lbFactory->commitMasterChanges(
  531. $fnameTrxOwner,
  532. // Abort if any transaction was too big
  533. [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
  534. );
  535. ScopedCallback::consume( $unlocker );
  536. }
  537. }