coro.nim 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Rokas Kupstys
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Nim coroutines implementation supports several context switching methods:
  10. ## ucontext: available on unix and alike (default)
  11. ## setjmp: available on unix and alike (x86/64 only)
  12. ## Fibers: available and required on windows.
  13. ##
  14. ## -d:nimCoroutines Required to build this module.
  15. ## -d:nimCoroutinesUcontext Use ucontext backend.
  16. ## -d:nimCoroutinesSetjmp Use setjmp backend.
  17. ## -d:nimCoroutinesSetjmpBundled Use bundled setjmp implementation.
  18. when not nimCoroutines and not defined(nimdoc):
  19. when defined(noNimCoroutines):
  20. {.error: "Coroutines can not be used with -d:noNimCoroutines"}
  21. else:
  22. {.error: "Coroutines require -d:nimCoroutines".}
  23. import os
  24. import lists
  25. include system/timers
  26. const defaultStackSize = 512 * 1024
  27. proc GC_addStack(bottom: pointer) {.cdecl, importc.}
  28. proc GC_removeStack(bottom: pointer) {.cdecl, importc.}
  29. proc GC_setActiveStack(bottom: pointer) {.cdecl, importc.}
  30. const
  31. CORO_BACKEND_UCONTEXT = 0
  32. CORO_BACKEND_SETJMP = 1
  33. CORO_BACKEND_FIBERS = 2
  34. when defined(windows):
  35. const coroBackend = CORO_BACKEND_FIBERS
  36. when defined(nimCoroutinesUcontext):
  37. {.warning: "ucontext coroutine backend is not available on windows, defaulting to fibers.".}
  38. when defined(nimCoroutinesSetjmp):
  39. {.warning: "setjmp coroutine backend is not available on windows, defaulting to fibers.".}
  40. elif defined(haiku):
  41. const coroBackend = CORO_BACKEND_SETJMP
  42. when defined(nimCoroutinesUcontext):
  43. {.warning: "ucontext coroutine backend is not available on haiku, defaulting to setjmp".}
  44. elif defined(nimCoroutinesSetjmp) or defined(nimCoroutinesSetjmpBundled):
  45. const coroBackend = CORO_BACKEND_SETJMP
  46. else:
  47. const coroBackend = CORO_BACKEND_UCONTEXT
  48. when coroBackend == CORO_BACKEND_FIBERS:
  49. import windows.winlean
  50. type
  51. Context = pointer
  52. elif coroBackend == CORO_BACKEND_UCONTEXT:
  53. type
  54. stack_t {.importc, header: "<ucontext.h>".} = object
  55. ss_sp: pointer
  56. ss_flags: int
  57. ss_size: int
  58. ucontext_t {.importc, header: "<ucontext.h>".} = object
  59. uc_link: ptr ucontext_t
  60. uc_stack: stack_t
  61. Context = ucontext_t
  62. proc getcontext(context: var ucontext_t): int32 {.importc, header: "<ucontext.h>".}
  63. proc setcontext(context: var ucontext_t): int32 {.importc, header: "<ucontext.h>".}
  64. proc swapcontext(fromCtx, toCtx: var ucontext_t): int32 {.importc, header: "<ucontext.h>".}
  65. proc makecontext(context: var ucontext_t, fn: pointer, argc: int32) {.importc, header: "<ucontext.h>", varargs.}
  66. elif coroBackend == CORO_BACKEND_SETJMP:
  67. proc coroExecWithStack*(fn: pointer, stack: pointer) {.noreturn, importc: "narch_$1", fastcall.}
  68. when defined(amd64):
  69. {.compile: "../arch/x86/amd64.S".}
  70. elif defined(i386):
  71. {.compile: "../arch/x86/i386.S".}
  72. else:
  73. # coroExecWithStack is defined in assembly. To support other platforms
  74. # please provide implementation of this procedure.
  75. {.error: "Unsupported architecture.".}
  76. when defined(nimCoroutinesSetjmpBundled):
  77. # Use setjmp/longjmp implementation shipped with compiler.
  78. when defined(amd64):
  79. type
  80. JmpBuf = array[0x50 + 0x10, uint8]
  81. elif defined(i386):
  82. type
  83. JmpBuf = array[0x1C, uint8]
  84. else:
  85. # Bundled setjmp/longjmp are defined in assembly. To support other
  86. # platforms please provide implementations of these procedures.
  87. {.error: "Unsupported architecture.".}
  88. proc setjmp(ctx: var JmpBuf): int {.importc: "narch_$1".}
  89. proc longjmp(ctx: JmpBuf, ret=1) {.importc: "narch_$1".}
  90. else:
  91. # Use setjmp/longjmp implementation provided by the system.
  92. type
  93. JmpBuf {.importc: "jmp_buf", header: "<setjmp.h>".} = object
  94. proc setjmp(ctx: var JmpBuf): int {.importc, header: "<setjmp.h>".}
  95. proc longjmp(ctx: JmpBuf, ret=1) {.importc, header: "<setjmp.h>".}
  96. type
  97. Context = JmpBuf
  98. when defined(unix):
  99. # GLibc fails with "*** longjmp causes uninitialized stack frame ***" because
  100. # our custom stacks are not initialized to a magic value.
  101. when defined(osx):
  102. # workaround: error: The deprecated ucontext routines require _XOPEN_SOURCE to be defined
  103. const extra = " -D_XOPEN_SOURCE"
  104. else:
  105. const extra = ""
  106. {.passC: "-U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=0" & extra.}
  107. const
  108. CORO_CREATED = 0
  109. CORO_EXECUTING = 1
  110. CORO_FINISHED = 2
  111. type
  112. Stack {.pure.} = object
  113. top: pointer # Top of the stack. Pointer used for deallocating stack if we own it.
  114. bottom: pointer # Very bottom of the stack, acts as unique stack identifier.
  115. size: int
  116. Coroutine {.pure.} = object
  117. execContext: Context
  118. fn: proc()
  119. state: int
  120. lastRun: Ticks
  121. sleepTime: float
  122. stack: Stack
  123. reference: CoroutineRef
  124. CoroutinePtr = ptr Coroutine
  125. CoroutineRef* = ref object
  126. ## CoroutineRef holds a pointer to actual coroutine object. Public API always returns
  127. ## CoroutineRef instead of CoroutinePtr in order to allow holding a reference to coroutine
  128. ## object while it can be safely deallocated by coroutine scheduler loop. In this case
  129. ## Coroutine.reference.coro is set to nil. Public API checks for for it being nil and
  130. ## gracefully fails if it is nil.
  131. coro: CoroutinePtr
  132. CoroutineLoopContext = ref object
  133. coroutines: DoublyLinkedList[CoroutinePtr]
  134. current: DoublyLinkedNode[CoroutinePtr]
  135. loop: Coroutine
  136. var ctx {.threadvar.}: CoroutineLoopContext
  137. proc getCurrent(): CoroutinePtr =
  138. ## Returns current executing coroutine object.
  139. var node = ctx.current
  140. if node != nil:
  141. return node.value
  142. return nil
  143. proc initialize() =
  144. ## Initializes coroutine state of current thread.
  145. if ctx == nil:
  146. ctx = CoroutineLoopContext()
  147. ctx.coroutines = initDoublyLinkedList[CoroutinePtr]()
  148. ctx.loop = Coroutine()
  149. ctx.loop.state = CORO_EXECUTING
  150. when coroBackend == CORO_BACKEND_FIBERS:
  151. ctx.loop.execContext = ConvertThreadToFiberEx(nil, FIBER_FLAG_FLOAT_SWITCH)
  152. proc runCurrentTask()
  153. proc switchTo(current, to: CoroutinePtr) =
  154. ## Switches execution from `current` into `to` context.
  155. to.lastRun = getTicks()
  156. # Update position of current stack so gc invoked from another stack knows how much to scan.
  157. GC_setActiveStack(current.stack.bottom)
  158. var frame = getFrameState()
  159. block:
  160. # Execution will switch to another fiber now. We do not need to update current stack
  161. when coroBackend == CORO_BACKEND_FIBERS:
  162. SwitchToFiber(to.execContext)
  163. elif coroBackend == CORO_BACKEND_UCONTEXT:
  164. discard swapcontext(current.execContext, to.execContext)
  165. elif coroBackend == CORO_BACKEND_SETJMP:
  166. var res = setjmp(current.execContext)
  167. if res == 0:
  168. if to.state == CORO_EXECUTING:
  169. # Coroutine is resumed.
  170. longjmp(to.execContext, 1)
  171. elif to.state == CORO_CREATED:
  172. # Coroutine is started.
  173. coroExecWithStack(runCurrentTask, to.stack.bottom)
  174. #doAssert false
  175. else:
  176. {.error: "Invalid coroutine backend set.".}
  177. # Execution was just resumed. Restore frame information and set active stack.
  178. setFrameState(frame)
  179. GC_setActiveStack(current.stack.bottom)
  180. proc suspend*(sleepTime: float=0) =
  181. ## Stops coroutine execution and resumes no sooner than after ``sleeptime`` seconds.
  182. ## Until then other coroutines are executed.
  183. var current = getCurrent()
  184. current.sleepTime = sleepTime
  185. switchTo(current, addr(ctx.loop))
  186. proc runCurrentTask() =
  187. ## Starts execution of current coroutine and updates it's state through coroutine's life.
  188. var sp {.volatile.}: pointer
  189. sp = addr(sp)
  190. block:
  191. var current = getCurrent()
  192. current.stack.bottom = sp
  193. # Execution of new fiber just started. Since it was entered not through `switchTo` we
  194. # have to set active stack here as well. GC_removeStack() has to be called in main loop
  195. # because we still need stack available in final suspend(0) call from which we will not
  196. # return.
  197. GC_addStack(sp)
  198. # Activate current stack because we are executing in a new coroutine.
  199. GC_setActiveStack(sp)
  200. current.state = CORO_EXECUTING
  201. try:
  202. current.fn() # Start coroutine execution
  203. except:
  204. echo "Unhandled exception in coroutine."
  205. writeStackTrace()
  206. current.state = CORO_FINISHED
  207. suspend(0) # Exit coroutine without returning from coroExecWithStack()
  208. doAssert false
  209. proc start*(c: proc(), stacksize: int=defaultStackSize): CoroutineRef {.discardable.} =
  210. ## Schedule coroutine for execution. It does not run immediately.
  211. if ctx == nil:
  212. initialize()
  213. var coro: CoroutinePtr
  214. when coroBackend == CORO_BACKEND_FIBERS:
  215. coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine)))
  216. coro.execContext = CreateFiberEx(stacksize, stacksize,
  217. FIBER_FLAG_FLOAT_SWITCH, (proc(p: pointer): void {.stdcall.} = runCurrentTask()), nil)
  218. coro.stack.size = stacksize
  219. else:
  220. coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine) + stacksize))
  221. coro.stack.top = cast[pointer](cast[ByteAddress](coro) + sizeof(Coroutine))
  222. coro.stack.bottom = cast[pointer](cast[ByteAddress](coro.stack.top) + stacksize)
  223. when coroBackend == CORO_BACKEND_UCONTEXT:
  224. discard getcontext(coro.execContext)
  225. coro.execContext.uc_stack.ss_sp = coro.stack.top
  226. coro.execContext.uc_stack.ss_size = stacksize
  227. coro.execContext.uc_link = addr(ctx.loop.execContext)
  228. makecontext(coro.execContext, runCurrentTask, 0)
  229. coro.fn = c
  230. coro.stack.size = stacksize
  231. coro.state = CORO_CREATED
  232. coro.reference = CoroutineRef(coro: coro)
  233. ctx.coroutines.append(coro)
  234. return coro.reference
  235. proc run*() =
  236. initialize()
  237. ## Starts main coroutine scheduler loop which exits when all coroutines exit.
  238. ## Calling this proc starts execution of first coroutine.
  239. ctx.current = ctx.coroutines.head
  240. var minDelay: float = 0
  241. while ctx.current != nil:
  242. var current = getCurrent()
  243. var remaining = current.sleepTime - (float(getTicks() - current.lastRun) / 1_000_000_000)
  244. if remaining <= 0:
  245. # Save main loop context. Suspending coroutine will resume after this statement with
  246. switchTo(addr(ctx.loop), current)
  247. else:
  248. if minDelay > 0 and remaining > 0:
  249. minDelay = min(remaining, minDelay)
  250. else:
  251. minDelay = remaining
  252. if current.state == CORO_FINISHED:
  253. var next = ctx.current.prev
  254. if next == nil:
  255. # If first coroutine ends then `prev` is nil even if more coroutines
  256. # are to be scheduled.
  257. next = ctx.current.next
  258. current.reference.coro = nil
  259. ctx.coroutines.remove(ctx.current)
  260. GC_removeStack(current.stack.bottom)
  261. when coroBackend == CORO_BACKEND_FIBERS:
  262. DeleteFiber(current.execContext)
  263. else:
  264. dealloc(current.stack.top)
  265. dealloc(current)
  266. ctx.current = next
  267. elif ctx.current == nil or ctx.current.next == nil:
  268. ctx.current = ctx.coroutines.head
  269. os.sleep(int(minDelay * 1000))
  270. else:
  271. ctx.current = ctx.current.next
  272. proc alive*(c: CoroutineRef): bool = c.coro != nil and c.coro.state != CORO_FINISHED
  273. ## Returns ``true`` if coroutine has not returned, ``false`` otherwise.
  274. proc wait*(c: CoroutineRef, interval=0.01) =
  275. ## Returns only after coroutine ``c`` has returned. ``interval`` is time in seconds how often.
  276. while alive(c):
  277. suspend(interval)