streams.scm 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. ;;; Streams
  2. ;;; Copyright (C) 2024 Igalia, S.L.
  3. ;;;
  4. ;;; Licensed under the Apache License, Version 2.0 (the "License");
  5. ;;; you may not use this file except in compliance with the License.
  6. ;;; You may obtain a copy of the License at
  7. ;;;
  8. ;;; http://www.apache.org/licenses/LICENSE-2.0
  9. ;;;
  10. ;;; Unless required by applicable law or agreed to in writing, software
  11. ;;; distributed under the License is distributed on an "AS IS" BASIS,
  12. ;;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. ;;; See the License for the specific language governing permissions and
  14. ;;; limitations under the License.
  15. ;;; Commentary:
  16. ;;;
  17. ;;; Streams are host objects that are asynchronous sources or sinks of
  18. ;;; data. This module wraps streams in the Hoot port interface, as
  19. ;;; input or output ports. Although the interface with the host is
  20. ;;; somewhat abstract, it is modelled after the WhatWG Streams
  21. ;;; specification (https://streams.spec.whatwg.org/).
  22. ;;;
  23. ;;; The Streams API is somewhat vague as to what constitutes "data".
  24. ;;; For our purposes, we assume that each chunk of data is a byte array.
  25. ;;; Encoding/decoding text is the host's responsibility, if that is what
  26. ;;; is wanted.
  27. ;;;
  28. ;;; Code:
  29. (define-module (fibers streams)
  30. #:use-module (hoot ports)
  31. #:use-module (hoot ffi)
  32. #:use-module (fibers promises)
  33. #:use-module (hoot bytevectors)
  34. #:export (open-input-stream
  35. open-output-stream
  36. standard-input-stream
  37. standard-output-stream
  38. standard-error-stream))
  39. ;; length -> Uint8Array
  40. (define-foreign stream:make-chunk
  41. "rt" "stream_make_chunk" i32 -> (ref extern))
  42. ;; Uint8Array -> length
  43. (define-foreign stream:chunk-length
  44. "rt" "stream_chunk_length" (ref extern) -> i32)
  45. ;; Uint8Array, idx -> byte
  46. (define-foreign stream:chunk-ref
  47. "rt" "stream_chunk_ref" (ref extern) i32 -> i32)
  48. ;; Uint8Array, idx, byte -> ()
  49. (define-foreign stream:chunk-set!
  50. "rt" "stream_chunk_set" (ref extern) i32 i32 -> none)
  51. ;; ReadableStream -> ReadableStreamDefaultReader
  52. (define-foreign stream:get-reader
  53. "rt" "stream_get_reader" (ref extern) -> (ref extern))
  54. ;; ReadableStreamDefaultReader -> Promise<Result<Uint8Array>>
  55. (define-foreign stream:read
  56. "rt" "stream_read" (ref extern) -> (ref extern))
  57. ;; Result<Uint8Array> -> Uint8Array
  58. (define-foreign stream:result-chunk
  59. "rt" "stream_result_chunk" (ref extern) -> (ref extern))
  60. ;; Result<Uint8Array> -> 1 if done, 0 otherwise
  61. (define-foreign stream:result-done?
  62. "rt" "stream_result_done" (ref extern) -> i32)
  63. ;; WritableStream -> WritableStreamDefaultWriter
  64. (define-foreign stream:get-writer
  65. "rt" "stream_get_writer" (ref extern) -> (ref extern))
  66. ;; WritableStreamDefaultWriter, Uint8Array -> Promise<undefined>
  67. (define-foreign stream:write
  68. "rt" "stream_write" (ref extern) (ref extern) -> (ref extern))
  69. ;; WritableStreamDefaultWriter -> Promise<undefined>
  70. (define-foreign stream:close-writer
  71. "rt" "stream_close_writer" (ref extern) -> (ref extern))
  72. ;; -> ReadableStream
  73. (define-foreign stream:stdin
  74. "io" "stream_stdin" -> (ref extern))
  75. ;; -> WritableStream
  76. (define-foreign stream:stdout
  77. "io" "stream_stdout" -> (ref extern))
  78. ;; -> WritableStream
  79. (define-foreign stream:stderr
  80. "io" "stream_stderr" -> (ref extern))
  81. (define (open-input-stream stream)
  82. (define reader (stream:get-reader stream))
  83. (define did-read 0)
  84. (define pos 0)
  85. (define done? #f)
  86. (define chunk #f)
  87. (define chunk-len 0)
  88. (define default-buffer-size 1024)
  89. (define (read dst start count)
  90. (cond
  91. ((eq? pos chunk-len)
  92. (if done?
  93. 0
  94. (let ((result (await (stream:read reader))))
  95. (set! done? (not (zero? (stream:result-done? result))))
  96. (set! did-read (+ did-read chunk-len))
  97. (set! chunk (stream:result-chunk result))
  98. (set! pos 0)
  99. (set! chunk-len (stream:chunk-length chunk))
  100. (read dst start count))))
  101. (else
  102. (let ((to-copy (min count (- chunk-len pos))))
  103. (let lp ((i 0))
  104. (when (< i to-copy)
  105. (bytevector-u8-set! dst (+ start i)
  106. (stream:chunk-ref chunk (+ pos i)))
  107. (lp (1+ i))))
  108. (set! pos (+ pos to-copy))
  109. to-copy))))
  110. (define (seek offset whence) ; seek
  111. (if (and (zero? offset)
  112. (eq? whence 'cur))
  113. (+ did-read pos)
  114. (error "unreachable; stream ports are not seekable")))
  115. (make-port read
  116. #f ; write
  117. #f ; input-waiting?
  118. seek
  119. #f ; close
  120. #f ; truncate
  121. "stream" ; repr
  122. #f ; filename
  123. default-buffer-size ; read-buf-size
  124. #f ; write-buf-size
  125. #f ; r/w-random-access
  126. #f ; fold-case?
  127. #f ; private data
  128. ))
  129. (define (open-output-stream stream)
  130. (define writer (stream:get-writer stream))
  131. (define pos 0)
  132. (define default-buffer-size 1024)
  133. (define (write bv start count)
  134. (unless (zero? count)
  135. (let ((chunk (stream:make-chunk count)))
  136. (let lp ((i 0))
  137. (when (< i count)
  138. (stream:chunk-set! chunk i (bytevector-u8-ref bv (+ start i)))
  139. (lp (1+ i))))
  140. (await (stream:write writer chunk))))
  141. (set! pos (+ pos count))
  142. count)
  143. (define (seek offset whence)
  144. (if (and (zero? offset)
  145. (eq? whence 'cur))
  146. pos
  147. (error "unreachable; stream ports are not seekable")))
  148. (define (close)
  149. (stream:close-writer writer))
  150. (make-port #f
  151. write ; write
  152. #f ; input-waiting?
  153. seek
  154. close ; close
  155. #f ; truncate
  156. "stream" ; repr
  157. #f ; filename
  158. #f ; read-buf-size
  159. default-buffer-size ; write-buf-size
  160. #f ; r/w-random-access
  161. #f ; fold-case?
  162. #f ; private data
  163. ))
  164. (define (standard-input-stream)
  165. (open-input-stream (stream:stdin)))
  166. (define (standard-output-stream)
  167. (open-output-stream (stream:stdout)))
  168. (define (standard-error-stream)
  169. (open-output-stream (stream:stderr)))