threads.nim 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2012 Andreas Rumpf
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Thread support for Nim.
  10. ##
  11. ## **Note**: This is part of the system module. Do not import it directly.
  12. ## To activate thread support you need to compile
  13. ## with the `--threads:on`:option: command line switch.
  14. ##
  15. ## Nim's memory model for threads is quite different from other common
  16. ## programming languages (C, Pascal): Each thread has its own
  17. ## (garbage collected) heap and sharing of memory is restricted. This helps
  18. ## to prevent race conditions and improves efficiency. See `the manual for
  19. ## details of this memory model <manual.html#threads>`_.
  20. ##
  21. ## Examples
  22. ## ========
  23. ##
  24. ## .. code-block:: Nim
  25. ##
  26. ## import std/locks
  27. ##
  28. ## var
  29. ## thr: array[0..4, Thread[tuple[a,b: int]]]
  30. ## L: Lock
  31. ##
  32. ## proc threadFunc(interval: tuple[a,b: int]) {.thread.} =
  33. ## for i in interval.a..interval.b:
  34. ## acquire(L) # lock stdout
  35. ## echo i
  36. ## release(L)
  37. ##
  38. ## initLock(L)
  39. ##
  40. ## for i in 0..high(thr):
  41. ## createThread(thr[i], threadFunc, (i*10, i*10+5))
  42. ## joinThreads(thr)
  43. ##
  44. ## deinitLock(L)
  45. when not declared(ThisIsSystem):
  46. {.error: "You must not import this module explicitly".}
  47. const
  48. hasAllocStack = defined(zephyr) # maybe freertos too?
  49. when hasAllocStack or defined(zephyr) or defined(freertos):
  50. const
  51. nimThreadStackSize {.intdefine.} = 8192
  52. nimThreadStackGuard {.intdefine.} = 128
  53. StackGuardSize = nimThreadStackGuard
  54. ThreadStackSize = nimThreadStackSize - nimThreadStackGuard
  55. else:
  56. const
  57. StackGuardSize = 4096
  58. ThreadStackMask =
  59. when defined(genode):
  60. 1024*64*sizeof(int)-1
  61. else:
  62. 1024*256*sizeof(int)-1
  63. ThreadStackSize = ThreadStackMask+1 - StackGuardSize
  64. #const globalsSlot = ThreadVarSlot(0)
  65. #sysAssert checkSlot.int == globalsSlot.int
  66. # Zephyr doesn't include this properly without some help
  67. when defined(zephyr):
  68. {.emit: """/*INCLUDESECTION*/
  69. #include <pthread.h>
  70. """.}
  71. # create for the main thread. Note: do not insert this data into the list
  72. # of all threads; it's not to be stopped etc.
  73. when not defined(useNimRtl):
  74. #when not defined(createNimRtl): initStackBottom()
  75. when declared(initGC):
  76. initGC()
  77. when not emulatedThreadVars:
  78. type ThreadType {.pure.} = enum
  79. None = 0,
  80. NimThread = 1,
  81. ForeignThread = 2
  82. var
  83. threadType {.rtlThreadVar.}: ThreadType
  84. threadType = ThreadType.NimThread
  85. # We jump through some hops here to ensure that Nim thread procs can have
  86. # the Nim calling convention. This is needed because thread procs are
  87. # ``stdcall`` on Windows and ``noconv`` on UNIX. Alternative would be to just
  88. # use ``stdcall`` since it is mapped to ``noconv`` on UNIX anyway.
  89. type
  90. Thread*[TArg] = object
  91. core: PGcThread
  92. sys: SysThread
  93. when TArg is void:
  94. dataFn: proc () {.nimcall, gcsafe.}
  95. else:
  96. dataFn: proc (m: TArg) {.nimcall, gcsafe.}
  97. data: TArg
  98. when hasAllocStack:
  99. rawStack: pointer
  100. proc `=copy`*[TArg](x: var Thread[TArg], y: Thread[TArg]) {.error.}
  101. var
  102. threadDestructionHandlers {.rtlThreadVar.}: seq[proc () {.closure, gcsafe, raises: [].}]
  103. proc onThreadDestruction*(handler: proc () {.closure, gcsafe, raises: [].}) =
  104. ## Registers a *thread local* handler that is called at the thread's
  105. ## destruction.
  106. ##
  107. ## A thread is destructed when the `.thread` proc returns
  108. ## normally or when it raises an exception. Note that unhandled exceptions
  109. ## in a thread nevertheless cause the whole process to die.
  110. threadDestructionHandlers.add handler
  111. template afterThreadRuns() =
  112. for i in countdown(threadDestructionHandlers.len-1, 0):
  113. threadDestructionHandlers[i]()
  114. when not defined(boehmgc) and not hasSharedHeap and not defined(gogc) and not defined(gcRegions):
  115. proc deallocOsPages() {.rtl, raises: [].}
  116. proc threadTrouble() {.raises: [], gcsafe.}
  117. ## defined in system/excpt.nim
  118. when defined(boehmgc):
  119. type GCStackBaseProc = proc(sb: pointer, t: pointer) {.noconv.}
  120. proc boehmGC_call_with_stack_base(sbp: GCStackBaseProc, p: pointer)
  121. {.importc: "GC_call_with_stack_base", boehmGC.}
  122. proc boehmGC_register_my_thread(sb: pointer)
  123. {.importc: "GC_register_my_thread", boehmGC.}
  124. proc boehmGC_unregister_my_thread()
  125. {.importc: "GC_unregister_my_thread", boehmGC.}
  126. proc threadProcWrapDispatch[TArg](sb: pointer, thrd: pointer) {.noconv, raises: [].} =
  127. boehmGC_register_my_thread(sb)
  128. try:
  129. let thrd = cast[ptr Thread[TArg]](thrd)
  130. when TArg is void:
  131. thrd.dataFn()
  132. else:
  133. thrd.dataFn(thrd.data)
  134. except:
  135. threadTrouble()
  136. finally:
  137. afterThreadRuns()
  138. boehmGC_unregister_my_thread()
  139. else:
  140. proc threadProcWrapDispatch[TArg](thrd: ptr Thread[TArg]) {.raises: [].} =
  141. try:
  142. when TArg is void:
  143. thrd.dataFn()
  144. else:
  145. when defined(nimV2):
  146. thrd.dataFn(thrd.data)
  147. else:
  148. var x: TArg
  149. deepCopy(x, thrd.data)
  150. thrd.dataFn(x)
  151. except:
  152. threadTrouble()
  153. finally:
  154. afterThreadRuns()
  155. when hasAllocStack:
  156. deallocShared(thrd.rawStack)
  157. proc threadProcWrapStackFrame[TArg](thrd: ptr Thread[TArg]) {.raises: [].} =
  158. when defined(boehmgc):
  159. boehmGC_call_with_stack_base(threadProcWrapDispatch[TArg], thrd)
  160. elif not defined(nogc) and not defined(gogc) and not defined(gcRegions) and not usesDestructors:
  161. var p {.volatile.}: pointer
  162. # init the GC for refc/markandsweep
  163. nimGC_setStackBottom(addr(p))
  164. initGC()
  165. when declared(threadType):
  166. threadType = ThreadType.NimThread
  167. threadProcWrapDispatch[TArg](thrd)
  168. when declared(deallocOsPages): deallocOsPages()
  169. else:
  170. threadProcWrapDispatch(thrd)
  171. template threadProcWrapperBody(closure: untyped): untyped =
  172. var thrd = cast[ptr Thread[TArg]](closure)
  173. var core = thrd.core
  174. when declared(globalsSlot): threadVarSetValue(globalsSlot, thrd.core)
  175. threadProcWrapStackFrame(thrd)
  176. # Since an unhandled exception terminates the whole process (!), there is
  177. # no need for a ``try finally`` here, nor would it be correct: The current
  178. # exception is tried to be re-raised by the code-gen after the ``finally``!
  179. # However this is doomed to fail, because we already unmapped every heap
  180. # page!
  181. # mark as not running anymore:
  182. thrd.core = nil
  183. thrd.dataFn = nil
  184. deallocShared(cast[pointer](core))
  185. {.push stack_trace:off.}
  186. when defined(windows):
  187. proc threadProcWrapper[TArg](closure: pointer): int32 {.stdcall.} =
  188. threadProcWrapperBody(closure)
  189. # implicitly return 0
  190. elif defined(genode):
  191. proc threadProcWrapper[TArg](closure: pointer) {.noconv.} =
  192. threadProcWrapperBody(closure)
  193. else:
  194. proc threadProcWrapper[TArg](closure: pointer): pointer {.noconv.} =
  195. threadProcWrapperBody(closure)
  196. {.pop.}
  197. proc running*[TArg](t: Thread[TArg]): bool {.inline.} =
  198. ## Returns true if `t` is running.
  199. result = t.dataFn != nil
  200. proc handle*[TArg](t: Thread[TArg]): SysThread {.inline.} =
  201. ## Returns the thread handle of `t`.
  202. result = t.sys
  203. when hostOS == "windows":
  204. const MAXIMUM_WAIT_OBJECTS = 64
  205. proc joinThread*[TArg](t: Thread[TArg]) {.inline.} =
  206. ## Waits for the thread `t` to finish.
  207. discard waitForSingleObject(t.sys, -1'i32)
  208. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  209. ## Waits for every thread in `t` to finish.
  210. var a: array[MAXIMUM_WAIT_OBJECTS, SysThread]
  211. var k = 0
  212. while k < len(t):
  213. var count = min(len(t) - k, MAXIMUM_WAIT_OBJECTS)
  214. for i in 0..(count - 1): a[i] = t[i + k].sys
  215. discard waitForMultipleObjects(int32(count),
  216. cast[ptr SysThread](addr(a)), 1, -1)
  217. inc(k, MAXIMUM_WAIT_OBJECTS)
  218. elif defined(genode):
  219. proc joinThread*[TArg](t: Thread[TArg]) {.importcpp.}
  220. ## Waits for the thread `t` to finish.
  221. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  222. ## Waits for every thread in `t` to finish.
  223. for i in 0..t.high: joinThread(t[i])
  224. else:
  225. proc joinThread*[TArg](t: Thread[TArg]) {.inline.} =
  226. ## Waits for the thread `t` to finish.
  227. discard pthread_join(t.sys, nil)
  228. proc joinThreads*[TArg](t: varargs[Thread[TArg]]) =
  229. ## Waits for every thread in `t` to finish.
  230. for i in 0..t.high: joinThread(t[i])
  231. when false:
  232. # XXX a thread should really release its heap here somehow:
  233. proc destroyThread*[TArg](t: var Thread[TArg]) =
  234. ## Forces the thread `t` to terminate. This is potentially dangerous if
  235. ## you don't have full control over `t` and its acquired resources.
  236. when hostOS == "windows":
  237. discard TerminateThread(t.sys, 1'i32)
  238. else:
  239. discard pthread_cancel(t.sys)
  240. when declared(registerThread): unregisterThread(addr(t))
  241. t.dataFn = nil
  242. ## if thread `t` already exited, `t.core` will be `null`.
  243. if not isNil(t.core):
  244. deallocShared(t.core)
  245. t.core = nil
  246. when hostOS == "windows":
  247. proc createThread*[TArg](t: var Thread[TArg],
  248. tp: proc (arg: TArg) {.thread, nimcall.},
  249. param: TArg) =
  250. ## Creates a new thread `t` and starts its execution.
  251. ##
  252. ## Entry point is the proc `tp`.
  253. ## `param` is passed to `tp`. `TArg` can be `void` if you
  254. ## don't need to pass any data to the thread.
  255. t.core = cast[PGcThread](allocShared0(sizeof(GcThread)))
  256. when TArg isnot void: t.data = param
  257. t.dataFn = tp
  258. when hasSharedHeap: t.core.stackSize = ThreadStackSize
  259. var dummyThreadId: int32
  260. t.sys = createThread(nil, ThreadStackSize, threadProcWrapper[TArg],
  261. addr(t), 0'i32, dummyThreadId)
  262. if t.sys <= 0:
  263. raise newException(ResourceExhaustedError, "cannot create thread")
  264. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  265. ## Pins a thread to a `CPU`:idx:.
  266. ##
  267. ## In other words sets a thread's `affinity`:idx:.
  268. ## If you don't know what this means, you shouldn't use this proc.
  269. setThreadAffinityMask(t.sys, uint(1 shl cpu))
  270. elif defined(genode):
  271. var affinityOffset: cuint = 1
  272. ## CPU affinity offset for next thread, safe to roll-over.
  273. proc createThread*[TArg](t: var Thread[TArg],
  274. tp: proc (arg: TArg) {.thread, nimcall.},
  275. param: TArg) =
  276. t.core = cast[PGcThread](allocShared0(sizeof(GcThread)))
  277. when TArg isnot void: t.data = param
  278. t.dataFn = tp
  279. when hasSharedHeap: t.stackSize = ThreadStackSize
  280. t.sys.initThread(
  281. runtimeEnv,
  282. ThreadStackSize.culonglong,
  283. threadProcWrapper[TArg], addr(t), affinityOffset)
  284. inc affinityOffset
  285. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  286. {.hint: "cannot change Genode thread CPU affinity after initialization".}
  287. discard
  288. else:
  289. proc createThread*[TArg](t: var Thread[TArg],
  290. tp: proc (arg: TArg) {.thread, nimcall.},
  291. param: TArg) =
  292. ## Creates a new thread `t` and starts its execution.
  293. ##
  294. ## Entry point is the proc `tp`. `param` is passed to `tp`.
  295. ## `TArg` can be `void` if you
  296. ## don't need to pass any data to the thread.
  297. t.core = cast[PGcThread](allocShared0(sizeof(GcThread)))
  298. when TArg isnot void: t.data = param
  299. t.dataFn = tp
  300. when hasSharedHeap: t.core.stackSize = ThreadStackSize
  301. var a {.noinit.}: Pthread_attr
  302. doAssert pthread_attr_init(a) == 0
  303. when hasAllocStack:
  304. var
  305. rawstk = allocShared0(ThreadStackSize + StackGuardSize)
  306. stk = cast[pointer](cast[uint](rawstk) + StackGuardSize)
  307. let setstacksizeResult = pthread_attr_setstack(addr a, stk, ThreadStackSize)
  308. t.rawStack = rawstk
  309. else:
  310. let setstacksizeResult = pthread_attr_setstacksize(a, ThreadStackSize)
  311. when not defined(ios):
  312. # This fails on iOS
  313. doAssert(setstacksizeResult == 0)
  314. if pthread_create(t.sys, a, threadProcWrapper[TArg], addr(t)) != 0:
  315. raise newException(ResourceExhaustedError, "cannot create thread")
  316. doAssert pthread_attr_destroy(a) == 0
  317. proc pinToCpu*[Arg](t: var Thread[Arg]; cpu: Natural) =
  318. ## Pins a thread to a `CPU`:idx:.
  319. ##
  320. ## In other words sets a thread's `affinity`:idx:.
  321. ## If you don't know what this means, you shouldn't use this proc.
  322. when not defined(macosx):
  323. var s {.noinit.}: CpuSet
  324. cpusetZero(s)
  325. cpusetIncl(cpu.cint, s)
  326. setAffinity(t.sys, csize_t(sizeof(s)), s)
  327. proc createThread*(t: var Thread[void], tp: proc () {.thread, nimcall.}) =
  328. createThread[void](t, tp)
  329. # we need to cache current threadId to not perform syscall all the time
  330. var threadId {.threadvar.}: int
  331. when defined(windows):
  332. proc getThreadId*(): int =
  333. ## Gets the ID of the currently running thread.
  334. if threadId == 0:
  335. threadId = int(getCurrentThreadId())
  336. result = threadId
  337. elif defined(linux):
  338. proc syscall(arg: clong): clong {.varargs, importc: "syscall", header: "<unistd.h>".}
  339. when defined(amd64):
  340. const NR_gettid = clong(186)
  341. else:
  342. var NR_gettid {.importc: "__NR_gettid", header: "<sys/syscall.h>".}: clong
  343. proc getThreadId*(): int =
  344. ## Gets the ID of the currently running thread.
  345. if threadId == 0:
  346. threadId = int(syscall(NR_gettid))
  347. result = threadId
  348. elif defined(dragonfly):
  349. proc lwp_gettid(): int32 {.importc, header: "unistd.h".}
  350. proc getThreadId*(): int =
  351. ## Gets the ID of the currently running thread.
  352. if threadId == 0:
  353. threadId = int(lwp_gettid())
  354. result = threadId
  355. elif defined(openbsd):
  356. proc getthrid(): int32 {.importc: "getthrid", header: "<unistd.h>".}
  357. proc getThreadId*(): int =
  358. ## get the ID of the currently running thread.
  359. if threadId == 0:
  360. threadId = int(getthrid())
  361. result = threadId
  362. elif defined(netbsd):
  363. proc lwp_self(): int32 {.importc: "_lwp_self", header: "<lwp.h>".}
  364. proc getThreadId*(): int =
  365. ## Gets the ID of the currently running thread.
  366. if threadId == 0:
  367. threadId = int(lwp_self())
  368. result = threadId
  369. elif defined(freebsd):
  370. proc syscall(arg: cint, arg0: ptr cint): cint {.varargs, importc: "syscall", header: "<unistd.h>".}
  371. var SYS_thr_self {.importc:"SYS_thr_self", header:"<sys/syscall.h>".}: cint
  372. proc getThreadId*(): int =
  373. ## Gets the ID of the currently running thread.
  374. var tid = 0.cint
  375. if threadId == 0:
  376. discard syscall(SYS_thr_self, addr tid)
  377. threadId = tid
  378. result = threadId
  379. elif defined(macosx):
  380. proc syscall(arg: cint): cint {.varargs, importc: "syscall", header: "<unistd.h>".}
  381. var SYS_thread_selfid {.importc:"SYS_thread_selfid", header:"<sys/syscall.h>".}: cint
  382. proc getThreadId*(): int =
  383. ## Gets the ID of the currently running thread.
  384. if threadId == 0:
  385. threadId = int(syscall(SYS_thread_selfid))
  386. result = threadId
  387. elif defined(solaris):
  388. type thread_t {.importc: "thread_t", header: "<thread.h>".} = distinct int
  389. proc thr_self(): thread_t {.importc, header: "<thread.h>".}
  390. proc getThreadId*(): int =
  391. ## Gets the ID of the currently running thread.
  392. if threadId == 0:
  393. threadId = int(thr_self())
  394. result = threadId
  395. elif defined(haiku):
  396. type thr_id {.importc: "thread_id", header: "<OS.h>".} = distinct int32
  397. proc find_thread(name: cstring): thr_id {.importc, header: "<OS.h>".}
  398. proc getThreadId*(): int =
  399. ## Gets the ID of the currently running thread.
  400. if threadId == 0:
  401. threadId = int(find_thread(nil))
  402. result = threadId