asyncstreams.nim 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## Unstable API.
  10. import asyncfutures
  11. import deques
  12. type
  13. FutureStream*[T] = ref object ## Special future that acts as
  14. ## a queue. Its API is still
  15. ## experimental and so is
  16. ## subject to change.
  17. queue: Deque[T]
  18. finished: bool
  19. cb: proc () {.closure, gcsafe.}
  20. proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
  21. ## Create a new ``FutureStream``. This future's callback is activated when
  22. ## two events occur:
  23. ##
  24. ## * New data is written into the future stream.
  25. ## * The future stream is completed (this means that no more data will be
  26. ## written).
  27. ##
  28. ## Specifying ``fromProc``, which is a string specifying the name of the proc
  29. ## that this future belongs to, is a good habit as it helps with debugging.
  30. ##
  31. ## **Note:** The API of FutureStream is still new and so has a higher
  32. ## likelihood of changing in the future.
  33. result = FutureStream[T](finished: false, cb: nil)
  34. result.queue = initDeque[T]()
  35. proc complete*[T](future: FutureStream[T]) =
  36. ## Completes a ``FutureStream`` signalling the end of data.
  37. future.finished = true
  38. if not future.cb.isNil:
  39. future.cb()
  40. proc `callback=`*[T](future: FutureStream[T],
  41. cb: proc (future: FutureStream[T]) {.closure, gcsafe.}) =
  42. ## Sets the callback proc to be called when data was placed inside the
  43. ## future stream.
  44. ##
  45. ## The callback is also called when the future is completed. So you should
  46. ## use ``finished`` to check whether data is available.
  47. ##
  48. ## If the future stream already has data or is finished then ``cb`` will be
  49. ## called immediately.
  50. future.cb = proc () = cb(future)
  51. if future.queue.len > 0 or future.finished:
  52. callSoon(future.cb)
  53. proc finished*[T](future: FutureStream[T]): bool =
  54. ## Check if a ``FutureStream`` is finished. ``true`` value means that
  55. ## no more data will be placed inside the stream *and* that there is
  56. ## no data waiting to be retrieved.
  57. result = future.finished and future.queue.len == 0
  58. proc write*[T](future: FutureStream[T], value: T): Future[void] =
  59. ## Writes the specified value inside the specified future stream.
  60. ##
  61. ## This will raise ``ValueError`` if ``future`` is finished.
  62. result = newFuture[void]("FutureStream.put")
  63. if future.finished:
  64. let msg = "FutureStream is finished and so no longer accepts new data."
  65. result.fail(newException(ValueError, msg))
  66. return
  67. # TODO: Implement limiting of the streams storage to prevent it growing
  68. # infinitely when no reads are occurring.
  69. future.queue.addLast(value)
  70. if not future.cb.isNil: future.cb()
  71. result.complete()
  72. proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) =
  73. ## Returns a future that will complete when the ``FutureStream`` has data
  74. ## placed into it. The future will be completed with the oldest
  75. ## value stored inside the stream. The return value will also determine
  76. ## whether data was retrieved, ``false`` means that the future stream was
  77. ## completed and no data was retrieved.
  78. ##
  79. ## This function will remove the data that was returned from the underlying
  80. ## ``FutureStream``.
  81. var resFut = newFuture[(bool, T)]("FutureStream.take")
  82. let savedCb = future.cb
  83. var newCb =
  84. proc (fs: FutureStream[T]) =
  85. # Exit early if `resFut` is already complete. (See #8994).
  86. if resFut.finished: return
  87. # We don't want this callback called again.
  88. future.cb = nil
  89. # The return value depends on whether the FutureStream has finished.
  90. var res: (bool, T)
  91. if finished(fs):
  92. # Remember, this callback is called when the FutureStream is completed.
  93. res[0] = false
  94. else:
  95. res[0] = true
  96. res[1] = fs.queue.popFirst()
  97. resFut.complete(res)
  98. # If the saved callback isn't nil then let's call it.
  99. if not savedCb.isNil: savedCb()
  100. if future.queue.len > 0 or future.finished:
  101. newCb(future)
  102. else:
  103. future.callback = newCb
  104. return resFut
  105. proc len*[T](future: FutureStream[T]): int =
  106. ## Returns the amount of data pieces inside the stream.
  107. future.queue.len