Job.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. <?php
  2. /**
  3. * Job queue task base code.
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation; either version 2 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License along
  16. * with this program; if not, write to the Free Software Foundation, Inc.,
  17. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. * http://www.gnu.org/copyleft/gpl.html
  19. *
  20. * @file
  21. * @defgroup JobQueue JobQueue
  22. */
  23. /**
  24. * Class to both describe a background job and handle jobs.
  25. * To push jobs onto queues, use JobQueueGroup::singleton()->push();
  26. *
  27. * @ingroup JobQueue
  28. */
  29. abstract class Job implements RunnableJob {
  30. /** @var string */
  31. public $command;
  32. /** @var array Array of job parameters */
  33. public $params;
  34. /** @var array Additional queue metadata */
  35. public $metadata = [];
  36. /** @var Title */
  37. protected $title;
  38. /** @var bool Expensive jobs may set this to true */
  39. protected $removeDuplicates = false;
  40. /** @var string Text for error that occurred last */
  41. protected $error;
  42. /** @var callable[] */
  43. protected $teardownCallbacks = [];
  44. /** @var int Bitfield of JOB_* class constants */
  45. protected $executionFlags = 0;
  46. /**
  47. * Create the appropriate object to handle a specific job
  48. *
  49. * @param string $command Job command
  50. * @param array|Title $params Job parameters
  51. * @throws InvalidArgumentException
  52. * @return Job
  53. */
  54. public static function factory( $command, $params = [] ) {
  55. global $wgJobClasses;
  56. if ( $params instanceof Title ) {
  57. // Backwards compatibility for old signature ($command, $title, $params)
  58. $title = $params;
  59. $params = func_num_args() >= 3 ? func_get_arg( 2 ) : [];
  60. } elseif ( isset( $params['namespace'] ) && isset( $params['title'] ) ) {
  61. // Handle job classes that take title as constructor parameter.
  62. // If a newer classes like GenericParameterJob uses these parameters,
  63. // then this happens in Job::__construct instead.
  64. $title = Title::makeTitle( $params['namespace'], $params['title'] );
  65. } else {
  66. // Default title for job classes not implementing GenericParameterJob.
  67. // This must be a valid title because it not directly passed to
  68. // our Job constructor, but rather it's subclasses which may expect
  69. // to be able to use it.
  70. $title = Title::makeTitle( NS_SPECIAL, 'Blankpage' );
  71. }
  72. if ( isset( $wgJobClasses[$command] ) ) {
  73. $handler = $wgJobClasses[$command];
  74. if ( is_callable( $handler ) ) {
  75. $job = call_user_func( $handler, $title, $params );
  76. } elseif ( class_exists( $handler ) ) {
  77. if ( is_subclass_of( $handler, GenericParameterJob::class ) ) {
  78. $job = new $handler( $params );
  79. } else {
  80. $job = new $handler( $title, $params );
  81. }
  82. } else {
  83. $job = null;
  84. }
  85. if ( $job instanceof Job ) {
  86. $job->command = $command;
  87. return $job;
  88. } else {
  89. throw new InvalidArgumentException(
  90. "Could not instantiate job '$command': bad spec!"
  91. );
  92. }
  93. }
  94. throw new InvalidArgumentException( "Invalid job command '{$command}'" );
  95. }
  96. /**
  97. * @param string $command
  98. * @param array|Title|null $params
  99. */
  100. public function __construct( $command, $params = null ) {
  101. if ( $params instanceof Title ) {
  102. // Backwards compatibility for old signature ($command, $title, $params)
  103. $title = $params;
  104. $params = func_num_args() >= 3 ? func_get_arg( 2 ) : [];
  105. } else {
  106. // Newer jobs may choose to not have a top-level title (e.g. GenericParameterJob)
  107. $title = null;
  108. }
  109. if ( !is_array( $params ) ) {
  110. throw new InvalidArgumentException( '$params must be an array' );
  111. }
  112. if (
  113. $title &&
  114. !isset( $params['namespace'] ) &&
  115. !isset( $params['title'] )
  116. ) {
  117. // When constructing this class for submitting to the queue,
  118. // normalise the $title arg of old job classes as part of $params.
  119. $params['namespace'] = $title->getNamespace();
  120. $params['title'] = $title->getDBkey();
  121. }
  122. $this->command = $command;
  123. $this->params = $params + [ 'requestId' => WebRequest::getRequestId() ];
  124. if ( $this->title === null ) {
  125. // Set this field for access via getTitle().
  126. $this->title = ( isset( $params['namespace'] ) && isset( $params['title'] ) )
  127. ? Title::makeTitle( $params['namespace'], $params['title'] )
  128. // GenericParameterJob classes without namespace/title params
  129. // should not use getTitle(). Set an invalid title as placeholder.
  130. : Title::makeTitle( NS_SPECIAL, '' );
  131. }
  132. }
  133. public function hasExecutionFlag( $flag ) {
  134. return ( $this->executionFlags & $flag ) === $flag;
  135. }
  136. /**
  137. * @return string
  138. */
  139. public function getType() {
  140. return $this->command;
  141. }
  142. /**
  143. * @return Title
  144. */
  145. final public function getTitle() {
  146. return $this->title;
  147. }
  148. /**
  149. * @return array
  150. */
  151. public function getParams() {
  152. return $this->params;
  153. }
  154. /**
  155. * @param string|null $field Metadata field or null to get all the metadata
  156. * @return mixed|null Value; null if missing
  157. * @since 1.33
  158. */
  159. public function getMetadata( $field = null ) {
  160. if ( $field === null ) {
  161. return $this->metadata;
  162. }
  163. return $this->metadata[$field] ?? null;
  164. }
  165. /**
  166. * @param string $field Key name to set the value for
  167. * @param mixed $value The value to set the field for
  168. * @return mixed|null The prior field value; null if missing
  169. * @since 1.33
  170. */
  171. public function setMetadata( $field, $value ) {
  172. $old = $this->getMetadata( $field );
  173. if ( $value === null ) {
  174. unset( $this->metadata[$field] );
  175. } else {
  176. $this->metadata[$field] = $value;
  177. }
  178. return $old;
  179. }
  180. /**
  181. * @return int|null UNIX timestamp to delay running this job until, otherwise null
  182. * @since 1.22
  183. */
  184. public function getReleaseTimestamp() {
  185. return isset( $this->params['jobReleaseTimestamp'] )
  186. ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] )
  187. : null;
  188. }
  189. /**
  190. * @return int|null UNIX timestamp of when the job was queued, or null
  191. * @since 1.26
  192. */
  193. public function getQueuedTimestamp() {
  194. return isset( $this->metadata['timestamp'] )
  195. ? wfTimestampOrNull( TS_UNIX, $this->metadata['timestamp'] )
  196. : null;
  197. }
  198. public function getRequestId() {
  199. return $this->params['requestId'] ?? null;
  200. }
  201. public function getReadyTimestamp() {
  202. return $this->getReleaseTimestamp() ?: $this->getQueuedTimestamp();
  203. }
  204. /**
  205. * Whether the queue should reject insertion of this job if a duplicate exists
  206. *
  207. * This can be used to avoid duplicated effort or combined with delayed jobs to
  208. * coalesce updates into larger batches. Claimed jobs are never treated as
  209. * duplicates of new jobs, and some queues may allow a few duplicates due to
  210. * network partitions and fail-over. Thus, additional locking is needed to
  211. * enforce mutual exclusion if this is really needed.
  212. *
  213. * @return bool
  214. */
  215. public function ignoreDuplicates() {
  216. return $this->removeDuplicates;
  217. }
  218. public function allowRetries() {
  219. return true;
  220. }
  221. public function workItemCount() {
  222. return 1;
  223. }
  224. /**
  225. * Subclasses may need to override this to make duplication detection work.
  226. * The resulting map conveys everything that makes the job unique. This is
  227. * only checked if ignoreDuplicates() returns true, meaning that duplicate
  228. * jobs are supposed to be ignored.
  229. *
  230. * @return array Map of key/values
  231. * @since 1.21
  232. */
  233. public function getDeduplicationInfo() {
  234. $info = [
  235. 'type' => $this->getType(),
  236. 'params' => $this->getParams()
  237. ];
  238. if ( is_array( $info['params'] ) ) {
  239. // Identical jobs with different "root" jobs should count as duplicates
  240. unset( $info['params']['rootJobSignature'] );
  241. unset( $info['params']['rootJobTimestamp'] );
  242. // Likewise for jobs with different delay times
  243. unset( $info['params']['jobReleaseTimestamp'] );
  244. // Identical jobs from different requests should count as duplicates
  245. unset( $info['params']['requestId'] );
  246. // Queues pack and hash this array, so normalize the order
  247. ksort( $info['params'] );
  248. }
  249. return $info;
  250. }
  251. /**
  252. * Get "root job" parameters for a task
  253. *
  254. * This is used to no-op redundant jobs, including child jobs of jobs,
  255. * as long as the children inherit the root job parameters. When a job
  256. * with root job parameters and "rootJobIsSelf" set is pushed, the
  257. * deduplicateRootJob() method is automatically called on it. If the
  258. * root job is only virtual and not actually pushed (e.g. the sub-jobs
  259. * are inserted directly), then call deduplicateRootJob() directly.
  260. *
  261. * @see JobQueue::deduplicateRootJob()
  262. *
  263. * @param string $key A key that identifies the task
  264. * @return array Map of:
  265. * - rootJobIsSelf : true
  266. * - rootJobSignature : hash (e.g. SHA1) that identifies the task
  267. * - rootJobTimestamp : TS_MW timestamp of this instance of the task
  268. * @since 1.21
  269. */
  270. public static function newRootJobParams( $key ) {
  271. return [
  272. 'rootJobIsSelf' => true,
  273. 'rootJobSignature' => sha1( $key ),
  274. 'rootJobTimestamp' => wfTimestampNow()
  275. ];
  276. }
  277. /**
  278. * @see JobQueue::deduplicateRootJob()
  279. * @return array
  280. * @since 1.21
  281. */
  282. public function getRootJobParams() {
  283. return [
  284. 'rootJobSignature' => $this->params['rootJobSignature'] ?? null,
  285. 'rootJobTimestamp' => $this->params['rootJobTimestamp'] ?? null
  286. ];
  287. }
  288. /**
  289. * @see JobQueue::deduplicateRootJob()
  290. * @return bool
  291. * @since 1.22
  292. */
  293. public function hasRootJobParams() {
  294. return isset( $this->params['rootJobSignature'] )
  295. && isset( $this->params['rootJobTimestamp'] );
  296. }
  297. /**
  298. * @see JobQueue::deduplicateRootJob()
  299. * @return bool Whether this is job is a root job
  300. */
  301. public function isRootJob() {
  302. return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] );
  303. }
  304. /**
  305. * @param callable $callback A function with one parameter, the success status, which will be
  306. * false if the job failed or it succeeded but the DB changes could not be committed or
  307. * any deferred updates threw an exception. (This parameter was added in 1.28.)
  308. * @since 1.27
  309. */
  310. protected function addTeardownCallback( $callback ) {
  311. $this->teardownCallbacks[] = $callback;
  312. }
  313. public function teardown( $status ) {
  314. foreach ( $this->teardownCallbacks as $callback ) {
  315. call_user_func( $callback, $status );
  316. }
  317. }
  318. public function toString() {
  319. $paramString = '';
  320. if ( $this->params ) {
  321. foreach ( $this->params as $key => $value ) {
  322. if ( $paramString != '' ) {
  323. $paramString .= ' ';
  324. }
  325. if ( is_array( $value ) ) {
  326. $filteredValue = [];
  327. foreach ( $value as $k => $v ) {
  328. $json = FormatJson::encode( $v );
  329. if ( $json === false || mb_strlen( $json ) > 512 ) {
  330. $filteredValue[$k] = gettype( $v ) . '(...)';
  331. } else {
  332. $filteredValue[$k] = $v;
  333. }
  334. }
  335. if ( count( $filteredValue ) <= 10 ) {
  336. $value = FormatJson::encode( $filteredValue );
  337. } else {
  338. $value = "array(" . count( $value ) . ")";
  339. }
  340. } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) {
  341. $value = "object(" . get_class( $value ) . ")";
  342. }
  343. $flatValue = (string)$value;
  344. if ( mb_strlen( $value ) > 1024 ) {
  345. $flatValue = "string(" . mb_strlen( $value ) . ")";
  346. }
  347. $paramString .= "$key={$flatValue}";
  348. }
  349. }
  350. $metaString = '';
  351. foreach ( $this->metadata as $key => $value ) {
  352. if ( is_scalar( $value ) && mb_strlen( $value ) < 1024 ) {
  353. $metaString .= ( $metaString ? ",$key=$value" : "$key=$value" );
  354. }
  355. }
  356. $s = $this->command;
  357. if ( is_object( $this->title ) ) {
  358. $s .= " {$this->title->getPrefixedDBkey()}";
  359. }
  360. if ( $paramString != '' ) {
  361. $s .= " $paramString";
  362. }
  363. if ( $metaString != '' ) {
  364. $s .= " ($metaString)";
  365. }
  366. return $s;
  367. }
  368. protected function setLastError( $error ) {
  369. $this->error = $error;
  370. }
  371. public function getLastError() {
  372. return $this->error;
  373. }
  374. }