coro.nim 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Rokas Kupstys
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Nim coroutines implementation, supports several context switching methods:
  10. ## ======== ============
  11. ## ucontext available on unix and alike (default)
  12. ## setjmp available on unix and alike (x86/64 only)
  13. ## fibers available and required on windows.
  14. ## ======== ============
  15. ##
  16. ## -d:nimCoroutines Required to build this module.
  17. ## -d:nimCoroutinesUcontext Use ucontext backend.
  18. ## -d:nimCoroutinesSetjmp Use setjmp backend.
  19. ## -d:nimCoroutinesSetjmpBundled Use bundled setjmp implementation.
  20. ##
  21. ## Unstable API.
  22. import system/coro_detection
  23. when not nimCoroutines and not defined(nimdoc):
  24. when defined(noNimCoroutines):
  25. {.error: "Coroutines can not be used with -d:noNimCoroutines".}
  26. else:
  27. {.error: "Coroutines require -d:nimCoroutines".}
  28. import std/[os, lists]
  29. include system/timers
  30. when defined(nimPreviewSlimSystem):
  31. import std/assertions
  32. const defaultStackSize = 512 * 1024
  33. const useOrcArc = defined(gcArc) or defined(gcOrc) or defined(gcAtomicArc)
  34. when useOrcArc:
  35. proc nimGC_setStackBottom*(theStackBottom: pointer) = discard
  36. proc GC_addStack(bottom: pointer) {.cdecl, importc.}
  37. proc GC_removeStack(bottom: pointer) {.cdecl, importc.}
  38. proc GC_setActiveStack(bottom: pointer) {.cdecl, importc.}
  39. proc GC_getActiveStack() : pointer {.cdecl, importc.}
  40. const
  41. CORO_BACKEND_UCONTEXT = 0
  42. CORO_BACKEND_SETJMP = 1
  43. CORO_BACKEND_FIBERS = 2
  44. when defined(windows):
  45. const coroBackend = CORO_BACKEND_FIBERS
  46. when defined(nimCoroutinesUcontext):
  47. {.warning: "ucontext coroutine backend is not available on windows, defaulting to fibers.".}
  48. when defined(nimCoroutinesSetjmp):
  49. {.warning: "setjmp coroutine backend is not available on windows, defaulting to fibers.".}
  50. elif defined(haiku) or defined(openbsd):
  51. const coroBackend = CORO_BACKEND_SETJMP
  52. when defined(nimCoroutinesUcontext):
  53. {.warning: "ucontext coroutine backend is not available on haiku, defaulting to setjmp".}
  54. elif defined(nimCoroutinesSetjmp) or defined(nimCoroutinesSetjmpBundled):
  55. const coroBackend = CORO_BACKEND_SETJMP
  56. else:
  57. const coroBackend = CORO_BACKEND_UCONTEXT
  58. when coroBackend == CORO_BACKEND_FIBERS:
  59. import std/winlean
  60. type
  61. Context = pointer
  62. elif coroBackend == CORO_BACKEND_UCONTEXT:
  63. type
  64. stack_t {.importc, header: "<ucontext.h>".} = object
  65. ss_sp: pointer
  66. ss_flags: int
  67. ss_size: int
  68. ucontext_t {.importc, header: "<ucontext.h>".} = object
  69. uc_link: ptr ucontext_t
  70. uc_stack: stack_t
  71. Context = ucontext_t
  72. proc getcontext(context: var ucontext_t): int32 {.importc,
  73. header: "<ucontext.h>".}
  74. proc setcontext(context: var ucontext_t): int32 {.importc,
  75. header: "<ucontext.h>".}
  76. proc swapcontext(fromCtx, toCtx: var ucontext_t): int32 {.importc,
  77. header: "<ucontext.h>".}
  78. proc makecontext(context: var ucontext_t, fn: pointer, argc: int32) {.importc,
  79. header: "<ucontext.h>", varargs.}
  80. elif coroBackend == CORO_BACKEND_SETJMP:
  81. proc coroExecWithStack*(fn: pointer, stack: pointer) {.noreturn,
  82. importc: "narch_$1", fastcall.}
  83. when defined(amd64):
  84. {.compile: "../arch/x86/amd64.S".}
  85. elif defined(i386):
  86. {.compile: "../arch/x86/i386.S".}
  87. else:
  88. # coroExecWithStack is defined in assembly. To support other platforms
  89. # please provide implementation of this procedure.
  90. {.error: "Unsupported architecture.".}
  91. when defined(nimCoroutinesSetjmpBundled):
  92. # Use setjmp/longjmp implementation shipped with compiler.
  93. when defined(amd64):
  94. type
  95. JmpBuf = array[0x50 + 0x10, uint8]
  96. elif defined(i386):
  97. type
  98. JmpBuf = array[0x1C, uint8]
  99. else:
  100. # Bundled setjmp/longjmp are defined in assembly. To support other
  101. # platforms please provide implementations of these procedures.
  102. {.error: "Unsupported architecture.".}
  103. proc setjmp(ctx: var JmpBuf): int {.importc: "narch_$1".}
  104. proc longjmp(ctx: JmpBuf, ret = 1) {.importc: "narch_$1".}
  105. else:
  106. # Use setjmp/longjmp implementation provided by the system.
  107. type
  108. JmpBuf {.importc: "jmp_buf", header: "<setjmp.h>".} = object
  109. proc setjmp(ctx: var JmpBuf): int {.importc, header: "<setjmp.h>".}
  110. proc longjmp(ctx: JmpBuf, ret = 1) {.importc, header: "<setjmp.h>".}
  111. type
  112. Context = JmpBuf
  113. when defined(unix):
  114. # GLibc fails with "*** longjmp causes uninitialized stack frame ***" because
  115. # our custom stacks are not initialized to a magic value.
  116. when defined(osx):
  117. # workaround: error: The deprecated ucontext routines require _XOPEN_SOURCE to be defined
  118. const extra = " -D_XOPEN_SOURCE"
  119. else:
  120. const extra = ""
  121. {.passc: "-U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=0" & extra.}
  122. const
  123. CORO_CREATED = 0
  124. CORO_EXECUTING = 1
  125. CORO_FINISHED = 2
  126. type
  127. Stack {.pure.} = object
  128. top: pointer # Top of the stack. Pointer used for deallocating stack if we own it.
  129. bottom: pointer # Very bottom of the stack, acts as unique stack identifier.
  130. size: int
  131. Coroutine {.pure.} = object
  132. execContext: Context
  133. fn: proc()
  134. state: int
  135. lastRun: Ticks
  136. sleepTime: float
  137. stack: Stack
  138. reference: CoroutineRef
  139. CoroutinePtr = ptr Coroutine
  140. CoroutineRef* = ref object
  141. ## CoroutineRef holds a pointer to actual coroutine object. Public API always returns
  142. ## CoroutineRef instead of CoroutinePtr in order to allow holding a reference to coroutine
  143. ## object while it can be safely deallocated by coroutine scheduler loop. In this case
  144. ## Coroutine.reference.coro is set to nil. Public API checks for it being nil and
  145. ## gracefully fails if it is nil.
  146. coro: CoroutinePtr
  147. CoroutineLoopContext = ref object
  148. coroutines: DoublyLinkedList[CoroutinePtr]
  149. current: DoublyLinkedNode[CoroutinePtr]
  150. loop: Coroutine
  151. ncbottom: pointer # non coroutine stack botttom
  152. var ctx {.threadvar.}: CoroutineLoopContext
  153. proc getCurrent(): CoroutinePtr =
  154. ## Returns current executing coroutine object.
  155. var node = ctx.current
  156. if node != nil:
  157. return node.value
  158. return nil
  159. proc initialize() =
  160. ## Initializes coroutine state of current thread.
  161. if ctx == nil:
  162. ctx = CoroutineLoopContext()
  163. ctx.coroutines = initDoublyLinkedList[CoroutinePtr]()
  164. ctx.loop = Coroutine()
  165. ctx.loop.state = CORO_EXECUTING
  166. when not useOrcArc:
  167. ctx.ncbottom = GC_getActiveStack()
  168. when coroBackend == CORO_BACKEND_FIBERS:
  169. ctx.loop.execContext = ConvertThreadToFiberEx(nil, FIBER_FLAG_FLOAT_SWITCH)
  170. proc runCurrentTask()
  171. proc switchTo(current, to: CoroutinePtr) =
  172. ## Switches execution from `current` into `to` context.
  173. to.lastRun = getTicks()
  174. # Update position of current stack so gc invoked from another stack knows how much to scan.
  175. when not useOrcArc:
  176. GC_setActiveStack(current.stack.bottom)
  177. nimGC_setStackBottom(current.stack.bottom)
  178. var frame = getFrameState()
  179. block:
  180. # Execution will switch to another fiber now. We do not need to update current stack
  181. when coroBackend == CORO_BACKEND_FIBERS:
  182. SwitchToFiber(to.execContext)
  183. elif coroBackend == CORO_BACKEND_UCONTEXT:
  184. discard swapcontext(current.execContext, to.execContext)
  185. elif coroBackend == CORO_BACKEND_SETJMP:
  186. var res = setjmp(current.execContext)
  187. if res == 0:
  188. if to.state == CORO_EXECUTING:
  189. # Coroutine is resumed.
  190. longjmp(to.execContext, 1)
  191. elif to.state == CORO_CREATED:
  192. # Coroutine is started.
  193. coroExecWithStack(runCurrentTask, to.stack.bottom)
  194. #raiseAssert "unreachable"
  195. else:
  196. {.error: "Invalid coroutine backend set.".}
  197. # Execution was just resumed. Restore frame information and set active stack.
  198. setFrameState(frame)
  199. when not useOrcArc:
  200. GC_setActiveStack(current.stack.bottom)
  201. nimGC_setStackBottom(ctx.ncbottom)
  202. proc suspend*(sleepTime: float = 0) =
  203. ## Stops coroutine execution and resumes no sooner than after `sleeptime` seconds.
  204. ## Until then other coroutines are executed.
  205. var current = getCurrent()
  206. current.sleepTime = sleepTime
  207. nimGC_setStackBottom(ctx.ncbottom)
  208. switchTo(current, addr(ctx.loop))
  209. proc runCurrentTask() =
  210. ## Starts execution of current coroutine and updates it's state through coroutine's life.
  211. var sp {.volatile.}: pointer
  212. sp = addr(sp)
  213. block:
  214. var current = getCurrent()
  215. current.stack.bottom = sp
  216. nimGC_setStackBottom(current.stack.bottom)
  217. # Execution of new fiber just started. Since it was entered not through `switchTo` we
  218. # have to set active stack here as well. GC_removeStack() has to be called in main loop
  219. # because we still need stack available in final suspend(0) call from which we will not
  220. # return.
  221. when not useOrcArc:
  222. GC_addStack(sp)
  223. # Activate current stack because we are executing in a new coroutine.
  224. GC_setActiveStack(sp)
  225. current.state = CORO_EXECUTING
  226. try:
  227. current.fn() # Start coroutine execution
  228. except:
  229. echo "Unhandled exception in coroutine."
  230. writeStackTrace()
  231. current.state = CORO_FINISHED
  232. nimGC_setStackBottom(ctx.ncbottom)
  233. suspend(0) # Exit coroutine without returning from coroExecWithStack()
  234. raiseAssert "unreachable"
  235. proc start*(c: proc(), stacksize: int = defaultStackSize): CoroutineRef {.discardable.} =
  236. ## Schedule coroutine for execution. It does not run immediately.
  237. if ctx == nil:
  238. initialize()
  239. var coro: CoroutinePtr
  240. when coroBackend == CORO_BACKEND_FIBERS:
  241. coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine)))
  242. coro.execContext = CreateFiberEx(stacksize, stacksize,
  243. FIBER_FLAG_FLOAT_SWITCH,
  244. (proc(p: pointer) {.stdcall.} = runCurrentTask()), nil)
  245. else:
  246. coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine) + stacksize))
  247. coro.stack.top = cast[pointer](cast[int](coro) + sizeof(Coroutine))
  248. coro.stack.bottom = cast[pointer](cast[int](coro.stack.top) + stacksize)
  249. when coroBackend == CORO_BACKEND_UCONTEXT:
  250. discard getcontext(coro.execContext)
  251. coro.execContext.uc_stack.ss_sp = coro.stack.top
  252. coro.execContext.uc_stack.ss_size = stacksize
  253. coro.execContext.uc_link = addr(ctx.loop.execContext)
  254. makecontext(coro.execContext, runCurrentTask, 0)
  255. coro.fn = c
  256. coro.stack.size = stacksize
  257. coro.state = CORO_CREATED
  258. coro.reference = CoroutineRef(coro: coro)
  259. ctx.coroutines.append(coro)
  260. return coro.reference
  261. proc run*() =
  262. ## Starts main coroutine scheduler loop which exits when all coroutines exit.
  263. ## Calling this proc starts execution of first coroutine.
  264. initialize()
  265. ctx.current = ctx.coroutines.head
  266. var minDelay: float = 0
  267. while ctx.current != nil:
  268. var current = getCurrent()
  269. var remaining = current.sleepTime - (float(getTicks() - current.lastRun) / 1_000_000_000)
  270. if remaining <= 0:
  271. # Save main loop context. Suspending coroutine will resume after this statement with
  272. switchTo(addr(ctx.loop), current)
  273. else:
  274. if minDelay > 0 and remaining > 0:
  275. minDelay = min(remaining, minDelay)
  276. else:
  277. minDelay = remaining
  278. if current.state == CORO_FINISHED:
  279. var next = ctx.current.prev
  280. if next == nil:
  281. # If first coroutine ends then `prev` is nil even if more coroutines
  282. # are to be scheduled.
  283. next = ctx.current.next
  284. current.reference.coro = nil
  285. ctx.coroutines.remove(ctx.current)
  286. when not useOrcArc:
  287. GC_removeStack(current.stack.bottom)
  288. when coroBackend == CORO_BACKEND_FIBERS:
  289. DeleteFiber(current.execContext)
  290. else:
  291. dealloc(current.stack.top)
  292. dealloc(current)
  293. ctx.current = next
  294. elif ctx.current == nil or ctx.current.next == nil:
  295. ctx.current = ctx.coroutines.head
  296. os.sleep(int(minDelay * 1000))
  297. else:
  298. ctx.current = ctx.current.next
  299. proc alive*(c: CoroutineRef): bool = c.coro != nil and c.coro.state != CORO_FINISHED
  300. ## Returns `true` if coroutine has not returned, `false` otherwise.
  301. proc wait*(c: CoroutineRef, interval = 0.01) =
  302. ## Returns only after coroutine `c` has returned. `interval` is time in seconds how often.
  303. while alive(c):
  304. suspend(interval)