runq.scm 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. ;;;; runq.scm --- the runq data structure
  2. ;;;;
  3. ;;;; Copyright (C) 1996, 2001, 2006, 2010 Free Software Foundation, Inc.
  4. ;;;;
  5. ;;;; This library is free software; you can redistribute it and/or
  6. ;;;; modify it under the terms of the GNU Lesser General Public
  7. ;;;; License as published by the Free Software Foundation; either
  8. ;;;; version 3 of the License, or (at your option) any later version.
  9. ;;;;
  10. ;;;; This library is distributed in the hope that it will be useful,
  11. ;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. ;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. ;;;; Lesser General Public License for more details.
  14. ;;;;
  15. ;;;; You should have received a copy of the GNU Lesser General Public
  16. ;;;; License along with this library; if not, write to the Free Software
  17. ;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. ;;;;
  19. ;;; Commentary:
  20. ;;; One way to schedule parallel computations in a serial environment is
  21. ;;; to explicitly divide each task up into small, finite execution time,
  22. ;;; strips. Then you interleave the execution of strips from various
  23. ;;; tasks to achieve a kind of parallelism. Runqs are a handy data
  24. ;;; structure for this style of programming.
  25. ;;;
  26. ;;; We use thunks (nullary procedures) and lists of thunks to represent
  27. ;;; strips. By convention, the return value of a strip-thunk must either
  28. ;;; be another strip or the value #f.
  29. ;;;
  30. ;;; A runq is a procedure that manages a queue of strips. Called with no
  31. ;;; arguments, it processes one strip from the queue. Called with
  32. ;;; arguments, the arguments form a control message for the queue. The
  33. ;;; first argument is a symbol which is the message selector.
  34. ;;;
  35. ;;; A strip is processed this way: If the strip is a thunk, the thunk is
  36. ;;; called -- if it returns a strip, that strip is added back to the
  37. ;;; queue. To process a strip which is a list of thunks, the CAR of that
  38. ;;; list is called. After a call to that CAR, there are 0, 1, or 2 strips
  39. ;;; -- perhaps one returned by the thunk, and perhaps the CDR of the
  40. ;;; original strip if that CDR is not nil. The runq puts whichever of
  41. ;;; these strips exist back on the queue. (The exact order in which
  42. ;;; strips are put back on the queue determines the scheduling behavior of
  43. ;;; a particular queue -- it's a parameter.)
  44. ;;; Code:
  45. (define-module (ice-9 runq)
  46. :use-module (ice-9 q)
  47. :export (runq-control make-void-runq make-fair-runq
  48. make-exclusive-runq make-subordinate-runq-to strip-sequence
  49. fair-strip-subtask))
  50. ;;;;
  51. ;;; (runq-control q msg . args)
  52. ;;;
  53. ;;; processes in the default way the control messages that
  54. ;;; can be sent to a runq. Q should be an ordinary
  55. ;;; Q (see utils/q.scm).
  56. ;;;
  57. ;;; The standard runq messages are:
  58. ;;;
  59. ;;; 'add! strip0 strip1... ;; to enqueue one or more strips
  60. ;;; 'enqueue! strip0 strip1... ;; to enqueue one or more strips
  61. ;;; 'push! strip0 ... ;; add strips to the front of the queue
  62. ;;; 'empty? ;; true if it is
  63. ;;; 'length ;; how many strips in the queue?
  64. ;;; 'kill! ;; empty the queue
  65. ;;; else ;; throw 'not-understood
  66. ;;;
  67. (define (runq-control q msg . args)
  68. (case msg
  69. ((add!) (for-each (lambda (t) (enq! q t)) args) '*unspecified*)
  70. ((enqueue!) (for-each (lambda (t) (enq! q t)) args) '*unspecified*)
  71. ((push!) (for-each (lambda (t) (q-push! q t)) args) '*unspecified*)
  72. ((empty?) (q-empty? q))
  73. ((length) (q-length q))
  74. ((kill!) (set! q (make-q)))
  75. (else (throw 'not-understood msg args))))
  76. (define (run-strip thunk) (catch #t thunk (lambda ign (warn 'runq-strip thunk ign) #f)))
  77. ;;;;
  78. ;;; make-void-runq
  79. ;;;
  80. ;;; Make a runq that discards all messages except "length", for which
  81. ;;; it returns 0.
  82. ;;;
  83. (define (make-void-runq)
  84. (lambda opts
  85. (and opts
  86. (apply-to-args opts
  87. (lambda (msg . args)
  88. (case msg
  89. ((length) 0)
  90. (else #f)))))))
  91. ;;;;
  92. ;;; (make-fair-runq)
  93. ;;;
  94. ;;; Returns a runq procedure.
  95. ;;; Called with no arguments, the procedure processes one strip from the queue.
  96. ;;; Called with arguments, it uses runq-control.
  97. ;;;
  98. ;;; In a fair runq, if a strip returns a new strip X, X is added
  99. ;;; to the end of the queue, meaning it will be the last to execute
  100. ;;; of all the remaining procedures.
  101. ;;;
  102. (define (make-fair-runq)
  103. (letrec ((q (make-q))
  104. (self
  105. (lambda ctl
  106. (if ctl
  107. (apply runq-control q ctl)
  108. (and (not (q-empty? q))
  109. (let ((next-strip (deq! q)))
  110. (cond
  111. ((procedure? next-strip) (let ((k (run-strip next-strip)))
  112. (and k (enq! q k))))
  113. ((pair? next-strip) (let ((k (run-strip (car next-strip))))
  114. (and k (enq! q k)))
  115. (if (not (null? (cdr next-strip)))
  116. (enq! q (cdr next-strip)))))
  117. self))))))
  118. self))
  119. ;;;;
  120. ;;; (make-exclusive-runq)
  121. ;;;
  122. ;;; Returns a runq procedure.
  123. ;;; Called with no arguments, the procedure processes one strip from the queue.
  124. ;;; Called with arguments, it uses runq-control.
  125. ;;;
  126. ;;; In an exclusive runq, if a strip W returns a new strip X, X is added
  127. ;;; to the front of the queue, meaning it will be the next to execute
  128. ;;; of all the remaining procedures.
  129. ;;;
  130. ;;; An exception to this occurs if W was the CAR of a list of strips.
  131. ;;; In that case, after the return value of W is pushed onto the front
  132. ;;; of the queue, the CDR of the list of strips is pushed in front
  133. ;;; of that (if the CDR is not nil). This way, the rest of the thunks
  134. ;;; in the list that contained W have priority over the return value of W.
  135. ;;;
  136. (define (make-exclusive-runq)
  137. (letrec ((q (make-q))
  138. (self
  139. (lambda ctl
  140. (if ctl
  141. (apply runq-control q ctl)
  142. (and (not (q-empty? q))
  143. (let ((next-strip (deq! q)))
  144. (cond
  145. ((procedure? next-strip) (let ((k (run-strip next-strip)))
  146. (and k (q-push! q k))))
  147. ((pair? next-strip) (let ((k (run-strip (car next-strip))))
  148. (and k (q-push! q k)))
  149. (if (not (null? (cdr next-strip)))
  150. (q-push! q (cdr next-strip)))))
  151. self))))))
  152. self))
  153. ;;;;
  154. ;;; (make-subordinate-runq-to superior basic-inferior)
  155. ;;;
  156. ;;; Returns a runq proxy for the runq basic-inferior.
  157. ;;;
  158. ;;; The proxy watches for operations on the basic-inferior that cause
  159. ;;; a transition from a queue length of 0 to a non-zero length and
  160. ;;; vice versa. While the basic-inferior queue is not empty,
  161. ;;; the proxy installs a task on the superior runq. Each strip
  162. ;;; of that task processes N strips from the basic-inferior where
  163. ;;; N is the length of the basic-inferior queue when the proxy
  164. ;;; strip is entered. [Countless scheduling variations are possible.]
  165. ;;;
  166. (define (make-subordinate-runq-to superior-runq basic-runq)
  167. (let ((runq-task (cons #f #f)))
  168. (set-car! runq-task
  169. (lambda ()
  170. (if (basic-runq 'empty?)
  171. (set-cdr! runq-task #f)
  172. (do ((n (basic-runq 'length) (1- n)))
  173. ((<= n 0) #f)
  174. (basic-runq)))))
  175. (letrec ((self
  176. (lambda ctl
  177. (if (not ctl)
  178. (let ((answer (basic-runq)))
  179. (self 'empty?)
  180. answer)
  181. (begin
  182. (case (car ctl)
  183. ((suspend) (set-cdr! runq-task #f))
  184. (else (let ((answer (apply basic-runq ctl)))
  185. (if (and (not (cdr runq-task)) (not (basic-runq 'empty?)))
  186. (begin
  187. (set-cdr! runq-task runq-task)
  188. (superior-runq 'add! runq-task)))
  189. answer))))))))
  190. self)))
  191. ;;;;
  192. ;;; (define fork-strips (lambda args args))
  193. ;;; Return a strip that starts several strips in
  194. ;;; parallel. If this strip is enqueued on a fair
  195. ;;; runq, strips of the parallel subtasks will run
  196. ;;; round-robin style.
  197. ;;;
  198. ;;;;
  199. ;;; (strip-sequence . strips)
  200. ;;;
  201. ;;; Returns a new strip which is the concatenation of the argument strips.
  202. ;;;
  203. (define (strip-sequence . strips)
  204. (lambda ()
  205. (let loop ((st (let ((a strips)) (set! strips #f) a)))
  206. (and (not (null? st))
  207. (let ((then ((car st))))
  208. (if then
  209. (lambda () (loop (cons then (cdr st))))
  210. (lambda () (loop (cdr st)))))))))
  211. ;;;;
  212. ;;; (fair-strip-subtask . initial-strips)
  213. ;;;
  214. ;;; Returns a new strip which is the synchronos, fair,
  215. ;;; parallel execution of the argument strips.
  216. ;;;
  217. ;;;
  218. ;;;
  219. (define (fair-strip-subtask . initial-strips)
  220. (let ((st (make-fair-runq)))
  221. (apply st 'add! initial-strips)
  222. st))
  223. ;;; runq.scm ends here