123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605 |
- #
- #
- # Nim's Runtime Library
- # (c) Copyright 2015 Andreas Rumpf
- #
- # See the file "copying.txt", included in this
- # distribution, for details about the copyright.
- #
- ## Implements Nim's `parallel & spawn statements <manual_experimental.html#parallel-amp-spawn>`_.
- ##
- ## Unstable API.
- ##
- ## See also
- ## ========
- ## * `threads module <threads.html>`_ for basic thread support
- ## * `locks module <locks.html>`_ for locks and condition variables
- ## * `asyncdispatch module <asyncdispatch.html>`_ for asynchronous IO
- when not compileOption("threads"):
- {.error: "Threadpool requires --threads:on option.".}
- import cpuinfo, cpuload, locks, os
- when defined(nimPreviewSlimSystem):
- import std/[assertions, typedthreads, sysatomics]
- {.push stackTrace:off.}
- type
- Semaphore = object
- c: Cond
- L: Lock
- counter: int
- proc initSemaphore(cv: var Semaphore) =
- initCond(cv.c)
- initLock(cv.L)
- proc destroySemaphore(cv: var Semaphore) {.inline.} =
- deinitCond(cv.c)
- deinitLock(cv.L)
- proc blockUntil(cv: var Semaphore) =
- acquire(cv.L)
- while cv.counter <= 0:
- wait(cv.c, cv.L)
- dec cv.counter
- release(cv.L)
- proc signal(cv: var Semaphore) =
- acquire(cv.L)
- inc cv.counter
- release(cv.L)
- signal(cv.c)
- const CacheLineSize = 64 # true for most archs
- type
- Barrier {.compilerproc.} = object
- entered: int
- cv: Semaphore # Semaphore takes 3 words at least
- left {.align(CacheLineSize).}: int
- interest {.align(CacheLineSize).} : bool # whether the master is interested in the "all done" event
- proc barrierEnter(b: ptr Barrier) {.compilerproc, inline.} =
- # due to the signaling between threads, it is ensured we are the only
- # one with access to 'entered' so we don't need 'atomicInc' here:
- inc b.entered
- # also we need no 'fence' instructions here as soon 'nimArgsPassingDone'
- # will be called which already will perform a fence for us.
- proc barrierLeave(b: ptr Barrier) {.compilerproc, inline.} =
- atomicInc b.left
- when not defined(x86): fence()
- # We may not have seen the final value of b.entered yet,
- # so we need to check for >= instead of ==.
- if b.interest and b.left >= b.entered: signal(b.cv)
- proc openBarrier(b: ptr Barrier) {.compilerproc, inline.} =
- b.entered = 0
- b.left = 0
- b.interest = false
- proc closeBarrier(b: ptr Barrier) {.compilerproc.} =
- fence()
- if b.left != b.entered:
- b.cv.initSemaphore()
- fence()
- b.interest = true
- fence()
- while b.left != b.entered: blockUntil(b.cv)
- destroySemaphore(b.cv)
- {.pop.}
- # ----------------------------------------------------------------------------
- type
- AwaitInfo = object
- cv: Semaphore
- idx: int
- FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_.
- FlowVarBaseObj {.acyclic.} = object of RootObj
- ready, usesSemaphore, awaited: bool
- cv: Semaphore # for 'blockUntilAny' support
- ai: ptr AwaitInfo
- idx: int
- data: pointer # we incRef and unref it to keep it alive; note this MUST NOT
- # be RootRef here otherwise the wrong GC keeps track of it!
- owner: pointer # ptr Worker
- FlowVarObj[T] {.acyclic.} = object of FlowVarBaseObj
- blob: T
- FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable.
- ToFreeQueue = object
- len: int
- lock: Lock
- empty: Semaphore
- data: array[128, pointer]
- WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
- Worker = object
- taskArrived: Semaphore
- taskStarted: Semaphore #\
- # task data:
- f: WorkerProc
- data: pointer
- ready: bool # put it here for correct alignment!
- initialized: bool # whether it has even been initialized
- shutdown: bool # the pool requests to shut down this worker thread
- q: ToFreeQueue
- readyForTask: Semaphore
- const threadpoolWaitMs {.intdefine.}: int = 100
- proc blockUntil*(fv: var FlowVarBaseObj) =
- ## Waits until the value for `fv` arrives.
- ##
- ## Usually it is not necessary to call this explicitly.
- if fv.usesSemaphore and not fv.awaited:
- fv.awaited = true
- blockUntil(fv.cv)
- destroySemaphore(fv.cv)
- proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
- if cas(addr w.ready, true, false):
- w.data = data
- w.f = fn
- signal(w.taskArrived)
- blockUntil(w.taskStarted)
- result = true
- proc cleanFlowVars(w: ptr Worker) =
- let q = addr(w.q)
- acquire(q.lock)
- for i in 0 ..< q.len:
- GC_unref(cast[RootRef](q.data[i]))
- #echo "GC_unref"
- q.len = 0
- release(q.lock)
- proc wakeupWorkerToProcessQueue(w: ptr Worker) =
- # we have to ensure it's us who wakes up the owning thread.
- # This is quite horrible code, but it runs so rarely that it doesn't matter:
- while not cas(addr w.ready, true, false):
- cpuRelax()
- discard
- w.data = nil
- w.f = proc (w, a: pointer) {.nimcall.} =
- let w = cast[ptr Worker](w)
- cleanFlowVars(w)
- signal(w.q.empty)
- signal(w.taskArrived)
- proc attach(fv: FlowVarBase; i: int): bool =
- acquire(fv.cv.L)
- if fv.cv.counter <= 0:
- fv.idx = i
- result = true
- else:
- result = false
- release(fv.cv.L)
- proc finished(fv: var FlowVarBaseObj) =
- doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
- # we have to protect against the rare cases where the owner of the flowVar
- # simply disregards the flowVar and yet the "flowVar" has not yet written
- # anything to it:
- blockUntil(fv)
- if fv.data.isNil: return
- let owner = cast[ptr Worker](fv.owner)
- let q = addr(owner.q)
- acquire(q.lock)
- while not (q.len < q.data.len):
- #echo "EXHAUSTED!"
- release(q.lock)
- wakeupWorkerToProcessQueue(owner)
- blockUntil(q.empty)
- acquire(q.lock)
- q.data[q.len] = cast[pointer](fv.data)
- inc q.len
- release(q.lock)
- fv.data = nil
- # the worker thread waits for "data" to be set to nil before shutting down
- owner.data = nil
- proc `=destroy`[T](fv: var FlowVarObj[T]) =
- finished(fv)
- `=destroy`(fv.blob)
- proc nimCreateFlowVar[T](): FlowVar[T] {.compilerproc.} =
- new(result)
- proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerproc.} =
- fv.cv.initSemaphore()
- fv.usesSemaphore = true
- proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} =
- if fv.ai != nil:
- acquire(fv.ai.cv.L)
- fv.ai.idx = fv.idx
- inc fv.ai.cv.counter
- release(fv.ai.cv.L)
- signal(fv.ai.cv.c)
- if fv.usesSemaphore:
- signal(fv.cv)
- proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
- ## Blocks until `fv` is available and then passes its value
- ## to `action`.
- ##
- ## Note that due to Nim's parameter passing semantics, this
- ## means that `T` doesn't need to be copied, so `awaitAndThen` can
- ## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_.
- blockUntil(fv[])
- when defined(nimV2):
- action(fv.blob)
- elif T is string or T is seq:
- action(cast[T](fv.data))
- elif T is ref:
- {.error: "'awaitAndThen' not available for FlowVar[ref]".}
- else:
- action(fv.blob)
- finished(fv[])
- proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
- ## Blocks until the value is available and then returns this value.
- blockUntil(fv[])
- when defined(nimV2):
- result = cast[ptr T](fv.blob)
- else:
- result = cast[ptr T](fv.data)
- finished(fv[])
- proc `^`*[T](fv: FlowVar[T]): T =
- ## Blocks until the value is available and then returns this value.
- blockUntil(fv[])
- when not defined(nimV2) and (T is string or T is seq or T is ref):
- deepCopy result, cast[T](fv.data)
- else:
- result = fv.blob
- finished(fv[])
- proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
- ## Awaits any of the given `flowVars`. Returns the index of one `flowVar`
- ## for which a value arrived.
- ##
- ## A `flowVar` only supports one call to `blockUntilAny` at the same time.
- ## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])`
- ## the second call will only block until `c`. If there is no `flowVar` left
- ## to be able to wait on, -1 is returned.
- ##
- ## **Note:** This results in non-deterministic behaviour and should be avoided.
- var ai: AwaitInfo
- ai.cv.initSemaphore()
- var conflicts = 0
- result = -1
- for i in 0 .. flowVars.high:
- if cas(addr flowVars[i].ai, nil, addr ai):
- if not attach(flowVars[i], i):
- result = i
- break
- else:
- inc conflicts
- if conflicts < flowVars.len:
- if result < 0:
- blockUntil(ai.cv)
- result = ai.idx
- for i in 0 .. flowVars.high:
- discard cas(addr flowVars[i].ai, addr ai, nil)
- destroySemaphore(ai.cv)
- proc isReady*(fv: FlowVarBase): bool =
- ## Determines whether the specified `FlowVarBase`'s value is available.
- ##
- ## If `true`, awaiting `fv` will not block.
- if fv.usesSemaphore and not fv.awaited:
- acquire(fv.cv.L)
- result = fv.cv.counter > 0
- release(fv.cv.L)
- else:
- result = true
- proc nimArgsPassingDone(p: pointer) {.compilerproc.} =
- let w = cast[ptr Worker](p)
- signal(w.taskStarted)
- const
- MaxThreadPoolSize* {.intdefine.} = 256 ## Maximum size of the thread pool. 256 threads
- ## should be good enough for anybody ;-)
- MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads.
- type
- ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier.
- var
- currentPoolSize: int
- maxPoolSize = MaxThreadPoolSize
- minPoolSize = 4
- gSomeReady: Semaphore
- readyWorker: ptr Worker
- # A workaround for recursion deadlock issue
- # https://github.com/nim-lang/Nim/issues/4597
- var
- numSlavesLock: Lock
- numSlavesRunning {.guard: numSlavesLock.}: int
- numSlavesWaiting {.guard: numSlavesLock.}: int
- isSlave {.threadvar.}: bool
- numSlavesLock.initLock
- gSomeReady.initSemaphore()
- proc slave(w: ptr Worker) {.thread.} =
- isSlave = true
- while true:
- if w.shutdown:
- w.shutdown = false
- atomicDec currentPoolSize
- while true:
- if w.data != nil:
- sleep(threadpoolWaitMs)
- else:
- # The flowvar finalizer ("finished()") set w.data to nil, so we can
- # safely terminate the thread.
- #
- # TODO: look for scenarios in which the flowvar is never finalized, so
- # a shut down thread gets stuck in this loop until the main thread exits.
- break
- break
- when declared(atomicStoreN):
- atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
- else:
- w.ready = true
- readyWorker = w
- signal(gSomeReady)
- blockUntil(w.taskArrived)
- # XXX Somebody needs to look into this (why does this assertion fail
- # in Visual Studio?)
- when not defined(vcc) and not defined(tcc): assert(not w.ready)
- withLock numSlavesLock:
- inc numSlavesRunning
- w.f(w, w.data)
- withLock numSlavesLock:
- dec numSlavesRunning
- if w.q.len != 0: w.cleanFlowVars
- proc distinguishedSlave(w: ptr Worker) {.thread.} =
- while true:
- when declared(atomicStoreN):
- atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
- else:
- w.ready = true
- signal(w.readyForTask)
- blockUntil(w.taskArrived)
- assert(not w.ready)
- w.f(w, w.data)
- if w.q.len != 0: w.cleanFlowVars
- var
- workers: array[MaxThreadPoolSize, Thread[ptr Worker]]
- workersData: array[MaxThreadPoolSize, Worker]
- distinguished: array[MaxDistinguishedThread, Thread[ptr Worker]]
- distinguishedData: array[MaxDistinguishedThread, Worker]
- when defined(nimPinToCpu):
- var gCpus: Natural
- proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
- ## Sets the minimum thread pool size. The default value of this is 4.
- minPoolSize = size
- proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
- ## Sets the maximum thread pool size. The default value of this
- ## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_.
- maxPoolSize = size
- if currentPoolSize > maxPoolSize:
- for i in maxPoolSize..currentPoolSize-1:
- let w = addr(workersData[i])
- w.shutdown = true
- when defined(nimRecursiveSpawn):
- var localThreadId {.threadvar.}: int
- proc activateWorkerThread(i: int) {.noinline.} =
- workersData[i].taskArrived.initSemaphore()
- workersData[i].taskStarted.initSemaphore()
- workersData[i].initialized = true
- workersData[i].q.empty.initSemaphore()
- initLock(workersData[i].q.lock)
- createThread(workers[i], slave, addr(workersData[i]))
- when defined(nimRecursiveSpawn):
- localThreadId = i+1
- when defined(nimPinToCpu):
- if gCpus > 0: pinToCpu(workers[i], i mod gCpus)
- proc activateDistinguishedThread(i: int) {.noinline.} =
- distinguishedData[i].taskArrived.initSemaphore()
- distinguishedData[i].taskStarted.initSemaphore()
- distinguishedData[i].initialized = true
- distinguishedData[i].q.empty.initSemaphore()
- initLock(distinguishedData[i].q.lock)
- distinguishedData[i].readyForTask.initSemaphore()
- createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
- proc setup() =
- let p = countProcessors()
- when defined(nimPinToCpu):
- gCpus = p
- currentPoolSize = min(p, MaxThreadPoolSize)
- readyWorker = addr(workersData[0])
- for i in 0..<currentPoolSize: activateWorkerThread(i)
- proc preferSpawn*(): bool =
- ## Use this proc to determine quickly if a `spawn` or a direct call is
- ## preferable.
- ##
- ## If it returns `true`, a `spawn` may make sense. In general
- ## it is not necessary to call this directly; use the `spawnX template
- ## <#spawnX.t>`_ instead.
- result = gSomeReady.counter > 0
- proc spawn*(call: sink typed) {.magic: "Spawn".} =
- ## Always spawns a new task, so that the `call` is never executed on
- ## the calling thread.
- ##
- ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
- ## return type that is either `void` or compatible with `FlowVar[T]`.
- discard "It uses `nimSpawn3` internally"
- proc pinnedSpawn*(id: ThreadId; call: sink typed) {.magic: "Spawn".} =
- ## Always spawns a new task on the worker thread with `id`, so that
- ## the `call` is **always** executed on the thread.
- ##
- ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
- ## return type that is either `void` or compatible with `FlowVar[T]`.
- discard "It uses `nimSpawn4` internally"
- template spawnX*(call) =
- ## Spawns a new task if a CPU core is ready, otherwise executes the
- ## call in the calling thread.
- ##
- ## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_
- ## in order to not block the producer for an unknown amount of time.
- ##
- ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
- ## return type that is either 'void' or compatible with `FlowVar[T]`.
- (if preferSpawn(): spawn call else: call)
- proc parallel*(body: untyped) {.magic: "Parallel".}
- ## A parallel section can be used to execute a block in parallel.
- ##
- ## `body` has to be in a DSL that is a particular subset of the language.
- ##
- ## Please refer to `the manual <manual_experimental.html#parallel-amp-spawn>`_
- ## for further information.
- var
- state: ThreadPoolState
- stateLock: Lock
- initLock stateLock
- proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerproc.} =
- # implementation of 'spawn' that is used by the code generator.
- while true:
- if selectWorker(readyWorker, fn, data): return
- for i in 0..<currentPoolSize:
- if selectWorker(addr(workersData[i]), fn, data): return
- # determine what to do, but keep in mind this is expensive too:
- # state.calls < maxPoolSize: warmup phase
- # (state.calls and 127) == 0: periodic check
- if state.calls < maxPoolSize or (state.calls and 127) == 0:
- # ensure the call to 'advice' is atomic:
- if tryAcquire(stateLock):
- if currentPoolSize < minPoolSize:
- if not workersData[currentPoolSize].initialized:
- activateWorkerThread(currentPoolSize)
- let w = addr(workersData[currentPoolSize])
- atomicInc currentPoolSize
- if selectWorker(w, fn, data):
- release(stateLock)
- return
- case advice(state)
- of doNothing: discard
- of doCreateThread:
- if currentPoolSize < maxPoolSize:
- if not workersData[currentPoolSize].initialized:
- activateWorkerThread(currentPoolSize)
- let w = addr(workersData[currentPoolSize])
- atomicInc currentPoolSize
- if selectWorker(w, fn, data):
- release(stateLock)
- return
- # else we didn't succeed but some other thread, so do nothing.
- of doShutdownThread:
- if currentPoolSize > minPoolSize:
- let w = addr(workersData[currentPoolSize-1])
- w.shutdown = true
- # we don't free anything here. Too dangerous.
- release(stateLock)
- # else the acquire failed, but this means some
- # other thread succeeded, so we don't need to do anything here.
- when defined(nimRecursiveSpawn):
- if localThreadId > 0:
- # we are a worker thread, so instead of waiting for something which
- # might as well never happen (see tparallel_quicksort), we run the task
- # on the current thread instead.
- var self = addr(workersData[localThreadId-1])
- fn(self, data)
- blockUntil(self.taskStarted)
- return
- if isSlave:
- # Run under lock until `numSlavesWaiting` increment to avoid a
- # race (otherwise two last threads might start waiting together)
- withLock numSlavesLock:
- if numSlavesRunning <= numSlavesWaiting + 1:
- # All the other slaves are waiting
- # If we wait now, we-re deadlocked until
- # an external spawn happens !
- if currentPoolSize < maxPoolSize:
- if not workersData[currentPoolSize].initialized:
- activateWorkerThread(currentPoolSize)
- let w = addr(workersData[currentPoolSize])
- atomicInc currentPoolSize
- if selectWorker(w, fn, data):
- return
- else:
- # There is no place in the pool. We're deadlocked.
- # echo "Deadlock!"
- discard
- inc numSlavesWaiting
- blockUntil(gSomeReady)
- if isSlave:
- withLock numSlavesLock:
- dec numSlavesWaiting
- var
- distinguishedLock: Lock
- initLock distinguishedLock
- proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} =
- acquire(distinguishedLock)
- if not distinguishedData[id].initialized:
- activateDistinguishedThread(id)
- release(distinguishedLock)
- while true:
- if selectWorker(addr(distinguishedData[id]), fn, data): break
- blockUntil(distinguishedData[id].readyForTask)
- proc sync*() =
- ## A simple barrier to wait for all `spawn`ed tasks.
- ##
- ## If you need more elaborate waiting, you have to use an explicit barrier.
- while true:
- var allReady = true
- for i in 0 ..< currentPoolSize:
- if not allReady: break
- allReady = allReady and workersData[i].ready
- if allReady: break
- sleep(threadpoolWaitMs)
- # We cannot "blockUntil(gSomeReady)" because workers may be shut down between
- # the time we establish that some are not "ready" and the time we wait for a
- # "signal(gSomeReady)" from inside "slave()" that can never come.
- setup()
|