futures.scm 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. ;;; -*- mode: scheme; coding: utf-8; -*-
  2. ;;;
  3. ;;; Copyright (C) 2010, 2011, 2012, 2013 Free Software Foundation, Inc.
  4. ;;;
  5. ;;; This library is free software; you can redistribute it and/or
  6. ;;; modify it under the terms of the GNU Lesser General Public
  7. ;;; License as published by the Free Software Foundation; either
  8. ;;; version 3 of the License, or (at your option) any later version.
  9. ;;;
  10. ;;; This library is distributed in the hope that it will be useful,
  11. ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. ;;; Lesser General Public License for more details.
  14. ;;;
  15. ;;; You should have received a copy of the GNU Lesser General Public
  16. ;;; License along with this library; if not, write to the Free Software
  17. ;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. (define-module (ice-9 futures)
  19. #:use-module (srfi srfi-1)
  20. #:use-module (srfi srfi-9)
  21. #:use-module (srfi srfi-9 gnu)
  22. #:use-module (srfi srfi-11)
  23. #:use-module (ice-9 q)
  24. #:use-module (ice-9 match)
  25. #:use-module (ice-9 control)
  26. #:export (future make-future future? touch))
  27. ;;; Author: Ludovic Courtès <ludo@gnu.org>
  28. ;;;
  29. ;;; Commentary:
  30. ;;;
  31. ;;; This module provides an implementation of futures, a mechanism for
  32. ;;; fine-grain parallelism. Futures were first described by Henry Baker
  33. ;;; in ``The Incremental Garbage Collection of Processes'', 1977, and
  34. ;;; then implemented in MultiLisp (an implicit variant thereof, i.e.,
  35. ;;; without `touch'.)
  36. ;;;
  37. ;;; This modules uses a fixed thread pool, normally one per CPU core.
  38. ;;; Futures are off-loaded to these threads, when they are idle.
  39. ;;;
  40. ;;; Code:
  41. ;;;
  42. ;;; Futures.
  43. ;;;
  44. (define-record-type <future>
  45. (%make-future thunk state mutex completion)
  46. future?
  47. (thunk future-thunk set-future-thunk!)
  48. (state future-state set-future-state!) ; done | started | queued
  49. (result future-result set-future-result!)
  50. (mutex future-mutex)
  51. (completion future-completion)) ; completion cond. var.
  52. (set-record-type-printer!
  53. <future>
  54. (lambda (future port)
  55. (simple-format port "#<future ~a ~a ~s>"
  56. (number->string (object-address future) 16)
  57. (future-state future)
  58. (future-thunk future))))
  59. (define (make-future thunk)
  60. "Return a new future for THUNK. Execution may start at any point
  61. concurrently, or it can start at the time when the returned future is
  62. touched."
  63. (create-workers!)
  64. (let ((future (%make-future thunk 'queued
  65. (make-mutex) (make-condition-variable))))
  66. (register-future! future)
  67. future))
  68. ;;;
  69. ;;; Future queues.
  70. ;;;
  71. ;; Global queue of pending futures.
  72. ;; TODO: Use per-worker queues to reduce contention.
  73. (define %futures (make-q))
  74. ;; Lock for %FUTURES and %FUTURES-WAITING.
  75. (define %futures-mutex (make-mutex))
  76. (define %futures-available (make-condition-variable))
  77. ;; A mapping of nested futures to futures waiting for them to complete.
  78. (define %futures-waiting '())
  79. ;; Nesting level of futures. Incremented each time a future is touched
  80. ;; from within a future.
  81. (define %nesting-level (make-parameter 0))
  82. ;; Maximum nesting level. The point is to avoid stack overflows when
  83. ;; nested futures are executed on the same stack. See
  84. ;; <http://bugs.gnu.org/13188>.
  85. (define %max-nesting-level 200)
  86. (define-syntax-rule (with-mutex m e0 e1 ...)
  87. ;; Copied from (ice-9 threads) to avoid circular dependency.
  88. (let ((x m))
  89. (dynamic-wind
  90. (lambda () (lock-mutex x))
  91. (lambda () (begin e0 e1 ...))
  92. (lambda () (unlock-mutex x)))))
  93. (define %future-prompt
  94. ;; The prompt futures abort to when they want to wait for another
  95. ;; future.
  96. (make-prompt-tag))
  97. (define (register-future! future)
  98. ;; Register FUTURE as being processable.
  99. (lock-mutex %futures-mutex)
  100. (enq! %futures future)
  101. (signal-condition-variable %futures-available)
  102. (unlock-mutex %futures-mutex))
  103. (define (process-future! future)
  104. "Process FUTURE. When FUTURE completes, return #t and update its
  105. result; otherwise, when FUTURE touches a nested future that has not
  106. completed yet, then suspend it and return #f. Suspending a future
  107. consists in capturing its continuation, marking it as `queued', and
  108. adding it to the waiter queue."
  109. (let/ec return
  110. (let* ((suspend
  111. (lambda (cont future-to-wait)
  112. ;; FUTURE wishes to wait for the completion of FUTURE-TO-WAIT.
  113. ;; At this point, FUTURE is unlocked and in `started' state,
  114. ;; and FUTURE-TO-WAIT is unlocked.
  115. (with-mutex %futures-mutex
  116. (with-mutex (future-mutex future)
  117. (set-future-thunk! future cont)
  118. (set-future-state! future 'queued))
  119. (with-mutex (future-mutex future-to-wait)
  120. ;; If FUTURE-TO-WAIT completed in the meantime, then
  121. ;; reschedule FUTURE directly; otherwise, add it to the
  122. ;; waiter queue.
  123. (if (eq? 'done (future-state future-to-wait))
  124. (begin
  125. (enq! %futures future)
  126. (signal-condition-variable %futures-available))
  127. (set! %futures-waiting
  128. (alist-cons future-to-wait future
  129. %futures-waiting))))
  130. (return #f))))
  131. (thunk (lambda ()
  132. (call-with-prompt %future-prompt
  133. (lambda ()
  134. (parameterize ((%nesting-level
  135. (1+ (%nesting-level))))
  136. ((future-thunk future))))
  137. suspend))))
  138. (set-future-result! future
  139. (catch #t
  140. (lambda ()
  141. (call-with-values thunk
  142. (lambda results
  143. (lambda ()
  144. (apply values results)))))
  145. (lambda args
  146. (lambda ()
  147. (apply throw args)))))
  148. #t)))
  149. (define (process-one-future)
  150. "Attempt to pick one future from the queue and process it."
  151. ;; %FUTURES-MUTEX must be locked on entry, and is locked on exit.
  152. (or (q-empty? %futures)
  153. (let ((future (deq! %futures)))
  154. (lock-mutex (future-mutex future))
  155. (case (future-state future)
  156. ((done started)
  157. ;; Nothing to do.
  158. (unlock-mutex (future-mutex future)))
  159. (else
  160. ;; Do the actual work.
  161. ;; We want to release %FUTURES-MUTEX so that other workers can
  162. ;; progress. However, to avoid deadlocks, we have to unlock
  163. ;; FUTURE as well, to preserve lock ordering.
  164. (unlock-mutex (future-mutex future))
  165. (unlock-mutex %futures-mutex)
  166. (lock-mutex (future-mutex future))
  167. (if (eq? (future-state future) 'queued) ; lost the race?
  168. (begin ; no, so let's process it
  169. (set-future-state! future 'started)
  170. (unlock-mutex (future-mutex future))
  171. (let ((done? (process-future! future)))
  172. (when done?
  173. (with-mutex %futures-mutex
  174. (with-mutex (future-mutex future)
  175. (set-future-state! future 'done)
  176. (notify-completion future))))))
  177. (unlock-mutex (future-mutex future))) ; yes
  178. (lock-mutex %futures-mutex))))))
  179. (define (process-futures)
  180. "Continuously process futures from the queue."
  181. (lock-mutex %futures-mutex)
  182. (let loop ()
  183. (when (q-empty? %futures)
  184. (wait-condition-variable %futures-available
  185. %futures-mutex))
  186. (process-one-future)
  187. (loop)))
  188. (define (notify-completion future)
  189. "Notify futures and callers waiting that FUTURE completed."
  190. ;; FUTURE and %FUTURES-MUTEX are locked.
  191. (broadcast-condition-variable (future-completion future))
  192. (let-values (((waiting remaining)
  193. (partition (match-lambda ; TODO: optimize
  194. ((waitee . _)
  195. (eq? waitee future)))
  196. %futures-waiting)))
  197. (set! %futures-waiting remaining)
  198. (for-each (match-lambda
  199. ((_ . waiter)
  200. (enq! %futures waiter)))
  201. waiting)))
  202. (define (touch future)
  203. "Return the result of FUTURE, computing it if not already done."
  204. (define (work)
  205. ;; Do some work while waiting for FUTURE to complete.
  206. (lock-mutex %futures-mutex)
  207. (if (q-empty? %futures)
  208. (begin
  209. (unlock-mutex %futures-mutex)
  210. (with-mutex (future-mutex future)
  211. (unless (eq? 'done (future-state future))
  212. (wait-condition-variable (future-completion future)
  213. (future-mutex future)))))
  214. (begin
  215. (process-one-future)
  216. (unlock-mutex %futures-mutex))))
  217. (let loop ()
  218. (lock-mutex (future-mutex future))
  219. (case (future-state future)
  220. ((done)
  221. (unlock-mutex (future-mutex future)))
  222. ((started)
  223. (unlock-mutex (future-mutex future))
  224. (if (> (%nesting-level) 0)
  225. (abort-to-prompt %future-prompt future)
  226. (begin
  227. (work)
  228. (loop))))
  229. (else ; queued
  230. (unlock-mutex (future-mutex future))
  231. (if (> (%nesting-level) %max-nesting-level)
  232. (abort-to-prompt %future-prompt future)
  233. (work))
  234. (loop))))
  235. ((future-result future)))
  236. ;;;
  237. ;;; Workers.
  238. ;;;
  239. (define %worker-count
  240. (if (provided? 'threads)
  241. (- (current-processor-count) 1)
  242. 0))
  243. ;; A dock of workers that stay here forever.
  244. ;; TODO
  245. ;; 1. Allow the pool to be shrunk, as in libgomp (though that we'd
  246. ;; need semaphores, which aren't yet in libguile!).
  247. ;; 2. Provide a `worker-count' fluid.
  248. (define %workers '())
  249. (define (%create-workers!)
  250. (with-mutex
  251. %futures-mutex
  252. ;; Setting 'create-workers!' to a no-op is an optimization, but it is
  253. ;; still possible for '%create-workers!' to be called more than once
  254. ;; from different threads. Therefore, to avoid creating %workers more
  255. ;; than once (and thus creating too many threads), we check to make
  256. ;; sure %workers is empty within the critical section.
  257. (when (null? %workers)
  258. (set! %workers
  259. (unfold (lambda (i) (>= i %worker-count))
  260. (lambda (i) (call-with-new-thread process-futures))
  261. 1+
  262. 0))
  263. (set! create-workers! (lambda () #t)))))
  264. (define create-workers!
  265. (lambda () (%create-workers!)))
  266. ;;;
  267. ;;; Syntax.
  268. ;;;
  269. (define-syntax-rule (future body)
  270. "Return a new future for BODY."
  271. (make-future (lambda () body)))
  272. ;;; Local Variables:
  273. ;;; eval: (put 'with-mutex 'scheme-indent-function 1)
  274. ;;; End: