asyncstreams.nim 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Unstable API.
  10. import asyncfutures
  11. when defined(nimPreviewSlimSystem):
  12. import std/assertions
  13. import deques
  14. type
  15. FutureStream*[T] = ref object ## Special future that acts as
  16. ## a queue. Its API is still
  17. ## experimental and so is
  18. ## subject to change.
  19. queue: Deque[T]
  20. finished: bool
  21. cb: proc () {.closure, gcsafe.}
  22. error*: ref Exception
  23. proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
  24. ## Create a new `FutureStream`. This future's callback is activated when
  25. ## two events occur:
  26. ##
  27. ## * New data is written into the future stream.
  28. ## * The future stream is completed (this means that no more data will be
  29. ## written).
  30. ##
  31. ## Specifying `fromProc`, which is a string specifying the name of the proc
  32. ## that this future belongs to, is a good habit as it helps with debugging.
  33. ##
  34. ## **Note:** The API of FutureStream is still new and so has a higher
  35. ## likelihood of changing in the future.
  36. result = FutureStream[T](finished: false, cb: nil)
  37. result.queue = initDeque[T]()
  38. proc complete*[T](future: FutureStream[T]) =
  39. ## Completes a `FutureStream` signalling the end of data.
  40. assert(future.error == nil, "Trying to complete failed stream")
  41. future.finished = true
  42. if not future.cb.isNil:
  43. future.cb()
  44. proc fail*[T](future: FutureStream[T], error: ref Exception) =
  45. ## Completes `future` with `error`.
  46. assert(not future.finished)
  47. future.finished = true
  48. future.error = error
  49. if not future.cb.isNil:
  50. future.cb()
  51. proc `callback=`*[T](future: FutureStream[T],
  52. cb: proc (future: FutureStream[T]) {.closure, gcsafe.}) =
  53. ## Sets the callback proc to be called when data was placed inside the
  54. ## future stream.
  55. ##
  56. ## The callback is also called when the future is completed. So you should
  57. ## use `finished` to check whether data is available.
  58. ##
  59. ## If the future stream already has data or is finished then `cb` will be
  60. ## called immediately.
  61. proc named() = cb(future)
  62. future.cb = named
  63. if future.queue.len > 0 or future.finished:
  64. callSoon(future.cb)
  65. proc finished*[T](future: FutureStream[T]): bool =
  66. ## Check if a `FutureStream` is finished. `true` value means that
  67. ## no more data will be placed inside the stream *and* that there is
  68. ## no data waiting to be retrieved.
  69. result = future.finished and future.queue.len == 0
  70. proc failed*[T](future: FutureStream[T]): bool =
  71. ## Determines whether `future` completed with an error.
  72. return future.error != nil
  73. proc write*[T](future: FutureStream[T], value: T): Future[void] =
  74. ## Writes the specified value inside the specified future stream.
  75. ##
  76. ## This will raise `ValueError` if `future` is finished.
  77. result = newFuture[void]("FutureStream.put")
  78. if future.finished:
  79. let msg = "FutureStream is finished and so no longer accepts new data."
  80. result.fail(newException(ValueError, msg))
  81. return
  82. # TODO: Implement limiting of the streams storage to prevent it growing
  83. # infinitely when no reads are occurring.
  84. future.queue.addLast(value)
  85. if not future.cb.isNil: future.cb()
  86. result.complete()
  87. proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) =
  88. ## Returns a future that will complete when the `FutureStream` has data
  89. ## placed into it. The future will be completed with the oldest
  90. ## value stored inside the stream. The return value will also determine
  91. ## whether data was retrieved, `false` means that the future stream was
  92. ## completed and no data was retrieved.
  93. ##
  94. ## This function will remove the data that was returned from the underlying
  95. ## `FutureStream`.
  96. var resFut = newFuture[(bool, T)]("FutureStream.take")
  97. let savedCb = future.cb
  98. proc newCb(fs: FutureStream[T]) =
  99. # Exit early if `resFut` is already complete. (See #8994).
  100. if resFut.finished: return
  101. # We don't want this callback called again.
  102. #future.cb = nil
  103. # The return value depends on whether the FutureStream has finished.
  104. var res: (bool, T)
  105. if finished(fs):
  106. # Remember, this callback is called when the FutureStream is completed.
  107. res[0] = false
  108. else:
  109. res[0] = true
  110. res[1] = fs.queue.popFirst()
  111. if fs.failed:
  112. resFut.fail(fs.error)
  113. else:
  114. resFut.complete(res)
  115. # If the saved callback isn't nil then let's call it.
  116. if not savedCb.isNil:
  117. if fs.queue.len > 0:
  118. savedCb()
  119. else:
  120. future.cb = savedCb
  121. if future.queue.len > 0 or future.finished:
  122. newCb(future)
  123. else:
  124. future.callback = newCb
  125. return resFut
  126. proc len*[T](future: FutureStream[T]): int =
  127. ## Returns the amount of data pieces inside the stream.
  128. future.queue.len