typedthreads.nim 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2012 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Thread support for Nim.
  10. ##
  11. ## Nim's memory model for threads is quite different from other common
  12. ## programming languages (C, Pascal): Each thread has its own
  13. ## (garbage collected) heap and sharing of memory is restricted. This helps
  14. ## to prevent race conditions and improves efficiency. See `the manual for
  15. ## details of this memory model <manual.html#threads>`_.
  16. ##
  17. ## Examples
  18. ## ========
  19. ##
  20. ## .. code-block:: Nim
  21. ##
  22. ## import std/locks
  23. ##
  24. ## var
  25. ## thr: array[0..4, Thread[tuple[a,b: int]]]
  26. ## L: Lock
  27. ##
  28. ## proc threadFunc(interval: tuple[a,b: int]) {.thread.} =
  29. ## for i in interval.a..interval.b:
  30. ## acquire(L) # lock stdout
  31. ## echo i
  32. ## release(L)
  33. ##
  34. ## initLock(L)
  35. ##
  36. ## for i in 0..high(thr):
  37. ## createThread(thr[i], threadFunc, (i*10, i*10+5))
  38. ## joinThreads(thr)
  39. ##
  40. ## deinitLock(L)
  41. import std/private/[threadtypes]
  42. export Thread
  43. import system/ansi_c
  44. when defined(nimPreviewSlimSystem):
  45. import std/assertions
  46. when defined(genode):
  47. import genode/env
  48. when hasAllocStack or defined(zephyr) or defined(freertos):
  49. const
  50. nimThreadStackSize {.intdefine.} = 8192
  51. nimThreadStackGuard {.intdefine.} = 128
  52. StackGuardSize = nimThreadStackGuard
  53. ThreadStackSize = nimThreadStackSize - nimThreadStackGuard
  54. else:
  55. const
  56. StackGuardSize = 4096
  57. ThreadStackMask =
  58. when defined(genode):
  59. 1024*64*sizeof(int)-1
  60. else:
  61. 1024*256*sizeof(int)-1
  62. ThreadStackSize = ThreadStackMask+1 - StackGuardSize
  63. when defined(gcDestructors):
  64. proc allocThreadStorage(size: int): pointer =
  65. result = c_malloc(csize_t size)
  66. zeroMem(result, size)
  67. else:
  68. template allocThreadStorage(size: untyped): untyped = allocShared0(size)
  69. #const globalsSlot = ThreadVarSlot(0)
  70. #sysAssert checkSlot.int == globalsSlot.int
  71. # Zephyr doesn't include this properly without some help
  72. when defined(zephyr):
  73. {.emit: """/*INCLUDESECTION*/
  74. #include <pthread.h>
  75. """.}
  76. # We jump through some hops here to ensure that Nim thread procs can have
  77. # the Nim calling convention. This is needed because thread procs are
  78. # ``stdcall`` on Windows and ``noconv`` on UNIX. Alternative would be to just
  79. # use ``stdcall`` since it is mapped to ``noconv`` on UNIX anyway.
  80. {.push stack_trace:off.}
  81. when defined(windows):
  82. proc threadProcWrapper[TArg](closure: pointer): int32 {.stdcall.} =
  83. nimThreadProcWrapperBody(closure)
  84. # implicitly return 0
  85. elif defined(genode):
  86. proc threadProcWrapper[TArg](closure: pointer) {.noconv.} =
  87. nimThreadProcWrapperBody(closure)
  88. else:
  89. proc threadProcWrapper[TArg](closure: pointer): pointer {.noconv.} =
  90. nimThreadProcWrapperBody(closure)
  91. {.pop.}
  92. proc running*[TArg](t: Thread[TArg]): bool {.inline.} =
  93. ## Returns true if `t` is running.
  94. result = t.dataFn != nil
  95. proc handle*[TArg](t: Thread[TArg]): SysThread {.inline.} =
  96. ## Returns the thread handle of `t`.
  97. result = t.sys
  98. when hostOS == "windows":
  99. const MAXIMUM_WAIT_OBJECTS = 64
  100. proc joinThread*[TArg](t: Thread[TArg]) {.inline.} =
  101. ## Waits for the thread `t` to finish.
  102. discard waitForSingleObject(t.sys, -1'i32)
  103. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  104. ## Waits for every thread in `t` to finish.
  105. var a: array[MAXIMUM_WAIT_OBJECTS, SysThread]
  106. var k = 0
  107. while k < len(t):
  108. var count = min(len(t) - k, MAXIMUM_WAIT_OBJECTS)
  109. for i in 0..(count - 1): a[i] = t[i + k].sys
  110. discard waitForMultipleObjects(int32(count),
  111. cast[ptr SysThread](addr(a)), 1, -1)
  112. inc(k, MAXIMUM_WAIT_OBJECTS)
  113. elif defined(genode):
  114. proc joinThread*[TArg](t: Thread[TArg]) {.importcpp.}
  115. ## Waits for the thread `t` to finish.
  116. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  117. ## Waits for every thread in `t` to finish.
  118. for i in 0..t.high: joinThread(t[i])
  119. else:
  120. proc joinThread*[TArg](t: Thread[TArg]) {.inline.} =
  121. ## Waits for the thread `t` to finish.
  122. discard pthread_join(t.sys, nil)
  123. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  124. ## Waits for every thread in `t` to finish.
  125. for i in 0..t.high: joinThread(t[i])
  126. when false:
  127. # XXX a thread should really release its heap here somehow:
  128. proc destroyThread*[TArg](t: var Thread[TArg]) =
  129. ## Forces the thread `t` to terminate. This is potentially dangerous if
  130. ## you don't have full control over `t` and its acquired resources.
  131. when hostOS == "windows":
  132. discard TerminateThread(t.sys, 1'i32)
  133. else:
  134. discard pthread_cancel(t.sys)
  135. when declared(registerThread): unregisterThread(addr(t))
  136. t.dataFn = nil
  137. ## if thread `t` already exited, `t.core` will be `null`.
  138. if not isNil(t.core):
  139. deallocThreadStorage(t.core)
  140. t.core = nil
  141. when hostOS == "windows":
  142. proc createThread*[TArg](t: var Thread[TArg],
  143. tp: proc (arg: TArg) {.thread, nimcall.},
  144. param: TArg) =
  145. ## Creates a new thread `t` and starts its execution.
  146. ##
  147. ## Entry point is the proc `tp`.
  148. ## `param` is passed to `tp`. `TArg` can be `void` if you
  149. ## don't need to pass any data to the thread.
  150. t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
  151. when TArg isnot void: t.data = param
  152. t.dataFn = tp
  153. when hasSharedHeap: t.core.stackSize = ThreadStackSize
  154. var dummyThreadId: int32
  155. t.sys = createThread(nil, ThreadStackSize, threadProcWrapper[TArg],
  156. addr(t), 0'i32, dummyThreadId)
  157. if t.sys <= 0:
  158. raise newException(ResourceExhaustedError, "cannot create thread")
  159. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  160. ## Pins a thread to a `CPU`:idx:.
  161. ##
  162. ## In other words sets a thread's `affinity`:idx:.
  163. ## If you don't know what this means, you shouldn't use this proc.
  164. setThreadAffinityMask(t.sys, uint(1 shl cpu))
  165. elif defined(genode):
  166. var affinityOffset: cuint = 1
  167. ## CPU affinity offset for next thread, safe to roll-over.
  168. proc createThread*[TArg](t: var Thread[TArg],
  169. tp: proc (arg: TArg) {.thread, nimcall.},
  170. param: TArg) =
  171. t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
  172. when TArg isnot void: t.data = param
  173. t.dataFn = tp
  174. when hasSharedHeap: t.stackSize = ThreadStackSize
  175. t.sys.initThread(
  176. runtimeEnv,
  177. ThreadStackSize.culonglong,
  178. threadProcWrapper[TArg], addr(t), affinityOffset)
  179. inc affinityOffset
  180. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  181. {.hint: "cannot change Genode thread CPU affinity after initialization".}
  182. discard
  183. else:
  184. proc createThread*[TArg](t: var Thread[TArg],
  185. tp: proc (arg: TArg) {.thread, nimcall.},
  186. param: TArg) =
  187. ## Creates a new thread `t` and starts its execution.
  188. ##
  189. ## Entry point is the proc `tp`. `param` is passed to `tp`.
  190. ## `TArg` can be `void` if you
  191. ## don't need to pass any data to the thread.
  192. t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
  193. when TArg isnot void: t.data = param
  194. t.dataFn = tp
  195. when hasSharedHeap: t.core.stackSize = ThreadStackSize
  196. var a {.noinit.}: Pthread_attr
  197. doAssert pthread_attr_init(a) == 0
  198. when hasAllocStack:
  199. var
  200. rawstk = allocThreadStorage(ThreadStackSize + StackGuardSize)
  201. stk = cast[pointer](cast[uint](rawstk) + StackGuardSize)
  202. let setstacksizeResult = pthread_attr_setstack(addr a, stk, ThreadStackSize)
  203. t.rawStack = rawstk
  204. else:
  205. let setstacksizeResult = pthread_attr_setstacksize(a, ThreadStackSize)
  206. when not defined(ios):
  207. # This fails on iOS
  208. doAssert(setstacksizeResult == 0)
  209. if pthread_create(t.sys, a, threadProcWrapper[TArg], addr(t)) != 0:
  210. raise newException(ResourceExhaustedError, "cannot create thread")
  211. doAssert pthread_attr_destroy(a) == 0
  212. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  213. ## Pins a thread to a `CPU`:idx:.
  214. ##
  215. ## In other words sets a thread's `affinity`:idx:.
  216. ## If you don't know what this means, you shouldn't use this proc.
  217. when not defined(macosx):
  218. var s {.noinit.}: CpuSet
  219. cpusetZero(s)
  220. cpusetIncl(cpu.cint, s)
  221. setAffinity(t.sys, csize_t(sizeof(s)), s)
  222. proc createThread*(t: var Thread[void], tp: proc () {.thread, nimcall.}) =
  223. createThread[void](t, tp)
  224. when not defined(gcOrc):
  225. include system/threadids