asyncfutures.nim 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. import os, tables, strutils, times, heapqueue, options, deques, cstrutils
  10. # TODO: This shouldn't need to be included, but should ideally be exported.
  11. type
  12. CallbackFunc = proc () {.closure, gcsafe.}
  13. CallbackList = object
  14. function: CallbackFunc
  15. next: owned(ref CallbackList)
  16. FutureBase* = ref object of RootObj ## Untyped future.
  17. callbacks: CallbackList
  18. finished: bool
  19. error*: ref Exception ## Stored exception
  20. errorStackTrace*: string
  21. when not defined(release):
  22. stackTrace: seq[StackTraceEntry] ## For debugging purposes only.
  23. id: int
  24. fromProc: string
  25. Future*[T] = ref object of FutureBase ## Typed future.
  26. value: T ## Stored value
  27. FutureVar*[T] = distinct Future[T]
  28. FutureError* = object of Exception
  29. cause*: FutureBase
  30. when not defined(release):
  31. var currentID = 0
  32. const isFutureLoggingEnabled* = defined(futureLogging)
  33. const
  34. NimAsyncContinueSuffix* = "NimAsyncContinue" ## For internal usage. Do not use.
  35. when isFutureLoggingEnabled:
  36. import hashes
  37. type
  38. FutureInfo* = object
  39. stackTrace*: seq[StackTraceEntry]
  40. fromProc*: string
  41. var futuresInProgress {.threadvar.}: Table[FutureInfo, int]
  42. proc getFuturesInProgress*(): var Table[FutureInfo, int] =
  43. return futuresInProgress
  44. proc hash(s: StackTraceEntry): Hash =
  45. result = hash(s.procname) !& hash(s.line) !&
  46. hash(s.filename)
  47. result = !$result
  48. proc hash(fi: FutureInfo): Hash =
  49. result = hash(fi.stackTrace) !& hash(fi.fromProc)
  50. result = !$result
  51. proc getFutureInfo(fut: FutureBase): FutureInfo =
  52. let info = FutureInfo(
  53. stackTrace: fut.stackTrace,
  54. fromProc: fut.fromProc
  55. )
  56. return info
  57. proc logFutureStart(fut: FutureBase) =
  58. let info = getFutureInfo(fut)
  59. if info notin getFuturesInProgress():
  60. getFuturesInProgress()[info] = 0
  61. getFuturesInProgress()[info].inc()
  62. proc logFutureFinish(fut: FutureBase) =
  63. getFuturesInProgress()[getFutureInfo(fut)].dec()
  64. var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}
  65. proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
  66. ## Get current implementation of ``callSoon``.
  67. return callSoonProc
  68. proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
  69. ## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized.
  70. callSoonProc = p
  71. proc callSoon*(cbproc: proc ()) =
  72. ## Call ``cbproc`` "soon".
  73. ##
  74. ## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick.
  75. ##
  76. ## If async dispatcher is not running, ``cbproc`` will be executed immediately.
  77. if callSoonProc.isNil:
  78. # Loop not initialized yet. Call the function directly to allow setup code to use futures.
  79. cbproc()
  80. else:
  81. callSoonProc(cbproc)
  82. template setupFutureBase(fromProc: string) =
  83. new(result)
  84. result.finished = false
  85. when not defined(release):
  86. result.stackTrace = getStackTraceEntries()
  87. result.id = currentID
  88. result.fromProc = fromProc
  89. currentID.inc()
  90. proc newFuture*[T](fromProc: string = "unspecified"): owned(Future[T]) =
  91. ## Creates a new future.
  92. ##
  93. ## Specifying ``fromProc``, which is a string specifying the name of the proc
  94. ## that this future belongs to, is a good habit as it helps with debugging.
  95. setupFutureBase(fromProc)
  96. when isFutureLoggingEnabled: logFutureStart(result)
  97. proc newFutureVar*[T](fromProc = "unspecified"): owned(FutureVar[T]) =
  98. ## Create a new ``FutureVar``. This Future type is ideally suited for
  99. ## situations where you want to avoid unnecessary allocations of Futures.
  100. ##
  101. ## Specifying ``fromProc``, which is a string specifying the name of the proc
  102. ## that this future belongs to, is a good habit as it helps with debugging.
  103. let fo = newFuture[T](fromProc)
  104. result = typeof(result)(fo)
  105. when isFutureLoggingEnabled: logFutureStart(Future[T](result))
  106. proc clean*[T](future: FutureVar[T]) =
  107. ## Resets the ``finished`` status of ``future``.
  108. Future[T](future).finished = false
  109. Future[T](future).error = nil
  110. proc checkFinished[T](future: Future[T]) =
  111. ## Checks whether `future` is finished. If it is then raises a
  112. ## ``FutureError``.
  113. when not defined(release):
  114. if future.finished:
  115. var msg = ""
  116. msg.add("An attempt was made to complete a Future more than once. ")
  117. msg.add("Details:")
  118. msg.add("\n Future ID: " & $future.id)
  119. msg.add("\n Created in proc: " & future.fromProc)
  120. msg.add("\n Stack trace to moment of creation:")
  121. msg.add("\n" & indent(($future.stackTrace).strip(), 4))
  122. when T is string:
  123. msg.add("\n Contents (string): ")
  124. msg.add("\n" & indent(future.value.repr, 4))
  125. msg.add("\n Stack trace to moment of secondary completion:")
  126. msg.add("\n" & indent(getStackTrace().strip(), 4))
  127. var err = newException(FutureError, msg)
  128. err.cause = future
  129. raise err
  130. proc call(callbacks: var CallbackList) =
  131. when not defined(nimV2):
  132. # strictly speaking a little code duplication here, but we strive
  133. # to minimize regressions and I'm not sure I got the 'nimV2' logic
  134. # right:
  135. var current = callbacks
  136. while true:
  137. if not current.function.isNil:
  138. callSoon(current.function)
  139. if current.next.isNil:
  140. break
  141. else:
  142. current = current.next[]
  143. else:
  144. var currentFunc = unown callbacks.function
  145. var currentNext = unown callbacks.next
  146. while true:
  147. if not currentFunc.isNil:
  148. callSoon(currentFunc)
  149. if currentNext.isNil:
  150. break
  151. else:
  152. currentFunc = currentNext.function
  153. currentNext = unown currentNext.next
  154. # callback will be called only once, let GC collect them now
  155. callbacks.next = nil
  156. callbacks.function = nil
  157. proc add(callbacks: var CallbackList, function: CallbackFunc) =
  158. if callbacks.function.isNil:
  159. callbacks.function = function
  160. assert callbacks.next == nil
  161. else:
  162. let newCallback = new(ref CallbackList)
  163. newCallback.function = function
  164. newCallback.next = nil
  165. if callbacks.next == nil:
  166. callbacks.next = newCallback
  167. else:
  168. var last = callbacks.next
  169. while last.next != nil:
  170. last = last.next
  171. last.next = newCallback
  172. proc complete*[T](future: Future[T], val: T) =
  173. ## Completes ``future`` with value ``val``.
  174. #assert(not future.finished, "Future already finished, cannot finish twice.")
  175. checkFinished(future)
  176. assert(future.error == nil)
  177. future.value = val
  178. future.finished = true
  179. future.callbacks.call()
  180. when isFutureLoggingEnabled: logFutureFinish(future)
  181. proc complete*(future: Future[void]) =
  182. ## Completes a void ``future``.
  183. #assert(not future.finished, "Future already finished, cannot finish twice.")
  184. checkFinished(future)
  185. assert(future.error == nil)
  186. future.finished = true
  187. future.callbacks.call()
  188. when isFutureLoggingEnabled: logFutureFinish(future)
  189. proc complete*[T](future: FutureVar[T]) =
  190. ## Completes a ``FutureVar``.
  191. template fut: untyped = Future[T](future)
  192. checkFinished(fut)
  193. assert(fut.error == nil)
  194. fut.finished = true
  195. fut.callbacks.call()
  196. when isFutureLoggingEnabled: logFutureFinish(Future[T](future))
  197. proc complete*[T](future: FutureVar[T], val: T) =
  198. ## Completes a ``FutureVar`` with value ``val``.
  199. ##
  200. ## Any previously stored value will be overwritten.
  201. template fut: untyped = Future[T](future)
  202. checkFinished(fut)
  203. assert(fut.error.isNil())
  204. fut.finished = true
  205. fut.value = val
  206. fut.callbacks.call()
  207. when isFutureLoggingEnabled: logFutureFinish(future)
  208. proc fail*[T](future: Future[T], error: ref Exception) =
  209. ## Completes ``future`` with ``error``.
  210. #assert(not future.finished, "Future already finished, cannot finish twice.")
  211. checkFinished(future)
  212. future.finished = true
  213. future.error = error
  214. future.errorStackTrace =
  215. if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
  216. future.callbacks.call()
  217. when isFutureLoggingEnabled: logFutureFinish(future)
  218. proc clearCallbacks*(future: FutureBase) =
  219. future.callbacks.function = nil
  220. future.callbacks.next = nil
  221. proc addCallback*(future: FutureBase, cb: proc() {.closure, gcsafe.}) =
  222. ## Adds the callbacks proc to be called when the future completes.
  223. ##
  224. ## If future has already completed then ``cb`` will be called immediately.
  225. assert cb != nil
  226. if future.finished:
  227. callSoon(cb)
  228. else:
  229. future.callbacks.add cb
  230. proc addCallback*[T](future: Future[T],
  231. cb: proc (future: Future[T]) {.closure, gcsafe.}) =
  232. ## Adds the callbacks proc to be called when the future completes.
  233. ##
  234. ## If future has already completed then ``cb`` will be called immediately.
  235. future.addCallback(
  236. proc() =
  237. cb(future)
  238. )
  239. proc `callback=`*(future: FutureBase, cb: proc () {.closure, gcsafe.}) =
  240. ## Clears the list of callbacks and sets the callback proc to be called when the future completes.
  241. ##
  242. ## If future has already completed then ``cb`` will be called immediately.
  243. ##
  244. ## It's recommended to use ``addCallback`` or ``then`` instead.
  245. future.clearCallbacks
  246. future.addCallback cb
  247. proc `callback=`*[T](future: Future[T],
  248. cb: proc (future: Future[T]) {.closure, gcsafe.}) =
  249. ## Sets the callback proc to be called when the future completes.
  250. ##
  251. ## If future has already completed then ``cb`` will be called immediately.
  252. future.callback = proc () = cb(future)
  253. proc getHint(entry: StackTraceEntry): string =
  254. ## We try to provide some hints about stack trace entries that the user
  255. ## may not be familiar with, in particular calls inside the stdlib.
  256. result = ""
  257. if entry.procname == cstring"processPendingCallbacks":
  258. if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0:
  259. return "Executes pending callbacks"
  260. elif entry.procname == cstring"poll":
  261. if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0:
  262. return "Processes asynchronous completion events"
  263. if entry.procname.endsWith(NimAsyncContinueSuffix):
  264. if cmpIgnoreStyle(entry.filename, "asyncmacro.nim") == 0:
  265. return "Resumes an async procedure"
  266. proc `$`*(entries: seq[StackTraceEntry]): string =
  267. result = ""
  268. # Find longest filename & line number combo for alignment purposes.
  269. var longestLeft = 0
  270. for entry in entries:
  271. if entry.procname.isNil: continue
  272. let left = $entry.filename & $entry.line
  273. if left.len > longestLeft:
  274. longestLeft = left.len
  275. var indent = 2
  276. # Format the entries.
  277. for entry in entries:
  278. if entry.procname.isNil:
  279. if entry.line == -10:
  280. result.add(spaces(indent) & "#[\n")
  281. indent.inc(2)
  282. else:
  283. indent.dec(2)
  284. result.add(spaces(indent) & "]#\n")
  285. continue
  286. let left = "$#($#)" % [$entry.filename, $entry.line]
  287. result.add((spaces(indent) & "$#$# $#\n") % [
  288. left,
  289. spaces(longestLeft - left.len + 2),
  290. $entry.procname
  291. ])
  292. let hint = getHint(entry)
  293. if hint.len > 0:
  294. result.add(spaces(indent+2) & "## " & hint & "\n")
  295. proc injectStacktrace[T](future: Future[T]) =
  296. when not defined(release):
  297. const header = "\nAsync traceback:\n"
  298. var exceptionMsg = future.error.msg
  299. if header in exceptionMsg:
  300. # This is messy: extract the original exception message from the msg
  301. # containing the async traceback.
  302. let start = exceptionMsg.find(header)
  303. exceptionMsg = exceptionMsg[0..<start]
  304. var newMsg = exceptionMsg & header
  305. let entries = getStackTraceEntries(future.error)
  306. newMsg.add($entries)
  307. newMsg.add("Exception message: " & exceptionMsg & "\n")
  308. newMsg.add("Exception type:")
  309. # # For debugging purposes
  310. # for entry in getStackTraceEntries(future.error):
  311. # newMsg.add "\n" & $entry
  312. future.error.msg = newMsg
  313. proc read*[T](future: Future[T] | FutureVar[T]): T =
  314. ## Retrieves the value of ``future``. Future must be finished otherwise
  315. ## this function will fail with a ``ValueError`` exception.
  316. ##
  317. ## If the result of the future is an error then that error will be raised.
  318. {.push hint[ConvFromXtoItselfNotNeeded]: off.}
  319. let fut = Future[T](future)
  320. {.pop.}
  321. if fut.finished:
  322. if fut.error != nil:
  323. injectStacktrace(fut)
  324. raise fut.error
  325. when T isnot void:
  326. return fut.value
  327. else:
  328. # TODO: Make a custom exception type for this?
  329. raise newException(ValueError, "Future still in progress.")
  330. proc readError*[T](future: Future[T]): ref Exception =
  331. ## Retrieves the exception stored in ``future``.
  332. ##
  333. ## An ``ValueError`` exception will be thrown if no exception exists
  334. ## in the specified Future.
  335. if future.error != nil: return future.error
  336. else:
  337. raise newException(ValueError, "No error in future.")
  338. proc mget*[T](future: FutureVar[T]): var T =
  339. ## Returns a mutable value stored in ``future``.
  340. ##
  341. ## Unlike ``read``, this function will not raise an exception if the
  342. ## Future has not been finished.
  343. result = Future[T](future).value
  344. proc finished*(future: FutureBase | FutureVar): bool =
  345. ## Determines whether ``future`` has completed.
  346. ##
  347. ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
  348. when future is FutureVar:
  349. result = (FutureBase(future)).finished
  350. else:
  351. result = future.finished
  352. proc failed*(future: FutureBase): bool =
  353. ## Determines whether ``future`` completed with an error.
  354. return future.error != nil
  355. proc asyncCheck*[T](future: Future[T]) =
  356. ## Sets a callback on ``future`` which raises an exception if the future
  357. ## finished with an error.
  358. ##
  359. ## This should be used instead of ``discard`` to discard void futures,
  360. ## or use ``waitFor`` if you need to wait for the future's completion.
  361. assert(not future.isNil, "Future is nil")
  362. # TODO: We can likely look at the stack trace here and inject the location
  363. # where the `asyncCheck` was called to give a better error stack message.
  364. proc asyncCheckCallback() =
  365. if future.failed:
  366. injectStacktrace(future)
  367. raise future.error
  368. future.callback = asyncCheckCallback
  369. proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
  370. ## Returns a future which will complete once both ``fut1`` and ``fut2``
  371. ## complete.
  372. var retFuture = newFuture[void]("asyncdispatch.`and`")
  373. fut1.callback =
  374. proc () =
  375. if not retFuture.finished:
  376. if fut1.failed: retFuture.fail(fut1.error)
  377. elif fut2.finished: retFuture.complete()
  378. fut2.callback =
  379. proc () =
  380. if not retFuture.finished:
  381. if fut2.failed: retFuture.fail(fut2.error)
  382. elif fut1.finished: retFuture.complete()
  383. return retFuture
  384. proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
  385. ## Returns a future which will complete once either ``fut1`` or ``fut2``
  386. ## complete.
  387. var retFuture = newFuture[void]("asyncdispatch.`or`")
  388. proc cb[X](fut: Future[X]) =
  389. if not retFuture.finished:
  390. if fut.failed: retFuture.fail(fut.error)
  391. else: retFuture.complete()
  392. fut1.callback = cb[T]
  393. fut2.callback = cb[Y]
  394. return retFuture
  395. proc all*[T](futs: varargs[Future[T]]): auto =
  396. ## Returns a future which will complete once
  397. ## all futures in ``futs`` complete.
  398. ## If the argument is empty, the returned future completes immediately.
  399. ##
  400. ## If the awaited futures are not ``Future[void]``, the returned future
  401. ## will hold the values of all awaited futures in a sequence.
  402. ##
  403. ## If the awaited futures *are* ``Future[void]``,
  404. ## this proc returns ``Future[void]``.
  405. when T is void:
  406. var
  407. retFuture = newFuture[void]("asyncdispatch.all")
  408. completedFutures = 0
  409. let totalFutures = len(futs)
  410. for fut in futs:
  411. fut.addCallback proc (f: Future[T]) =
  412. inc(completedFutures)
  413. if not retFuture.finished:
  414. if f.failed:
  415. retFuture.fail(f.error)
  416. else:
  417. if completedFutures == totalFutures:
  418. retFuture.complete()
  419. if totalFutures == 0:
  420. retFuture.complete()
  421. return retFuture
  422. else:
  423. var
  424. retFuture = newFuture[seq[T]]("asyncdispatch.all")
  425. retValues = newSeq[T](len(futs))
  426. completedFutures = 0
  427. for i, fut in futs:
  428. proc setCallback(i: int) =
  429. fut.addCallback proc (f: Future[T]) =
  430. inc(completedFutures)
  431. if not retFuture.finished:
  432. if f.failed:
  433. retFuture.fail(f.error)
  434. else:
  435. retValues[i] = f.read()
  436. if completedFutures == len(retValues):
  437. retFuture.complete(retValues)
  438. setCallback(i)
  439. if retValues.len == 0:
  440. retFuture.complete(retValues)
  441. return retFuture