asyncfutures.nim 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. import std/[os, sets, tables, strutils, times, heapqueue, options, deques, cstrutils, typetraits]
  10. import system/stacktraces
  11. when defined(nimPreviewSlimSystem):
  12. import std/objectdollar # for StackTraceEntry
  13. import std/assertions
  14. # TODO: This shouldn't need to be included, but should ideally be exported.
  15. type
  16. CallbackFunc = proc () {.closure, gcsafe.}
  17. CallbackList = object
  18. function: CallbackFunc
  19. next: owned(ref CallbackList)
  20. FutureBase* = ref object of RootObj ## Untyped future.
  21. callbacks: CallbackList
  22. finished: bool
  23. error*: ref Exception ## Stored exception
  24. errorStackTrace*: string
  25. when not defined(release) or defined(futureLogging):
  26. stackTrace: seq[StackTraceEntry] ## For debugging purposes only.
  27. id: int
  28. fromProc: string
  29. Future*[T] = ref object of FutureBase ## Typed future.
  30. value: T ## Stored value
  31. FutureVar*[T] = distinct Future[T]
  32. FutureError* = object of Defect
  33. cause*: FutureBase
  34. when not defined(release):
  35. var currentID = 0
  36. const isFutureLoggingEnabled* = defined(futureLogging)
  37. const
  38. NimAsyncContinueSuffix* = "NimAsyncContinue" ## For internal usage. Do not use.
  39. when isFutureLoggingEnabled:
  40. import std/hashes
  41. type
  42. FutureInfo* = object
  43. stackTrace*: seq[StackTraceEntry]
  44. fromProc*: string
  45. var futuresInProgress {.threadvar.}: Table[FutureInfo, int]
  46. proc getFuturesInProgress*(): var Table[FutureInfo, int] =
  47. return futuresInProgress
  48. proc hash(s: StackTraceEntry): Hash =
  49. result = hash(s.procname) !& hash(s.line) !&
  50. hash(s.filename)
  51. result = !$result
  52. proc hash(fi: FutureInfo): Hash =
  53. result = hash(fi.stackTrace) !& hash(fi.fromProc)
  54. result = !$result
  55. proc getFutureInfo(fut: FutureBase): FutureInfo =
  56. let info = FutureInfo(
  57. stackTrace: fut.stackTrace,
  58. fromProc: fut.fromProc
  59. )
  60. return info
  61. proc logFutureStart(fut: FutureBase) =
  62. let info = getFutureInfo(fut)
  63. if info notin getFuturesInProgress():
  64. getFuturesInProgress()[info] = 0
  65. getFuturesInProgress()[info].inc()
  66. proc logFutureFinish(fut: FutureBase) =
  67. getFuturesInProgress()[getFutureInfo(fut)].dec()
  68. var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}
  69. proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
  70. ## Get current implementation of `callSoon`.
  71. return callSoonProc
  72. proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
  73. ## Change current implementation of `callSoon`. This is normally called when dispatcher from `asyncdispatcher` is initialized.
  74. callSoonProc = p
  75. proc callSoon*(cbproc: proc () {.gcsafe.}) =
  76. ## Call `cbproc` "soon".
  77. ##
  78. ## If async dispatcher is running, `cbproc` will be executed during next dispatcher tick.
  79. ##
  80. ## If async dispatcher is not running, `cbproc` will be executed immediately.
  81. if callSoonProc.isNil:
  82. # Loop not initialized yet. Call the function directly to allow setup code to use futures.
  83. cbproc()
  84. else:
  85. callSoonProc(cbproc)
  86. template setupFutureBase(fromProc: string) =
  87. new(result)
  88. result.finished = false
  89. when not defined(release):
  90. result.stackTrace = getStackTraceEntries()
  91. result.id = currentID
  92. result.fromProc = fromProc
  93. currentID.inc()
  94. proc newFuture*[T](fromProc: string = "unspecified"): owned(Future[T]) =
  95. ## Creates a new future.
  96. ##
  97. ## Specifying `fromProc`, which is a string specifying the name of the proc
  98. ## that this future belongs to, is a good habit as it helps with debugging.
  99. setupFutureBase(fromProc)
  100. when isFutureLoggingEnabled: logFutureStart(result)
  101. proc newFutureVar*[T](fromProc = "unspecified"): owned(FutureVar[T]) =
  102. ## Create a new `FutureVar`. This Future type is ideally suited for
  103. ## situations where you want to avoid unnecessary allocations of Futures.
  104. ##
  105. ## Specifying `fromProc`, which is a string specifying the name of the proc
  106. ## that this future belongs to, is a good habit as it helps with debugging.
  107. let fo = newFuture[T](fromProc)
  108. result = typeof(result)(fo)
  109. when isFutureLoggingEnabled: logFutureStart(Future[T](result))
  110. proc clean*[T](future: FutureVar[T]) =
  111. ## Resets the `finished` status of `future`.
  112. Future[T](future).finished = false
  113. Future[T](future).error = nil
  114. proc checkFinished[T](future: Future[T]) =
  115. ## Checks whether `future` is finished. If it is then raises a
  116. ## `FutureError`.
  117. when not defined(release):
  118. if future.finished:
  119. var msg = ""
  120. msg.add("An attempt was made to complete a Future more than once. ")
  121. msg.add("Details:")
  122. msg.add("\n Future ID: " & $future.id)
  123. msg.add("\n Created in proc: " & future.fromProc)
  124. msg.add("\n Stack trace to moment of creation:")
  125. msg.add("\n" & indent(($future.stackTrace).strip(), 4))
  126. when T is string:
  127. msg.add("\n Contents (string): ")
  128. msg.add("\n" & indent($future.value, 4))
  129. msg.add("\n Stack trace to moment of secondary completion:")
  130. msg.add("\n" & indent(getStackTrace().strip(), 4))
  131. var err = newException(FutureError, msg)
  132. err.cause = future
  133. raise err
  134. proc call(callbacks: var CallbackList) =
  135. var current = callbacks
  136. while true:
  137. if not current.function.isNil:
  138. callSoon(current.function)
  139. if current.next.isNil:
  140. break
  141. else:
  142. current = current.next[]
  143. # callback will be called only once, let GC collect them now
  144. callbacks.next = nil
  145. callbacks.function = nil
  146. proc add(callbacks: var CallbackList, function: CallbackFunc) =
  147. if callbacks.function.isNil:
  148. callbacks.function = function
  149. assert callbacks.next == nil
  150. else:
  151. let newCallback = new(ref CallbackList)
  152. newCallback.function = function
  153. newCallback.next = nil
  154. if callbacks.next == nil:
  155. callbacks.next = newCallback
  156. else:
  157. var last = callbacks.next
  158. while last.next != nil:
  159. last = last.next
  160. last.next = newCallback
  161. proc completeImpl[T, U](future: Future[T], val: sink U, isVoid: static bool) =
  162. #assert(not future.finished, "Future already finished, cannot finish twice.")
  163. checkFinished(future)
  164. assert(future.error == nil)
  165. when not isVoid:
  166. future.value = val
  167. future.finished = true
  168. future.callbacks.call()
  169. when isFutureLoggingEnabled: logFutureFinish(future)
  170. proc complete*[T](future: Future[T], val: sink T) =
  171. ## Completes `future` with value `val`.
  172. completeImpl(future, val, false)
  173. proc complete*(future: Future[void], val = Future[void].default) =
  174. completeImpl(future, (), true)
  175. proc complete*[T](future: FutureVar[T]) =
  176. ## Completes a `FutureVar`.
  177. template fut: untyped = Future[T](future)
  178. checkFinished(fut)
  179. assert(fut.error == nil)
  180. fut.finished = true
  181. fut.callbacks.call()
  182. when isFutureLoggingEnabled: logFutureFinish(Future[T](future))
  183. proc complete*[T](future: FutureVar[T], val: sink T) =
  184. ## Completes a `FutureVar` with value `val`.
  185. ##
  186. ## Any previously stored value will be overwritten.
  187. template fut: untyped = Future[T](future)
  188. checkFinished(fut)
  189. assert(fut.error.isNil())
  190. fut.finished = true
  191. fut.value = val
  192. fut.callbacks.call()
  193. when isFutureLoggingEnabled: logFutureFinish(fut)
  194. proc fail*[T](future: Future[T], error: ref Exception) =
  195. ## Completes `future` with `error`.
  196. #assert(not future.finished, "Future already finished, cannot finish twice.")
  197. checkFinished(future)
  198. future.finished = true
  199. future.error = error
  200. future.errorStackTrace =
  201. if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
  202. future.callbacks.call()
  203. when isFutureLoggingEnabled: logFutureFinish(future)
  204. proc clearCallbacks*(future: FutureBase) =
  205. future.callbacks.function = nil
  206. future.callbacks.next = nil
  207. proc addCallback*(future: FutureBase, cb: proc() {.closure, gcsafe.}) =
  208. ## Adds the callbacks proc to be called when the future completes.
  209. ##
  210. ## If future has already completed then `cb` will be called immediately.
  211. assert cb != nil
  212. if future.finished:
  213. callSoon(cb)
  214. else:
  215. future.callbacks.add cb
  216. proc addCallback*[T](future: Future[T],
  217. cb: proc (future: Future[T]) {.closure, gcsafe.}) =
  218. ## Adds the callbacks proc to be called when the future completes.
  219. ##
  220. ## If future has already completed then `cb` will be called immediately.
  221. future.addCallback(
  222. proc() =
  223. cb(future)
  224. )
  225. proc `callback=`*(future: FutureBase, cb: proc () {.closure, gcsafe.}) =
  226. ## Clears the list of callbacks and sets the callback proc to be called when the future completes.
  227. ##
  228. ## If future has already completed then `cb` will be called immediately.
  229. ##
  230. ## It's recommended to use `addCallback` or `then` instead.
  231. future.clearCallbacks
  232. future.addCallback cb
  233. proc `callback=`*[T](future: Future[T],
  234. cb: proc (future: Future[T]) {.closure, gcsafe.}) =
  235. ## Sets the callback proc to be called when the future completes.
  236. ##
  237. ## If future has already completed then `cb` will be called immediately.
  238. future.callback = proc () = cb(future)
  239. template getFilenameProcname(entry: StackTraceEntry): (string, string) =
  240. when compiles(entry.filenameStr) and compiles(entry.procnameStr):
  241. # We can't rely on "entry.filename" and "entry.procname" still being valid
  242. # cstring pointers, because the "string.data" buffers they pointed to might
  243. # be already garbage collected (this entry being a non-shallow copy,
  244. # "entry.filename" no longer points to "entry.filenameStr.data", but to the
  245. # buffer of the original object).
  246. (entry.filenameStr, entry.procnameStr)
  247. else:
  248. ($entry.filename, $entry.procname)
  249. proc format(entry: StackTraceEntry): string =
  250. let (filename, procname) = getFilenameProcname(entry)
  251. let left = "$#($#)" % [filename, $entry.line]
  252. result = spaces(2) & "$# $#\n" % [left, procname]
  253. proc isInternal(entry: StackTraceEntry): bool =
  254. # --excessiveStackTrace:off
  255. const internals = [
  256. "asyncdispatch.nim",
  257. "asyncfutures.nim",
  258. "threadimpl.nim", # XXX ?
  259. ]
  260. let (filename, procname) = getFilenameProcname(entry)
  261. for line in internals:
  262. if filename.endsWith line:
  263. return true
  264. return false
  265. proc `$`*(stackTraceEntries: seq[StackTraceEntry]): string =
  266. result = ""
  267. when defined(nimStackTraceOverride):
  268. let entries = addDebuggingInfo(stackTraceEntries)
  269. else:
  270. let entries = stackTraceEntries
  271. var seenEntries = initHashSet[StackTraceEntry]()
  272. let L = entries.len-1
  273. var i = L
  274. var j = 0
  275. while i >= 0:
  276. if entries[i].line == reraisedFromBegin or i == 0:
  277. j = i + int(i != 0)
  278. while j < L:
  279. if entries[j].line == reraisedFromBegin:
  280. break
  281. if entries[j].line >= 0 and not isInternal(entries[j]):
  282. # this skips recursive calls sadly
  283. if entries[j] notin seenEntries:
  284. result.add format(entries[j])
  285. seenEntries.incl entries[j]
  286. inc j
  287. dec i
  288. proc injectStacktrace[T](future: Future[T]) =
  289. when not defined(release):
  290. const header = "\nAsync traceback:\n"
  291. var exceptionMsg = future.error.msg
  292. if header in exceptionMsg:
  293. # This is messy: extract the original exception message from the msg
  294. # containing the async traceback.
  295. let start = exceptionMsg.find(header)
  296. exceptionMsg = exceptionMsg[0..<start]
  297. var newMsg = exceptionMsg & header
  298. let entries = getStackTraceEntries(future.error)
  299. newMsg.add($entries)
  300. newMsg.add("Exception message: " & exceptionMsg & "\n")
  301. # # For debugging purposes
  302. # newMsg.add("Exception type:")
  303. # for entry in getStackTraceEntries(future.error):
  304. # newMsg.add "\n" & $entry
  305. future.error.msg = newMsg
  306. template readImpl(future, T) =
  307. when future is Future[T]:
  308. let fut {.cursor.} = future
  309. else:
  310. let fut {.cursor.} = Future[T](future)
  311. if fut.finished:
  312. if fut.error != nil:
  313. injectStacktrace(fut)
  314. raise fut.error
  315. when T isnot void:
  316. result = distinctBase(future).value
  317. else:
  318. # TODO: Make a custom exception type for this?
  319. raise newException(ValueError, "Future still in progress.")
  320. proc read*[T](future: Future[T] | FutureVar[T]): lent T =
  321. ## Retrieves the value of `future`. Future must be finished otherwise
  322. ## this function will fail with a `ValueError` exception.
  323. ##
  324. ## If the result of the future is an error then that error will be raised.
  325. readImpl(future, T)
  326. proc read*(future: Future[void] | FutureVar[void]) =
  327. readImpl(future, void)
  328. proc readError*[T](future: Future[T]): ref Exception =
  329. ## Retrieves the exception stored in `future`.
  330. ##
  331. ## An `ValueError` exception will be thrown if no exception exists
  332. ## in the specified Future.
  333. if future.error != nil: return future.error
  334. else:
  335. raise newException(ValueError, "No error in future.")
  336. proc mget*[T](future: FutureVar[T]): var T =
  337. ## Returns a mutable value stored in `future`.
  338. ##
  339. ## Unlike `read`, this function will not raise an exception if the
  340. ## Future has not been finished.
  341. result = Future[T](future).value
  342. proc finished*(future: FutureBase | FutureVar): bool =
  343. ## Determines whether `future` has completed.
  344. ##
  345. ## `True` may indicate an error or a value. Use `failed` to distinguish.
  346. when future is FutureVar:
  347. result = (FutureBase(future)).finished
  348. else:
  349. result = future.finished
  350. proc failed*(future: FutureBase): bool =
  351. ## Determines whether `future` completed with an error.
  352. return future.error != nil
  353. proc asyncCheck*[T](future: Future[T]) =
  354. ## Sets a callback on `future` which raises an exception if the future
  355. ## finished with an error.
  356. ##
  357. ## This should be used instead of `discard` to discard void futures,
  358. ## or use `waitFor` if you need to wait for the future's completion.
  359. assert(not future.isNil, "Future is nil")
  360. # TODO: We can likely look at the stack trace here and inject the location
  361. # where the `asyncCheck` was called to give a better error stack message.
  362. proc asyncCheckCallback() =
  363. if future.failed:
  364. injectStacktrace(future)
  365. raise future.error
  366. future.callback = asyncCheckCallback
  367. proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
  368. ## Returns a future which will complete once both `fut1` and `fut2`
  369. ## complete.
  370. var retFuture = newFuture[void]("asyncdispatch.`and`")
  371. fut1.callback =
  372. proc () =
  373. if not retFuture.finished:
  374. if fut1.failed: retFuture.fail(fut1.error)
  375. elif fut2.finished: retFuture.complete()
  376. fut2.callback =
  377. proc () =
  378. if not retFuture.finished:
  379. if fut2.failed: retFuture.fail(fut2.error)
  380. elif fut1.finished: retFuture.complete()
  381. return retFuture
  382. proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
  383. ## Returns a future which will complete once either `fut1` or `fut2`
  384. ## complete.
  385. var retFuture = newFuture[void]("asyncdispatch.`or`")
  386. proc cb[X](fut: Future[X]) =
  387. if not retFuture.finished:
  388. if fut.failed: retFuture.fail(fut.error)
  389. else: retFuture.complete()
  390. fut1.callback = cb[T]
  391. fut2.callback = cb[Y]
  392. return retFuture
  393. proc all*[T](futs: varargs[Future[T]]): auto =
  394. ## Returns a future which will complete once
  395. ## all futures in `futs` complete.
  396. ## If the argument is empty, the returned future completes immediately.
  397. ##
  398. ## If the awaited futures are not `Future[void]`, the returned future
  399. ## will hold the values of all awaited futures in a sequence.
  400. ##
  401. ## If the awaited futures *are* `Future[void]`,
  402. ## this proc returns `Future[void]`.
  403. when T is void:
  404. var
  405. retFuture = newFuture[void]("asyncdispatch.all")
  406. completedFutures = 0
  407. let totalFutures = len(futs)
  408. for fut in futs:
  409. fut.addCallback proc (f: Future[T]) =
  410. inc(completedFutures)
  411. if not retFuture.finished:
  412. if f.failed:
  413. retFuture.fail(f.error)
  414. else:
  415. if completedFutures == totalFutures:
  416. retFuture.complete()
  417. if totalFutures == 0:
  418. retFuture.complete()
  419. return retFuture
  420. else:
  421. var
  422. retFuture = newFuture[seq[T]]("asyncdispatch.all")
  423. retValues = newSeq[T](len(futs))
  424. completedFutures = 0
  425. for i, fut in futs:
  426. proc setCallback(i: int) =
  427. fut.addCallback proc (f: Future[T]) =
  428. inc(completedFutures)
  429. if not retFuture.finished:
  430. if f.failed:
  431. retFuture.fail(f.error)
  432. else:
  433. retValues[i] = f.read()
  434. if completedFutures == len(retValues):
  435. retFuture.complete(retValues)
  436. setCallback(i)
  437. if retValues.len == 0:
  438. retFuture.complete(retValues)
  439. return retFuture