123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- #
- #
- # Nim's Runtime Library
- # (c) Copyright 2015 Andreas Rumpf
- #
- # See the file "copying.txt", included in this
- # distribution, for details about the copyright.
- #
- ## Implements Nim's 'spawn'.
- when not declared(NimString):
- {.error: "You must not import this module explicitly".}
- {.push stackTrace:off.}
- # We declare our own condition variables here to get rid of the dummy lock
- # on Windows:
- type
- CondVar = object
- c: SysCond
- when defined(posix):
- stupidLock: SysLock
- counter: int
- proc createCondVar(): CondVar =
- initSysCond(result.c)
- when defined(posix):
- initSysLock(result.stupidLock)
- #acquireSys(result.stupidLock)
- proc destroyCondVar(c: var CondVar) {.inline.} =
- deinitSysCond(c.c)
- proc await(cv: var CondVar) =
- when defined(posix):
- acquireSys(cv.stupidLock)
- while cv.counter <= 0:
- waitSysCond(cv.c, cv.stupidLock)
- dec cv.counter
- releaseSys(cv.stupidLock)
- else:
- waitSysCondWindows(cv.c)
- proc signal(cv: var CondVar) =
- when defined(posix):
- acquireSys(cv.stupidLock)
- inc cv.counter
- releaseSys(cv.stupidLock)
- signalSysCond(cv.c)
- type
- FastCondVar = object
- event, slowPath: bool
- slow: CondVar
- proc createFastCondVar(): FastCondVar =
- initSysCond(result.slow.c)
- when defined(posix):
- initSysLock(result.slow.stupidLock)
- #acquireSys(result.slow.stupidLock)
- result.event = false
- result.slowPath = false
- proc await(cv: var FastCondVar) =
- #for i in 0 .. 50:
- # if cas(addr cv.event, true, false):
- # # this is a HIT: Triggers > 95% in my tests.
- # return
- # cpuRelax()
- #cv.slowPath = true
- # XXX For some reason this crashes some test programs
- await(cv.slow)
- cv.event = false
- proc signal(cv: var FastCondVar) =
- cv.event = true
- #if cas(addr cv.slowPath, true, false):
- signal(cv.slow)
- type
- Barrier* {.compilerProc.} = object
- counter: int
- cv: CondVar
- proc barrierEnter*(b: ptr Barrier) {.compilerProc.} =
- atomicInc b.counter
- proc barrierLeave*(b: ptr Barrier) {.compilerProc.} =
- atomicDec b.counter
- if b.counter <= 0: signal(b.cv)
- proc openBarrier*(b: ptr Barrier) {.compilerProc.} =
- b.counter = 0
- b.cv = createCondVar()
- proc closeBarrier*(b: ptr Barrier) {.compilerProc.} =
- await(b.cv)
- destroyCondVar(b.cv)
- {.pop.}
- # ----------------------------------------------------------------------------
- type
- WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
- Worker = object
- taskArrived: CondVar
- taskStarted: FastCondVar #\
- # task data:
- f: WorkerProc
- data: pointer
- ready: bool # put it here for correct alignment!
- proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
- let w = cast[ptr Worker](p)
- signal(w.taskStarted)
- var gSomeReady = createFastCondVar()
- proc slave(w: ptr Worker) {.thread.} =
- while true:
- w.ready = true # If we instead signal "workerReady" we need the scheduler
- # to notice this. The scheduler could then optimize the
- # layout of the worker threads (e.g. keep the list sorted)
- # so that no search for a "ready" thread is necessary.
- # This might be implemented later, but is more tricky than
- # it looks because 'spawn' itself can run concurrently.
- signal(gSomeReady)
- await(w.taskArrived)
- assert(not w.ready)
- # shield against spurious wakeups:
- if w.data != nil:
- w.f(w, w.data)
- w.data = nil
- const NumThreads = 4
- var
- workers: array[NumThreads, Thread[ptr Worker]]
- workersData: array[NumThreads, Worker]
- proc setup() =
- for i in 0 ..< NumThreads:
- workersData[i].taskArrived = createCondVar()
- workersData[i].taskStarted = createFastCondVar()
- createThread(workers[i], slave, addr(workersData[i]))
- proc preferSpawn*(): bool =
- ## Use this proc to determine quickly if a 'spawn' or a direct call is
- ## preferable. If it returns 'true' a 'spawn' may make sense. In general
- ## it is not necessary to call this directly; use 'spawnX' instead.
- result = gSomeReady.event
- proc spawn*(call: typed) {.magic: "Spawn".}
- ## always spawns a new task, so that the 'call' is never executed on
- ## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
- ## is gcsafe and has 'void' as the return type.
- template spawnX*(call: typed) =
- ## spawns a new task if a CPU core is ready, otherwise executes the
- ## call in the calling thread. Usually it is advised to
- ## use 'spawn' in order to not block the producer for an unknown
- ## amount of time. 'call' has to be proc call 'p(...)' where 'p'
- ## is gcsafe and has 'void' as the return type.
- if preferSpawn(): spawn call
- else: call
- proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
- # implementation of 'spawn' that is used by the code generator.
- while true:
- for i in 0.. high(workers):
- let w = addr(workersData[i])
- if cas(addr w.ready, true, false):
- w.data = data
- w.f = fn
- signal(w.taskArrived)
- await(w.taskStarted)
- return
- await(gSomeReady)
- proc sync*() =
- ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
- ## waiting, you have to use an explicit barrier.
- while true:
- var allReady = true
- for i in 0 .. high(workers):
- if not allReady: break
- allReady = allReady and workersData[i].ready
- if allReady: break
- await(gSomeReady)
- setup()
|