tfuturestream.nim 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. discard """
  2. file: "tfuturestream.nim"
  3. exitcode: 0
  4. output: '''
  5. 0
  6. 1
  7. 2
  8. 3
  9. 4
  10. 5
  11. Done
  12. Finished
  13. '''
  14. """
  15. import asyncdispatch
  16. var fs = newFutureStream[int]()
  17. proc alpha() {.async.} =
  18. for i in 0 .. 5:
  19. await fs.write(i)
  20. await sleepAsync(200)
  21. echo("Done")
  22. fs.complete()
  23. proc beta() {.async.} =
  24. while not fs.finished:
  25. let (hasValue, value) = await fs.read()
  26. if hasValue:
  27. echo(value)
  28. echo("Finished")
  29. asyncCheck alpha()
  30. waitFor beta()
  31. template ensureCallbacksAreScheduled =
  32. # callbacks are called directly if the dispatcher is not running
  33. discard getGlobalDispatcher()
  34. proc testCompletion() {.async.} =
  35. ensureCallbacksAreScheduled
  36. var stream = newFutureStream[string]()
  37. for i in 1..5:
  38. await stream.write($i)
  39. var readFuture = stream.readAll()
  40. stream.complete()
  41. yield readFuture
  42. let data = readFuture.read()
  43. doAssert(data.len == 5, "actual data len = " & $data.len)
  44. waitFor testCompletion()
  45. # TODO: Something like this should work eventually.
  46. # proc delta(): FutureStream[string] {.async.} =
  47. # for i in 0 .. 5:
  48. # await sleepAsync(1000)
  49. # result.put($i)
  50. # return ""
  51. # proc omega() {.async.} =
  52. # let fut = delta()
  53. # while not fut.finished():
  54. # echo(await fs.takeAsync())
  55. # echo("Finished")
  56. # waitFor omega()