threads.nim 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2012 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Thread support for Nim.
  10. ##
  11. ## **Note**: This is part of the system module. Do not import it directly.
  12. ## To activate thread support you need to compile
  13. ## with the `--threads:on`:option: command line switch.
  14. ##
  15. ## Nim's memory model for threads is quite different from other common
  16. ## programming languages (C, Pascal): Each thread has its own
  17. ## (garbage collected) heap and sharing of memory is restricted. This helps
  18. ## to prevent race conditions and improves efficiency. See `the manual for
  19. ## details of this memory model <manual.html#threads>`_.
  20. ##
  21. ## Examples
  22. ## ========
  23. ##
  24. ## .. code-block:: Nim
  25. ##
  26. ## import std/locks
  27. ##
  28. ## var
  29. ## thr: array[0..4, Thread[tuple[a,b: int]]]
  30. ## L: Lock
  31. ##
  32. ## proc threadFunc(interval: tuple[a,b: int]) {.thread.} =
  33. ## for i in interval.a..interval.b:
  34. ## acquire(L) # lock stdout
  35. ## echo i
  36. ## release(L)
  37. ##
  38. ## initLock(L)
  39. ##
  40. ## for i in 0..high(thr):
  41. ## createThread(thr[i], threadFunc, (i*10, i*10+5))
  42. ## joinThreads(thr)
  43. ##
  44. ## deinitLock(L)
  45. when not declared(ThisIsSystem):
  46. {.error: "You must not import this module explicitly".}
  47. when defined(nimPreviewSlimSystem):
  48. import std/assertions
  49. const
  50. hasAllocStack = defined(zephyr) # maybe freertos too?
  51. when defined(gcDestructors):
  52. proc allocThreadStorage(size: int): pointer =
  53. result = c_malloc(csize_t size)
  54. zeroMem(result, size)
  55. proc deallocThreadStorage(p: pointer) = c_free(p)
  56. else:
  57. template allocThreadStorage(size: untyped): untyped = allocShared0(size)
  58. template deallocThreadStorage(p: pointer) = deallocShared(p)
  59. when hasAllocStack or defined(zephyr) or defined(freertos):
  60. const
  61. nimThreadStackSize {.intdefine.} = 8192
  62. nimThreadStackGuard {.intdefine.} = 128
  63. StackGuardSize = nimThreadStackGuard
  64. ThreadStackSize = nimThreadStackSize - nimThreadStackGuard
  65. else:
  66. const
  67. StackGuardSize = 4096
  68. ThreadStackMask =
  69. when defined(genode):
  70. 1024*64*sizeof(int)-1
  71. else:
  72. 1024*256*sizeof(int)-1
  73. ThreadStackSize = ThreadStackMask+1 - StackGuardSize
  74. #const globalsSlot = ThreadVarSlot(0)
  75. #sysAssert checkSlot.int == globalsSlot.int
  76. # Zephyr doesn't include this properly without some help
  77. when defined(zephyr):
  78. {.emit: """/*INCLUDESECTION*/
  79. #include <pthread.h>
  80. """.}
  81. # create for the main thread. Note: do not insert this data into the list
  82. # of all threads; it's not to be stopped etc.
  83. when not defined(useNimRtl):
  84. #when not defined(createNimRtl): initStackBottom()
  85. when declared(initGC):
  86. initGC()
  87. when not emulatedThreadVars:
  88. type ThreadType {.pure.} = enum
  89. None = 0,
  90. NimThread = 1,
  91. ForeignThread = 2
  92. var
  93. threadType {.rtlThreadVar.}: ThreadType
  94. threadType = ThreadType.NimThread
  95. # We jump through some hops here to ensure that Nim thread procs can have
  96. # the Nim calling convention. This is needed because thread procs are
  97. # ``stdcall`` on Windows and ``noconv`` on UNIX. Alternative would be to just
  98. # use ``stdcall`` since it is mapped to ``noconv`` on UNIX anyway.
  99. type
  100. Thread*[TArg] = object
  101. core: PGcThread
  102. sys: SysThread
  103. when TArg is void:
  104. dataFn: proc () {.nimcall, gcsafe.}
  105. else:
  106. dataFn: proc (m: TArg) {.nimcall, gcsafe.}
  107. data: TArg
  108. when hasAllocStack:
  109. rawStack: pointer
  110. proc `=copy`*[TArg](x: var Thread[TArg], y: Thread[TArg]) {.error.}
  111. var
  112. threadDestructionHandlers {.rtlThreadVar.}: seq[proc () {.closure, gcsafe, raises: [].}]
  113. proc onThreadDestruction*(handler: proc () {.closure, gcsafe, raises: [].}) =
  114. ## Registers a *thread local* handler that is called at the thread's
  115. ## destruction.
  116. ##
  117. ## A thread is destructed when the `.thread` proc returns
  118. ## normally or when it raises an exception. Note that unhandled exceptions
  119. ## in a thread nevertheless cause the whole process to die.
  120. threadDestructionHandlers.add handler
  121. template afterThreadRuns() =
  122. for i in countdown(threadDestructionHandlers.len-1, 0):
  123. threadDestructionHandlers[i]()
  124. when not defined(boehmgc) and not hasSharedHeap and not defined(gogc) and not defined(gcRegions):
  125. proc deallocOsPages() {.rtl, raises: [].}
  126. proc threadTrouble() {.raises: [], gcsafe.}
  127. ## defined in system/excpt.nim
  128. when defined(boehmgc):
  129. type GCStackBaseProc = proc(sb: pointer, t: pointer) {.noconv.}
  130. proc boehmGC_call_with_stack_base(sbp: GCStackBaseProc, p: pointer)
  131. {.importc: "GC_call_with_stack_base", boehmGC.}
  132. proc boehmGC_register_my_thread(sb: pointer)
  133. {.importc: "GC_register_my_thread", boehmGC.}
  134. proc boehmGC_unregister_my_thread()
  135. {.importc: "GC_unregister_my_thread", boehmGC.}
  136. proc threadProcWrapDispatch[TArg](sb: pointer, thrd: pointer) {.noconv, raises: [].} =
  137. boehmGC_register_my_thread(sb)
  138. try:
  139. let thrd = cast[ptr Thread[TArg]](thrd)
  140. when TArg is void:
  141. thrd.dataFn()
  142. else:
  143. thrd.dataFn(thrd.data)
  144. except:
  145. threadTrouble()
  146. finally:
  147. afterThreadRuns()
  148. boehmGC_unregister_my_thread()
  149. else:
  150. proc threadProcWrapDispatch[TArg](thrd: ptr Thread[TArg]) {.raises: [].} =
  151. try:
  152. when TArg is void:
  153. thrd.dataFn()
  154. else:
  155. when defined(nimV2):
  156. thrd.dataFn(thrd.data)
  157. else:
  158. var x: TArg
  159. deepCopy(x, thrd.data)
  160. thrd.dataFn(x)
  161. except:
  162. threadTrouble()
  163. finally:
  164. afterThreadRuns()
  165. when hasAllocStack:
  166. deallocThreadStorage(thrd.rawStack)
  167. proc threadProcWrapStackFrame[TArg](thrd: ptr Thread[TArg]) {.raises: [].} =
  168. when defined(boehmgc):
  169. boehmGC_call_with_stack_base(threadProcWrapDispatch[TArg], thrd)
  170. elif not defined(nogc) and not defined(gogc) and not defined(gcRegions) and not usesDestructors:
  171. var p {.volatile.}: pointer
  172. # init the GC for refc/markandsweep
  173. nimGC_setStackBottom(addr(p))
  174. initGC()
  175. when declared(threadType):
  176. threadType = ThreadType.NimThread
  177. threadProcWrapDispatch[TArg](thrd)
  178. when declared(deallocOsPages): deallocOsPages()
  179. else:
  180. threadProcWrapDispatch(thrd)
  181. template threadProcWrapperBody(closure: untyped): untyped =
  182. var thrd = cast[ptr Thread[TArg]](closure)
  183. var core = thrd.core
  184. when declared(globalsSlot): threadVarSetValue(globalsSlot, thrd.core)
  185. threadProcWrapStackFrame(thrd)
  186. # Since an unhandled exception terminates the whole process (!), there is
  187. # no need for a ``try finally`` here, nor would it be correct: The current
  188. # exception is tried to be re-raised by the code-gen after the ``finally``!
  189. # However this is doomed to fail, because we already unmapped every heap
  190. # page!
  191. # mark as not running anymore:
  192. thrd.core = nil
  193. thrd.dataFn = nil
  194. deallocThreadStorage(cast[pointer](core))
  195. {.push stack_trace:off.}
  196. when defined(windows):
  197. proc threadProcWrapper[TArg](closure: pointer): int32 {.stdcall.} =
  198. threadProcWrapperBody(closure)
  199. # implicitly return 0
  200. elif defined(genode):
  201. proc threadProcWrapper[TArg](closure: pointer) {.noconv.} =
  202. threadProcWrapperBody(closure)
  203. else:
  204. proc threadProcWrapper[TArg](closure: pointer): pointer {.noconv.} =
  205. threadProcWrapperBody(closure)
  206. {.pop.}
  207. proc running*[TArg](t: Thread[TArg]): bool {.inline.} =
  208. ## Returns true if `t` is running.
  209. result = t.dataFn != nil
  210. proc handle*[TArg](t: Thread[TArg]): SysThread {.inline.} =
  211. ## Returns the thread handle of `t`.
  212. result = t.sys
  213. when hostOS == "windows":
  214. const MAXIMUM_WAIT_OBJECTS = 64
  215. proc joinThread*[TArg](t: Thread[TArg]) {.inline.} =
  216. ## Waits for the thread `t` to finish.
  217. discard waitForSingleObject(t.sys, -1'i32)
  218. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  219. ## Waits for every thread in `t` to finish.
  220. var a: array[MAXIMUM_WAIT_OBJECTS, SysThread]
  221. var k = 0
  222. while k < len(t):
  223. var count = min(len(t) - k, MAXIMUM_WAIT_OBJECTS)
  224. for i in 0..(count - 1): a[i] = t[i + k].sys
  225. discard waitForMultipleObjects(int32(count),
  226. cast[ptr SysThread](addr(a)), 1, -1)
  227. inc(k, MAXIMUM_WAIT_OBJECTS)
  228. elif defined(genode):
  229. proc joinThread*[TArg](t: Thread[TArg]) {.importcpp.}
  230. ## Waits for the thread `t` to finish.
  231. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  232. ## Waits for every thread in `t` to finish.
  233. for i in 0..t.high: joinThread(t[i])
  234. else:
  235. proc joinThread*[TArg](t: Thread[TArg]) {.inline.} =
  236. ## Waits for the thread `t` to finish.
  237. discard pthread_join(t.sys, nil)
  238. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  239. ## Waits for every thread in `t` to finish.
  240. for i in 0..t.high: joinThread(t[i])
  241. when false:
  242. # XXX a thread should really release its heap here somehow:
  243. proc destroyThread*[TArg](t: var Thread[TArg]) =
  244. ## Forces the thread `t` to terminate. This is potentially dangerous if
  245. ## you don't have full control over `t` and its acquired resources.
  246. when hostOS == "windows":
  247. discard TerminateThread(t.sys, 1'i32)
  248. else:
  249. discard pthread_cancel(t.sys)
  250. when declared(registerThread): unregisterThread(addr(t))
  251. t.dataFn = nil
  252. ## if thread `t` already exited, `t.core` will be `null`.
  253. if not isNil(t.core):
  254. deallocThreadStorage(t.core)
  255. t.core = nil
  256. when hostOS == "windows":
  257. proc createThread*[TArg](t: var Thread[TArg],
  258. tp: proc (arg: TArg) {.thread, nimcall.},
  259. param: TArg) =
  260. ## Creates a new thread `t` and starts its execution.
  261. ##
  262. ## Entry point is the proc `tp`.
  263. ## `param` is passed to `tp`. `TArg` can be `void` if you
  264. ## don't need to pass any data to the thread.
  265. t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
  266. when TArg isnot void: t.data = param
  267. t.dataFn = tp
  268. when hasSharedHeap: t.core.stackSize = ThreadStackSize
  269. var dummyThreadId: int32
  270. t.sys = createThread(nil, ThreadStackSize, threadProcWrapper[TArg],
  271. addr(t), 0'i32, dummyThreadId)
  272. if t.sys <= 0:
  273. raise newException(ResourceExhaustedError, "cannot create thread")
  274. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  275. ## Pins a thread to a `CPU`:idx:.
  276. ##
  277. ## In other words sets a thread's `affinity`:idx:.
  278. ## If you don't know what this means, you shouldn't use this proc.
  279. setThreadAffinityMask(t.sys, uint(1 shl cpu))
  280. elif defined(genode):
  281. var affinityOffset: cuint = 1
  282. ## CPU affinity offset for next thread, safe to roll-over.
  283. proc createThread*[TArg](t: var Thread[TArg],
  284. tp: proc (arg: TArg) {.thread, nimcall.},
  285. param: TArg) =
  286. t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
  287. when TArg isnot void: t.data = param
  288. t.dataFn = tp
  289. when hasSharedHeap: t.stackSize = ThreadStackSize
  290. t.sys.initThread(
  291. runtimeEnv,
  292. ThreadStackSize.culonglong,
  293. threadProcWrapper[TArg], addr(t), affinityOffset)
  294. inc affinityOffset
  295. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  296. {.hint: "cannot change Genode thread CPU affinity after initialization".}
  297. discard
  298. else:
  299. proc createThread*[TArg](t: var Thread[TArg],
  300. tp: proc (arg: TArg) {.thread, nimcall.},
  301. param: TArg) =
  302. ## Creates a new thread `t` and starts its execution.
  303. ##
  304. ## Entry point is the proc `tp`. `param` is passed to `tp`.
  305. ## `TArg` can be `void` if you
  306. ## don't need to pass any data to the thread.
  307. t.core = cast[PGcThread](allocThreadStorage(sizeof(GcThread)))
  308. when TArg isnot void: t.data = param
  309. t.dataFn = tp
  310. when hasSharedHeap: t.core.stackSize = ThreadStackSize
  311. var a {.noinit.}: Pthread_attr
  312. doAssert pthread_attr_init(a) == 0
  313. when hasAllocStack:
  314. var
  315. rawstk = allocThreadStorage(ThreadStackSize + StackGuardSize)
  316. stk = cast[pointer](cast[uint](rawstk) + StackGuardSize)
  317. let setstacksizeResult = pthread_attr_setstack(addr a, stk, ThreadStackSize)
  318. t.rawStack = rawstk
  319. else:
  320. let setstacksizeResult = pthread_attr_setstacksize(a, ThreadStackSize)
  321. when not defined(ios):
  322. # This fails on iOS
  323. doAssert(setstacksizeResult == 0)
  324. if pthread_create(t.sys, a, threadProcWrapper[TArg], addr(t)) != 0:
  325. raise newException(ResourceExhaustedError, "cannot create thread")
  326. doAssert pthread_attr_destroy(a) == 0
  327. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  328. ## Pins a thread to a `CPU`:idx:.
  329. ##
  330. ## In other words sets a thread's `affinity`:idx:.
  331. ## If you don't know what this means, you shouldn't use this proc.
  332. when not defined(macosx):
  333. var s {.noinit.}: CpuSet
  334. cpusetZero(s)
  335. cpusetIncl(cpu.cint, s)
  336. setAffinity(t.sys, csize_t(sizeof(s)), s)
  337. proc createThread*(t: var Thread[void], tp: proc () {.thread, nimcall.}) =
  338. createThread[void](t, tp)
  339. when not defined(gcOrc):
  340. include threadids