123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788 |
- ;; This file is part of scheme-GNUnet.
- ;; Copyright (C) 2021 GNUnet e.V.
- ;;
- ;; scheme-GNUnet is free software: you can redistribute it and/or modify it
- ;; under the terms of the GNU Affero General Public License as published
- ;; by the Free Software Foundation, either version 3 of the License,
- ;; or (at your option) any later version.
- ;;
- ;; scheme-GNUnet is distributed in the hope that it will be useful, but
- ;; WITHOUT ANY WARRANTY; without even the implied warranty of
- ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- ;; Affero General Public License for more details.
- ;;
- ;; You should have received a copy of the GNU Affero General Public License
- ;; along with this program. If not, see <http://www.gnu.org/licenses/>.
- ;;
- ;; SPDX-License-Identifier: AGPL-3.0-or-later
- (use-modules (gnu gnunet mq-impl stream)
- (gnu gnunet mq)
- (gnu gnunet mq handler)
- (gnu gnunet utils hat-let)
- (gnu gnunet utils bv-slice)
- (gnu gnunet config db)
- (gnu gnunet concurrency repeated-condition)
- (fibers conditions)
- (fibers operations)
- (fibers)
- ((rnrs arithmetic bitwise) #:select (bitwise-ior))
- (rnrs bytevectors)
- ((rnrs io ports) #:select (open-bytevector-input-port))
- ((rnrs base) #:select (assert))
- (rnrs hashtables)
- ((rnrs exceptions) #:select (guard))
- (srfi srfi-26)
- (srfi srfi-43)
- (rnrs io ports)
- (ice-9 atomic)
- (ice-9 binary-ports)
- (ice-9 suspendable-ports)
- (ice-9 control)
- (ice-9 match)
- (ice-9 threads)
- (tests utils))
- (define (no-sender . _)
- (error "no sender!"))
- (define no-handlers (message-handlers))
- (define (no-error-handler . _)
- (error "no error handler!"))
- (test-begin "mq-stream")
- (define (check-slice-equal slice bv)
- (let^ ((!! (assert (= (slice-length slice)
- (bytevector-length bv))))
- (! slice-copy (make-bytevector (slice-length slice)))
- (! copy (bv-slice/read-write slice-copy))
- (<-- () (slice-copy! slice copy))
- (!! (bytevector=? slice-copy bv)))
- (values)))
- ;; Without interposition, and the verifier always
- ;; returns #t.
- (define (simple-handler the-type handle)
- (message-handler
- (type the-type)
- ((interpose code) code)
- ((well-formed? _) #t)
- ((handle! x) (handle x))))
- ;; Why isn't this the default? This stops the process from
- ;; exiting instead of raising an EPIPE system-error when
- ;; writing to a broken pipe.
- (sigaction SIGPIPE SIG_IGN)
- (define (handle-input*! mq input)
- (call-with-values (lambda () (handle-input! mq input))
- (lambda e
- (apply inject-error! mq e)
- (values))))
- (test-assert "messages + eof are injected in-order"
- (let^ ((! input/bv #vu8(0 4 0 1 ; Message type 1, size 4
- 0 5 0 2 1 ; Message type 2, size 6
- 0 6 0 3 2 1)) ; Message type 3, size 7
- (! input (open-bytevector-input-port input/bv))
- (! received 0)
- (! (make-handler type expected-received expected-bv)
- (simple-handler
- type
- (lambda (slice)
- (assert (equal? received expected-received))
- (check-slice-equal slice expected-bv)
- (set! received (+ 1 received)))))
- (! handler/1 (make-handler 1 0 #vu8(0 4 0 1)))
- (! handler/2 (make-handler 2 1 #vu8(0 5 0 2 1)))
- (! handler/3 (make-handler 3 2 #vu8(0 6 0 3 2 1)))
- (! handlers
- (message-handlers handler/1 handler/2 handler/3))
- (! (error-handler . arguments)
- (assert (equal? received 3))
- (assert (equal? arguments '(input:regular-end-of-file)))
- (set! received 'end-of-file))
- (! mq (make-message-queue handlers error-handler no-sender))
- (<-- () (handle-input*! mq input)))
- ;; TODO: should the port be closed?
- (assert (equal? received 'end-of-file))))
- (test-assert "overly small message is detected (--> stop)"
- (let^ ((! input/bv #vu8(0 4 0 0 ; Message type 0, size 4
- 0 3 9 ; Overly small message, size 3, type != 0
- 0 4 0 1)) ; Message type 1, size 4
- ;; The first message is well-formatted and should therefore
- ;; be injected. The second one isn't, so an appropriate error should
- ;; injected. Then the message stream is broken, so the third
- ;; message shouldn't be injected.
- (! input (open-bytevector-input-port input/bv))
- (! received 0)
- (! handler/0
- (simple-handler 0
- (lambda (slice)
- (assert (equal? received 0))
- (check-slice-equal slice #vu8(0 4 0 0))
- (set! received 1))))
- (! handlers
- (message-handlers handler/0))
- (! (error-handler . arguments)
- (assert (equal? received 1))
- ;; Whether this malformed even has a message type is dubious,
- ;; but if it has one, it will be (* 256 9).
- (assert (equal? arguments `(input:overly-small ,(* 256 9) 3)))
- (set! received 'overly-small))
- (! mq (make-message-queue handlers error-handler no-sender))
- (<-- () (handle-input*! mq input)))
- (assert (equal? received 'overly-small))))
- (test-assert "premature eof is detected (--> stop)"
- (let^ ((! input/bv #vu8(0 8 7 6 5 4))
- (! input (open-bytevector-input-port input/bv))
- (! received #f)
- (! (error-handler . arguments)
- (assert (eq? received #f))
- (assert (equal? arguments '(input:premature-end-of-file)))
- (set! received #t))
- (! mq (make-message-queue no-handlers error-handler no-sender))
- (<-- () (handle-input*! mq input)))
- (assert (equal? received #t))))
- (test-equal "envelopes are written (no blocking)"
- ;; Three messages
- #vu8(0 4 0 1
- 0 4 0 2
- 0 4 0 3)
- (let^ ((! messages #(#vu8(0 4 0 1)
- #vu8(0 4 0 2)
- #vu8(0 4 0 3)))
- (<-- (port get-bytevector) (open-bytevector-output-port))
- (! mq (make-message-queue no-handlers no-error-handler
- (lambda (_) (values))))
- (! (insert-message index message)
- (send-message! mq (slice/read-only (bv-slice/read-write message))))
- (<-- ()
- (begin
- (vector-for-each insert-message messages)
- (values)))
- (<-- ()
- ;; The implementation detail that 'send-round'
- ;; is called before 'wait!' is assumed here.
- (let/ec ec
- (handle-output! mq port ec)
- (error "unreachable"))))
- (get-bytevector)))
- (define (blocking-output-port port . block-positions)
- (define (close)
- (close-port port))
- (define (write! bv index length)
- (define p (port-position port))
- (if (or (null? block-positions)
- (< (+ p length) (car block-positions)))
- (begin (put-bytevector port bv index length) length)
- (let ((short (- (car block-positions) p)))
- (put-bytevector port bv index short)
- ((current-write-waiter) port/blocking)
- (set! block-positions (cdr block-positions))
- short)))
- (define port/blocking
- (make-custom-binary-output-port "" write! #f #f close))
- (setvbuf port/blocking 'none)
- port/blocking)
- ;; The ‘blocking’ is to make this test case more interesting.
- ;; It does not currently have any effect, but it is expected
- ;; that the implementation of handle-output! will be changed
- ;; to react to blocking, for implementing message queue
- ;; shutdown.
- (test-equal "repeatable conditions can be used (blocking)"
- '(#vu8(0 4 0 1 0 4 0 2) . 4) ; 4: number of times writing blocks
- (let^ ((! rcvar (make-repeated-condition))
- (! stop? (make-condition))
- (! stopped? (make-condition))
- (! (interrupt! mq)
- (trigger-condition! rcvar))
- (! escape/output (make-parameter #f))
- (<-- (out/internal get-bytevector)
- (open-bytevector-output-port))
- ;; block writing a few times
- (! out (blocking-output-port out/internal 0 1 3 7))
- (! (wait!)
- (perform-operation
- (apply choice-operation
- (prepare-await-trigger! rcvar)
- (if (>= 8 (port-position out/internal))
- (list (wrap-operation
- (wait-operation stop?)
- (lambda () ((escape/output)))))
- '()))))
- (! mq (make-message-queue no-handlers no-error-handler interrupt!))
- (! n/blocked 0)
- (! message/1 #vu8(0 4 0 1))
- (! message/2 #vu8(0 4 0 2)))
- (run-fibers
- (lambda ()
- (spawn-fiber
- (lambda ()
- (let/ec ec
- (parameterize ((escape/output ec)
- (current-write-waiter
- (lambda (port)
- (cond ((eq? port out)
- (set! n/blocked (+ n/blocked 1)))
- ((file-port? port)
- ;; XXX ‘Attempt to suspend fiber within
- ;; continuaton barrier’
- #;((@@ (fibers) wait-for-writable) port)
- (select '() (list port) '()))))))
- (handle-output! mq out wait!)))
- (signal-condition! stopped?)))
- (send-message! mq (bv-slice/read-write message/1))
- (sleep 0.001)
- (send-message! mq (bv-slice/read-write message/2))
- (sleep 0.001)
- (signal-condition! stop?)
- (wait stopped?)
- (cons (get-bytevector) n/blocked))
- #:parallelism 1
- #:hz 0)))
- (define (call-with-temporary-directory proc)
- (let ((file (mkdtemp (in-vicinity (or (getenv "TMPDIR") "/tmp")
- "test-XXXXXX"))))
- (with-exception-handler
- (lambda (e)
- (system* "rm" "-r" file)
- (raise-exception e))
- (lambda ()
- (call-with-values
- (lambda () (proc file))
- (lambda the-values
- (system* "rm" "-r" file)
- (apply values the-values)))))))
- (define (make-config where)
- (hash->configuration
- (alist->hash-table
- `((("service" . "UNIXPATH") . ,where)))))
- (define (call-with-socket-location proc)
- (call-with-temporary-directory
- (lambda (dir)
- (define where (in-vicinity dir "sock.et"))
- (define config (make-config where))
- (proc where config))))
- (define (connect/test config connected?)
- (define (error-handler . error)
- (match error
- ;; The connection is closed by 'test-connection'.
- ;; If 'test-connection' doesn't close the connection,
- ;; then the GC would. In both cases, this error would
- ;; happen.
- (('input:regular-end-of-file) (values))
- (('connection:connected) (signal-condition! connected?))))
- (connect/fibers config "service" no-handlers error-handler
- #:spawn call-with-new-thread))
- (define (alist->hash-table alist)
- (define h (make-hashtable (lambda (key) 0) equal?))
- (define (insert! key+value)
- (hashtable-set! h (car key+value) (cdr key+value)))
- (for-each insert! alist)
- h)
- (define (test-connection mq server-sock)
- (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
- (let ((client (car (accept server-sock))))
- (assert (equal? #vu8(0 4 0 0) (get-bytevector-n client 4)))
- (close-port client)
- #t))
- (define (yield-many)
- ;; Give the new threads some time to run before binding the socket.
- ;; This allowed a bug in the use of 'connect' to be detected.
- (let loop ((n (* 8 (+ 1 (length (all-threads))))))
- (when (> n 0)
- (yield)
- (loop (- n 1)))))
- (test-assert "connect-unix, can connect when socket is already listening"
- (call-with-socket-location
- (lambda (where config)
- (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
- (define connected? (make-condition))
- (bind listening-sock AF_UNIX where)
- (listen listening-sock 1)
- (define mq (connect/test config connected?))
- (wait connected?)
- (test-connection mq listening-sock))))
- ;; Consider the case where a service starts, has bound its socket
- ;; but is not yet listening, and a client connects.
- (test-assert "connect-unix, will connect when socket is listening"
- (call-with-socket-location
- (lambda (where config)
- (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
- (define connected? (make-condition))
- (bind listening-sock AF_UNIX where)
- (define mq (connect/test config connected?))
- (yield-many)
- (listen listening-sock 1)
- (wait connected?)
- (test-connection mq listening-sock))))
- ;; Consider the case where a client starts before a service.
- (test-assert "connect-unix, will connect when socket is bound (and listening)"
- (call-with-socket-location
- (lambda (where config)
- (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
- (define connected? (make-condition))
- (define mq (connect/test config connected?))
- (yield-many)
- (bind listening-sock AF_UNIX where)
- (listen listening-sock 1)
- (wait connected?)
- (test-connection mq listening-sock))))
- ;; Consider the case where a service starts and stops,
- ;; a client connects and the service restarts.
- (test-assert
- "connect-unix, will connect even if there's an old socket lying around"
- (call-with-socket-location
- (lambda (where config)
- (let ((old-sock (socket PF_UNIX SOCK_STREAM 0)))
- (bind old-sock AF_UNIX where)
- (close-port old-sock))
- (define connected? (make-condition))
- (define mq (connect/test config connected?))
- (yield-many)
- (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
- (yield-many)
- ;; Delete the old socket, otherwise the 'bind' below results in ‘address alreay in use’
- (delete-file where)
- (yield-many)
- (bind listening-sock AF_UNIX where)
- (yield-many)
- (listen listening-sock 1)
- (wait connected?)
- (test-connection mq listening-sock))))
- ;; Consider the case where GNUnet version N uses stream sockets,
- ;; GNUnet version M uses datagram sockets, the system initially
- ;; uses GNUnet version N, a client for version M is started
- ;; (initially failing to connect to the server), then the system
- ;; switches to GNUnet version M.
- (test-assert
- "connect-unix, will connect even if previous socket is different type"
- (call-with-socket-location
- (lambda (where config)
- (define old-sock (socket PF_UNIX SOCK_DGRAM 0))
- (bind old-sock AF_UNIX where)
- ;; Datagram sockets don't support 'listen', so don't
- ;; call 'listen' with 'old-sock'.
- (define connected? (make-condition))
- (define mq (connect/test config connected?))
- (yield-many)
- (close-port old-sock)
- (delete-file where)
- (define new-sock (socket PF_UNIX SOCK_STREAM 0))
- (bind new-sock AF_UNIX where)
- (listen new-sock 1)
- (wait connected?)
- (test-connection mq new-sock))))
- ;; Consider a system that creates directories and the socket
- ;; with world-unreadable, world-unexecutable permissions at
- ;; first and makes the permissions more permissive later.
- (test-assert
- "connect-unix, will connect even if permissions are temporarily wrong"
- (call-with-temporary-directory
- (lambda (tmpdir)
- ;; Permissions on sockets can be unreliable on some systems,
- ;; so modify the permissions of a directory instead.
- (define subdir (in-vicinity tmpdir "dir"))
- (mkdir subdir)
- (define where (in-vicinity subdir "sock.et"))
- (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
- (bind listening-sock AF_UNIX where)
- (listen listening-sock 1)
- (chmod subdir #o000) ; unreadable
- (define connected? (make-condition))
- (define mq (connect/test (make-config where) connected?))
- (yield-many)
- ;; make it readable again
- ;; (and writable such that 'tmpdir' can be deleted).
- (chmod subdir #o700)
- (wait connected?)
- (test-connection mq listening-sock))))
- (test-assert "port->message-queue, can send/receive between pairs"
- (run-fibers
- (lambda ()
- ;; Create two message queues connected to each other
- ;; over a socket pair. Send '1' over the first message queue
- ;; and expect to receive it from the second queue, and send '0'
- ;; over the second message queue and expect to receive it from
- ;; the first.
- (define pair (socketpair AF_UNIX SOCK_STREAM 0))
- ;; As 'fibers' is used instead of POSIX threads, set O_NONBLOCK.
- (make-nonblocking! (car pair))
- (make-nonblocking! (cdr pair))
- (define received/0 #f)
- (define received/1 #f)
- (define done/0 (make-condition))
- (define done/1 (make-condition))
- (define handlers/0
- (message-handlers
- (simple-handler 0
- (lambda (slice)
- (assert (not received/0))
- (check-slice-equal slice #vu8(0 4 0 0))
- (set! received/0 #t)
- (signal-condition! done/0)))))
- (define handlers/1
- (message-handlers
- (simple-handler 1
- (lambda (slice)
- (assert (not received/1))
- (check-slice-equal slice #vu8(0 4 0 1))
- (set! received/1 #t)
- (signal-condition! done/1)))))
- (define error-handler no-error-handler)
- (define mq/0 (port->message-queue (car pair) handlers/0 error-handler))
- (define mq/1 (port->message-queue (cdr pair) handlers/1 error-handler))
- (send-message! mq/0 (bv-slice/read-write #vu8(0 4 0 1)))
- (send-message! mq/1 (bv-slice/read-write #vu8(0 4 0 0)))
- (wait done/0)
- (wait done/1)
- #t)))
- (define (two-sockets)
- (define sp (socketpair AF_UNIX SOCK_STREAM 0))
- (make-nonblocking! (car sp))
- (make-nonblocking! (cdr sp))
- (values (car sp) (cdr sp)))
- (test-assert "input eof detected --> handle-input/output! stops (port->message-queue)"
- (call-with-spawner/wait
- (lambda (spawn)
- (define-values (alpha beta) (two-sockets))
- (define end-of-file (make-condition))
- (define (error-handler . e)
- (assert (equal? e '(input:regular-end-of-file)))
- ;; only one end-of-file notification
- (assert (signal-condition! end-of-file)))
- (define mq/alpha
- (port->message-queue alpha no-handlers error-handler
- #:spawn spawn))
- ;; Give the fibers started by 'port->message-queue' a chance to block.
- (yield-many)
- ;; Let 'beta' stop writing, such that 'alpha' receives an end-of-file.
- ;; But keep the 'write' end of 'alpha' / 'read' end of 'beta' open to
- ;; complicate matters.
- (shutdown beta 1)
- (wait end-of-file)
- (define sent? (make-atomic-box #f))
- ;; Attempt to write a message, even though the connection is (half-)closed.
- ;; It should not actually be sent.
- (send-message! mq/alpha (bv-slice/read-write #vu8(0 4 0 0))
- #:notify-sent!
- (lambda ()
- ;; strictly speaking, this does not mean the message was
- ;; sent, but it's close enough for this test's purposes.
- (atomic-box-set! sent? #t)))
- ;; Give 'handle-output!' a chance to (faultively) sent the message.
- (yield-many)
- (sleep 0.1) ; the yield-many above is apparently insufficient
- (assert (not (atomic-box-ref sent?)))
- ;; If it didn't try to sent the message, that presumably means the
- ;; 'handle-output!' fiber has completed.
- #t)
- ;; Should make 'yield-many' less fragile.
- #:parallelism 1))
- (define (%false-if-broken-pipe thunk)
- "Call @var{thunk} in an environment where EPIPE system errors are caught.
- If an EPIPE system error is raised, return #f."
- (guard (c ((and (eq? 'system-error (exception-kind c))
- (= EPIPE (car (list-ref (exception-args c) 3))))
- #f))
- (thunk)))
- (define-syntax-rule (false-if-broken-pipe exp exp* ...)
- ;; See %false-if-broken-pipe
- (%false-if-broken-pipe
- (lambda ()
- exp exp* ...)))
- (test-assert "closed for writing --> handle-input! stops (port->message-queue)"
- (call-with-spawner/wait
- (lambda (spawn)
- (define-values (alpha beta) (two-sockets))
- (define received? (make-atomic-box #f))
- (define end-of-file (make-condition))
- (define (receive! slice)
- (assert (not (atomic-box-ref received?)))
- (atomic-box-set! received? #t)
- (error "shouldn't be received"))
- (define (error-handler . e)
- (pk 'e e)
- (assert (equal? e '(input:regular-end-of-file)))
- ;; only one end-of-file notification
- (assert (signal-condition! end-of-file)))
- (define mq/alpha
- (port->message-queue alpha
- (message-handlers
- (simple-handler 0 receive!))
- error-handler
- #:spawn spawn))
- ;; Give the new fibers a chance to block.
- (yield-many)
- ;; Let 'beta' stop reading, such that 'alpha' is closed for writing.
- ;; But keep the 'read' end of 'alpha' open to complicate matters.
- (shutdown beta 0)
- ;; TODO: fibers doesn't have an option for waiting for EPOLLRDHUP
- ;; or EPOLLERR, so the code cannot immediately detect that 'alpha'
- ;; cannot be written to anymore. Instead, 'handle-output!' will
- ;; detect the unwritability when it tries to write something.
- (send-message! mq/alpha (bv-slice/read-write #vu8(0 4 0 9)))
- ;; The end-of-file error should be injected, even though the socket
- ;; is half-duplex and only the write end is closed, because message
- ;; queues do not have a notion of half-duplex connections.
- (pk 'waiting)
- (wait end-of-file)
- ;; Attempt to read a message (after buffering a message), even though
- ;; the connection is half-closed. Ignore broken pipe errors here:
- ;; if a ‘broken pipe’ error happens here, that means ALPHA was closed,
- ;; which is correct (tested in "port is closed at output").
- (false-if-broken-pipe (put-bytevector beta #vu8(0 4 0 0)))
- ;; As the 'handle-input!' fiber should have exited already, 'receive!'
- ;; shouldn't be called.
- (yield-many)
- (sleep 0.1) ; might not be necessary anymore
- #t)
- ;; Should make 'yield-many' less fragile.
- #:parallelism 1))
- ;; This detects the absence of the parametrisation of 'current-write-waiter'.
- (test-assert "writer blocking and closed for reading --> all fibers stop"
- (call-with-spawner/wait
- (lambda (spawn)
- (define-values (alpha beta) (two-sockets))
- ;; Fill the writing pipe, such that the writing fiber will block.
- #;(fcntl alpha SO_SNDBUF 1) ; doesn't work on sockets on Linux ..
- ;; Simply writing a byte isn't sufficient, as the kernel can
- ;; impose a minimum buffer size.
- (define old-waiter (current-write-waiter))
- (let/ec ec
- (parameterize ((current-write-waiter
- (lambda (port)
- (if (eq? port alpha)
- (ec)
- ;; maybe a backtrace
- (old-waiter port)))))
- (define bv (make-bytevector 4096))
- (let loop ()
- (put-bytevector alpha bv)
- (loop))))
- (define closed-condition (make-condition))
- (define (error-handler e)
- (assert (eq? e 'input:regular-end-of-file))
- (unless (signal-condition! closed-condition)
- (error "already saw end of file")))
- (define mq (port->message-queue alpha no-handlers error-handler
- #:spawn spawn))
- (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
- (define (notify-sent)
- ;; if the mq-stream implementation implemented buffering by itself
- ;; this would actually be possible.
- (error "impossible, should be blocking by now!"))
- (send-message! mq
- (bv-slice/read-write #vu8(0 4 0 0))
- #:notify-sent! notify-sent)
- (pk 'send2)
- ;; Give the write fiber a chance to block.
- (yield-many)
- (sleep 0.1)
- (shutdown alpha 0)
- #t)
- ;; Should make 'yield-many' less fragile.
- #:parallelism 1))
- ;; ^ if this test blocks, that means not all fibers have stopped
- (test-assert "output buffers are flushed"
- (let^ ((<-- (alpha beta) (two-sockets))
- ;; In Guile, by default, new sockets are unbuffered.
- ;; Add a buffer.
- (<-- _ (setvbuf alpha 'block 64))
- (<-- _ (setvbuf beta 'block 64))
- (! mq (make-message-queue no-handlers no-error-handler
- (lambda (_) (values))))
- ;; Send a message. As the message is smaller than the buffer size,
- ;; it will be buffered unless 'handle-output!' takes special action.
- (! _ (send-message! mq (slice/read-only (bv-slice/read-write #vu8(0 4 0 0)))))
- (! waited?
- (let/ec ec
- (let ((old-waiter (current-write-waiter)))
- (parameterize ((current-write-waiter
- (lambda (p)
- (if (eq? p alpha)
- (ec #t)
- (old-waiter p)))))
- (handle-output! mq alpha
- (lambda ()
- (pk 'waiting...)
- ((pk 'escaping ec) #f)
- (pk 'escaped!)))))))
- ;; If HANDLE-OUTPUT! blocked, that meant the underlying system call
- ;; was called, so the kernel got (some of the) data and all is well
- ;; -- except that the kernel buffer size of 4 bytes seems rather tiny.
- (? waited?
- (format (current-error-port) "≤4 bytes seems rather small~%")
- #t)
- (! old-read-waiter (current-read-waiter)) )
- ;; If waited? is false, that means HANDLE-OUTPUT! succeeded and now
- ;; the bytes are in Guile's or the kernel's buffers. Test if they
- ;; are in the kernel's.
- (let/ec ec
- (equal? #vu8(0 4 0 0)
- (parameterize ((current-read-waiter
- (lambda (p)
- (if (eq? p beta)
- (ec #f)
- (old-read-waiter p)))))
- (get-bytevector-some beta))))))
- (define (error-handler/regular . e)
- (match e
- ('(input:regular-end-of-file) (values))
- (_ (error "what ~a" e))))
- (test-assert "port is closed at input eof"
- (call-with-spawner/wait
- (lambda (spawn)
- (define-values (alpha beta) (two-sockets))
- (define q (port->message-queue alpha no-handlers error-handler/regular
- #:spawn spawn))
- (shutdown alpha 0)
- (yield-many)
- (sleep 0.05) ;; XXX yield-many above is unsufficient
- (port-closed? alpha))
- #:parallelism 1)) ; to make the use of yield-many less fragile
- (test-assert "port is closed at output eof"
- (call-with-spawner/wait
- (lambda (spawn)
- (define-values (alpha beta) (two-sockets))
- (define mq (port->message-queue alpha no-handlers error-handler/regular
- #:spawn spawn))
- (shutdown alpha 1)
- ;; XXX It's not possible for the output eof to be waited for currently,
- ;; so attempt to send a message to wake up the writing fiber.
- (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
- (yield-many)
- (sleep 0.05) ;; XXX yield-many above is unsufficient
- (port-closed? alpha))
- #:parallelism 1)) ; to make the use of yield-many less fragile
- (test-assert "port is closed at input/output eof"
- (call-with-spawner/wait
- (lambda (spawn)
- (define-values (alpha beta) (two-sockets))
- (define q (port->message-queue alpha no-handlers error-handler/regular
- #:spawn spawn))
- (shutdown alpha 2)
- (yield-many)
- (sleep 0.05) ;; XXX yield-many above is unsufficient
- (port-closed? alpha))
- #:parallelism 1)) ; to make the use of yield-many less fragile
- (test-assert "fibers stop and port closed after close! (directly after creation)"
- (let^ ((<-- (alpha beta) (two-sockets)))
- (call-with-spawner/wait
- (lambda (spawn)
- (define q (port->message-queue alpha no-handlers error-handler/regular
- #:spawn spawn))
- (close-queue! q))
- #:parallelism 1)
- (port-closed? alpha)))
- (test-assert "fibers stop and port closed after close! (some times passes)"
- (let^ ((<-- (alpha beta) (two-sockets)))
- (call-with-spawner/wait
- (lambda (spawn)
- (define q (port->message-queue alpha no-handlers error-handler/regular
- #:spawn spawn))
- (yield-many)
- (sleep 0.01)
- (close-queue! q))
- #:parallelism 1)
- (port-closed? alpha)))
- (test-assert "can close while still connecting (--> interrupted)"
- (call-with-socket-location
- (lambda (where config)
- (call-with-spawner/wait
- (lambda (spawn)
- (define interrupted? #f)
- (define cond (make-condition))
- (define (error-handler . e)
- (match e
- ('(connection:interrupted)
- (begin
- (pk 'interrupted)
- (assert (not interrupted?))
- (set! interrupted? #t)
- (signal-condition! cond)))
- (_ (error "what ~a" e))))
- (define mq (connect/fibers config "service" no-handlers error-handler
- #:spawn spawn))
- (close-queue! mq)
- (wait cond)
- #t)))))
- (test-assert "can close after being connected (--> regular-end-of-file)"
- (call-with-socket-location
- (lambda (where config)
- (call-with-spawner/wait
- (lambda (spawn)
- (define connected? #f)
- (define connected-condition (make-condition))
- (define disconnected? #f)
- (define disconnected-condition (make-condition))
- (define (error-handler . e)
- (match e
- ('(connection:connected)
- (pk 'connected)
- (assert (not connected?))
- (set! connected? #t)
- (signal-condition! connected-condition))
- ('(input:regular-end-of-file)
- (assert connected?)
- (assert (not disconnected?))
- (set! disconnected? #t)
- (signal-condition! disconnected-condition))
- (_ (error "what ~a" e))))
- (define mq (connect/fibers config "service" no-handlers error-handler
- #:spawn spawn))
- (spawn
- (lambda ()
- (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
- (bind listening-sock AF_UNIX where)
- (listen listening-sock 1)
- ;; Make it non-blocking (because guile-fibers is used)
- (make-nonblocking! listening-sock)
- ;; Not actually interested in the return value
- (accept listening-sock)))
- (wait connected-condition)
- (assert (not disconnected?))
- (close-queue! mq)
- (wait disconnected-condition)
- #t)))))
- (test-end "mq-stream")
|