tweave.nim 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. discard """
  2. outputsub: '''Success'''
  3. cmd: '''nim c --gc:arc --threads:on $file'''
  4. disabled: "bsd"
  5. """
  6. # bug #13936
  7. import std/atomics
  8. const MemBlockSize = 256
  9. type
  10. ChannelSPSCSingle* = object
  11. full{.align: 128.}: Atomic[bool]
  12. itemSize*: uint8
  13. buffer*{.align: 8.}: UncheckedArray[byte]
  14. proc `=`(
  15. dest: var ChannelSPSCSingle,
  16. source: ChannelSPSCSingle
  17. ) {.error: "A channel cannot be copied".}
  18. proc initialize*(chan: var ChannelSPSCSingle, itemsize: SomeInteger) {.inline.} =
  19. ## If ChannelSPSCSingle is used intrusive another data structure
  20. ## be aware that it should be the last part due to ending by UncheckedArray
  21. ## Also due to 128 bytes padding, it automatically takes half
  22. ## of the default MemBlockSize
  23. assert itemsize.int in 0 .. int high(uint8)
  24. assert itemSize.int +
  25. sizeof(chan.itemsize) +
  26. sizeof(chan.full) < MemBlockSize
  27. chan.itemSize = uint8 itemsize
  28. chan.full.store(false, moRelaxed)
  29. func isEmpty*(chan: var ChannelSPSCSingle): bool {.inline.} =
  30. not chan.full.load(moAcquire)
  31. func tryRecv*[T](chan: var ChannelSPSCSingle, dst: var T): bool {.inline.} =
  32. ## Try receiving the item buffered in the channel
  33. ## Returns true if successful (channel was not empty)
  34. ##
  35. ## ⚠ Use only in the consumer thread that reads from the channel.
  36. assert (sizeof(T) == chan.itemsize.int) or
  37. # Support dummy object
  38. (sizeof(T) == 0 and chan.itemsize == 1)
  39. let full = chan.full.load(moAcquire)
  40. if not full:
  41. return false
  42. dst = cast[ptr T](chan.buffer.addr)[]
  43. chan.full.store(false, moRelease)
  44. return true
  45. func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} =
  46. ## Try sending an item into the channel
  47. ## Reurns true if successful (channel was empty)
  48. ##
  49. ## ⚠ Use only in the producer thread that writes from the channel.
  50. assert (sizeof(T) == chan.itemsize.int) or
  51. # Support dummy object
  52. (sizeof(T) == 0 and chan.itemsize == 1)
  53. let full = chan.full.load(moAcquire)
  54. if full:
  55. return false
  56. cast[ptr T](chan.buffer.addr)[] = src
  57. chan.full.store(true, moRelease)
  58. return true
  59. # Sanity checks
  60. # ------------------------------------------------------------------------------
  61. when isMainModule:
  62. when not compileOption("threads"):
  63. {.error: "This requires --threads:on compilation flag".}
  64. template sendLoop[T](chan: var ChannelSPSCSingle,
  65. data: sink T,
  66. body: untyped): untyped =
  67. while not chan.trySend(data):
  68. body
  69. template recvLoop[T](chan: var ChannelSPSCSingle,
  70. data: var T,
  71. body: untyped): untyped =
  72. while not chan.tryRecv(data):
  73. body
  74. type
  75. ThreadArgs = object
  76. ID: WorkerKind
  77. chan: ptr ChannelSPSCSingle
  78. WorkerKind = enum
  79. Sender
  80. Receiver
  81. template Worker(id: WorkerKind, body: untyped): untyped {.dirty.} =
  82. if args.ID == id:
  83. body
  84. proc thread_func(args: ThreadArgs) =
  85. # Worker RECEIVER:
  86. # ---------
  87. # <- chan
  88. # <- chan
  89. # <- chan
  90. #
  91. # Worker SENDER:
  92. # ---------
  93. # chan <- 42
  94. # chan <- 53
  95. # chan <- 64
  96. Worker(Receiver):
  97. var val: int
  98. for j in 0 ..< 10:
  99. args.chan[].recvLoop(val):
  100. # Busy loop, in prod we might want to yield the core/thread timeslice
  101. discard
  102. echo " Receiver got: ", val
  103. doAssert val == 42 + j*11
  104. Worker(Sender):
  105. doAssert args.chan.full.load(moRelaxed) == false
  106. for j in 0 ..< 10:
  107. let val = 42 + j*11
  108. args.chan[].sendLoop(val):
  109. # Busy loop, in prod we might want to yield the core/thread timeslice
  110. discard
  111. echo "Sender sent: ", val
  112. proc main() =
  113. echo "Testing if 2 threads can send data"
  114. echo "-----------------------------------"
  115. var threads: array[2, Thread[ThreadArgs]]
  116. var chan = cast[ptr ChannelSPSCSingle](allocShared(MemBlockSize))
  117. chan[].initialize(itemSize = sizeof(int))
  118. createThread(threads[0], thread_func, ThreadArgs(ID: Receiver, chan: chan))
  119. createThread(threads[1], thread_func, ThreadArgs(ID: Sender, chan: chan))
  120. joinThread(threads[0])
  121. joinThread(threads[1])
  122. freeShared(chan)
  123. echo "-----------------------------------"
  124. echo "Success"
  125. main()