threadpool.nim 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Implements Nim's `spawn <manual.html#parallel-amp-spawn>`_.
  10. when not compileOption("threads"):
  11. {.error: "Threadpool requires --threads:on option.".}
  12. import cpuinfo, cpuload, locks
  13. {.push stackTrace:off.}
  14. type
  15. Semaphore = object
  16. c: Cond
  17. L: Lock
  18. counter: int
  19. proc initSemaphore(cv: var Semaphore) =
  20. initCond(cv.c)
  21. initLock(cv.L)
  22. proc destroySemaphore(cv: var Semaphore) {.inline.} =
  23. deinitCond(cv.c)
  24. deinitLock(cv.L)
  25. proc blockUntil(cv: var Semaphore) =
  26. acquire(cv.L)
  27. while cv.counter <= 0:
  28. wait(cv.c, cv.L)
  29. dec cv.counter
  30. release(cv.L)
  31. proc signal(cv: var Semaphore) =
  32. acquire(cv.L)
  33. inc cv.counter
  34. release(cv.L)
  35. signal(cv.c)
  36. const CacheLineSize = 32 # true for most archs
  37. type
  38. Barrier {.compilerProc.} = object
  39. entered: int
  40. cv: Semaphore # Semaphore takes 3 words at least
  41. when sizeof(int) < 8:
  42. cacheAlign: array[CacheLineSize-4*sizeof(int), byte]
  43. left: int
  44. cacheAlign2: array[CacheLineSize-sizeof(int), byte]
  45. interest: bool ## wether the master is interested in the "all done" event
  46. proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} =
  47. # due to the signaling between threads, it is ensured we are the only
  48. # one with access to 'entered' so we don't need 'atomicInc' here:
  49. inc b.entered
  50. # also we need no 'fence' instructions here as soon 'nimArgsPassingDone'
  51. # will be called which already will perform a fence for us.
  52. proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} =
  53. atomicInc b.left
  54. when not defined(x86): fence()
  55. # We may not have seen the final value of b.entered yet,
  56. # so we need to check for >= instead of ==.
  57. if b.interest and b.left >= b.entered: signal(b.cv)
  58. proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} =
  59. b.entered = 0
  60. b.left = 0
  61. b.interest = false
  62. proc closeBarrier(b: ptr Barrier) {.compilerProc.} =
  63. fence()
  64. if b.left != b.entered:
  65. b.cv.initSemaphore()
  66. fence()
  67. b.interest = true
  68. fence()
  69. while b.left != b.entered: blockUntil(b.cv)
  70. destroySemaphore(b.cv)
  71. {.pop.}
  72. # ----------------------------------------------------------------------------
  73. type
  74. AwaitInfo = object
  75. cv: Semaphore
  76. idx: int
  77. FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]'
  78. FlowVarBaseObj = object of RootObj
  79. ready, usesSemaphore, awaited: bool
  80. cv: Semaphore #\
  81. # for 'blockUntilAny' support
  82. ai: ptr AwaitInfo
  83. idx: int
  84. data: pointer # we incRef and unref it to keep it alive; note this MUST NOT
  85. # be RootRef here otherwise the wrong GC keeps track of it!
  86. owner: pointer # ptr Worker
  87. FlowVarObj[T] = object of FlowVarBaseObj
  88. blob: T
  89. FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable
  90. ToFreeQueue = object
  91. len: int
  92. lock: Lock
  93. empty: Semaphore
  94. data: array[128, pointer]
  95. WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
  96. Worker = object
  97. taskArrived: Semaphore
  98. taskStarted: Semaphore #\
  99. # task data:
  100. f: WorkerProc
  101. data: pointer
  102. ready: bool # put it here for correct alignment!
  103. initialized: bool # whether it has even been initialized
  104. shutdown: bool # the pool requests to shut down this worker thread
  105. q: ToFreeQueue
  106. readyForTask: Semaphore
  107. proc blockUntil*(fv: FlowVarBase) =
  108. ## waits until the value for the flowVar arrives. Usually it is not necessary
  109. ## to call this explicitly.
  110. if fv.usesSemaphore and not fv.awaited:
  111. fv.awaited = true
  112. blockUntil(fv.cv)
  113. destroySemaphore(fv.cv)
  114. proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
  115. if cas(addr w.ready, true, false):
  116. w.data = data
  117. w.f = fn
  118. signal(w.taskArrived)
  119. blockUntil(w.taskStarted)
  120. result = true
  121. proc cleanFlowVars(w: ptr Worker) =
  122. let q = addr(w.q)
  123. acquire(q.lock)
  124. for i in 0 ..< q.len:
  125. GC_unref(cast[RootRef](q.data[i]))
  126. #echo "GC_unref"
  127. q.len = 0
  128. release(q.lock)
  129. proc wakeupWorkerToProcessQueue(w: ptr Worker) =
  130. # we have to ensure it's us who wakes up the owning thread.
  131. # This is quite horrible code, but it runs so rarely that it doesn't matter:
  132. while not cas(addr w.ready, true, false):
  133. cpuRelax()
  134. discard
  135. w.data = nil
  136. w.f = proc (w, a: pointer) {.nimcall.} =
  137. let w = cast[ptr Worker](w)
  138. cleanFlowVars(w)
  139. signal(w.q.empty)
  140. signal(w.taskArrived)
  141. proc attach(fv: FlowVarBase; i: int): bool =
  142. acquire(fv.cv.L)
  143. if fv.cv.counter <= 0:
  144. fv.idx = i
  145. result = true
  146. else:
  147. result = false
  148. release(fv.cv.L)
  149. proc finished(fv: FlowVarBase) =
  150. doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
  151. # we have to protect against the rare cases where the owner of the flowVar
  152. # simply disregards the flowVar and yet the "flowVar" has not yet written
  153. # anything to it:
  154. blockUntil(fv)
  155. if fv.data.isNil: return
  156. let owner = cast[ptr Worker](fv.owner)
  157. let q = addr(owner.q)
  158. acquire(q.lock)
  159. while not (q.len < q.data.len):
  160. #echo "EXHAUSTED!"
  161. release(q.lock)
  162. wakeupWorkerToProcessQueue(owner)
  163. blockUntil(q.empty)
  164. acquire(q.lock)
  165. q.data[q.len] = cast[pointer](fv.data)
  166. inc q.len
  167. release(q.lock)
  168. fv.data = nil
  169. proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
  170. proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} =
  171. new(result, fvFinalizer)
  172. proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerProc.} =
  173. fv.cv.initSemaphore()
  174. fv.usesSemaphore = true
  175. proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} =
  176. if fv.ai != nil:
  177. acquire(fv.ai.cv.L)
  178. fv.ai.idx = fv.idx
  179. inc fv.ai.cv.counter
  180. release(fv.ai.cv.L)
  181. signal(fv.ai.cv.c)
  182. if fv.usesSemaphore:
  183. signal(fv.cv)
  184. proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
  185. ## blocks until the ``fv`` is available and then passes its value
  186. ## to ``action``. Note that due to Nim's parameter passing semantics this
  187. ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can
  188. ## sometimes be more efficient than ``^``.
  189. blockUntil(fv)
  190. when T is string or T is seq:
  191. action(cast[T](fv.data))
  192. elif T is ref:
  193. {.error: "'awaitAndThen' not available for FlowVar[ref]".}
  194. else:
  195. action(fv.blob)
  196. finished(fv)
  197. proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
  198. ## blocks until the value is available and then returns this value.
  199. blockUntil(fv)
  200. result = cast[ptr T](fv.data)
  201. proc `^`*[T](fv: FlowVar[ref T]): ref T =
  202. ## blocks until the value is available and then returns this value.
  203. blockUntil(fv)
  204. let src = cast[ref T](fv.data)
  205. deepCopy result, src
  206. proc `^`*[T](fv: FlowVar[T]): T =
  207. ## blocks until the value is available and then returns this value.
  208. blockUntil(fv)
  209. when T is string or T is seq:
  210. # XXX closures? deepCopy?
  211. result = cast[T](fv.data)
  212. else:
  213. result = fv.blob
  214. proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
  215. ## awaits any of the given flowVars. Returns the index of one flowVar for
  216. ## which a value arrived. A flowVar only supports one call to 'blockUntilAny' at
  217. ## the same time. That means if you blockUntilAny([a,b]) and blockUntilAny([b,c]) the second
  218. ## call will only blockUntil 'c'. If there is no flowVar left to be able to wait
  219. ## on, -1 is returned.
  220. ## **Note**: This results in non-deterministic behaviour and should be avoided.
  221. var ai: AwaitInfo
  222. ai.cv.initSemaphore()
  223. var conflicts = 0
  224. result = -1
  225. for i in 0 .. flowVars.high:
  226. if cas(addr flowVars[i].ai, nil, addr ai):
  227. if not attach(flowVars[i], i):
  228. result = i
  229. break
  230. else:
  231. inc conflicts
  232. if conflicts < flowVars.len:
  233. if result < 0:
  234. blockUntil(ai.cv)
  235. result = ai.idx
  236. for i in 0 .. flowVars.high:
  237. discard cas(addr flowVars[i].ai, addr ai, nil)
  238. destroySemaphore(ai.cv)
  239. proc isReady*(fv: FlowVarBase): bool =
  240. ## Determines whether the specified ``FlowVarBase``'s value is available.
  241. ##
  242. ## If ``true`` awaiting ``fv`` will not block.
  243. if fv.usesSemaphore and not fv.awaited:
  244. acquire(fv.cv.L)
  245. result = fv.cv.counter > 0
  246. release(fv.cv.L)
  247. else:
  248. result = true
  249. proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
  250. let w = cast[ptr Worker](p)
  251. signal(w.taskStarted)
  252. const
  253. MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads
  254. ## should be good enough for anybody ;-)
  255. MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads.
  256. type
  257. ThreadId* = range[0..MaxDistinguishedThread-1]
  258. var
  259. currentPoolSize: int
  260. maxPoolSize = MaxThreadPoolSize
  261. minPoolSize = 4
  262. gSomeReady : Semaphore
  263. readyWorker: ptr Worker
  264. # A workaround for recursion deadlock issue
  265. # https://github.com/nim-lang/Nim/issues/4597
  266. var
  267. numSlavesLock: Lock
  268. numSlavesRunning {.guard: numSlavesLock}: int
  269. numSlavesWaiting {.guard: numSlavesLock}: int
  270. isSlave {.threadvar.}: bool
  271. numSlavesLock.initLock
  272. gSomeReady.initSemaphore()
  273. proc slave(w: ptr Worker) {.thread.} =
  274. isSlave = true
  275. while true:
  276. when declared(atomicStoreN):
  277. atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
  278. else:
  279. w.ready = true
  280. readyWorker = w
  281. signal(gSomeReady)
  282. blockUntil(w.taskArrived)
  283. # XXX Somebody needs to look into this (why does this assertion fail
  284. # in Visual Studio?)
  285. when not defined(vcc) and not defined(tcc): assert(not w.ready)
  286. withLock numSlavesLock:
  287. inc numSlavesRunning
  288. w.f(w, w.data)
  289. withLock numSlavesLock:
  290. dec numSlavesRunning
  291. if w.q.len != 0: w.cleanFlowVars
  292. if w.shutdown:
  293. w.shutdown = false
  294. atomicDec currentPoolSize
  295. proc distinguishedSlave(w: ptr Worker) {.thread.} =
  296. while true:
  297. when declared(atomicStoreN):
  298. atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
  299. else:
  300. w.ready = true
  301. signal(w.readyForTask)
  302. blockUntil(w.taskArrived)
  303. assert(not w.ready)
  304. w.f(w, w.data)
  305. if w.q.len != 0: w.cleanFlowVars
  306. var
  307. workers: array[MaxThreadPoolSize, Thread[ptr Worker]]
  308. workersData: array[MaxThreadPoolSize, Worker]
  309. distinguished: array[MaxDistinguishedThread, Thread[ptr Worker]]
  310. distinguishedData: array[MaxDistinguishedThread, Worker]
  311. when defined(nimPinToCpu):
  312. var gCpus: Natural
  313. proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
  314. ## sets the minimal thread pool size. The default value of this is 4.
  315. minPoolSize = size
  316. proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
  317. ## sets the maximal thread pool size. The default value of this
  318. ## is ``MaxThreadPoolSize``.
  319. maxPoolSize = size
  320. if currentPoolSize > maxPoolSize:
  321. for i in maxPoolSize..currentPoolSize-1:
  322. let w = addr(workersData[i])
  323. w.shutdown = true
  324. when defined(nimRecursiveSpawn):
  325. var localThreadId {.threadvar.}: int
  326. proc activateWorkerThread(i: int) {.noinline.} =
  327. workersData[i].taskArrived.initSemaphore()
  328. workersData[i].taskStarted.initSemaphore()
  329. workersData[i].initialized = true
  330. workersData[i].q.empty.initSemaphore()
  331. initLock(workersData[i].q.lock)
  332. createThread(workers[i], slave, addr(workersData[i]))
  333. when defined(nimRecursiveSpawn):
  334. localThreadId = i+1
  335. when defined(nimPinToCpu):
  336. if gCpus > 0: pinToCpu(workers[i], i mod gCpus)
  337. proc activateDistinguishedThread(i: int) {.noinline.} =
  338. distinguishedData[i].taskArrived.initSemaphore()
  339. distinguishedData[i].taskStarted.initSemaphore()
  340. distinguishedData[i].initialized = true
  341. distinguishedData[i].q.empty.initSemaphore()
  342. initLock(distinguishedData[i].q.lock)
  343. distinguishedData[i].readyForTask.initSemaphore()
  344. createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
  345. proc setup() =
  346. let p = countProcessors()
  347. when defined(nimPinToCpu):
  348. gCpus = p
  349. currentPoolSize = min(p, MaxThreadPoolSize)
  350. readyWorker = addr(workersData[0])
  351. for i in 0..<currentPoolSize: activateWorkerThread(i)
  352. proc preferSpawn*(): bool =
  353. ## Use this proc to determine quickly if a 'spawn' or a direct call is
  354. ## preferable. If it returns 'true' a 'spawn' may make sense. In general
  355. ## it is not necessary to call this directly; use 'spawnX' instead.
  356. result = gSomeReady.counter > 0
  357. proc spawn*(call: typed): void {.magic: "Spawn".}
  358. ## always spawns a new task, so that the 'call' is never executed on
  359. ## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
  360. ## is gcsafe and has a return type that is either 'void' or compatible
  361. ## with ``FlowVar[T]``.
  362. proc pinnedSpawn*(id: ThreadId; call: typed): void {.magic: "Spawn".}
  363. ## always spawns a new task on the worker thread with ``id``, so that
  364. ## the 'call' is **always** executed on
  365. ## the thread. 'call' has to be proc call 'p(...)' where 'p'
  366. ## is gcsafe and has a return type that is either 'void' or compatible
  367. ## with ``FlowVar[T]``.
  368. template spawnX*(call): void =
  369. ## spawns a new task if a CPU core is ready, otherwise executes the
  370. ## call in the calling thread. Usually it is advised to
  371. ## use 'spawn' in order to not block the producer for an unknown
  372. ## amount of time. 'call' has to be proc call 'p(...)' where 'p'
  373. ## is gcsafe and has a return type that is either 'void' or compatible
  374. ## with ``FlowVar[T]``.
  375. (if preferSpawn(): spawn call else: call)
  376. proc parallel*(body: untyped) {.magic: "Parallel".}
  377. ## a parallel section can be used to execute a block in parallel. ``body``
  378. ## has to be in a DSL that is a particular subset of the language. Please
  379. ## refer to the manual for further information.
  380. var
  381. state: ThreadPoolState
  382. stateLock: Lock
  383. initLock stateLock
  384. proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
  385. # implementation of 'spawn' that is used by the code generator.
  386. while true:
  387. if selectWorker(readyWorker, fn, data): return
  388. for i in 0..<currentPoolSize:
  389. if selectWorker(addr(workersData[i]), fn, data): return
  390. # determine what to do, but keep in mind this is expensive too:
  391. # state.calls < maxPoolSize: warmup phase
  392. # (state.calls and 127) == 0: periodic check
  393. if state.calls < maxPoolSize or (state.calls and 127) == 0:
  394. # ensure the call to 'advice' is atomic:
  395. if tryAcquire(stateLock):
  396. if currentPoolSize < minPoolSize:
  397. if not workersData[currentPoolSize].initialized:
  398. activateWorkerThread(currentPoolSize)
  399. let w = addr(workersData[currentPoolSize])
  400. atomicInc currentPoolSize
  401. if selectWorker(w, fn, data):
  402. release(stateLock)
  403. return
  404. case advice(state)
  405. of doNothing: discard
  406. of doCreateThread:
  407. if currentPoolSize < maxPoolSize:
  408. if not workersData[currentPoolSize].initialized:
  409. activateWorkerThread(currentPoolSize)
  410. let w = addr(workersData[currentPoolSize])
  411. atomicInc currentPoolSize
  412. if selectWorker(w, fn, data):
  413. release(stateLock)
  414. return
  415. # else we didn't succeed but some other thread, so do nothing.
  416. of doShutdownThread:
  417. if currentPoolSize > minPoolSize:
  418. let w = addr(workersData[currentPoolSize-1])
  419. w.shutdown = true
  420. # we don't free anything here. Too dangerous.
  421. release(stateLock)
  422. # else the acquire failed, but this means some
  423. # other thread succeeded, so we don't need to do anything here.
  424. when defined(nimRecursiveSpawn):
  425. if localThreadId > 0:
  426. # we are a worker thread, so instead of waiting for something which
  427. # might as well never happen (see tparallel_quicksort), we run the task
  428. # on the current thread instead.
  429. var self = addr(workersData[localThreadId-1])
  430. fn(self, data)
  431. blockUntil(self.taskStarted)
  432. return
  433. if isSlave:
  434. # Run under lock until `numSlavesWaiting` increment to avoid a
  435. # race (otherwise two last threads might start waiting together)
  436. withLock numSlavesLock:
  437. if numSlavesRunning <= numSlavesWaiting + 1:
  438. # All the other slaves are waiting
  439. # If we wait now, we-re deadlocked until
  440. # an external spawn happens !
  441. if currentPoolSize < maxPoolSize:
  442. if not workersData[currentPoolSize].initialized:
  443. activateWorkerThread(currentPoolSize)
  444. let w = addr(workersData[currentPoolSize])
  445. atomicInc currentPoolSize
  446. if selectWorker(w, fn, data):
  447. return
  448. else:
  449. # There is no place in the pool. We're deadlocked.
  450. # echo "Deadlock!"
  451. discard
  452. inc numSlavesWaiting
  453. blockUntil(gSomeReady)
  454. if isSlave:
  455. withLock numSlavesLock:
  456. dec numSlavesWaiting
  457. var
  458. distinguishedLock: Lock
  459. initLock distinguishedLock
  460. proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
  461. acquire(distinguishedLock)
  462. if not distinguishedData[id].initialized:
  463. activateDistinguishedThread(id)
  464. release(distinguishedLock)
  465. while true:
  466. if selectWorker(addr(distinguishedData[id]), fn, data): break
  467. blockUntil(distinguishedData[id].readyForTask)
  468. proc sync*() =
  469. ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
  470. ## waiting, you have to use an explicit barrier.
  471. var toRelease = 0
  472. while true:
  473. var allReady = true
  474. for i in 0 ..< currentPoolSize:
  475. if not allReady: break
  476. allReady = allReady and workersData[i].ready
  477. if allReady: break
  478. blockUntil(gSomeReady)
  479. inc toRelease
  480. for i in 0 ..< toRelease:
  481. signal(gSomeReady)
  482. setup()