123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499 |
- #
- #
- # The Nim Compiler
- # (c) Copyright 2021 Andreas Prell, Mamy André-Ratsimbazafy & Nim Contributors
- #
- # See the file "copying.txt", included in this
- # distribution, for details about the copyright.
- #
- # Based on https://github.com/mratsim/weave/blob/5696d94e6358711e840f8c0b7c684fcc5cbd4472/unused/channels/channels_legacy.nim
- # Those are translations of @aprell (Andreas Prell) original channels from C to Nim
- # (https://github.com/aprell/tasking-2.0/blob/master/src/channel_shm/channel.c)
- # And in turn they are an implementation of Michael & Scott lock-based queues
- # (note the paper has 2 channels: lock-free and lock-based) with additional caching:
- # Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
- # Maged M. Michael, Michael L. Scott, 1996
- # https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf
- ## This module only works with `--gc:arc` or `--gc:orc`.
- ##
- ## .. warning:: This module is experimental and its interface may change.
- ##
- ## The following is a simple example of two different ways to use channels:
- ## blocking and non-blocking.
- ##
- runnableExamples("--threads:on --gc:orc"):
- import std/os
- # In this example a channel is declared at module scope.
- # Channels are generic, and they include support for passing objects between
- # threads.
- # Note that isolated data passed through channels is moved around.
- var chan = newChannel[string]()
- # This proc will be run in another thread using the threads module.
- proc firstWorker() =
- chan.send("Hello World!")
- # This is another proc to run in a background thread. This proc takes a while
- # to send the message since it sleeps for 2 seconds (or 2000 milliseconds).
- proc secondWorker() =
- sleep(2000)
- chan.send("Another message")
- # Launch the worker.
- var worker1: Thread[void]
- createThread(worker1, firstWorker)
- # Block until the message arrives, then print it out.
- let dest = chan.recv()
- assert dest == "Hello World!"
- # Wait for the thread to exit before moving on to the next example.
- worker1.joinThread()
- # Launch the other worker.
- var worker2: Thread[void]
- createThread(worker2, secondWorker)
- # This time, use a non-blocking approach with tryRecv.
- # Since the main thread is not blocked, it could be used to perform other
- # useful work while it waits for data to arrive on the channel.
- var messages: seq[string]
- while true:
- var msg = ""
- if chan.tryRecv(msg):
- messages.add msg # "Another message"
- break
- messages.add "Pretend I'm doing useful work..."
- # For this example, sleep in order not to flood stdout with the above
- # message.
- sleep(400)
- # Wait for the second thread to exit before cleaning up the channel.
- worker2.joinThread()
- # Clean up the channel.
- assert chan.close()
- assert messages[^1] == "Another message"
- assert messages.len >= 2
- when not defined(gcArc) and not defined(gcOrc) and not defined(nimdoc):
- {.error: "This channel implementation requires --gc:arc or --gc:orc".}
- import std/[locks, atomics, isolation]
- import system/ansi_c
- # Channel (Shared memory channels)
- # ----------------------------------------------------------------------------------
- const
- cacheLineSize {.intdefine.} = 64 # TODO: some Samsung phone have 128 cache-line
- nimChannelCacheSize* {.intdefine.} = 100
- type
- ChannelRaw = ptr ChannelObj
- ChannelObj = object
- headLock, tailLock: Lock
- notFullCond, notEmptyCond: Cond
- closed: Atomic[bool]
- size: int
- itemsize: int # up to itemsize bytes can be exchanged over this channel
- head {.align: cacheLineSize.} : int # Items are taken from head and new items are inserted at tail
- tail: int
- buffer: ptr UncheckedArray[byte]
- atomicCounter: Atomic[int]
- ChannelCache = ptr ChannelCacheObj
- ChannelCacheObj = object
- next: ChannelCache
- chanSize: int
- chanN: int
- numCached: int
- cache: array[nimChannelCacheSize, ChannelRaw]
- # ----------------------------------------------------------------------------------
- proc numItems(chan: ChannelRaw): int {.inline.} =
- result = chan.tail - chan.head
- if result < 0:
- inc(result, 2 * chan.size)
- assert result <= chan.size
- template isFull(chan: ChannelRaw): bool =
- abs(chan.tail - chan.head) == chan.size
- template isEmpty(chan: ChannelRaw): bool =
- chan.head == chan.tail
- # Unbuffered / synchronous channels
- # ----------------------------------------------------------------------------------
- template numItemsUnbuf(chan: ChannelRaw): int =
- chan.head
- template isFullUnbuf(chan: ChannelRaw): bool =
- chan.head == 1
- template isEmptyUnbuf(chan: ChannelRaw): bool =
- chan.head == 0
- # ChannelRaw kinds
- # ----------------------------------------------------------------------------------
- func isUnbuffered(chan: ChannelRaw): bool =
- chan.size - 1 == 0
- # ChannelRaw status and properties
- # ----------------------------------------------------------------------------------
- proc isClosed(chan: ChannelRaw): bool {.inline.} = load(chan.closed, moRelaxed)
- proc peek(chan: ChannelRaw): int {.inline.} =
- (if chan.isUnbuffered: numItemsUnbuf(chan) else: numItems(chan))
- # Per-thread channel cache
- # ----------------------------------------------------------------------------------
- var channelCache {.threadvar.}: ChannelCache
- var channelCacheLen {.threadvar.}: int
- proc allocChannelCache(size, n: int): bool =
- ## Allocate a free list for storing channels of a given type
- var p = channelCache
- # Avoid multiple free lists for the exact same type of channel
- while not p.isNil:
- if size == p.chanSize and n == p.chanN:
- return false
- p = p.next
- p = cast[ptr ChannelCacheObj](c_malloc(csize_t sizeof(ChannelCacheObj)))
- if p.isNil:
- raise newException(OutOfMemDefect, "Could not allocate memory")
- p.chanSize = size
- p.chanN = n
- p.numCached = 0
- p.next = channelCache
- channelCache = p
- inc channelCacheLen
- result = true
- proc freeChannelCache*() =
- ## Frees the entire channel cache, including all channels
- var p = channelCache
- var q: ChannelCache
- while not p.isNil:
- q = p.next
- for i in 0 ..< p.numCached:
- let chan = p.cache[i]
- if not chan.buffer.isNil:
- c_free(chan.buffer)
- deinitLock(chan.headLock)
- deinitLock(chan.tailLock)
- deinitCond(chan.notFullCond)
- deinitCond(chan.notEmptyCond)
- c_free(chan)
- c_free(p)
- dec channelCacheLen
- p = q
- assert(channelCacheLen == 0)
- channelCache = nil
- # Channels memory ops
- # ----------------------------------------------------------------------------------
- proc allocChannel(size, n: int): ChannelRaw =
- when nimChannelCacheSize > 0:
- var p = channelCache
- while not p.isNil:
- if size == p.chanSize and n == p.chanN:
- # Check if free list contains channel
- if p.numCached > 0:
- dec p.numCached
- result = p.cache[p.numCached]
- assert(result.isEmpty)
- return
- else:
- # All the other lists in cache won't match
- break
- p = p.next
- result = cast[ChannelRaw](c_malloc(csize_t sizeof(ChannelObj)))
- if result.isNil:
- raise newException(OutOfMemDefect, "Could not allocate memory")
- # To buffer n items, we allocate for n
- result.buffer = cast[ptr UncheckedArray[byte]](c_malloc(csize_t n*size))
- if result.buffer.isNil:
- raise newException(OutOfMemDefect, "Could not allocate memory")
- initLock(result.headLock)
- initLock(result.tailLock)
- initCond(result.notFullCond)
- initCond(result.notEmptyCond)
- result.closed.store(false, moRelaxed) # We don't need atomic here, how to?
- result.size = n
- result.itemsize = size
- result.head = 0
- result.tail = 0
- result.atomicCounter.store(0, moRelaxed)
- when nimChannelCacheSize > 0:
- # Allocate a cache as well if one of the proper size doesn't exist
- discard allocChannelCache(size, n)
- proc freeChannel(chan: ChannelRaw) =
- if chan.isNil:
- return
- when nimChannelCacheSize > 0:
- var p = channelCache
- while not p.isNil:
- if chan.itemsize == p.chanSize and
- chan.size == p.chanN:
- if p.numCached < nimChannelCacheSize:
- # If space left in cache, cache it
- p.cache[p.numCached] = chan
- inc p.numCached
- return
- else:
- # All the other lists in cache won't match
- break
- p = p.next
- if not chan.buffer.isNil:
- c_free(chan.buffer)
- deinitLock(chan.headLock)
- deinitLock(chan.tailLock)
- deinitCond(chan.notFullCond)
- deinitCond(chan.notEmptyCond)
- c_free(chan)
- # MPMC Channels (Multi-Producer Multi-Consumer)
- # ----------------------------------------------------------------------------------
- proc sendUnbufferedMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool =
- if nonBlocking and chan.isFullUnbuf:
- return false
- acquire(chan.headLock)
- if nonBlocking and chan.isFullUnbuf:
- # Another thread was faster
- release(chan.headLock)
- return false
- while chan.isFullUnbuf:
- wait(chan.notFullcond, chan.headLock)
- assert chan.isEmptyUnbuf
- assert size <= chan.itemsize
- copyMem(chan.buffer, data, size)
- chan.head = 1
- signal(chan.notEmptyCond)
- release(chan.headLock)
- result = true
- proc sendMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool =
- assert not chan.isNil
- assert not data.isNil
- if isUnbuffered(chan):
- return sendUnbufferedMpmc(chan, data, size, nonBlocking)
- if nonBlocking and chan.isFull:
- return false
- acquire(chan.tailLock)
- if nonBlocking and chan.isFull:
- # Another thread was faster
- release(chan.tailLock)
- return false
- while chan.isFull:
- wait(chan.notFullcond, chan.tailLock)
- assert not chan.isFull
- assert size <= chan.itemsize
- let writeIdx = if chan.tail < chan.size: chan.tail
- else: chan.tail - chan.size
- copyMem(chan.buffer[writeIdx * chan.itemsize].addr, data, size)
- inc chan.tail
- if chan.tail == 2 * chan.size:
- chan.tail = 0
- signal(chan.notEmptyCond)
- release(chan.tailLock)
- result = true
- proc recvUnbufferedMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool =
- if nonBlocking and chan.isEmptyUnbuf:
- return false
- acquire(chan.headLock)
- if nonBlocking and chan.isEmptyUnbuf:
- # Another thread was faster
- release(chan.headLock)
- return false
- while chan.isEmptyUnbuf:
- wait(chan.notEmptyCond, chan.headLock)
- assert chan.isFullUnbuf
- assert size <= chan.itemsize
- copyMem(data, chan.buffer, size)
- chan.head = 0
- signal(chan.notFullCond)
- release(chan.headLock)
- result = true
- proc recvMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool =
- assert not chan.isNil
- assert not data.isNil
- if isUnbuffered(chan):
- return recvUnbufferedMpmc(chan, data, size, nonBlocking)
- if nonBlocking and chan.isEmpty:
- return false
- acquire(chan.headLock)
- if nonBlocking and chan.isEmpty:
- # Another thread took the last data
- release(chan.headLock)
- return false
- while chan.isEmpty:
- wait(chan.notEmptyCond, chan.headLock)
- assert not chan.isEmpty
- assert size <= chan.itemsize
- let readIdx = if chan.head < chan.size: chan.head
- else: chan.head - chan.size
- copyMem(data, chan.buffer[readIdx * chan.itemsize].addr, size)
- inc chan.head
- if chan.head == 2 * chan.size:
- chan.head = 0
- signal(chan.notFullCond)
- release(chan.headLock)
- result = true
- proc channelCloseMpmc(chan: ChannelRaw): bool =
- # Unsynchronized
- if chan.isClosed:
- # ChannelRaw already closed
- return false
- store(chan.closed, true, moRelaxed)
- result = true
- proc channelOpenMpmc(chan: ChannelRaw): bool =
- # Unsynchronized
- if not chan.isClosed:
- # ChannelRaw already open
- return false
- store(chan.closed, false, moRelaxed)
- result = true
- # Public API
- # ----------------------------------------------------------------------------------
- type
- Channel*[T] = object ## Typed channels
- d: ChannelRaw
- proc `=destroy`*[T](c: var Channel[T]) =
- if c.d != nil:
- if load(c.d.atomicCounter, moAcquire) == 0:
- if c.d.buffer != nil:
- freeChannel(c.d)
- else:
- atomicDec(c.d.atomicCounter)
- proc `=copy`*[T](dest: var Channel[T], src: Channel[T]) =
- ## Shares `Channel` by reference counting.
- if src.d != nil:
- atomicInc(src.d.atomicCounter)
- if dest.d != nil:
- `=destroy`(dest)
- dest.d = src.d
- func trySend*[T](c: Channel[T], src: var Isolated[T]): bool {.inline.} =
- ## Sends item to the channel(non blocking).
- var data = src.extract
- result = sendMpmc(c.d, data.addr, sizeof(T), true)
- if result:
- wasMoved(data)
- template trySend*[T](c: Channel[T], src: T): bool =
- ## Helper templates for `trySend`.
- trySend(c, isolate(src))
- func tryRecv*[T](c: Channel[T], dst: var T): bool {.inline.} =
- ## Receives item from the channel(non blocking).
- recvMpmc(c.d, dst.addr, sizeof(T), true)
- func send*[T](c: Channel[T], src: sink Isolated[T]) {.inline.} =
- ## Sends item to the channel(blocking).
- var data = src.extract
- discard sendMpmc(c.d, data.addr, sizeof(T), false)
- wasMoved(data)
- template send*[T](c: Channel[T]; src: T) =
- ## Helper templates for `send`.
- send(c, isolate(src))
- func recv*[T](c: Channel[T]): T {.inline.} =
- ## Receives item from the channel(blocking).
- discard recvMpmc(c.d, result.addr, sizeof(result), false)
- func open*[T](c: Channel[T]): bool {.inline.} =
- result = c.d.channelOpenMpmc()
- func close*[T](c: Channel[T]): bool {.inline.} =
- result = c.d.channelCloseMpmc()
- func peek*[T](c: Channel[T]): int {.inline.} = peek(c.d)
- proc newChannel*[T](elements = 30): Channel[T] =
- ## Returns a new `Channel`. `elements` should be positive.
- ## `elements` is used to specify whether a channel is buffered or not.
- ## If `elements` = 1, the channel is unbuffered. If `elements` > 1, the
- ## channel is buffered.
- assert elements >= 1, "Elements must be positive!"
- result = Channel[T](d: allocChannel(sizeof(T), elements))
|