asyncfutures.nim 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. import os, tables, strutils, times, heapqueue, options, deques, cstrutils
  10. import system/stacktraces
  11. # TODO: This shouldn't need to be included, but should ideally be exported.
  12. type
  13. CallbackFunc = proc () {.closure, gcsafe.}
  14. CallbackList = object
  15. function: CallbackFunc
  16. next: owned(ref CallbackList)
  17. FutureBase* = ref object of RootObj ## Untyped future.
  18. callbacks: CallbackList
  19. finished: bool
  20. error*: ref Exception ## Stored exception
  21. errorStackTrace*: string
  22. when not defined(release):
  23. stackTrace: seq[StackTraceEntry] ## For debugging purposes only.
  24. id: int
  25. fromProc: string
  26. Future*[T] = ref object of FutureBase ## Typed future.
  27. value: T ## Stored value
  28. FutureVar*[T] = distinct Future[T]
  29. FutureError* = object of Defect
  30. cause*: FutureBase
  31. when not defined(release):
  32. var currentID = 0
  33. const isFutureLoggingEnabled* = defined(futureLogging)
  34. const
  35. NimAsyncContinueSuffix* = "NimAsyncContinue" ## For internal usage. Do not use.
  36. when isFutureLoggingEnabled:
  37. import hashes
  38. type
  39. FutureInfo* = object
  40. stackTrace*: seq[StackTraceEntry]
  41. fromProc*: string
  42. var futuresInProgress {.threadvar.}: Table[FutureInfo, int]
  43. proc getFuturesInProgress*(): var Table[FutureInfo, int] =
  44. return futuresInProgress
  45. proc hash(s: StackTraceEntry): Hash =
  46. result = hash(s.procname) !& hash(s.line) !&
  47. hash(s.filename)
  48. result = !$result
  49. proc hash(fi: FutureInfo): Hash =
  50. result = hash(fi.stackTrace) !& hash(fi.fromProc)
  51. result = !$result
  52. proc getFutureInfo(fut: FutureBase): FutureInfo =
  53. let info = FutureInfo(
  54. stackTrace: fut.stackTrace,
  55. fromProc: fut.fromProc
  56. )
  57. return info
  58. proc logFutureStart(fut: FutureBase) =
  59. let info = getFutureInfo(fut)
  60. if info notin getFuturesInProgress():
  61. getFuturesInProgress()[info] = 0
  62. getFuturesInProgress()[info].inc()
  63. proc logFutureFinish(fut: FutureBase) =
  64. getFuturesInProgress()[getFutureInfo(fut)].dec()
  65. var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}
  66. proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
  67. ## Get current implementation of `callSoon`.
  68. return callSoonProc
  69. proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
  70. ## Change current implementation of `callSoon`. This is normally called when dispatcher from `asyncdispatcher` is initialized.
  71. callSoonProc = p
  72. proc callSoon*(cbproc: proc () {.gcsafe.}) =
  73. ## Call `cbproc` "soon".
  74. ##
  75. ## If async dispatcher is running, `cbproc` will be executed during next dispatcher tick.
  76. ##
  77. ## If async dispatcher is not running, `cbproc` will be executed immediately.
  78. if callSoonProc.isNil:
  79. # Loop not initialized yet. Call the function directly to allow setup code to use futures.
  80. cbproc()
  81. else:
  82. callSoonProc(cbproc)
  83. template setupFutureBase(fromProc: string) =
  84. new(result)
  85. result.finished = false
  86. when not defined(release):
  87. result.stackTrace = getStackTraceEntries()
  88. result.id = currentID
  89. result.fromProc = fromProc
  90. currentID.inc()
  91. proc newFuture*[T](fromProc: string = "unspecified"): owned(Future[T]) =
  92. ## Creates a new future.
  93. ##
  94. ## Specifying `fromProc`, which is a string specifying the name of the proc
  95. ## that this future belongs to, is a good habit as it helps with debugging.
  96. setupFutureBase(fromProc)
  97. when isFutureLoggingEnabled: logFutureStart(result)
  98. proc newFutureVar*[T](fromProc = "unspecified"): owned(FutureVar[T]) =
  99. ## Create a new `FutureVar`. This Future type is ideally suited for
  100. ## situations where you want to avoid unnecessary allocations of Futures.
  101. ##
  102. ## Specifying `fromProc`, which is a string specifying the name of the proc
  103. ## that this future belongs to, is a good habit as it helps with debugging.
  104. let fo = newFuture[T](fromProc)
  105. result = typeof(result)(fo)
  106. when isFutureLoggingEnabled: logFutureStart(Future[T](result))
  107. proc clean*[T](future: FutureVar[T]) =
  108. ## Resets the `finished` status of `future`.
  109. Future[T](future).finished = false
  110. Future[T](future).error = nil
  111. proc checkFinished[T](future: Future[T]) =
  112. ## Checks whether `future` is finished. If it is then raises a
  113. ## `FutureError`.
  114. when not defined(release):
  115. if future.finished:
  116. var msg = ""
  117. msg.add("An attempt was made to complete a Future more than once. ")
  118. msg.add("Details:")
  119. msg.add("\n Future ID: " & $future.id)
  120. msg.add("\n Created in proc: " & future.fromProc)
  121. msg.add("\n Stack trace to moment of creation:")
  122. msg.add("\n" & indent(($future.stackTrace).strip(), 4))
  123. when T is string:
  124. msg.add("\n Contents (string): ")
  125. msg.add("\n" & indent($future.value, 4))
  126. msg.add("\n Stack trace to moment of secondary completion:")
  127. msg.add("\n" & indent(getStackTrace().strip(), 4))
  128. var err = newException(FutureError, msg)
  129. err.cause = future
  130. raise err
  131. proc call(callbacks: var CallbackList) =
  132. var current = callbacks
  133. while true:
  134. if not current.function.isNil:
  135. callSoon(current.function)
  136. if current.next.isNil:
  137. break
  138. else:
  139. current = current.next[]
  140. # callback will be called only once, let GC collect them now
  141. callbacks.next = nil
  142. callbacks.function = nil
  143. proc add(callbacks: var CallbackList, function: CallbackFunc) =
  144. if callbacks.function.isNil:
  145. callbacks.function = function
  146. assert callbacks.next == nil
  147. else:
  148. let newCallback = new(ref CallbackList)
  149. newCallback.function = function
  150. newCallback.next = nil
  151. if callbacks.next == nil:
  152. callbacks.next = newCallback
  153. else:
  154. var last = callbacks.next
  155. while last.next != nil:
  156. last = last.next
  157. last.next = newCallback
  158. proc completeImpl[T, U](future: Future[T], val: U, isVoid: static bool) =
  159. #assert(not future.finished, "Future already finished, cannot finish twice.")
  160. checkFinished(future)
  161. assert(future.error == nil)
  162. when not isVoid:
  163. future.value = val
  164. future.finished = true
  165. future.callbacks.call()
  166. when isFutureLoggingEnabled: logFutureFinish(future)
  167. proc complete*[T](future: Future[T], val: T) =
  168. ## Completes `future` with value `val`.
  169. completeImpl(future, val, false)
  170. proc complete*(future: Future[void], val = Future[void].default) =
  171. completeImpl(future, (), true)
  172. proc complete*[T](future: FutureVar[T]) =
  173. ## Completes a `FutureVar`.
  174. template fut: untyped = Future[T](future)
  175. checkFinished(fut)
  176. assert(fut.error == nil)
  177. fut.finished = true
  178. fut.callbacks.call()
  179. when isFutureLoggingEnabled: logFutureFinish(Future[T](future))
  180. proc complete*[T](future: FutureVar[T], val: T) =
  181. ## Completes a `FutureVar` with value `val`.
  182. ##
  183. ## Any previously stored value will be overwritten.
  184. template fut: untyped = Future[T](future)
  185. checkFinished(fut)
  186. assert(fut.error.isNil())
  187. fut.finished = true
  188. fut.value = val
  189. fut.callbacks.call()
  190. when isFutureLoggingEnabled: logFutureFinish(future)
  191. proc fail*[T](future: Future[T], error: ref Exception) =
  192. ## Completes `future` with `error`.
  193. #assert(not future.finished, "Future already finished, cannot finish twice.")
  194. checkFinished(future)
  195. future.finished = true
  196. future.error = error
  197. future.errorStackTrace =
  198. if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
  199. future.callbacks.call()
  200. when isFutureLoggingEnabled: logFutureFinish(future)
  201. proc clearCallbacks*(future: FutureBase) =
  202. future.callbacks.function = nil
  203. future.callbacks.next = nil
  204. proc addCallback*(future: FutureBase, cb: proc() {.closure, gcsafe.}) =
  205. ## Adds the callbacks proc to be called when the future completes.
  206. ##
  207. ## If future has already completed then `cb` will be called immediately.
  208. assert cb != nil
  209. if future.finished:
  210. callSoon(cb)
  211. else:
  212. future.callbacks.add cb
  213. proc addCallback*[T](future: Future[T],
  214. cb: proc (future: Future[T]) {.closure, gcsafe.}) =
  215. ## Adds the callbacks proc to be called when the future completes.
  216. ##
  217. ## If future has already completed then `cb` will be called immediately.
  218. future.addCallback(
  219. proc() =
  220. cb(future)
  221. )
  222. proc `callback=`*(future: FutureBase, cb: proc () {.closure, gcsafe.}) =
  223. ## Clears the list of callbacks and sets the callback proc to be called when the future completes.
  224. ##
  225. ## If future has already completed then `cb` will be called immediately.
  226. ##
  227. ## It's recommended to use `addCallback` or `then` instead.
  228. future.clearCallbacks
  229. future.addCallback cb
  230. proc `callback=`*[T](future: Future[T],
  231. cb: proc (future: Future[T]) {.closure, gcsafe.}) =
  232. ## Sets the callback proc to be called when the future completes.
  233. ##
  234. ## If future has already completed then `cb` will be called immediately.
  235. future.callback = proc () = cb(future)
  236. template getFilenameProcname(entry: StackTraceEntry): (string, string) =
  237. when compiles(entry.filenameStr) and compiles(entry.procnameStr):
  238. # We can't rely on "entry.filename" and "entry.procname" still being valid
  239. # cstring pointers, because the "string.data" buffers they pointed to might
  240. # be already garbage collected (this entry being a non-shallow copy,
  241. # "entry.filename" no longer points to "entry.filenameStr.data", but to the
  242. # buffer of the original object).
  243. (entry.filenameStr, entry.procnameStr)
  244. else:
  245. ($entry.filename, $entry.procname)
  246. proc getHint(entry: StackTraceEntry): string =
  247. ## We try to provide some hints about stack trace entries that the user
  248. ## may not be familiar with, in particular calls inside the stdlib.
  249. let (filename, procname) = getFilenameProcname(entry)
  250. result = ""
  251. if procname == "processPendingCallbacks":
  252. if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0:
  253. return "Executes pending callbacks"
  254. elif procname == "poll":
  255. if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0:
  256. return "Processes asynchronous completion events"
  257. if procname.endsWith(NimAsyncContinueSuffix):
  258. if cmpIgnoreStyle(filename, "asyncmacro.nim") == 0:
  259. return "Resumes an async procedure"
  260. proc `$`*(stackTraceEntries: seq[StackTraceEntry]): string =
  261. when defined(nimStackTraceOverride):
  262. let entries = addDebuggingInfo(stackTraceEntries)
  263. else:
  264. let entries = stackTraceEntries
  265. result = ""
  266. # Find longest filename & line number combo for alignment purposes.
  267. var longestLeft = 0
  268. for entry in entries:
  269. let (filename, procname) = getFilenameProcname(entry)
  270. if procname == "": continue
  271. let leftLen = filename.len + len($entry.line)
  272. if leftLen > longestLeft:
  273. longestLeft = leftLen
  274. var indent = 2
  275. # Format the entries.
  276. for entry in entries:
  277. let (filename, procname) = getFilenameProcname(entry)
  278. if procname == "":
  279. if entry.line == reraisedFromBegin:
  280. result.add(spaces(indent) & "#[\n")
  281. indent.inc(2)
  282. elif entry.line == reraisedFromEnd:
  283. indent.dec(2)
  284. result.add(spaces(indent) & "]#\n")
  285. continue
  286. let left = "$#($#)" % [filename, $entry.line]
  287. result.add((spaces(indent) & "$#$# $#\n") % [
  288. left,
  289. spaces(longestLeft - left.len + 2),
  290. procname
  291. ])
  292. let hint = getHint(entry)
  293. if hint.len > 0:
  294. result.add(spaces(indent+2) & "## " & hint & "\n")
  295. proc injectStacktrace[T](future: Future[T]) =
  296. when not defined(release):
  297. const header = "\nAsync traceback:\n"
  298. var exceptionMsg = future.error.msg
  299. if header in exceptionMsg:
  300. # This is messy: extract the original exception message from the msg
  301. # containing the async traceback.
  302. let start = exceptionMsg.find(header)
  303. exceptionMsg = exceptionMsg[0..<start]
  304. var newMsg = exceptionMsg & header
  305. let entries = getStackTraceEntries(future.error)
  306. newMsg.add($entries)
  307. newMsg.add("Exception message: " & exceptionMsg & "\n")
  308. # # For debugging purposes
  309. # newMsg.add("Exception type:")
  310. # for entry in getStackTraceEntries(future.error):
  311. # newMsg.add "\n" & $entry
  312. future.error.msg = newMsg
  313. proc read*[T](future: Future[T] | FutureVar[T]): T =
  314. ## Retrieves the value of `future`. Future must be finished otherwise
  315. ## this function will fail with a `ValueError` exception.
  316. ##
  317. ## If the result of the future is an error then that error will be raised.
  318. when future is Future[T]:
  319. let fut {.cursor.} = future
  320. else:
  321. let fut {.cursor.} = Future[T](future)
  322. if fut.finished:
  323. if fut.error != nil:
  324. injectStacktrace(fut)
  325. raise fut.error
  326. when T isnot void:
  327. result = fut.value
  328. else:
  329. # TODO: Make a custom exception type for this?
  330. raise newException(ValueError, "Future still in progress.")
  331. proc readError*[T](future: Future[T]): ref Exception =
  332. ## Retrieves the exception stored in `future`.
  333. ##
  334. ## An `ValueError` exception will be thrown if no exception exists
  335. ## in the specified Future.
  336. if future.error != nil: return future.error
  337. else:
  338. raise newException(ValueError, "No error in future.")
  339. proc mget*[T](future: FutureVar[T]): var T =
  340. ## Returns a mutable value stored in `future`.
  341. ##
  342. ## Unlike `read`, this function will not raise an exception if the
  343. ## Future has not been finished.
  344. result = Future[T](future).value
  345. proc finished*(future: FutureBase | FutureVar): bool =
  346. ## Determines whether `future` has completed.
  347. ##
  348. ## `True` may indicate an error or a value. Use `failed` to distinguish.
  349. when future is FutureVar:
  350. result = (FutureBase(future)).finished
  351. else:
  352. result = future.finished
  353. proc failed*(future: FutureBase): bool =
  354. ## Determines whether `future` completed with an error.
  355. return future.error != nil
  356. proc asyncCheck*[T](future: Future[T]) =
  357. ## Sets a callback on `future` which raises an exception if the future
  358. ## finished with an error.
  359. ##
  360. ## This should be used instead of `discard` to discard void futures,
  361. ## or use `waitFor` if you need to wait for the future's completion.
  362. assert(not future.isNil, "Future is nil")
  363. # TODO: We can likely look at the stack trace here and inject the location
  364. # where the `asyncCheck` was called to give a better error stack message.
  365. proc asyncCheckCallback() =
  366. if future.failed:
  367. injectStacktrace(future)
  368. raise future.error
  369. future.callback = asyncCheckCallback
  370. proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
  371. ## Returns a future which will complete once both `fut1` and `fut2`
  372. ## complete.
  373. var retFuture = newFuture[void]("asyncdispatch.`and`")
  374. fut1.callback =
  375. proc () =
  376. if not retFuture.finished:
  377. if fut1.failed: retFuture.fail(fut1.error)
  378. elif fut2.finished: retFuture.complete()
  379. fut2.callback =
  380. proc () =
  381. if not retFuture.finished:
  382. if fut2.failed: retFuture.fail(fut2.error)
  383. elif fut1.finished: retFuture.complete()
  384. return retFuture
  385. proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
  386. ## Returns a future which will complete once either `fut1` or `fut2`
  387. ## complete.
  388. var retFuture = newFuture[void]("asyncdispatch.`or`")
  389. proc cb[X](fut: Future[X]) =
  390. if not retFuture.finished:
  391. if fut.failed: retFuture.fail(fut.error)
  392. else: retFuture.complete()
  393. fut1.callback = cb[T]
  394. fut2.callback = cb[Y]
  395. return retFuture
  396. proc all*[T](futs: varargs[Future[T]]): auto =
  397. ## Returns a future which will complete once
  398. ## all futures in `futs` complete.
  399. ## If the argument is empty, the returned future completes immediately.
  400. ##
  401. ## If the awaited futures are not `Future[void]`, the returned future
  402. ## will hold the values of all awaited futures in a sequence.
  403. ##
  404. ## If the awaited futures *are* `Future[void]`,
  405. ## this proc returns `Future[void]`.
  406. when T is void:
  407. var
  408. retFuture = newFuture[void]("asyncdispatch.all")
  409. completedFutures = 0
  410. let totalFutures = len(futs)
  411. for fut in futs:
  412. fut.addCallback proc (f: Future[T]) =
  413. inc(completedFutures)
  414. if not retFuture.finished:
  415. if f.failed:
  416. retFuture.fail(f.error)
  417. else:
  418. if completedFutures == totalFutures:
  419. retFuture.complete()
  420. if totalFutures == 0:
  421. retFuture.complete()
  422. return retFuture
  423. else:
  424. var
  425. retFuture = newFuture[seq[T]]("asyncdispatch.all")
  426. retValues = newSeq[T](len(futs))
  427. completedFutures = 0
  428. for i, fut in futs:
  429. proc setCallback(i: int) =
  430. fut.addCallback proc (f: Future[T]) =
  431. inc(completedFutures)
  432. if not retFuture.finished:
  433. if f.failed:
  434. retFuture.fail(f.error)
  435. else:
  436. retValues[i] = f.read()
  437. if completedFutures == len(retValues):
  438. retFuture.complete(retValues)
  439. setCallback(i)
  440. if retValues.len == 0:
  441. retFuture.complete(retValues)
  442. return retFuture