123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- <?php
- /**
- * Job queue task base code.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- * @defgroup JobQueue JobQueue
- */
- /**
- * Class to both describe a background job and handle jobs.
- * To push jobs onto queues, use JobQueueGroup::singleton()->push();
- *
- * @ingroup JobQueue
- */
- abstract class Job implements RunnableJob {
- /** @var string */
- public $command;
- /** @var array Array of job parameters */
- public $params;
- /** @var array Additional queue metadata */
- public $metadata = [];
- /** @var Title */
- protected $title;
- /** @var bool Expensive jobs may set this to true */
- protected $removeDuplicates = false;
- /** @var string Text for error that occurred last */
- protected $error;
- /** @var callable[] */
- protected $teardownCallbacks = [];
- /** @var int Bitfield of JOB_* class constants */
- protected $executionFlags = 0;
- /**
- * Create the appropriate object to handle a specific job
- *
- * @param string $command Job command
- * @param array|Title $params Job parameters
- * @throws InvalidArgumentException
- * @return Job
- */
- public static function factory( $command, $params = [] ) {
- global $wgJobClasses;
- if ( $params instanceof Title ) {
- // Backwards compatibility for old signature ($command, $title, $params)
- $title = $params;
- $params = func_num_args() >= 3 ? func_get_arg( 2 ) : [];
- } elseif ( isset( $params['namespace'] ) && isset( $params['title'] ) ) {
- // Handle job classes that take title as constructor parameter.
- // If a newer classes like GenericParameterJob uses these parameters,
- // then this happens in Job::__construct instead.
- $title = Title::makeTitle( $params['namespace'], $params['title'] );
- } else {
- // Default title for job classes not implementing GenericParameterJob.
- // This must be a valid title because it not directly passed to
- // our Job constructor, but rather it's subclasses which may expect
- // to be able to use it.
- $title = Title::makeTitle( NS_SPECIAL, 'Blankpage' );
- }
- if ( isset( $wgJobClasses[$command] ) ) {
- $handler = $wgJobClasses[$command];
- if ( is_callable( $handler ) ) {
- $job = call_user_func( $handler, $title, $params );
- } elseif ( class_exists( $handler ) ) {
- if ( is_subclass_of( $handler, GenericParameterJob::class ) ) {
- $job = new $handler( $params );
- } else {
- $job = new $handler( $title, $params );
- }
- } else {
- $job = null;
- }
- if ( $job instanceof Job ) {
- $job->command = $command;
- return $job;
- } else {
- throw new InvalidArgumentException(
- "Could not instantiate job '$command': bad spec!"
- );
- }
- }
- throw new InvalidArgumentException( "Invalid job command '{$command}'" );
- }
- /**
- * @param string $command
- * @param array|Title|null $params
- */
- public function __construct( $command, $params = null ) {
- if ( $params instanceof Title ) {
- // Backwards compatibility for old signature ($command, $title, $params)
- $title = $params;
- $params = func_num_args() >= 3 ? func_get_arg( 2 ) : [];
- } else {
- // Newer jobs may choose to not have a top-level title (e.g. GenericParameterJob)
- $title = null;
- }
- if ( !is_array( $params ) ) {
- throw new InvalidArgumentException( '$params must be an array' );
- }
- if (
- $title &&
- !isset( $params['namespace'] ) &&
- !isset( $params['title'] )
- ) {
- // When constructing this class for submitting to the queue,
- // normalise the $title arg of old job classes as part of $params.
- $params['namespace'] = $title->getNamespace();
- $params['title'] = $title->getDBkey();
- }
- $this->command = $command;
- $this->params = $params + [ 'requestId' => WebRequest::getRequestId() ];
- if ( $this->title === null ) {
- // Set this field for access via getTitle().
- $this->title = ( isset( $params['namespace'] ) && isset( $params['title'] ) )
- ? Title::makeTitle( $params['namespace'], $params['title'] )
- // GenericParameterJob classes without namespace/title params
- // should not use getTitle(). Set an invalid title as placeholder.
- : Title::makeTitle( NS_SPECIAL, '' );
- }
- }
- public function hasExecutionFlag( $flag ) {
- return ( $this->executionFlags & $flag ) === $flag;
- }
- /**
- * @return string
- */
- public function getType() {
- return $this->command;
- }
- /**
- * @return Title
- */
- final public function getTitle() {
- return $this->title;
- }
- /**
- * @return array
- */
- public function getParams() {
- return $this->params;
- }
- /**
- * @param string|null $field Metadata field or null to get all the metadata
- * @return mixed|null Value; null if missing
- * @since 1.33
- */
- public function getMetadata( $field = null ) {
- if ( $field === null ) {
- return $this->metadata;
- }
- return $this->metadata[$field] ?? null;
- }
- /**
- * @param string $field Key name to set the value for
- * @param mixed $value The value to set the field for
- * @return mixed|null The prior field value; null if missing
- * @since 1.33
- */
- public function setMetadata( $field, $value ) {
- $old = $this->getMetadata( $field );
- if ( $value === null ) {
- unset( $this->metadata[$field] );
- } else {
- $this->metadata[$field] = $value;
- }
- return $old;
- }
- /**
- * @return int|null UNIX timestamp to delay running this job until, otherwise null
- * @since 1.22
- */
- public function getReleaseTimestamp() {
- return isset( $this->params['jobReleaseTimestamp'] )
- ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] )
- : null;
- }
- /**
- * @return int|null UNIX timestamp of when the job was queued, or null
- * @since 1.26
- */
- public function getQueuedTimestamp() {
- return isset( $this->metadata['timestamp'] )
- ? wfTimestampOrNull( TS_UNIX, $this->metadata['timestamp'] )
- : null;
- }
- public function getRequestId() {
- return $this->params['requestId'] ?? null;
- }
- public function getReadyTimestamp() {
- return $this->getReleaseTimestamp() ?: $this->getQueuedTimestamp();
- }
- /**
- * Whether the queue should reject insertion of this job if a duplicate exists
- *
- * This can be used to avoid duplicated effort or combined with delayed jobs to
- * coalesce updates into larger batches. Claimed jobs are never treated as
- * duplicates of new jobs, and some queues may allow a few duplicates due to
- * network partitions and fail-over. Thus, additional locking is needed to
- * enforce mutual exclusion if this is really needed.
- *
- * @return bool
- */
- public function ignoreDuplicates() {
- return $this->removeDuplicates;
- }
- public function allowRetries() {
- return true;
- }
- public function workItemCount() {
- return 1;
- }
- /**
- * Subclasses may need to override this to make duplication detection work.
- * The resulting map conveys everything that makes the job unique. This is
- * only checked if ignoreDuplicates() returns true, meaning that duplicate
- * jobs are supposed to be ignored.
- *
- * @return array Map of key/values
- * @since 1.21
- */
- public function getDeduplicationInfo() {
- $info = [
- 'type' => $this->getType(),
- 'params' => $this->getParams()
- ];
- if ( is_array( $info['params'] ) ) {
- // Identical jobs with different "root" jobs should count as duplicates
- unset( $info['params']['rootJobSignature'] );
- unset( $info['params']['rootJobTimestamp'] );
- // Likewise for jobs with different delay times
- unset( $info['params']['jobReleaseTimestamp'] );
- // Identical jobs from different requests should count as duplicates
- unset( $info['params']['requestId'] );
- // Queues pack and hash this array, so normalize the order
- ksort( $info['params'] );
- }
- return $info;
- }
- /**
- * Get "root job" parameters for a task
- *
- * This is used to no-op redundant jobs, including child jobs of jobs,
- * as long as the children inherit the root job parameters. When a job
- * with root job parameters and "rootJobIsSelf" set is pushed, the
- * deduplicateRootJob() method is automatically called on it. If the
- * root job is only virtual and not actually pushed (e.g. the sub-jobs
- * are inserted directly), then call deduplicateRootJob() directly.
- *
- * @see JobQueue::deduplicateRootJob()
- *
- * @param string $key A key that identifies the task
- * @return array Map of:
- * - rootJobIsSelf : true
- * - rootJobSignature : hash (e.g. SHA1) that identifies the task
- * - rootJobTimestamp : TS_MW timestamp of this instance of the task
- * @since 1.21
- */
- public static function newRootJobParams( $key ) {
- return [
- 'rootJobIsSelf' => true,
- 'rootJobSignature' => sha1( $key ),
- 'rootJobTimestamp' => wfTimestampNow()
- ];
- }
- /**
- * @see JobQueue::deduplicateRootJob()
- * @return array
- * @since 1.21
- */
- public function getRootJobParams() {
- return [
- 'rootJobSignature' => $this->params['rootJobSignature'] ?? null,
- 'rootJobTimestamp' => $this->params['rootJobTimestamp'] ?? null
- ];
- }
- /**
- * @see JobQueue::deduplicateRootJob()
- * @return bool
- * @since 1.22
- */
- public function hasRootJobParams() {
- return isset( $this->params['rootJobSignature'] )
- && isset( $this->params['rootJobTimestamp'] );
- }
- /**
- * @see JobQueue::deduplicateRootJob()
- * @return bool Whether this is job is a root job
- */
- public function isRootJob() {
- return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] );
- }
- /**
- * @param callable $callback A function with one parameter, the success status, which will be
- * false if the job failed or it succeeded but the DB changes could not be committed or
- * any deferred updates threw an exception. (This parameter was added in 1.28.)
- * @since 1.27
- */
- protected function addTeardownCallback( $callback ) {
- $this->teardownCallbacks[] = $callback;
- }
- public function teardown( $status ) {
- foreach ( $this->teardownCallbacks as $callback ) {
- call_user_func( $callback, $status );
- }
- }
- public function toString() {
- $paramString = '';
- if ( $this->params ) {
- foreach ( $this->params as $key => $value ) {
- if ( $paramString != '' ) {
- $paramString .= ' ';
- }
- if ( is_array( $value ) ) {
- $filteredValue = [];
- foreach ( $value as $k => $v ) {
- $json = FormatJson::encode( $v );
- if ( $json === false || mb_strlen( $json ) > 512 ) {
- $filteredValue[$k] = gettype( $v ) . '(...)';
- } else {
- $filteredValue[$k] = $v;
- }
- }
- if ( count( $filteredValue ) <= 10 ) {
- $value = FormatJson::encode( $filteredValue );
- } else {
- $value = "array(" . count( $value ) . ")";
- }
- } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) {
- $value = "object(" . get_class( $value ) . ")";
- }
- $flatValue = (string)$value;
- if ( mb_strlen( $value ) > 1024 ) {
- $flatValue = "string(" . mb_strlen( $value ) . ")";
- }
- $paramString .= "$key={$flatValue}";
- }
- }
- $metaString = '';
- foreach ( $this->metadata as $key => $value ) {
- if ( is_scalar( $value ) && mb_strlen( $value ) < 1024 ) {
- $metaString .= ( $metaString ? ",$key=$value" : "$key=$value" );
- }
- }
- $s = $this->command;
- if ( is_object( $this->title ) ) {
- $s .= " {$this->title->getPrefixedDBkey()}";
- }
- if ( $paramString != '' ) {
- $s .= " $paramString";
- }
- if ( $metaString != '' ) {
- $s .= " ($metaString)";
- }
- return $s;
- }
- protected function setLastError( $error ) {
- $this->error = $error;
- }
- public function getLastError() {
- return $this->error;
- }
- }
|