123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972 |
- ;; This file is part of GNUnet.
- ;; Copyright (C) 2012, 2018, 2021, 2022 GNUnet e.V.
- ;;
- ;; GNUnet is free software: you can redistribute it and/or modify it
- ;; under the terms of the GNU Affero General Public License as published
- ;; by the Free Software Foundation, either version 3 of the License,
- ;; or (at your option) any later version.
- ;;
- ;; GNUnet is distributed in the hope that it will be useful, but
- ;; WITHOUT ANY WARRANTY; without even the implied warranty of
- ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- ;; Affero General Public License for more details.
- ;;
- ;; You should have received a copy of the GNU Affero General Public License
- ;; along with this program. If not, see <http://www.gnu.org/licenses/>.
- ;;
- ;; SPDX-License-Identifier: AGPL-3.0-or-later
- ;; Author: Florian Dold
- ;; Author: Christian Grothoff
- ;; Author: Maxime Devos
- (define-module (tests mq))
- (use-modules (ice-9 control)
- (tests utils) ; for conservative-gc?
- (fibers conditions)
- (fibers)
- (srfi srfi-1)
- (srfi srfi-26)
- (srfi srfi-39)
- (srfi srfi-43)
- (srfi srfi-64)
- (srfi srfi-111)
- (ice-9 match)
- ((rnrs base) #:select (assert mod))
- ((rnrs exceptions) #:select (guard))
- ((rnrs conditions) #:select (condition-who))
- ((rnrs arithmetic bitwise)
- #:select (bitwise-ior))
- (gnu gnunet netstruct syntactic)
- ((gnu gnunet netstruct procedural)
- #:select (u32/big))
- (gnu gnunet mq prio-prefs)
- (gnu gnunet mq prio-prefs2)
- (gnu gnunet util struct)
- (gnu gnunet utils bv-slice)
- (gnu gnunet utils hat-let)
- (gnu gnunet utils cut-syntax)
- ((gnu extractor enum)
- #:select (symbol-value value->index))
- (gnu gnunet message protocols)
- (gnu gnunet mq)
- (gnu gnunet mq envelope)
- (gnu gnunet mq handler)
- (quickcheck property)
- (quickcheck)
- (quickcheck arbitrary))
- ;; The client code sends the numbers 0 to
- ;; NUM_TRANSMISSIONS-1 over the message queue.
- ;; The notify-sent callback verifies whether
- ;; messages were sent in-order. The fake
- ;; ‘sender’ procedure verifies whether it received
- ;; the messages in order.
- ;;
- ;; Note that in more realistic situations, some
- ;; queueing can happen! A very special case
- ;; is being tested here.
- (define NUM_TRANSMISSIONS 100)
- (eval-when (expand load eval)
- (define-type /:msg:our-test:dummy
- (structure/packed
- (synopsis "A test message, containing an index")
- (documentation
- "The first time, a message with index 0 is sent.
- Then each time the index is increased.")
- (field (header /:message-header))
- (field (index u32/big)))))
- (define (index->dummy i)
- (let ((s (make-slice/read-write
- (sizeof /:msg:our-test:dummy '()))))
- (define-syntax set%!/dummy (cut-syntax set%! /:msg:our-test:dummy <> s <>))
- (set%!/dummy '(header type)
- (value->index (symbol-value message-type msg:util:dummy)))
- (set%!/dummy '(header size) (sizeof /:msg:our-test:dummy '()))
- (set%!/dummy '(index) i)
- s))
- (define (dummy->index s)
- (read% /:msg:our-test:dummy '(index) s))
- (define (client mq notify-sent-box sent-box)
- (define (see i)
- (if (= i (unbox notify-sent-box))
- (set-box! notify-sent-box (+ 1 i))
- (error "messages were sent out-of-order (index: ~a) (notify-sent: ~a) (sent: ~a)"
- i
- (unbox notify-sent-box)
- (unbox sent-box))))
- (do ((i 0 (+ 1 i)))
- ((>= i NUM_TRANSMISSIONS))
- (send-message! mq (index->dummy i)
- #:notify-sent! (cut see i))))
- (define (send-proc notify-sent-box sent-box envelope)
- (attempt-irrevocable-sent!
- envelope
- ((go message priority)
- (let ((index (dummy->index message)))
- (unless (= (+ index 1) (unbox notify-sent-box))
- (error "messages are being sent out-of-order or with queueing (index: ~a) (notify-sent: ~a) (sent: ~a)"
- index
- (unbox notify-sent-box)
- (unbox sent-box)))
- (unless (= index (unbox sent-box))
- (error "dunno (index: ~a) (notify-sent: ~a) (sent: ~a)"
- index
- (unbox notify-sent-box)
- (unbox sent-box)))
- (set-box! sent-box (+ 1 index))
- (values)))
- ((cancelled)
- (error "how did this cancelling happen?"))
- ((already-sent)
- (error "forgot to remove envelope from queue"))))
- (define no-handlers (message-handlers))
- (define (no-error-handler . what)
- (error "were did this error come from?"))
- (test-equal "in-order, no queuing"
- (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS)
- (let* ((notify-sent-box (box 0))
- (sent-box (box 0))
- (mq (make-message-queue no-handlers
- no-error-handler
- (make-one-by-one-sender
- (cut send-proc notify-sent-box sent-box <>)))))
- (client mq notify-sent-box sent-box)
- (list (unbox notify-sent-box) (unbox sent-box))))
- ;; Simulate buffering, by only ‘truly’ sending after each three messages.
- ;; This does _not_ test the queueing code! See the next test for that.
- ;; Make sure messages aren't lost, and they are still be sent in-order!
- ;;
- ;; (Assuming the sender is well-implemented. A buggy sender could send
- ;; things out-of-order.)
- (define (send-proc2 notify-sent-box sent-box mod-box stashed envelope)
- (let ((first-free (vector-index not stashed))
- (expected-filled (unbox mod-box)))
- (unless (= (or first-free 0) expected-filled)
- (error "did we lose a message?"))
- (set-box! mod-box (mod (+ 1 expected-filled) (vector-length stashed)))
- (if (not first-free)
- (begin
- (vector-map!
- (lambda (i envelope)
- (send-proc notify-sent-box sent-box envelope)
- #f)
- stashed)
- (vector-set! stashed 0 envelope))
- ;; @var{stashed} is not yet full; send the
- ;; envelope later!
- (vector-set! stashed first-free envelope))
- (values)))
- (define (expected-sent n k)
- (- n (let ((mod (mod n k)))
- (if (= mod 0)
- k
- mod))))
- (define k 3)
- (test-equal "in-order, some buffering"
- (map (cut expected-sent <> 3)
- (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
- (let* ((notify-sent-box (box 0))
- (sent-box (box 0))
- (mod-box (box 0))
- (stashed (make-vector k #f))
- (mq (make-message-queue no-handlers
- no-error-handler
- (make-one-by-one-sender
- (cut send-proc2 notify-sent-box sent-box mod-box stashed <>)))))
- (client mq notify-sent-box sent-box)
- (list (unbox notify-sent-box) (unbox sent-box))))
- ;; Test the queueing code by only flushing
- ;; the queue every N messages. Also check,
- ;; using flushing-allowed?, that sending
- ;; only happens when we expect it to happen.
- (define flushing-allowed?
- (make-parameter #f))
- (define (send-proc/check notify-sent-box sent-box envelope)
- (assert (flushing-allowed?))
- (send-proc notify-sent-box sent-box envelope))
- (define (make-every-n proc k)
- "Make a sender using @var{proc} every @var{k}
- invocations, and at other times doing nothing."
- ;; Should theoretically be an atomic, but the test is singly-threaded,
- ;; so don't bother.
- (define n-mod-k 0)
- (lambda (mq)
- (assert (not (flushing-allowed?)))
- (set! n-mod-k (+ 1 n-mod-k))
- (when (>= n-mod-k k)
- (set! n-mod-k 0)
- (parameterize ((flushing-allowed? #t))
- (proc mq)))))
- (test-equal "in-order, some queueing"
- (map (cut expected-sent <> 3)
- (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
- (let* ((notify-sent-box (box 0))
- (sent-box (box 0))
- (mq (make-message-queue no-handlers
- no-error-handler
- (make-every-n
- (make-one-by-one-sender
- (cut send-proc/check notify-sent-box sent-box <>))
- 3))))
- (client mq notify-sent-box sent-box)
- (list (unbox notify-sent-box) (unbox sent-box))))
- ;; Test that concurrency interacts well with queueing.
- ;;
- ;; The situation we consider, is a number
- ;; of different threads concurrently sending messages.
- ;; The test verifies whether all messages were, in fact, sent.
- ;;
- ;; To make things complicated, some queueing is introduced.
- ;; The sender will only proceed each time the current thread
- ;; has tried to send @var{k/thread} messages, and the sender
- ;; will only try to send at most @code{(+ k/thread e)}, where
- ;; @var{e} is a random number from @var{e/min} to @var{e/max}.
- ;; The tests detect the following potential problems in the code
- ;; by crashing (but not always, so you may need to re-run a few
- ;; times, three times tends to be enough in practice for me):
- ;;
- ;; * Replacing 'old' with 'queue' in
- ;; unless (pfds:queue-empty? old)
- ;; * Replacing 'old' with 'queue' in
- ;; receive (envelope new) (pfds:dequeue old)
- ;; * Replacing the first 'old' with 'queue' in
- ;; (eq? old (swap! old new)), in 'make-one-by-one-sender'
- ;; * Replacing the second 'old' with 'queue' in
- ;; (eq? old (swap! old new)), in 'make-one-by-one-sender'
- ;; * Replacing 'old' by 'queue' in
- ;; (pfds:enqueue old envelope)
- ;; (only detected infrequently, odds 1 to 7 or so)
- ;; * Replacing the first 'old' by 'queue' in
- ;; (eq? old (swap-queue! old new))
- ;; in 'send-message!'
- ;; * Replacing the second 'old' by 'queue' in
- ;; (eq? old (swap-queue! old new))
- ;; in 'send-message!'
- ;;
- ;; The following problems cause a hang when testing:
- ;; * Replacing 'queue' by 'old' in (spin queue)
- ;; in 'make-one-by-one-sender'
- ;; * Replacing 'queue' by 'old' in (spin queue)
- ;; in 'send-message!'.
- ;;
- ;; The following problems cause a hang in a preceding
- ;; test:
- ;;
- ;; * Replacing the first 'old' by 'new' in
- ;; (eq? old (swap-queue! old new))
- ;; in 'send-message!'
- ;; * Replacing 'queue' by 'old' in
- ;; (spin queue)
- ;; in 'send-message!'
- ;; * Replacing 'queue' by 'new' in
- ;; (spin queue)
- ;; in 'send-message!'
- ;;
- ;; Some potential problems currently remain undetected:
- ;; * Replacing the 'new' by 'queue' in
- ;; (pfds:queue-length new)
- ;;
- ;; However, it is only for printing a warning
- ;; when the queue is rather full. Being slightly
- ;; off in queue length shouldn't be a problem
- ;; there, as the 'maximum reasonable bound'
- ;; is just a wild guess and not some exact
- ;; cut-off.
- ;;
- ;; Cancellation will be tested separately.
- (define random/thread
- (fluid->parameter (make-unbound-fluid)))
- (define k/thread 12)
- (define e/min 2)
- (define e/max 7)
- (define N_MESSAGES 1000)
- (define N_THREAD 40)
- ;; List of (thread-index . message-index)
- ;; received by current thread.
- (define received/thread
- (fluid->parameter (make-unbound-fluid)))
- (define i/thread
- (fluid->parameter (make-unbound-fluid)))
- ;; The sending is happening concurrently,
- ;; so in-order delivery cannot be guaranteed.
- ;; Thus, requesting in-order delivery seems
- ;; silly.
- (define prio
- (bitwise-ior
- (prio->integer 'prio:background)
- (value->index (symbol-value priority-preference
- pref:out-of-order))))
- (eval-when (expand load eval)
- (define-type /:msg:our-test:concurrency
- (structure/packed
- (synopsis "A test message, containing an thread and message index")
- (documentation
- "The first time, a message with index 0 is sent.
- Then each time the index is increased.")
- (field (header /:message-header))
- (field (index u32/big))
- (field (thread u32/big)))))
- (define (make-thread-message thread-index i)
- (let ((s (make-slice/read-write
- (sizeof /:msg:our-test:concurrency '()))))
- (define-syntax set%!/concurrency
- (cut-syntax set%! /:msg:our-test:concurrency <> s <>))
- (set%!/concurrency
- '(header type) (value->index (symbol-value message-type msg:util:dummy)))
- (set%!/concurrency '(header size) (sizeof /:msg:our-test:concurrency '()))
- (set%!/concurrency '(index) i)
- (set%!/concurrency '(thread) thread-index)
- s))
- (define (decode-thread-message s)
- (cons (read% /:msg:our-test:concurrency '(thread) s)
- (read% /:msg:our-test:concurrency '(index) s)))
- (define (make-every-n/thread proc k)
- "Make a sender using @var{proc} every @var{k}
- invocations, and at other times doing nothing.
- @code{i/thread} is used for state."
- (lambda (mq)
- (assert (not (flushing-allowed?)))
- (i/thread (+ 1 (i/thread)))
- (when (>= (i/thread) k)
- (i/thread 0)
- (parameterize ((flushing-allowed? #t))
- (proc mq)))))
- (define (thread mq thread-index)
- (parameterize ((received/thread '())
- (i/thread 0)
- (random/thread
- (seed->random-state thread-index)))
- (do ((i 0 (+ 1 i)))
- ((>= i N_MESSAGES))
- (send-message! mq (make-thread-message thread-index i)
- #:priority prio))
- (received/thread)))
- (define (make-restricted-sender how-many make-sender inner-proc)
- "Make a sender that, when called, tries to send @code{(how-many)}
- messages, using @var{make-sender} and @var{inner-proc}."
- (define escape-thunk
- (fluid->parameter (make-unbound-fluid)))
- (define count
- (fluid->parameter (make-unbound-fluid)))
- (define max-count
- (fluid->parameter (make-unbound-fluid)))
- (define (count!)
- (count (+ 1 (count)))
- (when (= (count) (max-count))
- (count 0)
- ((escape-thunk))))
- (lambda (mq)
- (let/ec ec
- (parameterize ((max-count (how-many))
- (count 0)
- (escape-thunk ec))
- ((make-sender
- (lambda (envelope)
- (inner-proc envelope)
- ;; Check 'count' AFTER some things
- ;; have been sent! Otherwise, the
- ;; message is lost.
- (count!)
- (values)))
- mq)))))
- ;; After all threads have exited, we'll ‘drain’ out
- ;; the left-overs.
- (define drain? (make-parameter #f))
- (define (make-sender/choice y? x y)
- "When @code{(y?)}, send with @code{y}. Else, send
- with @code{x}."
- (lambda (mq)
- (if (y?)
- (y mq)
- (x mq))))
- (define (inner-send envelope)
- (attempt-irrevocable-sent!
- envelope
- ((go message priority)
- (received/thread (cons (decode-thread-message message)
- (received/thread)))
- (values))
- ((cancelled) (error "what/cancelled"))
- ((already-sent) (error "what/already-sent"))))
- (define sender/thread
- (make-sender/choice
- drain?
- (make-every-n/thread
- (make-restricted-sender
- (lambda ()
- (+ k/thread e/min
- (random (- e/max e/min -1) (random/thread))))
- make-one-by-one-sender
- inner-send)
- k/thread)
- (make-one-by-one-sender inner-send)))
- (define (results->array per-thread-sent)
- ;; A bit array of messages the send function has
- ;; seen.
- (define a (make-typed-array 'b #f N_MESSAGES N_THREAD))
- (define (visit-message message)
- (define thread-index (car message))
- (define message-index (cdr message))
- (array-set! a #t message-index thread-index))
- (define (visit-per-thread _ messages)
- (for-each visit-message messages))
- (vector-for-each visit-per-thread per-thread-sent)
- a)
- (define (array-missing a)
- (define missing '())
- (array-index-map! a
- (lambda (i j)
- (define found (array-ref a i j))
- (unless found
- (set! missing `((,i . ,j) . ,missing)))
- found))
- missing)
- ;; But possibly out-of-order!
- (test-equal "nothing lost when sending concurrently"
- '()
- (let* ((mq (make-message-queue no-handlers
- no-error-handler
- sender/thread))
- (thread-indices (iota N_THREAD))
- ;; The ‘drained-out’ messages are put
- ;; at index N_THREAD.
- (results (make-vector (+ 1 N_THREAD)))
- (done? (vector-unfold (lambda (_) (make-condition)) N_THREAD))
- (ready? (make-condition)))
- (run-fibers
- (lambda ()
- (define (run! thread-index)
- (spawn-fiber
- (lambda ()
- (wait ready?)
- (vector-set! results thread-index
- (thread mq thread-index))
- (signal-condition! (vector-ref done? thread-index)))))
- (for-each run! thread-indices)
- ;; Try to start every thread at the same time!
- (signal-condition! ready?)
- ;; #:drain? #t with parallelism is broken,
- ;; see <https://github.com/wingo/fibers/issues/47>.
- ;; So explicitely wait on each fiber.
- (vector-for-each (lambda (_ c) (wait c)) done?))
- #:drain? #t
- ;; No need
- #:install-suspendable-ports? #f
- ;; More interrupts --> more switches
- ;; --> more test coverage. At least,
- ;; that's the idea. Not really tested.
- #:hz 700)
- ;; Drain the left-overs.
- (parameterize ((drain? #t)
- (received/thread '()))
- (try-send-again! mq)
- (vector-set! results N_THREAD (received/thread)))
- (array-missing (results->array results))))
- ;; Test message injection / handling (no exceptions).
- (define mhp (vector-unfold (lambda (_) (make-parameter #f)) 4))
- (define mhv (vector-unfold (lambda (_) (make-parameter #f)) 4))
- (define mh (apply message-handlers
- (map (lambda (i)
- (message-handler
- (type i)
- ((interpose code) code)
- ((well-formed? slice)
- (((vector-ref mhv i)) slice))
- ((handle! slice)
- (((vector-ref mhp i)) slice))))
- (iota (vector-length mhp)))))
- ;; FWIW, passing #f is not really allowed.
- (define mq (make-message-queue mh #f #f))
- (test-eq "when injecting, handled message is eq?"
- #t
- (let ((m (make-slice/read-write 40))) ; could as wel have been 20
- (set%! /:message-header '(size)
- (slice-slice m 0 (sizeof /:message-header '())) 40)
- (let/ec ec
- (parameterize (((vector-ref mhp 0)
- (lambda (x)
- (ec (eq? x m))))
- ((vector-ref mhv 0)
- (lambda (x)
- (assert (eq? x m))
- #t)))
- (inject-message! mq m)
- 'unreachable))))
- (test-eq "non-zero types ok"
- #t
- (let ((s (make-slice/read-write (sizeof /:message-header '()))))
- (set%! /:message-header '(type) s 3)
- (set%! /:message-header '(size) s (sizeof /:message-header '()))
- (let/ec ec
- (parameterize (((vector-ref mhp 3)
- (lambda (x)
- (ec (eq? x s))))
- ((vector-ref mhv 3)
- (lambda (x)
- (assert (eq? s x))
- #t)))
- (inject-message! mq s)
- 'unreachable))))
- (test-equal "verifier & handler only called once"
- '(1 . 1)
- (let ((hcount 0)
- (vcount 0)
- (s (make-slice/read-write (sizeof /:message-header '()))))
- (set%! /:message-header '(size) s (sizeof /:message-header '()))
- (parameterize (((vector-ref mhp 0)
- (lambda (x)
- (set! hcount (+ 1 hcount))
- (assert (eq? x s))
- (values)))
- ((vector-ref mhv 0)
- (lambda (x)
- (set! vcount (+ 1 vcount))
- (assert (eq? x s))
- #t)))
- (inject-message! mq s)
- (cons hcount vcount))))
- ;; Test message injection (exceptions)
- (test-equal "missing header error"
- (map (lambda (i)
- `(missing-header-error (size . ,i)
- (who . inject-message!)))
- (iota (sizeof /:message-header '())))
- (map (lambda (i)
- (guard (e ((missing-header-error? e)
- `(missing-header-error
- (size . ,(missing-header-error-received-size e))
- (who . ,(condition-who e)))))
- (inject-message! mq (make-slice/read-write i))
- 'unreachable))
- (iota (sizeof /:message-header '()))))
- (test-assert "[prop] wrong header size error"
- (quickcheck
- (property ((%real-length $natural)
- (supposed-length $natural))
- (let* ((real-length (+ (sizeof /:message-header '())
- %real-length))
- (supposed-length (if (= real-length supposed-length)
- (+ 1 supposed-length)
- supposed-length))
- (s (make-slice/read-write real-length))
- (sheader (slice-slice s 0 (sizeof /:message-header '()))))
- (set%! /:message-header '(size)
- (slice-slice s 0 (sizeof /:message-header '()))
- supposed-length)
- (guard (e ((size-mismatch-error? e)
- (equal? `(,(size-mismatch-error-expected-size e)
- ,(size-mismatch-error-received-size e)
- ,(condition-who e))
- `(,supposed-length
- ,real-length
- inject-message!))))
- (inject-message! mq s)
- #f)))))
- (test-assert "no applicable message handler error"
- (let^ ((! errored? #f)
- (! slice (bv-slice/read-write #vu8(0 4 0 0)))
- (! (error-handler . e)
- (match e
- ('(logic:no-handler 0)
- (assert (not errored?))
- (set! errored? #t)
- (values))))
- (! mq (make-message-queue no-handlers error-handler #f)))
- (inject-message! mq slice)
- errored?))
- (test-assert "ill-formed message error"
- (let^ ((! errored? #f)
- (! slice (bv-slice/read-write #vu8(0 4 0 0)))
- (! handlers
- (message-handlers
- (message-handler
- (type 0)
- ((interpose code) code)
- ((well-formed? s)
- (assert (eq? s slice))
- #f)
- ((handle! slice)
- (error "unreachable")))))
- (! (error-handler . e)
- (match e
- ;; Note: it theoretically may have some unspecified rest
- ;; rest arguments. In ‘real code’, use
- ;; (logic:ill-formed 0 . rest) instead.
- ('(logic:ill-formed 0)
- (assert (not errored?))
- (set! errored? #t))))
- (! mq (make-message-queue handlers error-handler #f)))
- (inject-message! mq slice)
- errored?))
- ;; Test the following part of the send-message! docstring:
- ;; ‘After normal execution, the message envelope is returned,
- ;; but in case of an exception (for example, an out-of-memory exception
- ;; during the handling of a @code{&overly-full-queue-warning}), it is
- ;; possible the envelope isn't returned even though it has been enqueued
- ;; and it might perhaps be sent.
- (test-assert "returned envelope and sent envelope are equal"
- (let* ((returned-values #f)
- (sent-values #f)
- (sender
- (make-one-by-one-sender
- (lambda envelope-arguments
- (assert (eq? sent-values #f))
- (set! sent-values envelope-arguments)
- (values))))
- (mq (make-message-queue #f #f sender))
- (msg (index->dummy #xdeadbeef)))
- (call-with-values
- (lambda () (send-message! mq msg))
- (lambda return-values
- (set! returned-values return-values)))
- (and (equal? sent-values returned-values)
- (= (length sent-values) 1)
- (every envelope? sent-values))))
- ;; Strictly speaking, this test is allowed to fail
- ;; (as it is only ‘might’, not ‘it must be possible’),
- ;; but it seems a good idea to check our understanding is correct.
- (test-assert "message might be enqueued & sent but not returned"
- (let* ((enqueued? #f)
- (flush? (make-parameter #f))
- (sender/flush
- (make-one-by-one-sender
- (lambda (envelope)
- (set! enqueued? envelope)
- (values))))
- (sender/hold
- (lambda _ (values)))
- (sender (make-sender/choice flush? sender/hold
- sender/flush))
- (mq (make-message-queue #f #f sender))
- (msg (index->dummy 0))
- (exceptional #f)
- (enveloped #f))
- (with-exception-handler
- (lambda (_)
- (assert exceptional)
- (assert (envelope? enqueued?))
- (assert (not enveloped)))
- (lambda ()
- (with-exception-handler
- (lambda (e)
- (if (overly-full-queue-warning? e)
- (begin
- (set! exceptional #t)
- (parameterize ((flush? #t))
- (try-send-again! mq)
- ;; At least in the current implementation,
- ;; this holds.
- ;;
- ;; In a different implementation, the
- ;; envelope could be enqueued after
- ;; checking the queue length.
- (assert enqueued?))
- (throw 'out-of-memory))
- (raise-exception e #:continuable? #t)))
- (lambda ()
- (call-with-values
- (lambda ()
- (parameterize ((%suspicious-length 0))
- (send-message! mq msg)))
- (lambda args (set! enveloped args))))
- #:unwind? #f))
- #:unwind? #t
- #:unwind-for-type 'out-of-memory)
- (and enqueued? exceptional
- (not enveloped))))
- ;; Message cancellation.
- ;;
- ;; Cancellation is already tested in tests/envelope.scm.
- ;; However, the interaction with message queues has not
- ;; yet been tested.
- ;; This test detected (not detected by previous tests):
- ;; * the cdr of the contents of messages+garbage/box
- ;; being initialised incorrectly in make-message-queue
- ;; * using car instead of cdr in increment-garbage&maybe-cleanup
- (test-assert "envelopes do not keep a strong reference to the message queue"
- (let* ((mq (make-message-queue #f #f (lambda _ (values))))
- (mq-guard (make-guardian))
- (envelope (send-message! mq (index->dummy 0))))
- (mq-guard mq)
- (attempt-cancel!
- envelope
- ((now-cancelled)
- (gc)
- (->bool (mq-guard)))
- ((already-cancelled) (error "what/cancelled"))
- ((already-sent) (error "what/sent")))))
- (define (count-guardian/cancelled guardian)
- "Count how many elements are present in @var{guardian}.
- While we're at it, verify each element is a cancelled envelope."
- (let loop ((n 0))
- (let ((e (guardian)))
- (cond ((not e) n)
- ((envelope-peek-cancelled? e) (loop (+ n 1)))
- (#t (error "a not-cancelled envelope was freed!"))))))
- (define (count-guardian/uncancelled guardian)
- "Count how many elements are present in @var{guardian}.
- While we're at it, verify each element is an uncancelled envelope."
- (let loop ((n 0))
- (let ((e (guardian)))
- (cond ((not e) n)
- ((not (envelope-peek-cancelled? e)) (loop (+ n 1)))
- (#t (error "a cancelled envelope was freed!"))))))
- ;; This is a variant of
- ;; "the one-by-one message sender removes cancelled envelopes",
- ;; using guardians, and purely testing the cancelling code, and
- ;; not the sending code.
- ;;
- ;; It detects the following mutations:
- ;; * removing (spin queue+garbage) after swap! in the 'envelope-peek-cancelled?'
- ;; branch of 'make-one-by-one-sender'
- (test-assert "cancelling envelopes eventually frees memory even if message sender is dead"
- (let* ((mq (make-message-queue #f #f (lambda _ (values))))
- (cancelled-guard (make-guardian))
- (uncancelled-guard (make-guardian)))
- ;; Add a bunch of messages.
- (let ((messages
- (map (lambda (i)
- (send-message! mq (index->dummy i)))
- (iota 50))))
- ;; Cancel most of them. This should trigger collection of
- ;; cancelled envelopes.
- (for-each
- (lambda (e)
- (cancelled-guard e)
- (attempt-cancel!
- e
- ((now-cancelled) (values))
- ((already-cancelled) (error "what/cancelled"))
- ((already-sent) (error "what/sent"))))
- (list-head messages 40)))
- ;; Move freed envelopes to the guardian.
- (gc)
- ;; How many were freed?
- (let ((freed/cancelled (count-guardian/cancelled cancelled-guard))
- (freed/uncancelled (count-guardian/uncancelled uncancelled-guard))
- (cancelled 40)
- (total 50))
- (pk 'total total 'cancelled cancelled 'freed/cancelled freed/cancelled
- 'freed/uncancelled freed/uncancelled
- 'queue-length (message-queue-length mq))
- ;; Only cancelled messages were supposed to be freed.
- (assert (= freed/uncancelled 0))
- (assert (<= freed/cancelled cancelled))
- ;; A large fraction of cancelled messages should be freed.
- (assert (>= (/ freed/cancelled cancelled) 7/8))
- ;; If the GC is exact, all messages removed from the message
- ;; queue (due to cancelling) should be removed.
- (unless (conservative-gc?)
- (assert (= freed/cancelled (- total (message-queue-length mq)))))
- #t)))
- (define sender/no-cancelled
- (make-one-by-one-sender
- (lambda (e)
- (pk 'ee e)
- (assert (not (envelope-peek-cancelled? e)))
- (values))))
- ;; Not strictly necessary (and also undocumented), but this should
- ;; improve the accuracy of the garbage counter. Maybe not trying
- ;; to send useless (cancelled) envelopes could help with performance
- ;; as well (untested)?
- ;;
- ;; Also, this caught a bug in (gnu gnunet mq) -- the procedure returned
- ;; by 'make-one-by-one-sender' went into an infinite loop if it encountered
- ;; a cancelled envelope.
- ;;
- ;; This tests detects negating the test
- ;; (eq? old (swap! old (cons old-queue incremented-garbage)))
- ;; in increment-garbage&maybe-cleanup.
- (test-assert "the one-by-one message sender removes cancelled envelopes"
- (let* ((flush? (make-parameter #f))
- (sender (make-sender/choice flush? (lambda _ (values))
- sender/no-cancelled))
- (mq (make-message-queue #f #f sender)))
- ;; Fill the queue with many uncancelled messages, such that
- ;; the logic for collecting cancelled envelopes doesn't kick in too early.
- (do ((i 0 (+ i 1)))
- ((>= i 30))
- (send-message! mq (index->dummy i)))
- (assert (= (message-queue-length mq) 30))
- ;; Now add some envelopes to the queue & cancel them.
- (do ((i 0 (+ i 1)))
- ((>= i 4))
- (attempt-cancel!
- (send-message! mq (index->dummy (+ 30 i)))
- ((now-cancelled) (values))
- ((already-cancelled) (error "what / cancelled"))
- ((already-sent) (error "what / sent"))))
- (assert (= (message-queue-length mq) 34))
- (parameterize ((flush? #t))
- (try-send-again! mq))
- (assert (= (message-queue-length mq) 0))
- (assert (= (%message-queue-garbagitude mq) 0))
- #t))
- ;; This is a variation of "nothing lost when sending concurrently",
- ;; but for cancelation.
- ;;
- ;; This test fails in case of the following mutations:
- ;; * replace 0 with 1 in (or some other number) in
- ;; (swap! old (cons filtered 0))
- ;; in increment-garbage&maybe-cleanup
- (test-assert "the (approximate) cancellation count is accurate, when not sending, even when cancelling concurrently (also, uncancelled messages are not lost)"
- (let* ((messages/cancellation 10000)
- (n/not-cancelled #f)
- (flush? (make-parameter #f))
- (sender/check (lambda (e)
- (unless (envelope-peek-cancelled? e)
- (set! n/not-cancelled (+ 1 n/not-cancelled)))
- (values)))
- (sender (make-sender/choice flush?
- (lambda _ (values))
- (make-one-by-one-sender sender/check)))
- (mq (make-message-queue #f #f sender))
- (ready? (make-condition))
- (done? (vector-unfold
- (lambda (_) (make-condition))
- (/ messages/cancellation 2)))
- (messages
- (with-exception-handler
- (lambda (e)
- (if (overly-full-queue-warning? e)
- (values)
- (raise-exception e #:continuable? #t)))
- (lambda ()
- (vector-unfold (compose (cut send-message! mq <>)
- index->dummy)
- messages/cancellation)))))
- (run-fibers
- (lambda ()
- ;; Cancel half of the messages, concurrently.
- ;; Only half of all the messages are cancelled,
- ;; to avoid resetting the garbage counter.
- (vector-for-each
- (lambda (i done? message)
- (when (< i (/ messages/cancellation 2))
- (spawn-fiber
- (lambda ()
- (wait ready?)
- (attempt-cancel!
- message
- ((now-cancelled)
- (signal-condition! done?)
- (values))
- ((already-cancelled)
- (signal-condition! done?)
- (error "what/cancelled"))
- ((already-sent)
- (signal-condition! done?)
- (error "what/sent")))))))
- done? messages)
- (signal-condition! ready?)
- (vector-for-each (lambda (_ c) (wait c)) done?))
- #:hz 4000)
- ;; Verify the estimate is accurate, at least in this
- ;; situation.
- (assert (= (pk 'garbagitude (%message-queue-garbagitude mq))
- (pk 'expected (/ messages/cancellation 2))))
- ;; Cancel more messages (until 7/8 are cancelled),
- ;; to trigger collection. While we're at, verify
- ;; the estimate is still correct.
- (do ((i (/ messages/cancellation 2) (+ i 1)))
- ((>= (/ i messages/cancellation) 7/8))
- (attempt-cancel!
- (vector-ref messages i)
- ((now-cancelled)
- ;; 3/4 is the (arbitrary) ratio at which
- ;; the garbage is thrown out
- (if (< (* 4 i) (* 3 messages/cancellation))
- (assert (= (%message-queue-garbagitude mq)
- (+ i 1)))
- (assert (= (%message-queue-garbagitude mq)
- (- i (* 3/4 messages/cancellation))))))
- ((already-cancelled) (error "what/cancelled2"))
- ((already-sent) (error "what/sent2"))))
- ;; Now send the envelopes, to verify uncancelled messages
- ;; are still in the queue.
- (parameterize ((flush? #t))
- (set! n/not-cancelled 0)
- (try-send-again! mq))
- (assert (= n/not-cancelled (* 1/8 messages/cancellation)))
- ;; As everything has been removed from the queue,
- ;; the estimate should now be zero.
- (assert (= (pk 'final-garbagitude (%message-queue-garbagitude mq))
- 0))
- #t))
|