sysspawn.nim 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Implements Nim's 'spawn'.
  10. when not declared(NimString):
  11. {.error: "You must not import this module explicitly".}
  12. {.push stackTrace:off.}
  13. # We declare our own condition variables here to get rid of the dummy lock
  14. # on Windows:
  15. type
  16. CondVar = object
  17. c: SysCond
  18. stupidLock: SysLock
  19. counter: int
  20. proc createCondVar(): CondVar =
  21. initSysCond(result.c)
  22. initSysLock(result.stupidLock)
  23. #acquireSys(result.stupidLock)
  24. proc destroyCondVar(c: var CondVar) {.inline.} =
  25. deinitSysCond(c.c)
  26. proc await(cv: var CondVar) =
  27. acquireSys(cv.stupidLock)
  28. while cv.counter <= 0:
  29. waitSysCond(cv.c, cv.stupidLock)
  30. dec cv.counter
  31. releaseSys(cv.stupidLock)
  32. proc signal(cv: var CondVar) =
  33. acquireSys(cv.stupidLock)
  34. inc cv.counter
  35. releaseSys(cv.stupidLock)
  36. signalSysCond(cv.c)
  37. type
  38. FastCondVar = object
  39. event, slowPath: bool
  40. slow: CondVar
  41. proc createFastCondVar(): FastCondVar =
  42. initSysCond(result.slow.c)
  43. initSysLock(result.slow.stupidLock)
  44. #acquireSys(result.slow.stupidLock)
  45. result.event = false
  46. result.slowPath = false
  47. proc await(cv: var FastCondVar) =
  48. #for i in 0 .. 50:
  49. # if cas(addr cv.event, true, false):
  50. # # this is a HIT: Triggers > 95% in my tests.
  51. # return
  52. # cpuRelax()
  53. #cv.slowPath = true
  54. # XXX For some reason this crashes some test programs
  55. await(cv.slow)
  56. cv.event = false
  57. proc signal(cv: var FastCondVar) =
  58. cv.event = true
  59. #if cas(addr cv.slowPath, true, false):
  60. signal(cv.slow)
  61. type
  62. Barrier* {.compilerproc.} = object
  63. counter: int
  64. cv: CondVar
  65. proc barrierEnter*(b: ptr Barrier) {.compilerproc.} =
  66. atomicInc b.counter
  67. proc barrierLeave*(b: ptr Barrier) {.compilerproc.} =
  68. atomicDec b.counter
  69. if b.counter <= 0: signal(b.cv)
  70. proc openBarrier*(b: ptr Barrier) {.compilerproc.} =
  71. b.counter = 0
  72. b.cv = createCondVar()
  73. proc closeBarrier*(b: ptr Barrier) {.compilerproc.} =
  74. await(b.cv)
  75. destroyCondVar(b.cv)
  76. {.pop.}
  77. # ----------------------------------------------------------------------------
  78. type
  79. WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
  80. Worker = object
  81. taskArrived: CondVar
  82. taskStarted: FastCondVar #\
  83. # task data:
  84. f: WorkerProc
  85. data: pointer
  86. ready: bool # put it here for correct alignment!
  87. proc nimArgsPassingDone(p: pointer) {.compilerproc.} =
  88. let w = cast[ptr Worker](p)
  89. signal(w.taskStarted)
  90. var gSomeReady = createFastCondVar()
  91. proc slave(w: ptr Worker) {.thread.} =
  92. while true:
  93. w.ready = true # If we instead signal "workerReady" we need the scheduler
  94. # to notice this. The scheduler could then optimize the
  95. # layout of the worker threads (e.g. keep the list sorted)
  96. # so that no search for a "ready" thread is necessary.
  97. # This might be implemented later, but is more tricky than
  98. # it looks because 'spawn' itself can run concurrently.
  99. signal(gSomeReady)
  100. await(w.taskArrived)
  101. assert(not w.ready)
  102. # shield against spurious wakeups:
  103. if w.data != nil:
  104. w.f(w, w.data)
  105. w.data = nil
  106. const NumThreads = 4
  107. var
  108. workers: array[NumThreads, Thread[ptr Worker]]
  109. workersData: array[NumThreads, Worker]
  110. proc setup() =
  111. for i in 0 ..< NumThreads:
  112. workersData[i].taskArrived = createCondVar()
  113. workersData[i].taskStarted = createFastCondVar()
  114. createThread(workers[i], slave, addr(workersData[i]))
  115. proc preferSpawn*(): bool =
  116. ## Use this proc to determine quickly if a 'spawn' or a direct call is
  117. ## preferable. If it returns 'true' a 'spawn' may make sense. In general
  118. ## it is not necessary to call this directly; use 'spawnX' instead.
  119. result = gSomeReady.event
  120. proc spawn*(call: typed) {.magic: "Spawn".}
  121. ## always spawns a new task, so that the 'call' is never executed on
  122. ## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
  123. ## is gcsafe and has 'void' as the return type.
  124. template spawnX*(call: typed) =
  125. ## spawns a new task if a CPU core is ready, otherwise executes the
  126. ## call in the calling thread. Usually it is advised to
  127. ## use 'spawn' in order to not block the producer for an unknown
  128. ## amount of time. 'call' has to be proc call 'p(...)' where 'p'
  129. ## is gcsafe and has 'void' as the return type.
  130. if preferSpawn(): spawn call
  131. else: call
  132. proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerproc.} =
  133. # implementation of 'spawn' that is used by the code generator.
  134. while true:
  135. for i in 0.. high(workers):
  136. let w = addr(workersData[i])
  137. if cas(addr w.ready, true, false):
  138. w.data = data
  139. w.f = fn
  140. signal(w.taskArrived)
  141. await(w.taskStarted)
  142. return
  143. await(gSomeReady)
  144. proc sync*() =
  145. ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
  146. ## waiting, you have to use an explicit barrier.
  147. while true:
  148. var allReady = true
  149. for i in 0 .. high(workers):
  150. if not allReady: break
  151. allReady = allReady and workersData[i].ready
  152. if allReady: break
  153. await(gSomeReady)
  154. setup()