threadpool.nim 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. {.deprecated: "use the nimble packages `malebolgia`, `taskpools` or `weave` instead".}
  10. ## Implements Nim's `parallel & spawn statements <manual_experimental.html#parallel-amp-spawn>`_.
  11. ##
  12. ## Unstable API.
  13. ##
  14. ## See also
  15. ## ========
  16. ## * `threads module <typedthreads.html>`_ for basic thread support
  17. ## * `locks module <locks.html>`_ for locks and condition variables
  18. ## * `asyncdispatch module <asyncdispatch.html>`_ for asynchronous IO
  19. when not compileOption("threads"):
  20. {.error: "Threadpool requires --threads:on option.".}
  21. import std/[cpuinfo, cpuload, locks, os]
  22. when defined(nimPreviewSlimSystem):
  23. import std/[assertions, typedthreads, sysatomics]
  24. {.push stackTrace:off.}
  25. type
  26. Semaphore = object
  27. c: Cond
  28. L: Lock
  29. counter: int
  30. proc initSemaphore(cv: var Semaphore) =
  31. initCond(cv.c)
  32. initLock(cv.L)
  33. proc destroySemaphore(cv: var Semaphore) {.inline.} =
  34. deinitCond(cv.c)
  35. deinitLock(cv.L)
  36. proc blockUntil(cv: var Semaphore) =
  37. acquire(cv.L)
  38. while cv.counter <= 0:
  39. wait(cv.c, cv.L)
  40. dec cv.counter
  41. release(cv.L)
  42. proc signal(cv: var Semaphore) =
  43. acquire(cv.L)
  44. inc cv.counter
  45. release(cv.L)
  46. signal(cv.c)
  47. const CacheLineSize = 64 # true for most archs
  48. type
  49. Barrier {.compilerproc.} = object
  50. entered: int
  51. cv: Semaphore # Semaphore takes 3 words at least
  52. left {.align(CacheLineSize).}: int
  53. interest {.align(CacheLineSize).} : bool # whether the master is interested in the "all done" event
  54. proc barrierEnter(b: ptr Barrier) {.compilerproc, inline.} =
  55. # due to the signaling between threads, it is ensured we are the only
  56. # one with access to 'entered' so we don't need 'atomicInc' here:
  57. inc b.entered
  58. # also we need no 'fence' instructions here as soon 'nimArgsPassingDone'
  59. # will be called which already will perform a fence for us.
  60. proc barrierLeave(b: ptr Barrier) {.compilerproc, inline.} =
  61. atomicInc b.left
  62. when not defined(x86): fence()
  63. # We may not have seen the final value of b.entered yet,
  64. # so we need to check for >= instead of ==.
  65. if b.interest and b.left >= b.entered: signal(b.cv)
  66. proc openBarrier(b: ptr Barrier) {.compilerproc, inline.} =
  67. b.entered = 0
  68. b.left = 0
  69. b.interest = false
  70. proc closeBarrier(b: ptr Barrier) {.compilerproc.} =
  71. fence()
  72. if b.left != b.entered:
  73. b.cv.initSemaphore()
  74. fence()
  75. b.interest = true
  76. fence()
  77. while b.left != b.entered: blockUntil(b.cv)
  78. destroySemaphore(b.cv)
  79. {.pop.}
  80. # ----------------------------------------------------------------------------
  81. type
  82. AwaitInfo = object
  83. cv: Semaphore
  84. idx: int
  85. FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_.
  86. FlowVarBaseObj {.acyclic.} = object of RootObj
  87. ready, usesSemaphore, awaited: bool
  88. cv: Semaphore # for 'blockUntilAny' support
  89. ai: ptr AwaitInfo
  90. idx: int
  91. data: pointer # we incRef and unref it to keep it alive; note this MUST NOT
  92. # be RootRef here otherwise the wrong GC keeps track of it!
  93. owner: pointer # ptr Worker
  94. FlowVarObj[T] {.acyclic.} = object of FlowVarBaseObj
  95. blob: T
  96. FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable.
  97. ToFreeQueue = object
  98. len: int
  99. lock: Lock
  100. empty: Semaphore
  101. data: array[128, pointer]
  102. WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
  103. Worker = object
  104. taskArrived: Semaphore
  105. taskStarted: Semaphore #\
  106. # task data:
  107. f: WorkerProc
  108. data: pointer
  109. ready: bool # put it here for correct alignment!
  110. initialized: bool # whether it has even been initialized
  111. shutdown: bool # the pool requests to shut down this worker thread
  112. q: ToFreeQueue
  113. readyForTask: Semaphore
  114. const threadpoolWaitMs {.intdefine.}: int = 100
  115. proc blockUntil*(fv: var FlowVarBaseObj) =
  116. ## Waits until the value for `fv` arrives.
  117. ##
  118. ## Usually it is not necessary to call this explicitly.
  119. if fv.usesSemaphore and not fv.awaited:
  120. fv.awaited = true
  121. blockUntil(fv.cv)
  122. destroySemaphore(fv.cv)
  123. proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
  124. if cas(addr w.ready, true, false):
  125. w.data = data
  126. w.f = fn
  127. signal(w.taskArrived)
  128. blockUntil(w.taskStarted)
  129. result = true
  130. proc cleanFlowVars(w: ptr Worker) =
  131. let q = addr(w.q)
  132. acquire(q.lock)
  133. for i in 0 ..< q.len:
  134. GC_unref(cast[RootRef](q.data[i]))
  135. #echo "GC_unref"
  136. q.len = 0
  137. release(q.lock)
  138. proc wakeupWorkerToProcessQueue(w: ptr Worker) =
  139. # we have to ensure it's us who wakes up the owning thread.
  140. # This is quite horrible code, but it runs so rarely that it doesn't matter:
  141. while not cas(addr w.ready, true, false):
  142. cpuRelax()
  143. discard
  144. w.data = nil
  145. w.f = proc (w, a: pointer) {.nimcall.} =
  146. let w = cast[ptr Worker](w)
  147. cleanFlowVars(w)
  148. signal(w.q.empty)
  149. signal(w.taskArrived)
  150. proc attach(fv: FlowVarBase; i: int): bool =
  151. acquire(fv.cv.L)
  152. if fv.cv.counter <= 0:
  153. fv.idx = i
  154. result = true
  155. else:
  156. result = false
  157. release(fv.cv.L)
  158. proc finished(fv: var FlowVarBaseObj) =
  159. doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
  160. # we have to protect against the rare cases where the owner of the flowVar
  161. # simply disregards the flowVar and yet the "flowVar" has not yet written
  162. # anything to it:
  163. blockUntil(fv)
  164. if fv.data.isNil: return
  165. let owner = cast[ptr Worker](fv.owner)
  166. let q = addr(owner.q)
  167. acquire(q.lock)
  168. while not (q.len < q.data.len):
  169. #echo "EXHAUSTED!"
  170. release(q.lock)
  171. wakeupWorkerToProcessQueue(owner)
  172. blockUntil(q.empty)
  173. acquire(q.lock)
  174. q.data[q.len] = cast[pointer](fv.data)
  175. inc q.len
  176. release(q.lock)
  177. fv.data = nil
  178. # the worker thread waits for "data" to be set to nil before shutting down
  179. owner.data = nil
  180. proc `=destroy`[T](fv: var FlowVarObj[T]) =
  181. finished(fv)
  182. `=destroy`(fv.blob)
  183. proc nimCreateFlowVar[T](): FlowVar[T] {.compilerproc.} =
  184. new(result)
  185. proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerproc.} =
  186. fv.cv.initSemaphore()
  187. fv.usesSemaphore = true
  188. proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} =
  189. if fv.ai != nil:
  190. acquire(fv.ai.cv.L)
  191. fv.ai.idx = fv.idx
  192. inc fv.ai.cv.counter
  193. release(fv.ai.cv.L)
  194. signal(fv.ai.cv.c)
  195. if fv.usesSemaphore:
  196. signal(fv.cv)
  197. proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
  198. ## Blocks until `fv` is available and then passes its value
  199. ## to `action`.
  200. ##
  201. ## Note that due to Nim's parameter passing semantics, this
  202. ## means that `T` doesn't need to be copied, so `awaitAndThen` can
  203. ## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_.
  204. blockUntil(fv[])
  205. when defined(nimV2):
  206. action(fv.blob)
  207. elif T is string or T is seq:
  208. action(cast[T](fv.data))
  209. elif T is ref:
  210. {.error: "'awaitAndThen' not available for FlowVar[ref]".}
  211. else:
  212. action(fv.blob)
  213. finished(fv[])
  214. proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
  215. ## Blocks until the value is available and then returns this value.
  216. blockUntil(fv[])
  217. when defined(nimV2):
  218. result = cast[ptr T](fv.blob)
  219. else:
  220. result = cast[ptr T](fv.data)
  221. finished(fv[])
  222. proc `^`*[T](fv: FlowVar[T]): T =
  223. ## Blocks until the value is available and then returns this value.
  224. blockUntil(fv[])
  225. when not defined(nimV2) and (T is string or T is seq or T is ref):
  226. deepCopy result, cast[T](fv.data)
  227. else:
  228. result = fv.blob
  229. finished(fv[])
  230. proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
  231. ## Awaits any of the given `flowVars`. Returns the index of one `flowVar`
  232. ## for which a value arrived.
  233. ##
  234. ## A `flowVar` only supports one call to `blockUntilAny` at the same time.
  235. ## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])`
  236. ## the second call will only block until `c`. If there is no `flowVar` left
  237. ## to be able to wait on, -1 is returned.
  238. ##
  239. ## **Note:** This results in non-deterministic behaviour and should be avoided.
  240. var ai: AwaitInfo
  241. ai.cv.initSemaphore()
  242. var conflicts = 0
  243. result = -1
  244. for i in 0 .. flowVars.high:
  245. if cas(addr flowVars[i].ai, nil, addr ai):
  246. if not attach(flowVars[i], i):
  247. result = i
  248. break
  249. else:
  250. inc conflicts
  251. if conflicts < flowVars.len:
  252. if result < 0:
  253. blockUntil(ai.cv)
  254. result = ai.idx
  255. for i in 0 .. flowVars.high:
  256. discard cas(addr flowVars[i].ai, addr ai, nil)
  257. destroySemaphore(ai.cv)
  258. proc isReady*(fv: FlowVarBase): bool =
  259. ## Determines whether the specified `FlowVarBase`'s value is available.
  260. ##
  261. ## If `true`, awaiting `fv` will not block.
  262. if fv.usesSemaphore and not fv.awaited:
  263. acquire(fv.cv.L)
  264. result = fv.cv.counter > 0
  265. release(fv.cv.L)
  266. else:
  267. result = true
  268. proc nimArgsPassingDone(p: pointer) {.compilerproc.} =
  269. let w = cast[ptr Worker](p)
  270. signal(w.taskStarted)
  271. const
  272. MaxThreadPoolSize* {.intdefine.} = 256 ## Maximum size of the thread pool. 256 threads
  273. ## should be good enough for anybody ;-)
  274. MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads.
  275. type
  276. ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier.
  277. var
  278. currentPoolSize: int
  279. maxPoolSize = MaxThreadPoolSize
  280. minPoolSize = 4
  281. gSomeReady: Semaphore
  282. readyWorker: ptr Worker
  283. # A workaround for recursion deadlock issue
  284. # https://github.com/nim-lang/Nim/issues/4597
  285. var
  286. numSlavesLock: Lock
  287. numSlavesRunning {.guard: numSlavesLock.}: int
  288. numSlavesWaiting {.guard: numSlavesLock.}: int
  289. isSlave {.threadvar.}: bool
  290. numSlavesLock.initLock
  291. gSomeReady.initSemaphore()
  292. proc slave(w: ptr Worker) {.thread.} =
  293. isSlave = true
  294. while true:
  295. if w.shutdown:
  296. w.shutdown = false
  297. atomicDec currentPoolSize
  298. while true:
  299. if w.data != nil:
  300. sleep(threadpoolWaitMs)
  301. else:
  302. # The flowvar finalizer ("finished()") set w.data to nil, so we can
  303. # safely terminate the thread.
  304. #
  305. # TODO: look for scenarios in which the flowvar is never finalized, so
  306. # a shut down thread gets stuck in this loop until the main thread exits.
  307. break
  308. break
  309. when declared(atomicStoreN):
  310. atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
  311. else:
  312. w.ready = true
  313. readyWorker = w
  314. signal(gSomeReady)
  315. blockUntil(w.taskArrived)
  316. # XXX Somebody needs to look into this (why does this assertion fail
  317. # in Visual Studio?)
  318. when not defined(vcc) and not defined(tcc): assert(not w.ready)
  319. withLock numSlavesLock:
  320. inc numSlavesRunning
  321. w.f(w, w.data)
  322. withLock numSlavesLock:
  323. dec numSlavesRunning
  324. if w.q.len != 0: w.cleanFlowVars
  325. proc distinguishedSlave(w: ptr Worker) {.thread.} =
  326. while true:
  327. when declared(atomicStoreN):
  328. atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
  329. else:
  330. w.ready = true
  331. signal(w.readyForTask)
  332. blockUntil(w.taskArrived)
  333. assert(not w.ready)
  334. w.f(w, w.data)
  335. if w.q.len != 0: w.cleanFlowVars
  336. var
  337. workers: array[MaxThreadPoolSize, Thread[ptr Worker]]
  338. workersData: array[MaxThreadPoolSize, Worker]
  339. distinguished: array[MaxDistinguishedThread, Thread[ptr Worker]]
  340. distinguishedData: array[MaxDistinguishedThread, Worker]
  341. when defined(nimPinToCpu):
  342. var gCpus: Natural
  343. proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
  344. ## Sets the minimum thread pool size. The default value of this is 4.
  345. minPoolSize = size
  346. proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
  347. ## Sets the maximum thread pool size. The default value of this
  348. ## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_.
  349. maxPoolSize = size
  350. if currentPoolSize > maxPoolSize:
  351. for i in maxPoolSize..currentPoolSize-1:
  352. let w = addr(workersData[i])
  353. w.shutdown = true
  354. when defined(nimRecursiveSpawn):
  355. var localThreadId {.threadvar.}: int
  356. proc activateWorkerThread(i: int) {.noinline.} =
  357. workersData[i].taskArrived.initSemaphore()
  358. workersData[i].taskStarted.initSemaphore()
  359. workersData[i].initialized = true
  360. workersData[i].q.empty.initSemaphore()
  361. initLock(workersData[i].q.lock)
  362. createThread(workers[i], slave, addr(workersData[i]))
  363. when defined(nimRecursiveSpawn):
  364. localThreadId = i+1
  365. when defined(nimPinToCpu):
  366. if gCpus > 0: pinToCpu(workers[i], i mod gCpus)
  367. proc activateDistinguishedThread(i: int) {.noinline.} =
  368. distinguishedData[i].taskArrived.initSemaphore()
  369. distinguishedData[i].taskStarted.initSemaphore()
  370. distinguishedData[i].initialized = true
  371. distinguishedData[i].q.empty.initSemaphore()
  372. initLock(distinguishedData[i].q.lock)
  373. distinguishedData[i].readyForTask.initSemaphore()
  374. createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
  375. proc setup() =
  376. let p = countProcessors()
  377. when defined(nimPinToCpu):
  378. gCpus = p
  379. currentPoolSize = min(p, MaxThreadPoolSize)
  380. readyWorker = addr(workersData[0])
  381. for i in 0..<currentPoolSize: activateWorkerThread(i)
  382. proc preferSpawn*(): bool =
  383. ## Use this proc to determine quickly if a `spawn` or a direct call is
  384. ## preferable.
  385. ##
  386. ## If it returns `true`, a `spawn` may make sense. In general
  387. ## it is not necessary to call this directly; use the `spawnX template
  388. ## <#spawnX.t>`_ instead.
  389. result = gSomeReady.counter > 0
  390. proc spawn*(call: sink typed) {.magic: "Spawn".} =
  391. ## Always spawns a new task, so that the `call` is never executed on
  392. ## the calling thread.
  393. ##
  394. ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
  395. ## return type that is either `void` or compatible with `FlowVar[T]`.
  396. discard "It uses `nimSpawn3` internally"
  397. proc pinnedSpawn*(id: ThreadId; call: sink typed) {.magic: "Spawn".} =
  398. ## Always spawns a new task on the worker thread with `id`, so that
  399. ## the `call` is **always** executed on the thread.
  400. ##
  401. ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
  402. ## return type that is either `void` or compatible with `FlowVar[T]`.
  403. discard "It uses `nimSpawn4` internally"
  404. template spawnX*(call) =
  405. ## Spawns a new task if a CPU core is ready, otherwise executes the
  406. ## call in the calling thread.
  407. ##
  408. ## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_
  409. ## in order to not block the producer for an unknown amount of time.
  410. ##
  411. ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
  412. ## return type that is either 'void' or compatible with `FlowVar[T]`.
  413. (if preferSpawn(): spawn call else: call)
  414. proc parallel*(body: untyped) {.magic: "Parallel".}
  415. ## A parallel section can be used to execute a block in parallel.
  416. ##
  417. ## `body` has to be in a DSL that is a particular subset of the language.
  418. ##
  419. ## Please refer to `the manual <manual_experimental.html#parallel-amp-spawn>`_
  420. ## for further information.
  421. var
  422. state: ThreadPoolState
  423. stateLock: Lock
  424. initLock stateLock
  425. proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerproc.} =
  426. # implementation of 'spawn' that is used by the code generator.
  427. while true:
  428. if selectWorker(readyWorker, fn, data): return
  429. for i in 0..<currentPoolSize:
  430. if selectWorker(addr(workersData[i]), fn, data): return
  431. # determine what to do, but keep in mind this is expensive too:
  432. # state.calls < maxPoolSize: warmup phase
  433. # (state.calls and 127) == 0: periodic check
  434. if state.calls < maxPoolSize or (state.calls and 127) == 0:
  435. # ensure the call to 'advice' is atomic:
  436. if tryAcquire(stateLock):
  437. if currentPoolSize < minPoolSize:
  438. if not workersData[currentPoolSize].initialized:
  439. activateWorkerThread(currentPoolSize)
  440. let w = addr(workersData[currentPoolSize])
  441. atomicInc currentPoolSize
  442. if selectWorker(w, fn, data):
  443. release(stateLock)
  444. return
  445. case advice(state)
  446. of doNothing: discard
  447. of doCreateThread:
  448. if currentPoolSize < maxPoolSize:
  449. if not workersData[currentPoolSize].initialized:
  450. activateWorkerThread(currentPoolSize)
  451. let w = addr(workersData[currentPoolSize])
  452. atomicInc currentPoolSize
  453. if selectWorker(w, fn, data):
  454. release(stateLock)
  455. return
  456. # else we didn't succeed but some other thread, so do nothing.
  457. of doShutdownThread:
  458. if currentPoolSize > minPoolSize:
  459. let w = addr(workersData[currentPoolSize-1])
  460. w.shutdown = true
  461. # we don't free anything here. Too dangerous.
  462. release(stateLock)
  463. # else the acquire failed, but this means some
  464. # other thread succeeded, so we don't need to do anything here.
  465. when defined(nimRecursiveSpawn):
  466. if localThreadId > 0:
  467. # we are a worker thread, so instead of waiting for something which
  468. # might as well never happen (see tparallel_quicksort), we run the task
  469. # on the current thread instead.
  470. var self = addr(workersData[localThreadId-1])
  471. fn(self, data)
  472. blockUntil(self.taskStarted)
  473. return
  474. if isSlave:
  475. # Run under lock until `numSlavesWaiting` increment to avoid a
  476. # race (otherwise two last threads might start waiting together)
  477. withLock numSlavesLock:
  478. if numSlavesRunning <= numSlavesWaiting + 1:
  479. # All the other slaves are waiting
  480. # If we wait now, we-re deadlocked until
  481. # an external spawn happens !
  482. if currentPoolSize < maxPoolSize:
  483. if not workersData[currentPoolSize].initialized:
  484. activateWorkerThread(currentPoolSize)
  485. let w = addr(workersData[currentPoolSize])
  486. atomicInc currentPoolSize
  487. if selectWorker(w, fn, data):
  488. return
  489. else:
  490. # There is no place in the pool. We're deadlocked.
  491. # echo "Deadlock!"
  492. discard
  493. inc numSlavesWaiting
  494. blockUntil(gSomeReady)
  495. if isSlave:
  496. withLock numSlavesLock:
  497. dec numSlavesWaiting
  498. var
  499. distinguishedLock: Lock
  500. initLock distinguishedLock
  501. proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} =
  502. acquire(distinguishedLock)
  503. if not distinguishedData[id].initialized:
  504. activateDistinguishedThread(id)
  505. release(distinguishedLock)
  506. while true:
  507. if selectWorker(addr(distinguishedData[id]), fn, data): break
  508. blockUntil(distinguishedData[id].readyForTask)
  509. proc sync*() =
  510. ## A simple barrier to wait for all `spawn`ed tasks.
  511. ##
  512. ## If you need more elaborate waiting, you have to use an explicit barrier.
  513. while true:
  514. var allReady = true
  515. for i in 0 ..< currentPoolSize:
  516. if not allReady: break
  517. allReady = allReady and workersData[i].ready
  518. if allReady: break
  519. sleep(threadpoolWaitMs)
  520. # We cannot "blockUntil(gSomeReady)" because workers may be shut down between
  521. # the time we establish that some are not "ready" and the time we wait for a
  522. # "signal(gSomeReady)" from inside "slave()" that can never come.
  523. setup()