sysspawn.nim 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Implements Nim's 'spawn'.
  10. when not declared(NimString):
  11. {.error: "You must not import this module explicitly".}
  12. {.push stackTrace:off.}
  13. # We declare our own condition variables here to get rid of the dummy lock
  14. # on Windows:
  15. type
  16. CondVar = object
  17. c: SysCond
  18. when defined(posix):
  19. stupidLock: SysLock
  20. counter: int
  21. proc createCondVar(): CondVar =
  22. initSysCond(result.c)
  23. when defined(posix):
  24. initSysLock(result.stupidLock)
  25. #acquireSys(result.stupidLock)
  26. proc destroyCondVar(c: var CondVar) {.inline.} =
  27. deinitSysCond(c.c)
  28. proc await(cv: var CondVar) =
  29. when defined(posix):
  30. acquireSys(cv.stupidLock)
  31. while cv.counter <= 0:
  32. waitSysCond(cv.c, cv.stupidLock)
  33. dec cv.counter
  34. releaseSys(cv.stupidLock)
  35. else:
  36. waitSysCondWindows(cv.c)
  37. proc signal(cv: var CondVar) =
  38. when defined(posix):
  39. acquireSys(cv.stupidLock)
  40. inc cv.counter
  41. releaseSys(cv.stupidLock)
  42. signalSysCond(cv.c)
  43. type
  44. FastCondVar = object
  45. event, slowPath: bool
  46. slow: CondVar
  47. proc createFastCondVar(): FastCondVar =
  48. initSysCond(result.slow.c)
  49. when defined(posix):
  50. initSysLock(result.slow.stupidLock)
  51. #acquireSys(result.slow.stupidLock)
  52. result.event = false
  53. result.slowPath = false
  54. proc await(cv: var FastCondVar) =
  55. #for i in 0 .. 50:
  56. # if cas(addr cv.event, true, false):
  57. # # this is a HIT: Triggers > 95% in my tests.
  58. # return
  59. # cpuRelax()
  60. #cv.slowPath = true
  61. # XXX For some reason this crashes some test programs
  62. await(cv.slow)
  63. cv.event = false
  64. proc signal(cv: var FastCondVar) =
  65. cv.event = true
  66. #if cas(addr cv.slowPath, true, false):
  67. signal(cv.slow)
  68. type
  69. Barrier* {.compilerProc.} = object
  70. counter: int
  71. cv: CondVar
  72. proc barrierEnter*(b: ptr Barrier) {.compilerProc.} =
  73. atomicInc b.counter
  74. proc barrierLeave*(b: ptr Barrier) {.compilerProc.} =
  75. atomicDec b.counter
  76. if b.counter <= 0: signal(b.cv)
  77. proc openBarrier*(b: ptr Barrier) {.compilerProc.} =
  78. b.counter = 0
  79. b.cv = createCondVar()
  80. proc closeBarrier*(b: ptr Barrier) {.compilerProc.} =
  81. await(b.cv)
  82. destroyCondVar(b.cv)
  83. {.pop.}
  84. # ----------------------------------------------------------------------------
  85. type
  86. WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
  87. Worker = object
  88. taskArrived: CondVar
  89. taskStarted: FastCondVar #\
  90. # task data:
  91. f: WorkerProc
  92. data: pointer
  93. ready: bool # put it here for correct alignment!
  94. proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
  95. let w = cast[ptr Worker](p)
  96. signal(w.taskStarted)
  97. var gSomeReady = createFastCondVar()
  98. proc slave(w: ptr Worker) {.thread.} =
  99. while true:
  100. w.ready = true # If we instead signal "workerReady" we need the scheduler
  101. # to notice this. The scheduler could then optimize the
  102. # layout of the worker threads (e.g. keep the list sorted)
  103. # so that no search for a "ready" thread is necessary.
  104. # This might be implemented later, but is more tricky than
  105. # it looks because 'spawn' itself can run concurrently.
  106. signal(gSomeReady)
  107. await(w.taskArrived)
  108. assert(not w.ready)
  109. # shield against spurious wakeups:
  110. if w.data != nil:
  111. w.f(w, w.data)
  112. w.data = nil
  113. const NumThreads = 4
  114. var
  115. workers: array[NumThreads, Thread[ptr Worker]]
  116. workersData: array[NumThreads, Worker]
  117. proc setup() =
  118. for i in 0 ..< NumThreads:
  119. workersData[i].taskArrived = createCondVar()
  120. workersData[i].taskStarted = createFastCondVar()
  121. createThread(workers[i], slave, addr(workersData[i]))
  122. proc preferSpawn*(): bool =
  123. ## Use this proc to determine quickly if a 'spawn' or a direct call is
  124. ## preferable. If it returns 'true' a 'spawn' may make sense. In general
  125. ## it is not necessary to call this directly; use 'spawnX' instead.
  126. result = gSomeReady.event
  127. proc spawn*(call: typed) {.magic: "Spawn".}
  128. ## always spawns a new task, so that the 'call' is never executed on
  129. ## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
  130. ## is gcsafe and has 'void' as the return type.
  131. template spawnX*(call: typed) =
  132. ## spawns a new task if a CPU core is ready, otherwise executes the
  133. ## call in the calling thread. Usually it is advised to
  134. ## use 'spawn' in order to not block the producer for an unknown
  135. ## amount of time. 'call' has to be proc call 'p(...)' where 'p'
  136. ## is gcsafe and has 'void' as the return type.
  137. if preferSpawn(): spawn call
  138. else: call
  139. proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
  140. # implementation of 'spawn' that is used by the code generator.
  141. while true:
  142. for i in 0.. high(workers):
  143. let w = addr(workersData[i])
  144. if cas(addr w.ready, true, false):
  145. w.data = data
  146. w.f = fn
  147. signal(w.taskArrived)
  148. await(w.taskStarted)
  149. return
  150. await(gSomeReady)
  151. proc sync*() =
  152. ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
  153. ## waiting, you have to use an explicit barrier.
  154. while true:
  155. var allReady = true
  156. for i in 0 .. high(workers):
  157. if not allReady: break
  158. allReady = allReady and workersData[i].ready
  159. if allReady: break
  160. await(gSomeReady)
  161. setup()