mq.scm 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972
  1. ;; This file is part of GNUnet.
  2. ;; Copyright (C) 2012, 2018, 2021, 2022 GNUnet e.V.
  3. ;;
  4. ;; GNUnet is free software: you can redistribute it and/or modify it
  5. ;; under the terms of the GNU Affero General Public License as published
  6. ;; by the Free Software Foundation, either version 3 of the License,
  7. ;; or (at your option) any later version.
  8. ;;
  9. ;; GNUnet is distributed in the hope that it will be useful, but
  10. ;; WITHOUT ANY WARRANTY; without even the implied warranty of
  11. ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. ;; Affero General Public License for more details.
  13. ;;
  14. ;; You should have received a copy of the GNU Affero General Public License
  15. ;; along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. ;;
  17. ;; SPDX-License-Identifier: AGPL-3.0-or-later
  18. ;; Author: Florian Dold
  19. ;; Author: Christian Grothoff
  20. ;; Author: Maxime Devos
  21. (define-module (tests mq))
  22. (use-modules (ice-9 control)
  23. (tests utils) ; for conservative-gc?
  24. (fibers conditions)
  25. (fibers)
  26. (srfi srfi-1)
  27. (srfi srfi-26)
  28. (srfi srfi-39)
  29. (srfi srfi-43)
  30. (srfi srfi-64)
  31. (srfi srfi-111)
  32. (ice-9 match)
  33. ((rnrs base) #:select (assert mod))
  34. ((rnrs exceptions) #:select (guard))
  35. ((rnrs conditions) #:select (condition-who))
  36. ((rnrs arithmetic bitwise)
  37. #:select (bitwise-ior))
  38. (gnu gnunet netstruct syntactic)
  39. ((gnu gnunet netstruct procedural)
  40. #:select (u32/big))
  41. (gnu gnunet mq prio-prefs)
  42. (gnu gnunet mq prio-prefs2)
  43. (gnu gnunet util struct)
  44. (gnu gnunet utils bv-slice)
  45. (gnu gnunet utils hat-let)
  46. (gnu gnunet utils cut-syntax)
  47. ((gnu extractor enum)
  48. #:select (symbol-value value->index))
  49. (gnu gnunet message protocols)
  50. (gnu gnunet mq)
  51. (gnu gnunet mq envelope)
  52. (gnu gnunet mq handler)
  53. (quickcheck property)
  54. (quickcheck)
  55. (quickcheck arbitrary))
  56. ;; The client code sends the numbers 0 to
  57. ;; NUM_TRANSMISSIONS-1 over the message queue.
  58. ;; The notify-sent callback verifies whether
  59. ;; messages were sent in-order. The fake
  60. ;; ‘sender’ procedure verifies whether it received
  61. ;; the messages in order.
  62. ;;
  63. ;; Note that in more realistic situations, some
  64. ;; queueing can happen! A very special case
  65. ;; is being tested here.
  66. (define NUM_TRANSMISSIONS 100)
  67. (eval-when (expand load eval)
  68. (define-type /:msg:our-test:dummy
  69. (structure/packed
  70. (synopsis "A test message, containing an index")
  71. (documentation
  72. "The first time, a message with index 0 is sent.
  73. Then each time the index is increased.")
  74. (field (header /:message-header))
  75. (field (index u32/big)))))
  76. (define (index->dummy i)
  77. (let ((s (make-slice/read-write
  78. (sizeof /:msg:our-test:dummy '()))))
  79. (define-syntax set%!/dummy (cut-syntax set%! /:msg:our-test:dummy <> s <>))
  80. (set%!/dummy '(header type)
  81. (value->index (symbol-value message-type msg:util:dummy)))
  82. (set%!/dummy '(header size) (sizeof /:msg:our-test:dummy '()))
  83. (set%!/dummy '(index) i)
  84. s))
  85. (define (dummy->index s)
  86. (read% /:msg:our-test:dummy '(index) s))
  87. (define (client mq notify-sent-box sent-box)
  88. (define (see i)
  89. (if (= i (unbox notify-sent-box))
  90. (set-box! notify-sent-box (+ 1 i))
  91. (error "messages were sent out-of-order (index: ~a) (notify-sent: ~a) (sent: ~a)"
  92. i
  93. (unbox notify-sent-box)
  94. (unbox sent-box))))
  95. (do ((i 0 (+ 1 i)))
  96. ((>= i NUM_TRANSMISSIONS))
  97. (send-message! mq (index->dummy i)
  98. #:notify-sent! (cut see i))))
  99. (define (send-proc notify-sent-box sent-box envelope)
  100. (attempt-irrevocable-sent!
  101. envelope
  102. ((go message priority)
  103. (let ((index (dummy->index message)))
  104. (unless (= (+ index 1) (unbox notify-sent-box))
  105. (error "messages are being sent out-of-order or with queueing (index: ~a) (notify-sent: ~a) (sent: ~a)"
  106. index
  107. (unbox notify-sent-box)
  108. (unbox sent-box)))
  109. (unless (= index (unbox sent-box))
  110. (error "dunno (index: ~a) (notify-sent: ~a) (sent: ~a)"
  111. index
  112. (unbox notify-sent-box)
  113. (unbox sent-box)))
  114. (set-box! sent-box (+ 1 index))
  115. (values)))
  116. ((cancelled)
  117. (error "how did this cancelling happen?"))
  118. ((already-sent)
  119. (error "forgot to remove envelope from queue"))))
  120. (define no-handlers (message-handlers))
  121. (define (no-error-handler . what)
  122. (error "were did this error come from?"))
  123. (test-equal "in-order, no queuing"
  124. (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS)
  125. (let* ((notify-sent-box (box 0))
  126. (sent-box (box 0))
  127. (mq (make-message-queue no-handlers
  128. no-error-handler
  129. (make-one-by-one-sender
  130. (cut send-proc notify-sent-box sent-box <>)))))
  131. (client mq notify-sent-box sent-box)
  132. (list (unbox notify-sent-box) (unbox sent-box))))
  133. ;; Simulate buffering, by only ‘truly’ sending after each three messages.
  134. ;; This does _not_ test the queueing code! See the next test for that.
  135. ;; Make sure messages aren't lost, and they are still be sent in-order!
  136. ;;
  137. ;; (Assuming the sender is well-implemented. A buggy sender could send
  138. ;; things out-of-order.)
  139. (define (send-proc2 notify-sent-box sent-box mod-box stashed envelope)
  140. (let ((first-free (vector-index not stashed))
  141. (expected-filled (unbox mod-box)))
  142. (unless (= (or first-free 0) expected-filled)
  143. (error "did we lose a message?"))
  144. (set-box! mod-box (mod (+ 1 expected-filled) (vector-length stashed)))
  145. (if (not first-free)
  146. (begin
  147. (vector-map!
  148. (lambda (i envelope)
  149. (send-proc notify-sent-box sent-box envelope)
  150. #f)
  151. stashed)
  152. (vector-set! stashed 0 envelope))
  153. ;; @var{stashed} is not yet full; send the
  154. ;; envelope later!
  155. (vector-set! stashed first-free envelope))
  156. (values)))
  157. (define (expected-sent n k)
  158. (- n (let ((mod (mod n k)))
  159. (if (= mod 0)
  160. k
  161. mod))))
  162. (define k 3)
  163. (test-equal "in-order, some buffering"
  164. (map (cut expected-sent <> 3)
  165. (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
  166. (let* ((notify-sent-box (box 0))
  167. (sent-box (box 0))
  168. (mod-box (box 0))
  169. (stashed (make-vector k #f))
  170. (mq (make-message-queue no-handlers
  171. no-error-handler
  172. (make-one-by-one-sender
  173. (cut send-proc2 notify-sent-box sent-box mod-box stashed <>)))))
  174. (client mq notify-sent-box sent-box)
  175. (list (unbox notify-sent-box) (unbox sent-box))))
  176. ;; Test the queueing code by only flushing
  177. ;; the queue every N messages. Also check,
  178. ;; using flushing-allowed?, that sending
  179. ;; only happens when we expect it to happen.
  180. (define flushing-allowed?
  181. (make-parameter #f))
  182. (define (send-proc/check notify-sent-box sent-box envelope)
  183. (assert (flushing-allowed?))
  184. (send-proc notify-sent-box sent-box envelope))
  185. (define (make-every-n proc k)
  186. "Make a sender using @var{proc} every @var{k}
  187. invocations, and at other times doing nothing."
  188. ;; Should theoretically be an atomic, but the test is singly-threaded,
  189. ;; so don't bother.
  190. (define n-mod-k 0)
  191. (lambda (mq)
  192. (assert (not (flushing-allowed?)))
  193. (set! n-mod-k (+ 1 n-mod-k))
  194. (when (>= n-mod-k k)
  195. (set! n-mod-k 0)
  196. (parameterize ((flushing-allowed? #t))
  197. (proc mq)))))
  198. (test-equal "in-order, some queueing"
  199. (map (cut expected-sent <> 3)
  200. (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
  201. (let* ((notify-sent-box (box 0))
  202. (sent-box (box 0))
  203. (mq (make-message-queue no-handlers
  204. no-error-handler
  205. (make-every-n
  206. (make-one-by-one-sender
  207. (cut send-proc/check notify-sent-box sent-box <>))
  208. 3))))
  209. (client mq notify-sent-box sent-box)
  210. (list (unbox notify-sent-box) (unbox sent-box))))
  211. ;; Test that concurrency interacts well with queueing.
  212. ;;
  213. ;; The situation we consider, is a number
  214. ;; of different threads concurrently sending messages.
  215. ;; The test verifies whether all messages were, in fact, sent.
  216. ;;
  217. ;; To make things complicated, some queueing is introduced.
  218. ;; The sender will only proceed each time the current thread
  219. ;; has tried to send @var{k/thread} messages, and the sender
  220. ;; will only try to send at most @code{(+ k/thread e)}, where
  221. ;; @var{e} is a random number from @var{e/min} to @var{e/max}.
  222. ;; The tests detect the following potential problems in the code
  223. ;; by crashing (but not always, so you may need to re-run a few
  224. ;; times, three times tends to be enough in practice for me):
  225. ;;
  226. ;; * Replacing 'old' with 'queue' in
  227. ;; unless (pfds:queue-empty? old)
  228. ;; * Replacing 'old' with 'queue' in
  229. ;; receive (envelope new) (pfds:dequeue old)
  230. ;; * Replacing the first 'old' with 'queue' in
  231. ;; (eq? old (swap! old new)), in 'make-one-by-one-sender'
  232. ;; * Replacing the second 'old' with 'queue' in
  233. ;; (eq? old (swap! old new)), in 'make-one-by-one-sender'
  234. ;; * Replacing 'old' by 'queue' in
  235. ;; (pfds:enqueue old envelope)
  236. ;; (only detected infrequently, odds 1 to 7 or so)
  237. ;; * Replacing the first 'old' by 'queue' in
  238. ;; (eq? old (swap-queue! old new))
  239. ;; in 'send-message!'
  240. ;; * Replacing the second 'old' by 'queue' in
  241. ;; (eq? old (swap-queue! old new))
  242. ;; in 'send-message!'
  243. ;;
  244. ;; The following problems cause a hang when testing:
  245. ;; * Replacing 'queue' by 'old' in (spin queue)
  246. ;; in 'make-one-by-one-sender'
  247. ;; * Replacing 'queue' by 'old' in (spin queue)
  248. ;; in 'send-message!'.
  249. ;;
  250. ;; The following problems cause a hang in a preceding
  251. ;; test:
  252. ;;
  253. ;; * Replacing the first 'old' by 'new' in
  254. ;; (eq? old (swap-queue! old new))
  255. ;; in 'send-message!'
  256. ;; * Replacing 'queue' by 'old' in
  257. ;; (spin queue)
  258. ;; in 'send-message!'
  259. ;; * Replacing 'queue' by 'new' in
  260. ;; (spin queue)
  261. ;; in 'send-message!'
  262. ;;
  263. ;; Some potential problems currently remain undetected:
  264. ;; * Replacing the 'new' by 'queue' in
  265. ;; (pfds:queue-length new)
  266. ;;
  267. ;; However, it is only for printing a warning
  268. ;; when the queue is rather full. Being slightly
  269. ;; off in queue length shouldn't be a problem
  270. ;; there, as the 'maximum reasonable bound'
  271. ;; is just a wild guess and not some exact
  272. ;; cut-off.
  273. ;;
  274. ;; Cancellation will be tested separately.
  275. (define random/thread
  276. (fluid->parameter (make-unbound-fluid)))
  277. (define k/thread 12)
  278. (define e/min 2)
  279. (define e/max 7)
  280. (define N_MESSAGES 1000)
  281. (define N_THREAD 40)
  282. ;; List of (thread-index . message-index)
  283. ;; received by current thread.
  284. (define received/thread
  285. (fluid->parameter (make-unbound-fluid)))
  286. (define i/thread
  287. (fluid->parameter (make-unbound-fluid)))
  288. ;; The sending is happening concurrently,
  289. ;; so in-order delivery cannot be guaranteed.
  290. ;; Thus, requesting in-order delivery seems
  291. ;; silly.
  292. (define prio
  293. (bitwise-ior
  294. (prio->integer 'prio:background)
  295. (value->index (symbol-value priority-preference
  296. pref:out-of-order))))
  297. (eval-when (expand load eval)
  298. (define-type /:msg:our-test:concurrency
  299. (structure/packed
  300. (synopsis "A test message, containing an thread and message index")
  301. (documentation
  302. "The first time, a message with index 0 is sent.
  303. Then each time the index is increased.")
  304. (field (header /:message-header))
  305. (field (index u32/big))
  306. (field (thread u32/big)))))
  307. (define (make-thread-message thread-index i)
  308. (let ((s (make-slice/read-write
  309. (sizeof /:msg:our-test:concurrency '()))))
  310. (define-syntax set%!/concurrency
  311. (cut-syntax set%! /:msg:our-test:concurrency <> s <>))
  312. (set%!/concurrency
  313. '(header type) (value->index (symbol-value message-type msg:util:dummy)))
  314. (set%!/concurrency '(header size) (sizeof /:msg:our-test:concurrency '()))
  315. (set%!/concurrency '(index) i)
  316. (set%!/concurrency '(thread) thread-index)
  317. s))
  318. (define (decode-thread-message s)
  319. (cons (read% /:msg:our-test:concurrency '(thread) s)
  320. (read% /:msg:our-test:concurrency '(index) s)))
  321. (define (make-every-n/thread proc k)
  322. "Make a sender using @var{proc} every @var{k}
  323. invocations, and at other times doing nothing.
  324. @code{i/thread} is used for state."
  325. (lambda (mq)
  326. (assert (not (flushing-allowed?)))
  327. (i/thread (+ 1 (i/thread)))
  328. (when (>= (i/thread) k)
  329. (i/thread 0)
  330. (parameterize ((flushing-allowed? #t))
  331. (proc mq)))))
  332. (define (thread mq thread-index)
  333. (parameterize ((received/thread '())
  334. (i/thread 0)
  335. (random/thread
  336. (seed->random-state thread-index)))
  337. (do ((i 0 (+ 1 i)))
  338. ((>= i N_MESSAGES))
  339. (send-message! mq (make-thread-message thread-index i)
  340. #:priority prio))
  341. (received/thread)))
  342. (define (make-restricted-sender how-many make-sender inner-proc)
  343. "Make a sender that, when called, tries to send @code{(how-many)}
  344. messages, using @var{make-sender} and @var{inner-proc}."
  345. (define escape-thunk
  346. (fluid->parameter (make-unbound-fluid)))
  347. (define count
  348. (fluid->parameter (make-unbound-fluid)))
  349. (define max-count
  350. (fluid->parameter (make-unbound-fluid)))
  351. (define (count!)
  352. (count (+ 1 (count)))
  353. (when (= (count) (max-count))
  354. (count 0)
  355. ((escape-thunk))))
  356. (lambda (mq)
  357. (let/ec ec
  358. (parameterize ((max-count (how-many))
  359. (count 0)
  360. (escape-thunk ec))
  361. ((make-sender
  362. (lambda (envelope)
  363. (inner-proc envelope)
  364. ;; Check 'count' AFTER some things
  365. ;; have been sent! Otherwise, the
  366. ;; message is lost.
  367. (count!)
  368. (values)))
  369. mq)))))
  370. ;; After all threads have exited, we'll ‘drain’ out
  371. ;; the left-overs.
  372. (define drain? (make-parameter #f))
  373. (define (make-sender/choice y? x y)
  374. "When @code{(y?)}, send with @code{y}. Else, send
  375. with @code{x}."
  376. (lambda (mq)
  377. (if (y?)
  378. (y mq)
  379. (x mq))))
  380. (define (inner-send envelope)
  381. (attempt-irrevocable-sent!
  382. envelope
  383. ((go message priority)
  384. (received/thread (cons (decode-thread-message message)
  385. (received/thread)))
  386. (values))
  387. ((cancelled) (error "what/cancelled"))
  388. ((already-sent) (error "what/already-sent"))))
  389. (define sender/thread
  390. (make-sender/choice
  391. drain?
  392. (make-every-n/thread
  393. (make-restricted-sender
  394. (lambda ()
  395. (+ k/thread e/min
  396. (random (- e/max e/min -1) (random/thread))))
  397. make-one-by-one-sender
  398. inner-send)
  399. k/thread)
  400. (make-one-by-one-sender inner-send)))
  401. (define (results->array per-thread-sent)
  402. ;; A bit array of messages the send function has
  403. ;; seen.
  404. (define a (make-typed-array 'b #f N_MESSAGES N_THREAD))
  405. (define (visit-message message)
  406. (define thread-index (car message))
  407. (define message-index (cdr message))
  408. (array-set! a #t message-index thread-index))
  409. (define (visit-per-thread _ messages)
  410. (for-each visit-message messages))
  411. (vector-for-each visit-per-thread per-thread-sent)
  412. a)
  413. (define (array-missing a)
  414. (define missing '())
  415. (array-index-map! a
  416. (lambda (i j)
  417. (define found (array-ref a i j))
  418. (unless found
  419. (set! missing `((,i . ,j) . ,missing)))
  420. found))
  421. missing)
  422. ;; But possibly out-of-order!
  423. (test-equal "nothing lost when sending concurrently"
  424. '()
  425. (let* ((mq (make-message-queue no-handlers
  426. no-error-handler
  427. sender/thread))
  428. (thread-indices (iota N_THREAD))
  429. ;; The ‘drained-out’ messages are put
  430. ;; at index N_THREAD.
  431. (results (make-vector (+ 1 N_THREAD)))
  432. (done? (vector-unfold (lambda (_) (make-condition)) N_THREAD))
  433. (ready? (make-condition)))
  434. (run-fibers
  435. (lambda ()
  436. (define (run! thread-index)
  437. (spawn-fiber
  438. (lambda ()
  439. (wait ready?)
  440. (vector-set! results thread-index
  441. (thread mq thread-index))
  442. (signal-condition! (vector-ref done? thread-index)))))
  443. (for-each run! thread-indices)
  444. ;; Try to start every thread at the same time!
  445. (signal-condition! ready?)
  446. ;; #:drain? #t with parallelism is broken,
  447. ;; see <https://github.com/wingo/fibers/issues/47>.
  448. ;; So explicitely wait on each fiber.
  449. (vector-for-each (lambda (_ c) (wait c)) done?))
  450. #:drain? #t
  451. ;; No need
  452. #:install-suspendable-ports? #f
  453. ;; More interrupts --> more switches
  454. ;; --> more test coverage. At least,
  455. ;; that's the idea. Not really tested.
  456. #:hz 700)
  457. ;; Drain the left-overs.
  458. (parameterize ((drain? #t)
  459. (received/thread '()))
  460. (try-send-again! mq)
  461. (vector-set! results N_THREAD (received/thread)))
  462. (array-missing (results->array results))))
  463. ;; Test message injection / handling (no exceptions).
  464. (define mhp (vector-unfold (lambda (_) (make-parameter #f)) 4))
  465. (define mhv (vector-unfold (lambda (_) (make-parameter #f)) 4))
  466. (define mh (apply message-handlers
  467. (map (lambda (i)
  468. (message-handler
  469. (type i)
  470. ((interpose code) code)
  471. ((well-formed? slice)
  472. (((vector-ref mhv i)) slice))
  473. ((handle! slice)
  474. (((vector-ref mhp i)) slice))))
  475. (iota (vector-length mhp)))))
  476. ;; FWIW, passing #f is not really allowed.
  477. (define mq (make-message-queue mh #f #f))
  478. (test-eq "when injecting, handled message is eq?"
  479. #t
  480. (let ((m (make-slice/read-write 40))) ; could as wel have been 20
  481. (set%! /:message-header '(size)
  482. (slice-slice m 0 (sizeof /:message-header '())) 40)
  483. (let/ec ec
  484. (parameterize (((vector-ref mhp 0)
  485. (lambda (x)
  486. (ec (eq? x m))))
  487. ((vector-ref mhv 0)
  488. (lambda (x)
  489. (assert (eq? x m))
  490. #t)))
  491. (inject-message! mq m)
  492. 'unreachable))))
  493. (test-eq "non-zero types ok"
  494. #t
  495. (let ((s (make-slice/read-write (sizeof /:message-header '()))))
  496. (set%! /:message-header '(type) s 3)
  497. (set%! /:message-header '(size) s (sizeof /:message-header '()))
  498. (let/ec ec
  499. (parameterize (((vector-ref mhp 3)
  500. (lambda (x)
  501. (ec (eq? x s))))
  502. ((vector-ref mhv 3)
  503. (lambda (x)
  504. (assert (eq? s x))
  505. #t)))
  506. (inject-message! mq s)
  507. 'unreachable))))
  508. (test-equal "verifier & handler only called once"
  509. '(1 . 1)
  510. (let ((hcount 0)
  511. (vcount 0)
  512. (s (make-slice/read-write (sizeof /:message-header '()))))
  513. (set%! /:message-header '(size) s (sizeof /:message-header '()))
  514. (parameterize (((vector-ref mhp 0)
  515. (lambda (x)
  516. (set! hcount (+ 1 hcount))
  517. (assert (eq? x s))
  518. (values)))
  519. ((vector-ref mhv 0)
  520. (lambda (x)
  521. (set! vcount (+ 1 vcount))
  522. (assert (eq? x s))
  523. #t)))
  524. (inject-message! mq s)
  525. (cons hcount vcount))))
  526. ;; Test message injection (exceptions)
  527. (test-equal "missing header error"
  528. (map (lambda (i)
  529. `(missing-header-error (size . ,i)
  530. (who . inject-message!)))
  531. (iota (sizeof /:message-header '())))
  532. (map (lambda (i)
  533. (guard (e ((missing-header-error? e)
  534. `(missing-header-error
  535. (size . ,(missing-header-error-received-size e))
  536. (who . ,(condition-who e)))))
  537. (inject-message! mq (make-slice/read-write i))
  538. 'unreachable))
  539. (iota (sizeof /:message-header '()))))
  540. (test-assert "[prop] wrong header size error"
  541. (quickcheck
  542. (property ((%real-length $natural)
  543. (supposed-length $natural))
  544. (let* ((real-length (+ (sizeof /:message-header '())
  545. %real-length))
  546. (supposed-length (if (= real-length supposed-length)
  547. (+ 1 supposed-length)
  548. supposed-length))
  549. (s (make-slice/read-write real-length))
  550. (sheader (slice-slice s 0 (sizeof /:message-header '()))))
  551. (set%! /:message-header '(size)
  552. (slice-slice s 0 (sizeof /:message-header '()))
  553. supposed-length)
  554. (guard (e ((size-mismatch-error? e)
  555. (equal? `(,(size-mismatch-error-expected-size e)
  556. ,(size-mismatch-error-received-size e)
  557. ,(condition-who e))
  558. `(,supposed-length
  559. ,real-length
  560. inject-message!))))
  561. (inject-message! mq s)
  562. #f)))))
  563. (test-assert "no applicable message handler error"
  564. (let^ ((! errored? #f)
  565. (! slice (bv-slice/read-write #vu8(0 4 0 0)))
  566. (! (error-handler . e)
  567. (match e
  568. ('(logic:no-handler 0)
  569. (assert (not errored?))
  570. (set! errored? #t)
  571. (values))))
  572. (! mq (make-message-queue no-handlers error-handler #f)))
  573. (inject-message! mq slice)
  574. errored?))
  575. (test-assert "ill-formed message error"
  576. (let^ ((! errored? #f)
  577. (! slice (bv-slice/read-write #vu8(0 4 0 0)))
  578. (! handlers
  579. (message-handlers
  580. (message-handler
  581. (type 0)
  582. ((interpose code) code)
  583. ((well-formed? s)
  584. (assert (eq? s slice))
  585. #f)
  586. ((handle! slice)
  587. (error "unreachable")))))
  588. (! (error-handler . e)
  589. (match e
  590. ;; Note: it theoretically may have some unspecified rest
  591. ;; rest arguments. In ‘real code’, use
  592. ;; (logic:ill-formed 0 . rest) instead.
  593. ('(logic:ill-formed 0)
  594. (assert (not errored?))
  595. (set! errored? #t))))
  596. (! mq (make-message-queue handlers error-handler #f)))
  597. (inject-message! mq slice)
  598. errored?))
  599. ;; Test the following part of the send-message! docstring:
  600. ;; ‘After normal execution, the message envelope is returned,
  601. ;; but in case of an exception (for example, an out-of-memory exception
  602. ;; during the handling of a @code{&overly-full-queue-warning}), it is
  603. ;; possible the envelope isn't returned even though it has been enqueued
  604. ;; and it might perhaps be sent.
  605. (test-assert "returned envelope and sent envelope are equal"
  606. (let* ((returned-values #f)
  607. (sent-values #f)
  608. (sender
  609. (make-one-by-one-sender
  610. (lambda envelope-arguments
  611. (assert (eq? sent-values #f))
  612. (set! sent-values envelope-arguments)
  613. (values))))
  614. (mq (make-message-queue #f #f sender))
  615. (msg (index->dummy #xdeadbeef)))
  616. (call-with-values
  617. (lambda () (send-message! mq msg))
  618. (lambda return-values
  619. (set! returned-values return-values)))
  620. (and (equal? sent-values returned-values)
  621. (= (length sent-values) 1)
  622. (every envelope? sent-values))))
  623. ;; Strictly speaking, this test is allowed to fail
  624. ;; (as it is only ‘might’, not ‘it must be possible’),
  625. ;; but it seems a good idea to check our understanding is correct.
  626. (test-assert "message might be enqueued & sent but not returned"
  627. (let* ((enqueued? #f)
  628. (flush? (make-parameter #f))
  629. (sender/flush
  630. (make-one-by-one-sender
  631. (lambda (envelope)
  632. (set! enqueued? envelope)
  633. (values))))
  634. (sender/hold
  635. (lambda _ (values)))
  636. (sender (make-sender/choice flush? sender/hold
  637. sender/flush))
  638. (mq (make-message-queue #f #f sender))
  639. (msg (index->dummy 0))
  640. (exceptional #f)
  641. (enveloped #f))
  642. (with-exception-handler
  643. (lambda (_)
  644. (assert exceptional)
  645. (assert (envelope? enqueued?))
  646. (assert (not enveloped)))
  647. (lambda ()
  648. (with-exception-handler
  649. (lambda (e)
  650. (if (overly-full-queue-warning? e)
  651. (begin
  652. (set! exceptional #t)
  653. (parameterize ((flush? #t))
  654. (try-send-again! mq)
  655. ;; At least in the current implementation,
  656. ;; this holds.
  657. ;;
  658. ;; In a different implementation, the
  659. ;; envelope could be enqueued after
  660. ;; checking the queue length.
  661. (assert enqueued?))
  662. (throw 'out-of-memory))
  663. (raise-exception e #:continuable? #t)))
  664. (lambda ()
  665. (call-with-values
  666. (lambda ()
  667. (parameterize ((%suspicious-length 0))
  668. (send-message! mq msg)))
  669. (lambda args (set! enveloped args))))
  670. #:unwind? #f))
  671. #:unwind? #t
  672. #:unwind-for-type 'out-of-memory)
  673. (and enqueued? exceptional
  674. (not enveloped))))
  675. ;; Message cancellation.
  676. ;;
  677. ;; Cancellation is already tested in tests/envelope.scm.
  678. ;; However, the interaction with message queues has not
  679. ;; yet been tested.
  680. ;; This test detected (not detected by previous tests):
  681. ;; * the cdr of the contents of messages+garbage/box
  682. ;; being initialised incorrectly in make-message-queue
  683. ;; * using car instead of cdr in increment-garbage&maybe-cleanup
  684. (test-assert "envelopes do not keep a strong reference to the message queue"
  685. (let* ((mq (make-message-queue #f #f (lambda _ (values))))
  686. (mq-guard (make-guardian))
  687. (envelope (send-message! mq (index->dummy 0))))
  688. (mq-guard mq)
  689. (attempt-cancel!
  690. envelope
  691. ((now-cancelled)
  692. (gc)
  693. (->bool (mq-guard)))
  694. ((already-cancelled) (error "what/cancelled"))
  695. ((already-sent) (error "what/sent")))))
  696. (define (count-guardian/cancelled guardian)
  697. "Count how many elements are present in @var{guardian}.
  698. While we're at it, verify each element is a cancelled envelope."
  699. (let loop ((n 0))
  700. (let ((e (guardian)))
  701. (cond ((not e) n)
  702. ((envelope-peek-cancelled? e) (loop (+ n 1)))
  703. (#t (error "a not-cancelled envelope was freed!"))))))
  704. (define (count-guardian/uncancelled guardian)
  705. "Count how many elements are present in @var{guardian}.
  706. While we're at it, verify each element is an uncancelled envelope."
  707. (let loop ((n 0))
  708. (let ((e (guardian)))
  709. (cond ((not e) n)
  710. ((not (envelope-peek-cancelled? e)) (loop (+ n 1)))
  711. (#t (error "a cancelled envelope was freed!"))))))
  712. ;; This is a variant of
  713. ;; "the one-by-one message sender removes cancelled envelopes",
  714. ;; using guardians, and purely testing the cancelling code, and
  715. ;; not the sending code.
  716. ;;
  717. ;; It detects the following mutations:
  718. ;; * removing (spin queue+garbage) after swap! in the 'envelope-peek-cancelled?'
  719. ;; branch of 'make-one-by-one-sender'
  720. (test-assert "cancelling envelopes eventually frees memory even if message sender is dead"
  721. (let* ((mq (make-message-queue #f #f (lambda _ (values))))
  722. (cancelled-guard (make-guardian))
  723. (uncancelled-guard (make-guardian)))
  724. ;; Add a bunch of messages.
  725. (let ((messages
  726. (map (lambda (i)
  727. (send-message! mq (index->dummy i)))
  728. (iota 50))))
  729. ;; Cancel most of them. This should trigger collection of
  730. ;; cancelled envelopes.
  731. (for-each
  732. (lambda (e)
  733. (cancelled-guard e)
  734. (attempt-cancel!
  735. e
  736. ((now-cancelled) (values))
  737. ((already-cancelled) (error "what/cancelled"))
  738. ((already-sent) (error "what/sent"))))
  739. (list-head messages 40)))
  740. ;; Move freed envelopes to the guardian.
  741. (gc)
  742. ;; How many were freed?
  743. (let ((freed/cancelled (count-guardian/cancelled cancelled-guard))
  744. (freed/uncancelled (count-guardian/uncancelled uncancelled-guard))
  745. (cancelled 40)
  746. (total 50))
  747. (pk 'total total 'cancelled cancelled 'freed/cancelled freed/cancelled
  748. 'freed/uncancelled freed/uncancelled
  749. 'queue-length (message-queue-length mq))
  750. ;; Only cancelled messages were supposed to be freed.
  751. (assert (= freed/uncancelled 0))
  752. (assert (<= freed/cancelled cancelled))
  753. ;; A large fraction of cancelled messages should be freed.
  754. (assert (>= (/ freed/cancelled cancelled) 7/8))
  755. ;; If the GC is exact, all messages removed from the message
  756. ;; queue (due to cancelling) should be removed.
  757. (unless (conservative-gc?)
  758. (assert (= freed/cancelled (- total (message-queue-length mq)))))
  759. #t)))
  760. (define sender/no-cancelled
  761. (make-one-by-one-sender
  762. (lambda (e)
  763. (pk 'ee e)
  764. (assert (not (envelope-peek-cancelled? e)))
  765. (values))))
  766. ;; Not strictly necessary (and also undocumented), but this should
  767. ;; improve the accuracy of the garbage counter. Maybe not trying
  768. ;; to send useless (cancelled) envelopes could help with performance
  769. ;; as well (untested)?
  770. ;;
  771. ;; Also, this caught a bug in (gnu gnunet mq) -- the procedure returned
  772. ;; by 'make-one-by-one-sender' went into an infinite loop if it encountered
  773. ;; a cancelled envelope.
  774. ;;
  775. ;; This tests detects negating the test
  776. ;; (eq? old (swap! old (cons old-queue incremented-garbage)))
  777. ;; in increment-garbage&maybe-cleanup.
  778. (test-assert "the one-by-one message sender removes cancelled envelopes"
  779. (let* ((flush? (make-parameter #f))
  780. (sender (make-sender/choice flush? (lambda _ (values))
  781. sender/no-cancelled))
  782. (mq (make-message-queue #f #f sender)))
  783. ;; Fill the queue with many uncancelled messages, such that
  784. ;; the logic for collecting cancelled envelopes doesn't kick in too early.
  785. (do ((i 0 (+ i 1)))
  786. ((>= i 30))
  787. (send-message! mq (index->dummy i)))
  788. (assert (= (message-queue-length mq) 30))
  789. ;; Now add some envelopes to the queue & cancel them.
  790. (do ((i 0 (+ i 1)))
  791. ((>= i 4))
  792. (attempt-cancel!
  793. (send-message! mq (index->dummy (+ 30 i)))
  794. ((now-cancelled) (values))
  795. ((already-cancelled) (error "what / cancelled"))
  796. ((already-sent) (error "what / sent"))))
  797. (assert (= (message-queue-length mq) 34))
  798. (parameterize ((flush? #t))
  799. (try-send-again! mq))
  800. (assert (= (message-queue-length mq) 0))
  801. (assert (= (%message-queue-garbagitude mq) 0))
  802. #t))
  803. ;; This is a variation of "nothing lost when sending concurrently",
  804. ;; but for cancelation.
  805. ;;
  806. ;; This test fails in case of the following mutations:
  807. ;; * replace 0 with 1 in (or some other number) in
  808. ;; (swap! old (cons filtered 0))
  809. ;; in increment-garbage&maybe-cleanup
  810. (test-assert "the (approximate) cancellation count is accurate, when not sending, even when cancelling concurrently (also, uncancelled messages are not lost)"
  811. (let* ((messages/cancellation 10000)
  812. (n/not-cancelled #f)
  813. (flush? (make-parameter #f))
  814. (sender/check (lambda (e)
  815. (unless (envelope-peek-cancelled? e)
  816. (set! n/not-cancelled (+ 1 n/not-cancelled)))
  817. (values)))
  818. (sender (make-sender/choice flush?
  819. (lambda _ (values))
  820. (make-one-by-one-sender sender/check)))
  821. (mq (make-message-queue #f #f sender))
  822. (ready? (make-condition))
  823. (done? (vector-unfold
  824. (lambda (_) (make-condition))
  825. (/ messages/cancellation 2)))
  826. (messages
  827. (with-exception-handler
  828. (lambda (e)
  829. (if (overly-full-queue-warning? e)
  830. (values)
  831. (raise-exception e #:continuable? #t)))
  832. (lambda ()
  833. (vector-unfold (compose (cut send-message! mq <>)
  834. index->dummy)
  835. messages/cancellation)))))
  836. (run-fibers
  837. (lambda ()
  838. ;; Cancel half of the messages, concurrently.
  839. ;; Only half of all the messages are cancelled,
  840. ;; to avoid resetting the garbage counter.
  841. (vector-for-each
  842. (lambda (i done? message)
  843. (when (< i (/ messages/cancellation 2))
  844. (spawn-fiber
  845. (lambda ()
  846. (wait ready?)
  847. (attempt-cancel!
  848. message
  849. ((now-cancelled)
  850. (signal-condition! done?)
  851. (values))
  852. ((already-cancelled)
  853. (signal-condition! done?)
  854. (error "what/cancelled"))
  855. ((already-sent)
  856. (signal-condition! done?)
  857. (error "what/sent")))))))
  858. done? messages)
  859. (signal-condition! ready?)
  860. (vector-for-each (lambda (_ c) (wait c)) done?))
  861. #:hz 4000)
  862. ;; Verify the estimate is accurate, at least in this
  863. ;; situation.
  864. (assert (= (pk 'garbagitude (%message-queue-garbagitude mq))
  865. (pk 'expected (/ messages/cancellation 2))))
  866. ;; Cancel more messages (until 7/8 are cancelled),
  867. ;; to trigger collection. While we're at, verify
  868. ;; the estimate is still correct.
  869. (do ((i (/ messages/cancellation 2) (+ i 1)))
  870. ((>= (/ i messages/cancellation) 7/8))
  871. (attempt-cancel!
  872. (vector-ref messages i)
  873. ((now-cancelled)
  874. ;; 3/4 is the (arbitrary) ratio at which
  875. ;; the garbage is thrown out
  876. (if (< (* 4 i) (* 3 messages/cancellation))
  877. (assert (= (%message-queue-garbagitude mq)
  878. (+ i 1)))
  879. (assert (= (%message-queue-garbagitude mq)
  880. (- i (* 3/4 messages/cancellation))))))
  881. ((already-cancelled) (error "what/cancelled2"))
  882. ((already-sent) (error "what/sent2"))))
  883. ;; Now send the envelopes, to verify uncancelled messages
  884. ;; are still in the queue.
  885. (parameterize ((flush? #t))
  886. (set! n/not-cancelled 0)
  887. (try-send-again! mq))
  888. (assert (= n/not-cancelled (* 1/8 messages/cancellation)))
  889. ;; As everything has been removed from the queue,
  890. ;; the estimate should now be zero.
  891. (assert (= (pk 'final-garbagitude (%message-queue-garbagitude mq))
  892. 0))
  893. #t))