asyncfutures.nim 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. import os, tables, strutils, times, heapqueue, options, deques, cstrutils
  2. # TODO: This shouldn't need to be included, but should ideally be exported.
  3. type
  4. CallbackFunc = proc () {.closure, gcsafe.}
  5. CallbackList = object
  6. function: CallbackFunc
  7. next: ref CallbackList
  8. FutureBase* = ref object of RootObj ## Untyped future.
  9. callbacks: CallbackList
  10. finished: bool
  11. error*: ref Exception ## Stored exception
  12. errorStackTrace*: string
  13. when not defined(release):
  14. stackTrace: string ## For debugging purposes only.
  15. id: int
  16. fromProc: string
  17. Future*[T] = ref object of FutureBase ## Typed future.
  18. value: T ## Stored value
  19. FutureVar*[T] = distinct Future[T]
  20. FutureError* = object of Exception
  21. cause*: FutureBase
  22. when not defined(release):
  23. var currentID = 0
  24. var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}
  25. proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
  26. ## Get current implementation of ``callSoon``.
  27. return callSoonProc
  28. proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
  29. ## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized.
  30. callSoonProc = p
  31. proc callSoon*(cbproc: proc ()) =
  32. ## Call ``cbproc`` "soon".
  33. ##
  34. ## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick.
  35. ##
  36. ## If async dispatcher is not running, ``cbproc`` will be executed immediately.
  37. if callSoonProc.isNil:
  38. # Loop not initialized yet. Call the function directly to allow setup code to use futures.
  39. cbproc()
  40. else:
  41. callSoonProc(cbproc)
  42. template setupFutureBase(fromProc: string) =
  43. new(result)
  44. result.finished = false
  45. when not defined(release):
  46. result.stackTrace = getStackTrace()
  47. result.id = currentID
  48. result.fromProc = fromProc
  49. currentID.inc()
  50. proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
  51. ## Creates a new future.
  52. ##
  53. ## Specifying ``fromProc``, which is a string specifying the name of the proc
  54. ## that this future belongs to, is a good habit as it helps with debugging.
  55. setupFutureBase(fromProc)
  56. proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
  57. ## Create a new ``FutureVar``. This Future type is ideally suited for
  58. ## situations where you want to avoid unnecessary allocations of Futures.
  59. ##
  60. ## Specifying ``fromProc``, which is a string specifying the name of the proc
  61. ## that this future belongs to, is a good habit as it helps with debugging.
  62. result = FutureVar[T](newFuture[T](fromProc))
  63. proc clean*[T](future: FutureVar[T]) =
  64. ## Resets the ``finished`` status of ``future``.
  65. Future[T](future).finished = false
  66. Future[T](future).error = nil
  67. proc checkFinished[T](future: Future[T]) =
  68. ## Checks whether `future` is finished. If it is then raises a
  69. ## ``FutureError``.
  70. when not defined(release):
  71. if future.finished:
  72. var msg = ""
  73. msg.add("An attempt was made to complete a Future more than once. ")
  74. msg.add("Details:")
  75. msg.add("\n Future ID: " & $future.id)
  76. msg.add("\n Created in proc: " & future.fromProc)
  77. msg.add("\n Stack trace to moment of creation:")
  78. msg.add("\n" & indent(future.stackTrace.strip(), 4))
  79. when T is string:
  80. msg.add("\n Contents (string): ")
  81. msg.add("\n" & indent(future.value.repr, 4))
  82. msg.add("\n Stack trace to moment of secondary completion:")
  83. msg.add("\n" & indent(getStackTrace().strip(), 4))
  84. var err = newException(FutureError, msg)
  85. err.cause = future
  86. raise err
  87. proc call(callbacks: var CallbackList) =
  88. var current = callbacks
  89. while true:
  90. if not current.function.isNil:
  91. callSoon(current.function)
  92. if current.next.isNil:
  93. break
  94. else:
  95. current = current.next[]
  96. # callback will be called only once, let GC collect them now
  97. callbacks.next = nil
  98. callbacks.function = nil
  99. proc add(callbacks: var CallbackList, function: CallbackFunc) =
  100. if callbacks.function.isNil:
  101. callbacks.function = function
  102. assert callbacks.next == nil
  103. else:
  104. let newNext = new(ref CallbackList)
  105. newNext.function = callbacks.function
  106. newNext.next = callbacks.next
  107. callbacks.next = newNext
  108. callbacks.function = function
  109. proc complete*[T](future: Future[T], val: T) =
  110. ## Completes ``future`` with value ``val``.
  111. #assert(not future.finished, "Future already finished, cannot finish twice.")
  112. checkFinished(future)
  113. assert(future.error == nil)
  114. future.value = val
  115. future.finished = true
  116. future.callbacks.call()
  117. proc complete*(future: Future[void]) =
  118. ## Completes a void ``future``.
  119. #assert(not future.finished, "Future already finished, cannot finish twice.")
  120. checkFinished(future)
  121. assert(future.error == nil)
  122. future.finished = true
  123. future.callbacks.call()
  124. proc complete*[T](future: FutureVar[T]) =
  125. ## Completes a ``FutureVar``.
  126. template fut: untyped = Future[T](future)
  127. checkFinished(fut)
  128. assert(fut.error == nil)
  129. fut.finished = true
  130. fut.callbacks.call()
  131. proc complete*[T](future: FutureVar[T], val: T) =
  132. ## Completes a ``FutureVar`` with value ``val``.
  133. ##
  134. ## Any previously stored value will be overwritten.
  135. template fut: untyped = Future[T](future)
  136. checkFinished(fut)
  137. assert(fut.error.isNil())
  138. fut.finished = true
  139. fut.value = val
  140. fut.callbacks.call()
  141. proc fail*[T](future: Future[T], error: ref Exception) =
  142. ## Completes ``future`` with ``error``.
  143. #assert(not future.finished, "Future already finished, cannot finish twice.")
  144. checkFinished(future)
  145. future.finished = true
  146. future.error = error
  147. future.errorStackTrace =
  148. if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
  149. future.callbacks.call()
  150. proc clearCallbacks*(future: FutureBase) =
  151. future.callbacks.function = nil
  152. future.callbacks.next = nil
  153. proc addCallback*(future: FutureBase, cb: proc() {.closure,gcsafe.}) =
  154. ## Adds the callbacks proc to be called when the future completes.
  155. ##
  156. ## If future has already completed then ``cb`` will be called immediately.
  157. assert cb != nil
  158. if future.finished:
  159. callSoon(cb)
  160. else:
  161. future.callbacks.add cb
  162. proc addCallback*[T](future: Future[T],
  163. cb: proc (future: Future[T]) {.closure,gcsafe.}) =
  164. ## Adds the callbacks proc to be called when the future completes.
  165. ##
  166. ## If future has already completed then ``cb`` will be called immediately.
  167. future.addCallback(
  168. proc() =
  169. cb(future)
  170. )
  171. proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
  172. ## Clears the list of callbacks and sets the callback proc to be called when the future completes.
  173. ##
  174. ## If future has already completed then ``cb`` will be called immediately.
  175. ##
  176. ## It's recommended to use ``addCallback`` or ``then`` instead.
  177. future.clearCallbacks
  178. future.addCallback cb
  179. proc `callback=`*[T](future: Future[T],
  180. cb: proc (future: Future[T]) {.closure,gcsafe.}) =
  181. ## Sets the callback proc to be called when the future completes.
  182. ##
  183. ## If future has already completed then ``cb`` will be called immediately.
  184. future.callback = proc () = cb(future)
  185. proc getHint(entry: StackTraceEntry): string =
  186. ## We try to provide some hints about stack trace entries that the user
  187. ## may not be familiar with, in particular calls inside the stdlib.
  188. result = ""
  189. if entry.procname == cstring"processPendingCallbacks":
  190. if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0:
  191. return "Executes pending callbacks"
  192. elif entry.procname == cstring"poll":
  193. if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0:
  194. return "Processes asynchronous completion events"
  195. if entry.procname.endsWith("_continue"):
  196. if cmpIgnoreStyle(entry.filename, "asyncmacro.nim") == 0:
  197. return "Resumes an async procedure"
  198. proc `$`*(entries: seq[StackTraceEntry]): string =
  199. result = ""
  200. # Find longest filename & line number combo for alignment purposes.
  201. var longestLeft = 0
  202. for entry in entries:
  203. if entry.procName.isNil: continue
  204. let left = $entry.filename & $entry.line
  205. if left.len > longestLeft:
  206. longestLeft = left.len
  207. var indent = 2
  208. # Format the entries.
  209. for entry in entries:
  210. if entry.procName.isNil:
  211. if entry.line == -10:
  212. result.add(spaces(indent) & "#[\n")
  213. indent.inc(2)
  214. else:
  215. indent.dec(2)
  216. result.add(spaces(indent) & "]#\n")
  217. continue
  218. let left = "$#($#)" % [$entry.filename, $entry.line]
  219. result.add((spaces(indent) & "$#$# $#\n") % [
  220. left,
  221. spaces(longestLeft - left.len + 2),
  222. $entry.procName
  223. ])
  224. let hint = getHint(entry)
  225. if hint.len > 0:
  226. result.add(spaces(indent+2) & "## " & hint & "\n")
  227. proc injectStacktrace[T](future: Future[T]) =
  228. when not defined(release):
  229. const header = "\nAsync traceback:\n"
  230. var exceptionMsg = future.error.msg
  231. if header in exceptionMsg:
  232. # This is messy: extract the original exception message from the msg
  233. # containing the async traceback.
  234. let start = exceptionMsg.find(header)
  235. exceptionMsg = exceptionMsg[0..<start]
  236. var newMsg = exceptionMsg & header
  237. let entries = getStackTraceEntries(future.error)
  238. newMsg.add($entries)
  239. newMsg.add("Exception message: " & exceptionMsg & "\n")
  240. newMsg.add("Exception type:")
  241. # # For debugging purposes
  242. # for entry in getStackTraceEntries(future.error):
  243. # newMsg.add "\n" & $entry
  244. future.error.msg = newMsg
  245. proc read*[T](future: Future[T] | FutureVar[T]): T =
  246. ## Retrieves the value of ``future``. Future must be finished otherwise
  247. ## this function will fail with a ``ValueError`` exception.
  248. ##
  249. ## If the result of the future is an error then that error will be raised.
  250. {.push hint[ConvFromXtoItselfNotNeeded]: off.}
  251. let fut = Future[T](future)
  252. {.pop.}
  253. if fut.finished:
  254. if fut.error != nil:
  255. injectStacktrace(fut)
  256. raise fut.error
  257. when T isnot void:
  258. return fut.value
  259. else:
  260. # TODO: Make a custom exception type for this?
  261. raise newException(ValueError, "Future still in progress.")
  262. proc readError*[T](future: Future[T]): ref Exception =
  263. ## Retrieves the exception stored in ``future``.
  264. ##
  265. ## An ``ValueError`` exception will be thrown if no exception exists
  266. ## in the specified Future.
  267. if future.error != nil: return future.error
  268. else:
  269. raise newException(ValueError, "No error in future.")
  270. proc mget*[T](future: FutureVar[T]): var T =
  271. ## Returns a mutable value stored in ``future``.
  272. ##
  273. ## Unlike ``read``, this function will not raise an exception if the
  274. ## Future has not been finished.
  275. result = Future[T](future).value
  276. proc finished*(future: FutureBase | FutureVar): bool =
  277. ## Determines whether ``future`` has completed.
  278. ##
  279. ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
  280. when future is FutureVar:
  281. result = (FutureBase(future)).finished
  282. else:
  283. result = future.finished
  284. proc failed*(future: FutureBase): bool =
  285. ## Determines whether ``future`` completed with an error.
  286. return future.error != nil
  287. proc asyncCheck*[T](future: Future[T]) =
  288. ## Sets a callback on ``future`` which raises an exception if the future
  289. ## finished with an error.
  290. ##
  291. ## This should be used instead of ``discard`` to discard void futures,
  292. ## or use ``waitFor`` if you need to wait for the future's completion.
  293. assert(not future.isNil, "Future is nil")
  294. future.callback =
  295. proc () =
  296. if future.failed:
  297. injectStacktrace(future)
  298. raise future.error
  299. proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
  300. ## Returns a future which will complete once both ``fut1`` and ``fut2``
  301. ## complete.
  302. var retFuture = newFuture[void]("asyncdispatch.`and`")
  303. fut1.callback =
  304. proc () =
  305. if not retFuture.finished:
  306. if fut1.failed: retFuture.fail(fut1.error)
  307. elif fut2.finished: retFuture.complete()
  308. fut2.callback =
  309. proc () =
  310. if not retFuture.finished:
  311. if fut2.failed: retFuture.fail(fut2.error)
  312. elif fut1.finished: retFuture.complete()
  313. return retFuture
  314. proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
  315. ## Returns a future which will complete once either ``fut1`` or ``fut2``
  316. ## complete.
  317. var retFuture = newFuture[void]("asyncdispatch.`or`")
  318. proc cb[X](fut: Future[X]) =
  319. if fut.failed: retFuture.fail(fut.error)
  320. if not retFuture.finished: retFuture.complete()
  321. fut1.callback = cb[T]
  322. fut2.callback = cb[Y]
  323. return retFuture
  324. proc all*[T](futs: varargs[Future[T]]): auto =
  325. ## Returns a future which will complete once
  326. ## all futures in ``futs`` complete.
  327. ## If the argument is empty, the returned future completes immediately.
  328. ##
  329. ## If the awaited futures are not ``Future[void]``, the returned future
  330. ## will hold the values of all awaited futures in a sequence.
  331. ##
  332. ## If the awaited futures *are* ``Future[void]``,
  333. ## this proc returns ``Future[void]``.
  334. when T is void:
  335. var
  336. retFuture = newFuture[void]("asyncdispatch.all")
  337. completedFutures = 0
  338. let totalFutures = len(futs)
  339. for fut in futs:
  340. fut.addCallback proc (f: Future[T]) =
  341. inc(completedFutures)
  342. if not retFuture.finished:
  343. if f.failed:
  344. retFuture.fail(f.error)
  345. else:
  346. if completedFutures == totalFutures:
  347. retFuture.complete()
  348. if totalFutures == 0:
  349. retFuture.complete()
  350. return retFuture
  351. else:
  352. var
  353. retFuture = newFuture[seq[T]]("asyncdispatch.all")
  354. retValues = newSeq[T](len(futs))
  355. completedFutures = 0
  356. for i, fut in futs:
  357. proc setCallback(i: int) =
  358. fut.addCallback proc (f: Future[T]) =
  359. inc(completedFutures)
  360. if not retFuture.finished:
  361. if f.failed:
  362. retFuture.fail(f.error)
  363. else:
  364. retValues[i] = f.read()
  365. if completedFutures == len(retValues):
  366. retFuture.complete(retValues)
  367. setCallback(i)
  368. if retValues.len == 0:
  369. retFuture.complete(retValues)
  370. return retFuture