Queue_item.php 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. <?php
  2. // This file is part of GNU social - https://www.gnu.org/software/social
  3. //
  4. // GNU social is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // GNU social is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with GNU social. If not, see <http://www.gnu.org/licenses/>.
  16. /**
  17. * Table Definition for queue_item
  18. */
  19. defined('GNUSOCIAL') || die();
  20. class Queue_item extends Managed_DataObject
  21. {
  22. ###START_AUTOCODE
  23. /* the code below is auto generated do not remove the above tag */
  24. public $__table = 'queue_item'; // table name
  25. public $id; // int(4) primary_key not_null
  26. public $frame; // blob not_null
  27. public $transport; // varchar(32)
  28. public $created; // datetime()
  29. public $claimed; // datetime()
  30. /* the code above is auto generated do not remove the tag below */
  31. ###END_AUTOCODE
  32. public static function schemaDef()
  33. {
  34. return array(
  35. 'fields' => array(
  36. 'id' => array('type' => 'serial', 'not null' => true, 'description' => 'unique identifier'),
  37. 'frame' => array('type' => 'blob', 'not null' => true, 'description' => 'data: object reference or opaque string'),
  38. 'transport' => array('type' => 'varchar', 'length' => 32, 'not null' => true, 'description' => 'queue for what? "email", "xmpp", "sms", "irc", ...'),
  39. 'created' => array('type' => 'datetime', 'description' => 'date this record was created'),
  40. 'claimed' => array('type' => 'datetime', 'description' => 'date this item was claimed'),
  41. ),
  42. 'primary key' => array('id'),
  43. 'indexes' => array(
  44. 'queue_item_created_idx' => array('created'),
  45. ),
  46. );
  47. }
  48. /**
  49. * @param mixed $transports name of a single queue or array of queues to pull from
  50. * If not specified, checks all queues in the system.
  51. */
  52. public static function top($transports = null, array $ignored_transports = [])
  53. {
  54. $qi = new Queue_item();
  55. if ($transports) {
  56. if (is_array($transports)) {
  57. // @fixme use safer escaping
  58. $list = implode("','", array_map(array($qi, 'escape'), $transports));
  59. $qi->whereAdd("transport in ('$list')");
  60. } else {
  61. $qi->transport = $transports;
  62. }
  63. }
  64. if (!empty($ignored_transports)) {
  65. // @fixme use safer escaping
  66. $list = implode("','", array_map(array($qi, 'escape'), $ignored_transports));
  67. $qi->whereAdd("transport NOT IN ('$list')");
  68. }
  69. $qi->orderBy('created');
  70. $qi->whereAdd('claimed is null');
  71. $qi->limit(1);
  72. $cnt = $qi->find(true);
  73. if ($cnt) {
  74. // XXX: potential race condition
  75. // can we force it to only update if claimed is still null
  76. // (or old)?
  77. common_log(LOG_INFO, 'claiming queue item id = ' . $qi->getID() . ' for transport ' . $qi->transport);
  78. $orig = clone($qi);
  79. $qi->claimed = common_sql_now();
  80. $result = $qi->update($orig);
  81. if ($result) {
  82. common_log(LOG_DEBUG, 'claim succeeded.');
  83. return $qi;
  84. } else {
  85. common_log(LOG_ERR, 'claim of queue item id= ' . $qi->getID() . ' for transport ' . $qi->transport . ' failed.');
  86. }
  87. }
  88. $qi = null;
  89. return null;
  90. }
  91. /**
  92. * Release a claimed item.
  93. */
  94. public function releaseClaim()
  95. {
  96. // DB_DataObject doesn't let us save nulls right now
  97. $sql = sprintf("UPDATE queue_item SET claimed=NULL WHERE id=%d", $this->getID());
  98. $this->query($sql);
  99. $this->claimed = null;
  100. $this->encache();
  101. }
  102. }