JobQueueDB.php 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913
  1. <?php
  2. /**
  3. * Database-backed job queue code.
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License along
  16. * with this program; if not, write to the Free Software Foundation, Inc.,
  17. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. * http://www.gnu.org/copyleft/gpl.html
  19. *
  20. * @file
  21. */
  22. use Wikimedia\Rdbms\IDatabase;
  23. use Wikimedia\Rdbms\Database;
  24. use Wikimedia\Rdbms\DBConnectionError;
  25. use Wikimedia\Rdbms\DBError;
  26. use MediaWiki\MediaWikiServices;
  27. use Wikimedia\Rdbms\IMaintainableDatabase;
  28. use Wikimedia\ScopedCallback;
  29. /**
  30. * Class to handle job queues stored in the DB
  31. *
  32. * @ingroup JobQueue
  33. * @since 1.21
  34. */
  35. class JobQueueDB extends JobQueue {
  36. const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
  37. const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
  38. const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
  39. const MAX_OFFSET = 255; // integer; maximum number of rows to skip
  40. /** @var IMaintainableDatabase|DBError|null */
  41. protected $conn;
  42. /** @var array|null Server configuration array */
  43. protected $server;
  44. /** @var string|null Name of an external DB cluster or null for the local DB cluster */
  45. protected $cluster;
  46. /**
  47. * Additional parameters include:
  48. * - server : Server configuration array for Database::factory. Overrides "cluster".
  49. * - cluster : The name of an external cluster registered via LBFactory.
  50. * If not specified, the primary DB cluster for the wiki will be used.
  51. * This can be overridden with a custom cluster so that DB handles will
  52. * be retrieved via LBFactory::getExternalLB() and getConnection().
  53. * @param array $params
  54. */
  55. protected function __construct( array $params ) {
  56. parent::__construct( $params );
  57. if ( isset( $params['server'] ) ) {
  58. $this->server = $params['server'];
  59. } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
  60. $this->cluster = $params['cluster'];
  61. }
  62. }
  63. protected function supportedOrders() {
  64. return [ 'random', 'timestamp', 'fifo' ];
  65. }
  66. protected function optimalOrder() {
  67. return 'random';
  68. }
  69. /**
  70. * @see JobQueue::doIsEmpty()
  71. * @return bool
  72. */
  73. protected function doIsEmpty() {
  74. $dbr = $this->getReplicaDB();
  75. /** @noinspection PhpUnusedLocalVariableInspection */
  76. $scope = $this->getScopedNoTrxFlag( $dbr );
  77. try {
  78. $found = $dbr->selectField( // unclaimed job
  79. 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
  80. );
  81. } catch ( DBError $e ) {
  82. throw $this->getDBException( $e );
  83. }
  84. return !$found;
  85. }
  86. /**
  87. * @see JobQueue::doGetSize()
  88. * @return int
  89. */
  90. protected function doGetSize() {
  91. $key = $this->getCacheKey( 'size' );
  92. $size = $this->wanCache->get( $key );
  93. if ( is_int( $size ) ) {
  94. return $size;
  95. }
  96. $dbr = $this->getReplicaDB();
  97. /** @noinspection PhpUnusedLocalVariableInspection */
  98. $scope = $this->getScopedNoTrxFlag( $dbr );
  99. try {
  100. $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
  101. [ 'job_cmd' => $this->type, 'job_token' => '' ],
  102. __METHOD__
  103. );
  104. } catch ( DBError $e ) {
  105. throw $this->getDBException( $e );
  106. }
  107. $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
  108. return $size;
  109. }
  110. /**
  111. * @see JobQueue::doGetAcquiredCount()
  112. * @return int
  113. */
  114. protected function doGetAcquiredCount() {
  115. if ( $this->claimTTL <= 0 ) {
  116. return 0; // no acknowledgements
  117. }
  118. $key = $this->getCacheKey( 'acquiredcount' );
  119. $count = $this->wanCache->get( $key );
  120. if ( is_int( $count ) ) {
  121. return $count;
  122. }
  123. $dbr = $this->getReplicaDB();
  124. /** @noinspection PhpUnusedLocalVariableInspection */
  125. $scope = $this->getScopedNoTrxFlag( $dbr );
  126. try {
  127. $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
  128. [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
  129. __METHOD__
  130. );
  131. } catch ( DBError $e ) {
  132. throw $this->getDBException( $e );
  133. }
  134. $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
  135. return $count;
  136. }
  137. /**
  138. * @see JobQueue::doGetAbandonedCount()
  139. * @return int
  140. * @throws MWException
  141. */
  142. protected function doGetAbandonedCount() {
  143. if ( $this->claimTTL <= 0 ) {
  144. return 0; // no acknowledgements
  145. }
  146. $key = $this->getCacheKey( 'abandonedcount' );
  147. $count = $this->wanCache->get( $key );
  148. if ( is_int( $count ) ) {
  149. return $count;
  150. }
  151. $dbr = $this->getReplicaDB();
  152. /** @noinspection PhpUnusedLocalVariableInspection */
  153. $scope = $this->getScopedNoTrxFlag( $dbr );
  154. try {
  155. $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
  156. [
  157. 'job_cmd' => $this->type,
  158. "job_token != {$dbr->addQuotes( '' )}",
  159. "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
  160. ],
  161. __METHOD__
  162. );
  163. } catch ( DBError $e ) {
  164. throw $this->getDBException( $e );
  165. }
  166. $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
  167. return $count;
  168. }
  169. /**
  170. * @see JobQueue::doBatchPush()
  171. * @param IJobSpecification[] $jobs
  172. * @param int $flags
  173. * @throws DBError|Exception
  174. * @return void
  175. */
  176. protected function doBatchPush( array $jobs, $flags ) {
  177. $dbw = $this->getMasterDB();
  178. /** @noinspection PhpUnusedLocalVariableInspection */
  179. $scope = $this->getScopedNoTrxFlag( $dbw );
  180. // In general, there will be two cases here:
  181. // a) sqlite; DB connection is probably a regular round-aware handle.
  182. // If the connection is busy with a transaction, then defer the job writes
  183. // until right before the main round commit step. Any errors that bubble
  184. // up will rollback the main commit round.
  185. // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
  186. // No transaction is active nor will be started by writes, so enqueue the jobs
  187. // now so that any errors will show up immediately as the interface expects. Any
  188. // errors that bubble up will rollback the main commit round.
  189. $fname = __METHOD__;
  190. $dbw->onTransactionPreCommitOrIdle(
  191. function ( IDatabase $dbw ) use ( $jobs, $flags, $fname ) {
  192. $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
  193. },
  194. $fname
  195. );
  196. }
  197. /**
  198. * This function should *not* be called outside of JobQueueDB
  199. *
  200. * @suppress SecurityCheck-SQLInjection Bug in phan-taint-check handling bulk inserts
  201. * @param IDatabase $dbw
  202. * @param IJobSpecification[] $jobs
  203. * @param int $flags
  204. * @param string $method
  205. * @throws DBError
  206. * @return void
  207. */
  208. public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
  209. if ( $jobs === [] ) {
  210. return;
  211. }
  212. $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
  213. $rowList = []; // list of jobs for jobs that are not de-duplicated
  214. foreach ( $jobs as $job ) {
  215. $row = $this->insertFields( $job, $dbw );
  216. if ( $job->ignoreDuplicates() ) {
  217. $rowSet[$row['job_sha1']] = $row;
  218. } else {
  219. $rowList[] = $row;
  220. }
  221. }
  222. if ( $flags & self::QOS_ATOMIC ) {
  223. $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
  224. }
  225. try {
  226. // Strip out any duplicate jobs that are already in the queue...
  227. if ( count( $rowSet ) ) {
  228. $res = $dbw->select( 'job', 'job_sha1',
  229. [
  230. // No job_type condition since it's part of the job_sha1 hash
  231. 'job_sha1' => array_keys( $rowSet ),
  232. 'job_token' => '' // unclaimed
  233. ],
  234. $method
  235. );
  236. foreach ( $res as $row ) {
  237. wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
  238. unset( $rowSet[$row->job_sha1] ); // already enqueued
  239. }
  240. }
  241. // Build the full list of job rows to insert
  242. $rows = array_merge( $rowList, array_values( $rowSet ) );
  243. // Insert the job rows in chunks to avoid replica DB lag...
  244. foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
  245. $dbw->insert( 'job', $rowBatch, $method );
  246. }
  247. $this->incrStats( 'inserts', $this->type, count( $rows ) );
  248. $this->incrStats( 'dupe_inserts', $this->type,
  249. count( $rowSet ) + count( $rowList ) - count( $rows )
  250. );
  251. } catch ( DBError $e ) {
  252. throw $this->getDBException( $e );
  253. }
  254. if ( $flags & self::QOS_ATOMIC ) {
  255. $dbw->endAtomic( $method );
  256. }
  257. }
  258. /**
  259. * @see JobQueue::doPop()
  260. * @return RunnableJob|bool
  261. */
  262. protected function doPop() {
  263. $dbw = $this->getMasterDB();
  264. /** @noinspection PhpUnusedLocalVariableInspection */
  265. $scope = $this->getScopedNoTrxFlag( $dbw );
  266. $job = false; // job popped off
  267. try {
  268. $uuid = wfRandomString( 32 ); // pop attempt
  269. do { // retry when our row is invalid or deleted as a duplicate
  270. // Try to reserve a row in the DB...
  271. if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
  272. $row = $this->claimOldest( $uuid );
  273. } else { // random first
  274. $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
  275. $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
  276. $row = $this->claimRandom( $uuid, $rand, $gte );
  277. }
  278. // Check if we found a row to reserve...
  279. if ( !$row ) {
  280. break; // nothing to do
  281. }
  282. $this->incrStats( 'pops', $this->type );
  283. // Get the job object from the row...
  284. $job = $this->jobFromRow( $row );
  285. break; // done
  286. } while ( true );
  287. if ( !$job || mt_rand( 0, 9 ) == 0 ) {
  288. // Handled jobs that need to be recycled/deleted;
  289. // any recycled jobs will be picked up next attempt
  290. $this->recycleAndDeleteStaleJobs();
  291. }
  292. } catch ( DBError $e ) {
  293. throw $this->getDBException( $e );
  294. }
  295. return $job;
  296. }
  297. /**
  298. * Reserve a row with a single UPDATE without holding row locks over RTTs...
  299. *
  300. * @param string $uuid 32 char hex string
  301. * @param int $rand Random unsigned integer (31 bits)
  302. * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
  303. * @return stdClass|bool Row|false
  304. */
  305. protected function claimRandom( $uuid, $rand, $gte ) {
  306. $dbw = $this->getMasterDB();
  307. /** @noinspection PhpUnusedLocalVariableInspection */
  308. $scope = $this->getScopedNoTrxFlag( $dbw );
  309. // Check cache to see if the queue has <= OFFSET items
  310. $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
  311. $invertedDirection = false; // whether one job_random direction was already scanned
  312. // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
  313. // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
  314. // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
  315. // be used here with MySQL.
  316. do {
  317. if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
  318. // For small queues, using OFFSET will overshoot and return no rows more often.
  319. // Instead, this uses job_random to pick a row (possibly checking both directions).
  320. $ineq = $gte ? '>=' : '<=';
  321. $dir = $gte ? 'ASC' : 'DESC';
  322. $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
  323. [
  324. 'job_cmd' => $this->type,
  325. 'job_token' => '', // unclaimed
  326. "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
  327. __METHOD__,
  328. [ 'ORDER BY' => "job_random {$dir}" ]
  329. );
  330. if ( !$row && !$invertedDirection ) {
  331. $gte = !$gte;
  332. $invertedDirection = true;
  333. continue; // try the other direction
  334. }
  335. } else { // table *may* have >= MAX_OFFSET rows
  336. // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
  337. // in MySQL if there are many rows for some reason. This uses a small OFFSET
  338. // instead of job_random for reducing excess claim retries.
  339. $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
  340. [
  341. 'job_cmd' => $this->type,
  342. 'job_token' => '', // unclaimed
  343. ],
  344. __METHOD__,
  345. [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
  346. );
  347. if ( !$row ) {
  348. $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
  349. $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
  350. continue; // use job_random
  351. }
  352. }
  353. if ( $row ) { // claim the job
  354. $dbw->update( 'job', // update by PK
  355. [
  356. 'job_token' => $uuid,
  357. 'job_token_timestamp' => $dbw->timestamp(),
  358. 'job_attempts = job_attempts+1' ],
  359. [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
  360. __METHOD__
  361. );
  362. // This might get raced out by another runner when claiming the previously
  363. // selected row. The use of job_random should minimize this problem, however.
  364. if ( !$dbw->affectedRows() ) {
  365. $row = false; // raced out
  366. }
  367. } else {
  368. break; // nothing to do
  369. }
  370. } while ( !$row );
  371. return $row;
  372. }
  373. /**
  374. * Reserve a row with a single UPDATE without holding row locks over RTTs...
  375. *
  376. * @param string $uuid 32 char hex string
  377. * @return stdClass|bool Row|false
  378. */
  379. protected function claimOldest( $uuid ) {
  380. $dbw = $this->getMasterDB();
  381. /** @noinspection PhpUnusedLocalVariableInspection */
  382. $scope = $this->getScopedNoTrxFlag( $dbw );
  383. $row = false; // the row acquired
  384. do {
  385. if ( $dbw->getType() === 'mysql' ) {
  386. // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
  387. // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
  388. // Postgres has no such limitation. However, MySQL offers an
  389. // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
  390. $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
  391. "SET " .
  392. "job_token = {$dbw->addQuotes( $uuid ) }, " .
  393. "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
  394. "job_attempts = job_attempts+1 " .
  395. "WHERE ( " .
  396. "job_cmd = {$dbw->addQuotes( $this->type )} " .
  397. "AND job_token = {$dbw->addQuotes( '' )} " .
  398. ") ORDER BY job_id ASC LIMIT 1",
  399. __METHOD__
  400. );
  401. } else {
  402. // Use a subquery to find the job, within an UPDATE to claim it.
  403. // This uses as much of the DB wrapper functions as possible.
  404. $dbw->update( 'job',
  405. [
  406. 'job_token' => $uuid,
  407. 'job_token_timestamp' => $dbw->timestamp(),
  408. 'job_attempts = job_attempts+1' ],
  409. [ 'job_id = (' .
  410. $dbw->selectSQLText( 'job', 'job_id',
  411. [ 'job_cmd' => $this->type, 'job_token' => '' ],
  412. __METHOD__,
  413. [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
  414. ')'
  415. ],
  416. __METHOD__
  417. );
  418. }
  419. // Fetch any row that we just reserved...
  420. if ( $dbw->affectedRows() ) {
  421. $row = $dbw->selectRow( 'job', self::selectFields(),
  422. [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
  423. );
  424. if ( !$row ) { // raced out by duplicate job removal
  425. wfDebug( "Row deleted as duplicate by another process.\n" );
  426. }
  427. } else {
  428. break; // nothing to do
  429. }
  430. } while ( !$row );
  431. return $row;
  432. }
  433. /**
  434. * @see JobQueue::doAck()
  435. * @param RunnableJob $job
  436. * @throws MWException
  437. */
  438. protected function doAck( RunnableJob $job ) {
  439. $id = $job->getMetadata( 'id' );
  440. if ( $id === null ) {
  441. throw new MWException( "Job of type '{$job->getType()}' has no ID." );
  442. }
  443. $dbw = $this->getMasterDB();
  444. /** @noinspection PhpUnusedLocalVariableInspection */
  445. $scope = $this->getScopedNoTrxFlag( $dbw );
  446. try {
  447. // Delete a row with a single DELETE without holding row locks over RTTs...
  448. $dbw->delete(
  449. 'job',
  450. [ 'job_cmd' => $this->type, 'job_id' => $id ],
  451. __METHOD__
  452. );
  453. $this->incrStats( 'acks', $this->type );
  454. } catch ( DBError $e ) {
  455. throw $this->getDBException( $e );
  456. }
  457. }
  458. /**
  459. * @see JobQueue::doDeduplicateRootJob()
  460. * @param IJobSpecification $job
  461. * @throws MWException
  462. * @return bool
  463. */
  464. protected function doDeduplicateRootJob( IJobSpecification $job ) {
  465. // Callers should call JobQueueGroup::push() before this method so that if the
  466. // insert fails, the de-duplication registration will be aborted. Since the insert
  467. // is deferred till "transaction idle", do the same here, so that the ordering is
  468. // maintained. Having only the de-duplication registration succeed would cause
  469. // jobs to become no-ops without any actual jobs that made them redundant.
  470. $dbw = $this->getMasterDB();
  471. /** @noinspection PhpUnusedLocalVariableInspection */
  472. $scope = $this->getScopedNoTrxFlag( $dbw );
  473. $dbw->onTransactionCommitOrIdle(
  474. function () use ( $job ) {
  475. parent::doDeduplicateRootJob( $job );
  476. },
  477. __METHOD__
  478. );
  479. return true;
  480. }
  481. /**
  482. * @see JobQueue::doDelete()
  483. * @return bool
  484. */
  485. protected function doDelete() {
  486. $dbw = $this->getMasterDB();
  487. /** @noinspection PhpUnusedLocalVariableInspection */
  488. $scope = $this->getScopedNoTrxFlag( $dbw );
  489. try {
  490. $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
  491. } catch ( DBError $e ) {
  492. throw $this->getDBException( $e );
  493. }
  494. return true;
  495. }
  496. /**
  497. * @see JobQueue::doWaitForBackups()
  498. * @return void
  499. */
  500. protected function doWaitForBackups() {
  501. if ( $this->server ) {
  502. return; // not using LBFactory instance
  503. }
  504. $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
  505. $lbFactory->waitForReplication( [
  506. 'domain' => $this->domain,
  507. 'cluster' => is_string( $this->cluster ) ? $this->cluster : false
  508. ] );
  509. }
  510. /**
  511. * @return void
  512. */
  513. protected function doFlushCaches() {
  514. foreach ( [ 'size', 'acquiredcount' ] as $type ) {
  515. $this->wanCache->delete( $this->getCacheKey( $type ) );
  516. }
  517. }
  518. /**
  519. * @see JobQueue::getAllQueuedJobs()
  520. * @return Iterator
  521. */
  522. public function getAllQueuedJobs() {
  523. return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
  524. }
  525. /**
  526. * @see JobQueue::getAllAcquiredJobs()
  527. * @return Iterator
  528. */
  529. public function getAllAcquiredJobs() {
  530. return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
  531. }
  532. /**
  533. * @param array $conds Query conditions
  534. * @return Iterator
  535. */
  536. protected function getJobIterator( array $conds ) {
  537. $dbr = $this->getReplicaDB();
  538. /** @noinspection PhpUnusedLocalVariableInspection */
  539. $scope = $this->getScopedNoTrxFlag( $dbr );
  540. try {
  541. return new MappedIterator(
  542. $dbr->select( 'job', self::selectFields(), $conds ),
  543. function ( $row ) {
  544. return $this->jobFromRow( $row );
  545. }
  546. );
  547. } catch ( DBError $e ) {
  548. throw $this->getDBException( $e );
  549. }
  550. }
  551. public function getCoalesceLocationInternal() {
  552. if ( $this->server ) {
  553. return null; // not using the LBFactory instance
  554. }
  555. return is_string( $this->cluster )
  556. ? "DBCluster:{$this->cluster}:{$this->domain}"
  557. : "LBFactory:{$this->domain}";
  558. }
  559. protected function doGetSiblingQueuesWithJobs( array $types ) {
  560. $dbr = $this->getReplicaDB();
  561. /** @noinspection PhpUnusedLocalVariableInspection */
  562. $scope = $this->getScopedNoTrxFlag( $dbr );
  563. // @note: this does not check whether the jobs are claimed or not.
  564. // This is useful so JobQueueGroup::pop() also sees queues that only
  565. // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
  566. // failed jobs so that they can be popped again for that edge case.
  567. $res = $dbr->select( 'job', 'DISTINCT job_cmd',
  568. [ 'job_cmd' => $types ], __METHOD__ );
  569. $types = [];
  570. foreach ( $res as $row ) {
  571. $types[] = $row->job_cmd;
  572. }
  573. return $types;
  574. }
  575. protected function doGetSiblingQueueSizes( array $types ) {
  576. $dbr = $this->getReplicaDB();
  577. /** @noinspection PhpUnusedLocalVariableInspection */
  578. $scope = $this->getScopedNoTrxFlag( $dbr );
  579. $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
  580. [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
  581. $sizes = [];
  582. foreach ( $res as $row ) {
  583. $sizes[$row->job_cmd] = (int)$row->count;
  584. }
  585. return $sizes;
  586. }
  587. /**
  588. * Recycle or destroy any jobs that have been claimed for too long
  589. *
  590. * @return int Number of jobs recycled/deleted
  591. */
  592. public function recycleAndDeleteStaleJobs() {
  593. $now = time();
  594. $count = 0; // affected rows
  595. $dbw = $this->getMasterDB();
  596. /** @noinspection PhpUnusedLocalVariableInspection */
  597. $scope = $this->getScopedNoTrxFlag( $dbw );
  598. try {
  599. if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
  600. return $count; // already in progress
  601. }
  602. // Remove claims on jobs acquired for too long if enabled...
  603. if ( $this->claimTTL > 0 ) {
  604. $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
  605. // Get the IDs of jobs that have be claimed but not finished after too long.
  606. // These jobs can be recycled into the queue by expiring the claim. Selecting
  607. // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
  608. $res = $dbw->select( 'job', 'job_id',
  609. [
  610. 'job_cmd' => $this->type,
  611. "job_token != {$dbw->addQuotes( '' )}", // was acquired
  612. "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
  613. "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
  614. __METHOD__
  615. );
  616. $ids = array_map(
  617. function ( $o ) {
  618. return $o->job_id;
  619. }, iterator_to_array( $res )
  620. );
  621. if ( count( $ids ) ) {
  622. // Reset job_token for these jobs so that other runners will pick them up.
  623. // Set the timestamp to the current time, as it is useful to now that the job
  624. // was already tried before (the timestamp becomes the "released" time).
  625. $dbw->update( 'job',
  626. [
  627. 'job_token' => '',
  628. 'job_token_timestamp' => $dbw->timestamp( $now ) // time of release
  629. ],
  630. [ 'job_id' => $ids, "job_token != ''" ],
  631. __METHOD__
  632. );
  633. $affected = $dbw->affectedRows();
  634. $count += $affected;
  635. $this->incrStats( 'recycles', $this->type, $affected );
  636. }
  637. }
  638. // Just destroy any stale jobs...
  639. $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
  640. $conds = [
  641. 'job_cmd' => $this->type,
  642. "job_token != {$dbw->addQuotes( '' )}", // was acquired
  643. "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
  644. ];
  645. if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
  646. $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
  647. }
  648. // Get the IDs of jobs that are considered stale and should be removed. Selecting
  649. // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
  650. $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
  651. $ids = array_map(
  652. function ( $o ) {
  653. return $o->job_id;
  654. }, iterator_to_array( $res )
  655. );
  656. if ( count( $ids ) ) {
  657. $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
  658. $affected = $dbw->affectedRows();
  659. $count += $affected;
  660. $this->incrStats( 'abandons', $this->type, $affected );
  661. }
  662. $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
  663. } catch ( DBError $e ) {
  664. throw $this->getDBException( $e );
  665. }
  666. return $count;
  667. }
  668. /**
  669. * @param IJobSpecification $job
  670. * @param IDatabase $db
  671. * @return array
  672. */
  673. protected function insertFields( IJobSpecification $job, IDatabase $db ) {
  674. return [
  675. // Fields that describe the nature of the job
  676. 'job_cmd' => $job->getType(),
  677. 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
  678. 'job_title' => $job->getParams()['title'] ?? '',
  679. 'job_params' => self::makeBlob( $job->getParams() ),
  680. // Additional job metadata
  681. 'job_timestamp' => $db->timestamp(),
  682. 'job_sha1' => Wikimedia\base_convert(
  683. sha1( serialize( $job->getDeduplicationInfo() ) ),
  684. 16, 36, 31
  685. ),
  686. 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
  687. ];
  688. }
  689. /**
  690. * @throws JobQueueConnectionError
  691. * @return IDatabase
  692. */
  693. protected function getReplicaDB() {
  694. try {
  695. return $this->getDB( DB_REPLICA );
  696. } catch ( DBConnectionError $e ) {
  697. throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
  698. }
  699. }
  700. /**
  701. * @throws JobQueueConnectionError
  702. * @return IMaintainableDatabase
  703. */
  704. protected function getMasterDB() {
  705. try {
  706. return $this->getDB( DB_MASTER );
  707. } catch ( DBConnectionError $e ) {
  708. throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
  709. }
  710. }
  711. /**
  712. * @param int $index (DB_REPLICA/DB_MASTER)
  713. * @return IMaintainableDatabase
  714. */
  715. protected function getDB( $index ) {
  716. if ( $this->server ) {
  717. if ( $this->conn instanceof IDatabase ) {
  718. return $this->conn;
  719. } elseif ( $this->conn instanceof DBError ) {
  720. throw $this->conn;
  721. }
  722. try {
  723. $this->conn = Database::factory( $this->server['type'], $this->server );
  724. } catch ( DBError $e ) {
  725. $this->conn = $e;
  726. throw $e;
  727. }
  728. return $this->conn;
  729. } else {
  730. $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
  731. $lb = is_string( $this->cluster )
  732. ? $lbFactory->getExternalLB( $this->cluster )
  733. : $lbFactory->getMainLB( $this->domain );
  734. if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
  735. // Keep a separate connection to avoid contention and deadlocks;
  736. // However, SQLite has the opposite behavior due to DB-level locking.
  737. $flags = $lb::CONN_TRX_AUTOCOMMIT;
  738. } else {
  739. // Jobs insertion will be defered until the PRESEND stage to reduce contention.
  740. $flags = 0;
  741. }
  742. return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
  743. }
  744. }
  745. /**
  746. * @param IDatabase $db
  747. * @return ScopedCallback
  748. */
  749. private function getScopedNoTrxFlag( IDatabase $db ) {
  750. $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
  751. $db->clearFlag( DBO_TRX ); // make each query its own transaction
  752. return new ScopedCallback( function () use ( $db, $autoTrx ) {
  753. if ( $autoTrx ) {
  754. $db->setFlag( DBO_TRX ); // restore old setting
  755. }
  756. } );
  757. }
  758. /**
  759. * @param string $property
  760. * @return string
  761. */
  762. private function getCacheKey( $property ) {
  763. $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
  764. return $this->wanCache->makeGlobalKey(
  765. 'jobqueue',
  766. $this->domain,
  767. $cluster,
  768. $this->type,
  769. $property
  770. );
  771. }
  772. /**
  773. * @param array|bool $params
  774. * @return string
  775. */
  776. protected static function makeBlob( $params ) {
  777. if ( $params !== false ) {
  778. return serialize( $params );
  779. } else {
  780. return '';
  781. }
  782. }
  783. /**
  784. * @param stdClass $row
  785. * @return RunnableJob|null
  786. */
  787. protected function jobFromRow( $row ) {
  788. $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
  789. if ( !is_array( $params ) ) { // this shouldn't happen
  790. throw new UnexpectedValueException(
  791. "Could not unserialize job with ID '{$row->job_id}'." );
  792. }
  793. $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
  794. $job = $this->factoryJob( $row->job_cmd, $params );
  795. $job->setMetadata( 'id', $row->job_id );
  796. $job->setMetadata( 'timestamp', $row->job_timestamp );
  797. return $job;
  798. }
  799. /**
  800. * @param DBError $e
  801. * @return JobQueueError
  802. */
  803. protected function getDBException( DBError $e ) {
  804. return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
  805. }
  806. /**
  807. * Return the list of job fields that should be selected.
  808. * @since 1.23
  809. * @return array
  810. */
  811. public static function selectFields() {
  812. return [
  813. 'job_id',
  814. 'job_cmd',
  815. 'job_namespace',
  816. 'job_title',
  817. 'job_timestamp',
  818. 'job_params',
  819. 'job_random',
  820. 'job_attempts',
  821. 'job_token',
  822. 'job_token_timestamp',
  823. 'job_sha1',
  824. ];
  825. }
  826. }