channels.nim 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. #
  2. #
  3. # The Nim Compiler
  4. # (c) Copyright 2021 Andreas Prell, Mamy André-Ratsimbazafy & Nim Contributors
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # Based on https://github.com/mratsim/weave/blob/5696d94e6358711e840f8c0b7c684fcc5cbd4472/unused/channels/channels_legacy.nim
  10. # Those are translations of @aprell (Andreas Prell) original channels from C to Nim
  11. # (https://github.com/aprell/tasking-2.0/blob/master/src/channel_shm/channel.c)
  12. # And in turn they are an implementation of Michael & Scott lock-based queues
  13. # (note the paper has 2 channels: lock-free and lock-based) with additional caching:
  14. # Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
  15. # Maged M. Michael, Michael L. Scott, 1996
  16. # https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf
  17. ## This module only works with `--gc:arc` or `--gc:orc`.
  18. ##
  19. ## .. warning:: This module is experimental and its interface may change.
  20. ##
  21. ## The following is a simple example of two different ways to use channels:
  22. ## blocking and non-blocking.
  23. ##
  24. runnableExamples("--threads:on --gc:orc"):
  25. import std/os
  26. # In this example a channel is declared at module scope.
  27. # Channels are generic, and they include support for passing objects between
  28. # threads.
  29. # Note that isolated data passed through channels is moved around.
  30. var chan = newChannel[string]()
  31. # This proc will be run in another thread using the threads module.
  32. proc firstWorker() =
  33. chan.send("Hello World!")
  34. # This is another proc to run in a background thread. This proc takes a while
  35. # to send the message since it sleeps for 2 seconds (or 2000 milliseconds).
  36. proc secondWorker() =
  37. sleep(2000)
  38. chan.send("Another message")
  39. # Launch the worker.
  40. var worker1: Thread[void]
  41. createThread(worker1, firstWorker)
  42. # Block until the message arrives, then print it out.
  43. let dest = chan.recv()
  44. assert dest == "Hello World!"
  45. # Wait for the thread to exit before moving on to the next example.
  46. worker1.joinThread()
  47. # Launch the other worker.
  48. var worker2: Thread[void]
  49. createThread(worker2, secondWorker)
  50. # This time, use a non-blocking approach with tryRecv.
  51. # Since the main thread is not blocked, it could be used to perform other
  52. # useful work while it waits for data to arrive on the channel.
  53. var messages: seq[string]
  54. while true:
  55. var msg = ""
  56. if chan.tryRecv(msg):
  57. messages.add msg # "Another message"
  58. break
  59. messages.add "Pretend I'm doing useful work..."
  60. # For this example, sleep in order not to flood stdout with the above
  61. # message.
  62. sleep(400)
  63. # Wait for the second thread to exit before cleaning up the channel.
  64. worker2.joinThread()
  65. # Clean up the channel.
  66. assert chan.close()
  67. assert messages[^1] == "Another message"
  68. assert messages.len >= 2
  69. when not defined(gcArc) and not defined(gcOrc) and not defined(nimdoc):
  70. {.error: "This channel implementation requires --gc:arc or --gc:orc".}
  71. import std/[locks, atomics, isolation]
  72. import system/ansi_c
  73. # Channel (Shared memory channels)
  74. # ----------------------------------------------------------------------------------
  75. const
  76. cacheLineSize {.intdefine.} = 64 # TODO: some Samsung phone have 128 cache-line
  77. nimChannelCacheSize* {.intdefine.} = 100
  78. type
  79. ChannelRaw = ptr ChannelObj
  80. ChannelObj = object
  81. headLock, tailLock: Lock
  82. notFullCond, notEmptyCond: Cond
  83. closed: Atomic[bool]
  84. size: int
  85. itemsize: int # up to itemsize bytes can be exchanged over this channel
  86. head {.align: cacheLineSize.} : int # Items are taken from head and new items are inserted at tail
  87. tail: int
  88. buffer: ptr UncheckedArray[byte]
  89. atomicCounter: Atomic[int]
  90. ChannelCache = ptr ChannelCacheObj
  91. ChannelCacheObj = object
  92. next: ChannelCache
  93. chanSize: int
  94. chanN: int
  95. numCached: int
  96. cache: array[nimChannelCacheSize, ChannelRaw]
  97. # ----------------------------------------------------------------------------------
  98. proc numItems(chan: ChannelRaw): int {.inline.} =
  99. result = chan.tail - chan.head
  100. if result < 0:
  101. inc(result, 2 * chan.size)
  102. assert result <= chan.size
  103. template isFull(chan: ChannelRaw): bool =
  104. abs(chan.tail - chan.head) == chan.size
  105. template isEmpty(chan: ChannelRaw): bool =
  106. chan.head == chan.tail
  107. # Unbuffered / synchronous channels
  108. # ----------------------------------------------------------------------------------
  109. template numItemsUnbuf(chan: ChannelRaw): int =
  110. chan.head
  111. template isFullUnbuf(chan: ChannelRaw): bool =
  112. chan.head == 1
  113. template isEmptyUnbuf(chan: ChannelRaw): bool =
  114. chan.head == 0
  115. # ChannelRaw kinds
  116. # ----------------------------------------------------------------------------------
  117. func isUnbuffered(chan: ChannelRaw): bool =
  118. chan.size - 1 == 0
  119. # ChannelRaw status and properties
  120. # ----------------------------------------------------------------------------------
  121. proc isClosed(chan: ChannelRaw): bool {.inline.} = load(chan.closed, moRelaxed)
  122. proc peek(chan: ChannelRaw): int {.inline.} =
  123. (if chan.isUnbuffered: numItemsUnbuf(chan) else: numItems(chan))
  124. # Per-thread channel cache
  125. # ----------------------------------------------------------------------------------
  126. var channelCache {.threadvar.}: ChannelCache
  127. var channelCacheLen {.threadvar.}: int
  128. proc allocChannelCache(size, n: int): bool =
  129. ## Allocate a free list for storing channels of a given type
  130. var p = channelCache
  131. # Avoid multiple free lists for the exact same type of channel
  132. while not p.isNil:
  133. if size == p.chanSize and n == p.chanN:
  134. return false
  135. p = p.next
  136. p = cast[ptr ChannelCacheObj](c_malloc(csize_t sizeof(ChannelCacheObj)))
  137. if p.isNil:
  138. raise newException(OutOfMemDefect, "Could not allocate memory")
  139. p.chanSize = size
  140. p.chanN = n
  141. p.numCached = 0
  142. p.next = channelCache
  143. channelCache = p
  144. inc channelCacheLen
  145. result = true
  146. proc freeChannelCache*() =
  147. ## Frees the entire channel cache, including all channels
  148. var p = channelCache
  149. var q: ChannelCache
  150. while not p.isNil:
  151. q = p.next
  152. for i in 0 ..< p.numCached:
  153. let chan = p.cache[i]
  154. if not chan.buffer.isNil:
  155. c_free(chan.buffer)
  156. deinitLock(chan.headLock)
  157. deinitLock(chan.tailLock)
  158. deinitCond(chan.notFullCond)
  159. deinitCond(chan.notEmptyCond)
  160. c_free(chan)
  161. c_free(p)
  162. dec channelCacheLen
  163. p = q
  164. assert(channelCacheLen == 0)
  165. channelCache = nil
  166. # Channels memory ops
  167. # ----------------------------------------------------------------------------------
  168. proc allocChannel(size, n: int): ChannelRaw =
  169. when nimChannelCacheSize > 0:
  170. var p = channelCache
  171. while not p.isNil:
  172. if size == p.chanSize and n == p.chanN:
  173. # Check if free list contains channel
  174. if p.numCached > 0:
  175. dec p.numCached
  176. result = p.cache[p.numCached]
  177. assert(result.isEmpty)
  178. return
  179. else:
  180. # All the other lists in cache won't match
  181. break
  182. p = p.next
  183. result = cast[ChannelRaw](c_malloc(csize_t sizeof(ChannelObj)))
  184. if result.isNil:
  185. raise newException(OutOfMemDefect, "Could not allocate memory")
  186. # To buffer n items, we allocate for n
  187. result.buffer = cast[ptr UncheckedArray[byte]](c_malloc(csize_t n*size))
  188. if result.buffer.isNil:
  189. raise newException(OutOfMemDefect, "Could not allocate memory")
  190. initLock(result.headLock)
  191. initLock(result.tailLock)
  192. initCond(result.notFullCond)
  193. initCond(result.notEmptyCond)
  194. result.closed.store(false, moRelaxed) # We don't need atomic here, how to?
  195. result.size = n
  196. result.itemsize = size
  197. result.head = 0
  198. result.tail = 0
  199. result.atomicCounter.store(0, moRelaxed)
  200. when nimChannelCacheSize > 0:
  201. # Allocate a cache as well if one of the proper size doesn't exist
  202. discard allocChannelCache(size, n)
  203. proc freeChannel(chan: ChannelRaw) =
  204. if chan.isNil:
  205. return
  206. when nimChannelCacheSize > 0:
  207. var p = channelCache
  208. while not p.isNil:
  209. if chan.itemsize == p.chanSize and
  210. chan.size == p.chanN:
  211. if p.numCached < nimChannelCacheSize:
  212. # If space left in cache, cache it
  213. p.cache[p.numCached] = chan
  214. inc p.numCached
  215. return
  216. else:
  217. # All the other lists in cache won't match
  218. break
  219. p = p.next
  220. if not chan.buffer.isNil:
  221. c_free(chan.buffer)
  222. deinitLock(chan.headLock)
  223. deinitLock(chan.tailLock)
  224. deinitCond(chan.notFullCond)
  225. deinitCond(chan.notEmptyCond)
  226. c_free(chan)
  227. # MPMC Channels (Multi-Producer Multi-Consumer)
  228. # ----------------------------------------------------------------------------------
  229. proc sendUnbufferedMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool =
  230. if nonBlocking and chan.isFullUnbuf:
  231. return false
  232. acquire(chan.headLock)
  233. if nonBlocking and chan.isFullUnbuf:
  234. # Another thread was faster
  235. release(chan.headLock)
  236. return false
  237. while chan.isFullUnbuf:
  238. wait(chan.notFullcond, chan.headLock)
  239. assert chan.isEmptyUnbuf
  240. assert size <= chan.itemsize
  241. copyMem(chan.buffer, data, size)
  242. chan.head = 1
  243. signal(chan.notEmptyCond)
  244. release(chan.headLock)
  245. result = true
  246. proc sendMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool =
  247. assert not chan.isNil
  248. assert not data.isNil
  249. if isUnbuffered(chan):
  250. return sendUnbufferedMpmc(chan, data, size, nonBlocking)
  251. if nonBlocking and chan.isFull:
  252. return false
  253. acquire(chan.tailLock)
  254. if nonBlocking and chan.isFull:
  255. # Another thread was faster
  256. release(chan.tailLock)
  257. return false
  258. while chan.isFull:
  259. wait(chan.notFullcond, chan.tailLock)
  260. assert not chan.isFull
  261. assert size <= chan.itemsize
  262. let writeIdx = if chan.tail < chan.size: chan.tail
  263. else: chan.tail - chan.size
  264. copyMem(chan.buffer[writeIdx * chan.itemsize].addr, data, size)
  265. inc chan.tail
  266. if chan.tail == 2 * chan.size:
  267. chan.tail = 0
  268. signal(chan.notEmptyCond)
  269. release(chan.tailLock)
  270. result = true
  271. proc recvUnbufferedMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool =
  272. if nonBlocking and chan.isEmptyUnbuf:
  273. return false
  274. acquire(chan.headLock)
  275. if nonBlocking and chan.isEmptyUnbuf:
  276. # Another thread was faster
  277. release(chan.headLock)
  278. return false
  279. while chan.isEmptyUnbuf:
  280. wait(chan.notEmptyCond, chan.headLock)
  281. assert chan.isFullUnbuf
  282. assert size <= chan.itemsize
  283. copyMem(data, chan.buffer, size)
  284. chan.head = 0
  285. signal(chan.notFullCond)
  286. release(chan.headLock)
  287. result = true
  288. proc recvMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool =
  289. assert not chan.isNil
  290. assert not data.isNil
  291. if isUnbuffered(chan):
  292. return recvUnbufferedMpmc(chan, data, size, nonBlocking)
  293. if nonBlocking and chan.isEmpty:
  294. return false
  295. acquire(chan.headLock)
  296. if nonBlocking and chan.isEmpty:
  297. # Another thread took the last data
  298. release(chan.headLock)
  299. return false
  300. while chan.isEmpty:
  301. wait(chan.notEmptyCond, chan.headLock)
  302. assert not chan.isEmpty
  303. assert size <= chan.itemsize
  304. let readIdx = if chan.head < chan.size: chan.head
  305. else: chan.head - chan.size
  306. copyMem(data, chan.buffer[readIdx * chan.itemsize].addr, size)
  307. inc chan.head
  308. if chan.head == 2 * chan.size:
  309. chan.head = 0
  310. signal(chan.notFullCond)
  311. release(chan.headLock)
  312. result = true
  313. proc channelCloseMpmc(chan: ChannelRaw): bool =
  314. # Unsynchronized
  315. if chan.isClosed:
  316. # ChannelRaw already closed
  317. return false
  318. store(chan.closed, true, moRelaxed)
  319. result = true
  320. proc channelOpenMpmc(chan: ChannelRaw): bool =
  321. # Unsynchronized
  322. if not chan.isClosed:
  323. # ChannelRaw already open
  324. return false
  325. store(chan.closed, false, moRelaxed)
  326. result = true
  327. # Public API
  328. # ----------------------------------------------------------------------------------
  329. type
  330. Channel*[T] = object ## Typed channels
  331. d: ChannelRaw
  332. proc `=destroy`*[T](c: var Channel[T]) =
  333. if c.d != nil:
  334. if load(c.d.atomicCounter, moAcquire) == 0:
  335. if c.d.buffer != nil:
  336. freeChannel(c.d)
  337. else:
  338. atomicDec(c.d.atomicCounter)
  339. proc `=copy`*[T](dest: var Channel[T], src: Channel[T]) =
  340. ## Shares `Channel` by reference counting.
  341. if src.d != nil:
  342. atomicInc(src.d.atomicCounter)
  343. if dest.d != nil:
  344. `=destroy`(dest)
  345. dest.d = src.d
  346. func trySend*[T](c: Channel[T], src: var Isolated[T]): bool {.inline.} =
  347. ## Sends item to the channel(non blocking).
  348. var data = src.extract
  349. result = sendMpmc(c.d, data.addr, sizeof(T), true)
  350. if result:
  351. wasMoved(data)
  352. template trySend*[T](c: Channel[T], src: T): bool =
  353. ## Helper templates for `trySend`.
  354. trySend(c, isolate(src))
  355. func tryRecv*[T](c: Channel[T], dst: var T): bool {.inline.} =
  356. ## Receives item from the channel(non blocking).
  357. recvMpmc(c.d, dst.addr, sizeof(T), true)
  358. func send*[T](c: Channel[T], src: sink Isolated[T]) {.inline.} =
  359. ## Sends item to the channel(blocking).
  360. var data = src.extract
  361. discard sendMpmc(c.d, data.addr, sizeof(T), false)
  362. wasMoved(data)
  363. template send*[T](c: Channel[T]; src: T) =
  364. ## Helper templates for `send`.
  365. send(c, isolate(src))
  366. func recv*[T](c: Channel[T]): T {.inline.} =
  367. ## Receives item from the channel(blocking).
  368. discard recvMpmc(c.d, result.addr, sizeof(result), false)
  369. func open*[T](c: Channel[T]): bool {.inline.} =
  370. result = c.d.channelOpenMpmc()
  371. func close*[T](c: Channel[T]): bool {.inline.} =
  372. result = c.d.channelCloseMpmc()
  373. func peek*[T](c: Channel[T]): int {.inline.} = peek(c.d)
  374. proc newChannel*[T](elements = 30): Channel[T] =
  375. ## Returns a new `Channel`. `elements` should be positive.
  376. ## `elements` is used to specify whether a channel is buffered or not.
  377. ## If `elements` = 1, the channel is unbuffered. If `elements` > 1, the
  378. ## channel is buffered.
  379. assert elements >= 1, "Elements must be positive!"
  380. result = Channel[T](d: allocChannel(sizeof(T), elements))