|
- #!/usr/bin/env bash
- # -*- scheme -*-
- exec -a "${0}" guile -L $(dirname $(realpath "$0")) -e '(fetchpull-standalone)' -c '' "${@}"
- ; !#
- ;; for emacs (defun test-this-file () (interactive) (save-current-buffer) (async-shell-command (concat (buffer-file-name (current-buffer)) " --test")))
- (define-module (fetchpull-standalone)
- #:export (main))
- ;; support inline math
- #!curly-infix
- (define version "0.0.0 just-do-it")
- (define design
- '(
- (keys are KSK@<prefix>--DATE-uploaded-xxx-days-before-using-MODE)
- (process:
- (get the current date)
- (for realtime or bulk as MODE
- (for each power of two up to 128 as iii
- (insert a random chunk (without compression)
- (to {DATE + iii days}-uploaded-iii-days-before-using-MODE))
- (request the key DATE-uploaded-iii-days-before-using-MODE)
- (write the times along with the keys (without the prefix)
- (into insert-times.csv and request-times.csv))
- (format as
- (DATE-as-seconds-since-epoch duration iii MODE)))))
- (prefix is generated from securepassword.w and stored in the file fetchpull-prefix.txt)))
- (import
- (only (srfi srfi-19) current-date date->string string->date date->time-utc time-utc->date
- make-time time-utc time-duration add-duration current-time)
- (only (srfi srfi-9) define-record-type)
- (only (ice-9 pretty-print) pretty-print)
- (only (ice-9 rdelim) read-line read-delimited)
- (only (ice-9 format) format)
- (only (srfi srfi-27) random-integer)
- (only (srfi srfi-1) first second third alist-cons assoc lset<= lset-intersection lset-difference)
- (only (rnrs bytevectors) make-bytevector bytevector-length string->utf8)
- (only (rnrs io ports) get-bytevector-all get-bytevector-n
- put-bytevector bytevector->string port-eof?)
- (only (ice-9 popen) open-output-pipe)
- (only (ice-9 expect) expect-strings ;; for quick experimentation. Expect needs additional functions and variables available:
- expect expect-regexec expect-timeout expect-select expect-timeout-proc
- expect-char-proc expect-eof-proc expect-strings-compile-flags)
- (only (ice-9 regex) string-match match:substring)
- (ice-9 threads)
- (ice-9 atomic)
- (only (ice-9 q) make-q enq! deq! q-empty?)
- (sxml simple))
- (define today (current-time time-utc))
- (define (time->iso time)
- (date->string (time-utc->date time) "~1"))
- (define (iso->time string)
- (date->time-utc (string->date string "~Y~m~d")))
- (define (add-days time number-of-days)
- (let ((seconds (* 3600 24 number-of-days)))
- (add-duration time
- (make-time time-duration 0 seconds))))
- (define (string-replace-string s char replacement-string)
- (string-join (string-split s char) replacement-string))
- (define (replace-KSK-escaped s)
- (string-replace-string (string-replace-string s #\+ "-")
- #\= "-"))
-
- (define (task-id)
- (replace-KSK-escaped (map number->string (map random-integer (iota 10)))))
- (define prefix-filename "fetchpull-prefix.txt")
- (define prefix-cache #f)
- (define (prefix)
- (cond
- (prefix-cache
- prefix-cache)
- ((file-exists? prefix-filename)
- (read-line (open-input-file prefix-filename)))
- (else
- (let
- ((pw (task-id))
- (port (open-output-file prefix-filename)))
- (display pw port)
- (close-port port)
- pw))))
- (define (KSK-for-insert prefix today days-before mode)
- (format #f "KSK@~a--~a-uploaded-~3,'0d-days-before-using-~a" prefix
- (time->iso (add-days today days-before))
- days-before mode))
- (define (KSK-for-request prefix time days-before mode)
- #(
- (tests
- (test-equal
- "KSK@WwL6-UXTu-sa5n.fAk2-s7kj.5Kp6--2018-11-23-uploaded-005-days-before-using-realtime"
- (KSK-for-request "WwL6-UXTu-sa5n.fAk2-s7kj.5Kp6" (iso->time "2018-11-23") 5 'realtime))))
-
- (format #f "KSK@~a--~a-uploaded-~3,'0d-days-before-using-~a" prefix
- (time->iso time)
- days-before mode))
- ;; the shared FCP socket
- (define sock #f)
- (define (fcp-socket-create)
- (define addrs (getaddrinfo "127.0.0.1" "9482"))
- (define addr (first addrs))
- (define s (socket (addrinfo:fam addr) (addrinfo:socktype addr) (addrinfo:protocol addr)))
- (connect s (addrinfo:addr addr))
- s)
- (define-record-type <message>
- (message-create task type data fields )
- message?
- (task message-task)
- (type message-type)
- (data message-data)
- (fields message-fields ));; avoid duplicates: fred joins duplicate fields with ";" to a single value
- (define (format-field field)
- (format #f "~a=~a"
- (car field)
- (cdr field)))
- (define (join-fields fields)
- #((tests (test-equal "A=B\nX=V" (join-fields (list (cons 'A "B") (cons 'X 'V))))))
- (string-join
- (map format-field fields)
- "\n"))
- (define field-key car)
- (define field-value cdr)
- (define (field-split s)
- (let ((where (string-index s #\=)))
- (if where
- (cons
- (string->symbol (substring/shared s 0 where))
- (substring/shared s (+ where 1) (string-length s)))
- (cons s ""))))
- (define (write-message message sock)
- (display (message-type message) sock)
- (newline sock)
- (when (message-task message)
- (format sock "Identifier=~a\n"
- (message-task message)))
- (when (not (null? (message-fields message)))
- (display (join-fields (message-fields message))
- sock)
- (newline sock))
- (cond
- ((message-data message)
- (format sock "~a\n"
- (format-field (cons 'DataLength (bytevector-length (message-data message)))))
- (format sock "Data\n")
- (put-bytevector sock (message-data message)))
- (else
- (display 'EndMessage sock)
- (newline sock))))
- (define (message-client-hello)
- (message-create #f 'ClientHello #f
- (list (cons 'Name "FetchpullClient" )
- (cons 'ExpectedVersion "2.0"))))
- (define (message-watch-global)
- (message-create #f 'WatchGlobal #f
- (list (cons 'Enabled "true" )
- (cons 'VerbosityMask 1 ))));; simple progress
- (define (message-disconnect)
- (message-create #f 'Disconnect #f
- (list)))
- (define (message-client-get task URI custom-fields)
- (message-create task 'ClientGet #f
- (append
- (list (cons 'URI URI))
- '((Verbosity . 1 );; get SimpleProgress messages for the tasks
- (ReturnType . direct)
- (MaxRetries . 1 );; -1 means: try indefinitely, with ULPR, essentially long polling
- (Global . true)
- (Persistence . reboot))
- custom-fields)))
- (define (message-client-get-realtime task URI)
- (message-client-get task URI
- '(
- (PriorityClass . 2)
- (RealTimeFlag . true)
- (FilterData . false))))
- (define (message-client-put task URI data custom-fields)
- (message-create task 'ClientPut data
- (append
- (list (cons 'URI URI))
- `((Verbosity . 1 );; get SimpleProgress messages for the tasks
- (MaxRetries . 1 );; default: 10
- (Global . true)
- (Persistence . reboot)
- (UploadFrom . direct))
- custom-fields)))
- (define (message-client-put-realtime task URI data)
- (message-client-put task URI data
- '(
- (PriorityClass . 2)
- (RealTimeFlag . true)
- (DontCompress . true)
- (ExtraInsertsSingleBlock . 0)
- (ExtraInsertsSplitfileHeaderBlock . 0)
- (Metadata.ContentType . application/octet-stream))))
- (define (message-remove-request task)
- (message-create task 'RemoveRequest #f
- (list (cons 'Global 'true))))
- (define supported-messages
- '(NodeHello GetFailed DataFound AllData PutSuccessful PutFailed))
- (define (log-warning message things)
- (format (current-output-port)
- "Warning: ~a: ~a\n" message things))
- (define (read-message port)
- (if (port-eof? port)
- #f
- (let loop ((type (string->symbol (read-line port))))
- (define DataLength #f)
- (define task #f)
- (let readlines ((lines (list (read-line port))))
- (define line (first lines))
- (define field (field-split line))
- (when (equal? 'DataLength (field-key field))
- (set! DataLength
- (field-value field)))
- (when (equal? 'Identifier (field-key field))
- (set! task
- (field-value field)))
- ;; pretty-print : list 'line line 'type type
- (cond
- ((string-index line #\=)
- (readlines (cons (read-line port) lines)))
- ((member type supported-messages );; line is Data or EndMessage
- (let
- (
- (data ;; EndMessage has no Data
- (if (and DataLength (not (equal? "EndMessage" line)))
- (get-bytevector-n port (string->number DataLength))
- #f)))
- (message-create task type data
- (map field-split (cdr lines)))))
- (else
- (log-warning "unsupported message type" type)
- (if (port-eof? port)
- #f
- (loop (string->symbol (read-line port))))))))))
- (define next-message
- (make-atomic-box #f))
- (define (send-message message)
- ;; wait until the message was retrieved. This only replaces if the previous content was #f. take-message-to-send switches takes the messages
- (let try ((failed (atomic-box-compare-and-swap! next-message #f message)))
- (when failed
- (usleep 100)
- (try (atomic-box-compare-and-swap! next-message #f message)))))
- (define (take-message-to-send)
- ;; get the message and reset next-message to #f to allow taking another message
- (atomic-box-swap! next-message #f))
- (define message-processors
- (make-atomic-box (list)))
- (define (process message)
- (let loop ((processors (atomic-box-ref message-processors)) (msg message))
- (cond
- ((not msg)
- #f)
- ((null? processors)
- msg)
- (else
- (loop (cdr processors)
- ((first processors) msg))))))
- (define (processor-put! processor)
- (let loop ((old (atomic-box-ref message-processors)))
- (define old-now (atomic-box-compare-and-swap! message-processors old (cons processor old)))
- (when (not (equal? old old-now))
- (loop (atomic-box-ref message-processors)))))
- (define (processor-delete! processor)
- (let loop ((old (atomic-box-ref message-processors)))
- (define old-now (atomic-box-compare-and-swap! message-processors old (delete processor old)))
- (when (not (equal? old old-now))
- (loop (atomic-box-ref message-processors)))))
- (define (fcp-read-loop sock)
- (let loop ((message (read-message sock)))
- (when message
- (warn-unhandled
- (process message))
- (loop (read-message sock)))))
- (define (fcp-write-loop sock)
- (let loop ((message (take-message-to-send)))
- (if message
- (begin
- (write-message message sock))
- (usleep 100))
- (loop (take-message-to-send))))
- (define (warn-unhandled message)
- (when message
- (format (current-error-port );; avoid writing to the error port elsewhere, that causes multithreading problems. Use current-output-port instead
- "Unhandled message ~a\n" message))
- #f)
- (define (printing-passthrough-processor message)
- (pretty-print message)
- message)
- (define (printing-discarding-processor message)
- (pretty-print message)
- #f)
- (define (discarding-processor message)
- #f)
- (define (help args)
- (format (current-output-port)
- "~a [-i] [--help | --version | --test | YYYY-mm-dd]
- Options:
- -i load the script and run an interactive REPL."
- (first args)))
- ;; timing information (alists)
- (define get-successful (list))
- (define get-failed (list))
- (define put-successful (list))
- (define put-failed (list))
- (define get-alldata (list )); the actual data, for debugging
- (define (processor-record-datafound-getdata message)
- (cond
- ((equal? 'DataFound (message-type message))
- (send-message
- (message-create (message-task message)
- 'GetRequestStatus #f
- (list (cons 'Global 'true))))
- #f)
- (else message)))
- (define (current-time-seconds)
- (car (gettimeofday)))
- (define (processor-record-alldata-time message)
- (cond
- ((equal? 'AllData (message-type message))
- (set! get-successful
- (alist-cons (message-task message) (current-time-seconds) get-successful))
- #f)
- (else message)))
- (define (processor-record-getfailed-time message)
- (cond
- ((equal? 'GetFailed (message-type message))
- (set! get-failed
- (alist-cons (message-task message) (current-time-seconds) get-failed))
- #f)
- (else message)))
- (define (processor-record-putfailed-time message)
- (cond
- ((equal? 'PutFailed (message-type message))
- (set! put-failed
- (alist-cons (message-task message) (current-time-seconds) put-failed))
- #f)
- (else message)))
- (define (processor-record-putsuccessful-time message)
- (cond
- ((equal? 'PutSuccessful (message-type message))
- (set! put-successful
- (alist-cons (message-task message) (current-time-seconds) put-successful))
- #f)
- (else message)))
- (define (processor-record-identifier-collision-time message failed)
- (cond
- ((equal? 'IdentifierCollision (message-type message))
- (set! failed
- (alist-cons (message-task message) (current-time-seconds) failed))
- #f)
- (else message)))
- (define (processor-record-identifier-collision-get-time message)
- (processor-record-identifier-collision-time message get-failed))
- (define (processor-record-identifier-collision-put-time message)
- (processor-record-identifier-collision-time message put-failed))
- (define-record-type <duration-entry>
- (duration-entry key duration successful operation mode)
- timing-entry?
- (key duration-entry-key)
- (duration duration-entry-duration)
- (successful duration-entry-success)
- (operation duration-entry-operation );; get or put
- (mode duration-entry-mode ));; realtime bulk speehacks
-
- (define (time-get keys)
- (define start-times (list))
- (define (finished-tasks)
- (append
- (map car get-successful)
- (map car get-failed)))
- ;; setup a processing chain which saves the time information about the request
- (processor-put! processor-record-datafound-getdata)
- (processor-put! processor-record-alldata-time)
- (processor-put! processor-record-getfailed-time)
- (processor-put! processor-record-identifier-collision-get-time)
- ;; just use the keys as task-IDs (Identifiers)
- (let loop ((keys keys))
- (when (not (null? keys))
- ;; first remove requests which might still be in the upload or download queue
- (send-message
- (message-remove-request (first keys)))
- (set! start-times (alist-cons (first keys) (current-time-seconds) start-times))
- (send-message
- (message-client-get-realtime (first keys) (first keys)))
- (loop (cdr keys))))
- ;; wait for completion
- (let loop ((finished (finished-tasks)))
- (when (not (lset<= equal? keys finished))
- (let ((unfinished (lset-difference equal? keys (lset-intersection equal? keys finished))))
- (format (current-output-port)
- "~d download keys still in flight\n" (length unfinished)))
- (usleep 1000000)
- (loop (finished-tasks))))
- ;; all done: cleanup and take the timing
- (processor-delete! processor-record-identifier-collision-get-time)
- (processor-delete! processor-record-getfailed-time)
- (processor-delete! processor-record-alldata-time)
- (processor-delete! processor-record-datafound-getdata)
- (let loop ((keys keys) (times '()))
- (if (null? keys)
- times
- (let ()
- (define key (first keys))
- (define (gettime L) (cdr (assoc key L)))
- (define start-time (gettime start-times))
- (define finish-time (gettime (append get-successful get-failed)))
- (define successful (and (assoc key get-successful) #t ));; forces boolean
- (send-message
- (message-remove-request key))
- (loop (cdr keys)
- (cons (duration-entry (first keys) {finish-time - start-time} successful 'GET 'realtime)
- times))))))
- (define (time-put keys)
- (define start-times (list))
- (define (finished-tasks)
- (append
- (map car put-successful)
- (map car put-failed)))
- ;; setup a processing chain which saves the time information about the request
- (processor-put! processor-record-putsuccessful-time)
- (processor-put! processor-record-putfailed-time)
- (processor-put! processor-record-identifier-collision-put-time)
- ;; just use the keys as task-IDs (Identifiers)
- (let loop ((keys keys))
- (when (not (null? keys))
- ;; first remove requests which might still be in the upload or download queue
- (send-message
- (message-remove-request (first keys)))
- (set! start-times (alist-cons (first keys) (current-time-seconds) start-times))
- (send-message
- (message-client-put-realtime (first keys) (first keys)
- (string->utf8 (first keys))))
- (loop (cdr keys))))
- ;; wait for completion
- (let loop ((finished (finished-tasks)))
- (when (not (lset<= equal? keys finished))
- (let ((unfinished (lset-difference equal? keys (lset-intersection equal? keys finished))))
- (format (current-output-port)
- "~d upload keys still in flight\n" (length unfinished)))
- (usleep 1000000)
- (loop (finished-tasks))))
- ;; all done: cleanup and take the timing
- (processor-delete! processor-record-identifier-collision-put-time)
- (processor-delete! processor-record-putfailed-time)
- (processor-delete! processor-record-putsuccessful-time)
- (let loop ((keys keys) (times '()))
- (if (null? keys)
- times
- (let ()
- (define key (first keys))
- (define (gettime L) (cdr (assoc key L)))
- (define start-time (gettime start-times))
- (define finish-time (gettime (append put-successful put-failed)))
- (define successful (and (assoc key put-successful) #t ));; forces boolean
- (send-message
- (message-remove-request key))
- (loop (cdr keys)
- (cons (duration-entry (first keys) {finish-time - start-time} successful 'PUT 'realtime)
- times))))))
- (define %this-module (current-module))
- (define (test)
- (processor-put! discarding-processor)
- (processor-put! printing-passthrough-processor)
- (set! sock (fcp-socket-create))
- (let
- (
- (fcp-read-thread
- (begin-thread
- (fcp-read-loop sock)))
- (fcp-write-thread
- (begin-thread
- (fcp-write-loop sock))))
- (send-message (message-client-hello))
- (send-message (message-watch-global))
- (send-message (message-client-get-realtime (task-id) "USK@N82omidQlapADLWIym1u4rXvEQhjoIFbMa5~p1SKoOY,LE3WlYKas1AIdoVX~9wahrTlV5oZYhvJ4AcYYGsBq-w,AQACAAE/irclogs/772/2018-11-23.weechatlog"))
- (sleep 30)
- (send-message (message-disconnect))
- (join-thread fcp-write-thread (+ 30 (current-time-seconds)))
- (join-thread fcp-read-thread (+ 30 (current-time-seconds)))
- (close sock)))
- (define (call-with-fcp-connection thunk)
- (set! sock (fcp-socket-create))
- (let
- (
- (fcp-read-thread
- (begin-thread
- (fcp-read-loop sock)))
- (fcp-write-thread
- (begin-thread
- (fcp-write-loop sock))))
- (send-message (message-client-hello))
- (send-message (message-watch-global))
- (thunk)
- (send-message (message-disconnect))
- (join-thread fcp-write-thread (+ 3 (current-time-seconds)))
- (join-thread fcp-read-thread (+ 3 (current-time-seconds)))
- (close sock)))
- (define-syntax-rule (with-fcp-connection exp ...)
- (call-with-fcp-connection
- (λ () exp ...)))
- (define* (stats->csv stats #:key (target-filename #f))
- "Format the all duration-entry in stats as csv file.
- example:
- date;key;duration;days-before;mode;success
- KSK@...;32;16;realtime;false
- KSK@...;40;32;realtime;true
- "
- (define (days-before key)
- (string->number
- (match:substring
- (string-match "uploaded-([0-9]*)-days-before" key)
- 1)))
- (define new (not (and target-filename (file-exists? target-filename))))
- (define port
- (cond
- (target-filename
- (open-file target-filename "al"))
- (else
- (current-output-port))))
- (when new
- (display "day;key;duration;days-before;mode;success" port)
- (newline port ))
- (let loop ((stats stats))
- (when (not (null? stats))
- (let ((s (first stats)))
- (format port "~a;~a;~f;~d;~a;~a\n"
- (time->iso today)
- (duration-entry-key s)
- (duration-entry-duration s)
- (days-before (duration-entry-key s))
- (duration-entry-mode s)
- (duration-entry-success s)))
- (loop (cdr stats))))
- (when target-filename (close-port port)))
- (define (website-content port)
- (define title "Fetch-Pull-Stats re-woven")
- (sxml->xml
- `(*TOP*
- (html
- (head (title ,title)
- (meta (@ (charset "utf-8"))))
- (body (h1 ,title)
- (p (img (@ (src "fetchpull.png") (alt "fetch-pull-statistics"))))
- (p "created with "
- (a (@ (href "https://bitbucket.org/ArneBab/freenet-guile/src/default/fetchpull.w") (title "link to project"))
- "fetchpull.w"))
- (p "plotted with "
- (a (@ (href "fetchpull-plot.gnuplot"))
- "fetchpull-plot.gnuplot")))))
- port))
- (define (create-plot)
- (define gnuplot (open-output-pipe "gnuplot"))
- (define input (open-input-file "fetchpull-plot.gnuplot"))
- (let loop ()
- (when (not (port-eof? input))
- (display (read-char input) gnuplot)
- (loop)))
- (display #\newline gnuplot)
- (close input)
- (close gnuplot)
- (sync))
- (define (copy-resources-to path)
- ;; remove all KSK information from the stats to prevent people from tampering with them
- (let loop ((files '("fetchpull-stats-get.csv" "fetchpull-stats-put.csv")))
- (when (not (null? files))
- (when (file-exists? (first files))
- (let ((new-filename (string-append path file-name-separator-string (first files))))
- (copy-file (first files)
- new-filename)
- (close (open-output-pipe (string-append "sed -i 's/KSK@.*using-realtime/KEY/' " new-filename "\n")))))
- (loop (cdr files))))
- ;; simply copy over the plot and plotting script
- (let loop ((files '("fetchpull-plot.gnuplot" "fetchpull.png")))
- (when (not (null? files))
- (when (file-exists? (first files))
- (copy-file (first files)
- (string-append path file-name-separator-string
- (first files))))
- (loop (cdr files)))))
- (define (ensure-directory-exists path)
- (cond
- ((not (file-exists? path))
- (mkdir path))
- ((not (file-is-directory? path))
- (error 'system-error "Selected path ~A is no directory" path))
- (else path)))
- (define (write-site-to path)
- (define filepath (string-append path file-name-separator-string "index.html"))
- (define port (open-output-file filepath))
- (display "<!doctype html>\n" port)
- (website-content port)
- (close port))
- (define (create-site path)
- (ensure-directory-exists path)
- (create-plot)
- (copy-resources-to path)
- (write-site-to path))
- (define (final-action? args)
- (if {(length args) > 1}
- (cond
- ((equal? "--help" (second args))
- (help args)
- #t)
- ((equal? "--version" (second args))
- (format (current-output-port)
- "~a\n" version)
- #t)
- ((equal? "--test" (second args))
- (test)
- #t)
- ((equal? "--site" (second args))
- (create-site (if {(length args) > 2} (third args) "site"))
- #t)
- (else #f))
- #f))
-
-
- (define (main args)
- (when (not (final-action? args))
- (when {(length args) > 1}
- (pretty-print (second args))
- (set! today (iso->time (second args))))
- (processor-put! printing-passthrough-processor)
- (let ((get-stats '()) (put-stats '()))
- (define (stats-get stat)
- (set! get-stats (append get-stats stat))
- stat)
- (define (stats-put stat)
- (set! put-stats (append put-stats stat))
- stat)
- (with-fcp-connection
- (let loop
- ((modes '(realtime)))
- (define days-before
- (cons 0
- (map (λ(x) (expt 2 x))
- (iota 10))))
- (define* (KSK-for-get days #:key (append ""))
- (KSK-for-request (string-append (prefix) append) today days 'realtime))
- (define* (KSK-for-put days #:key (append ""))
- (KSK-for-insert (string-append (prefix) append) today days 'realtime))
- (when (not (null? modes))
- (stats-put
- (time-put
- (apply append
- (map (λ(x) (map (λ (y) (KSK-for-put y #:append (number->string x))) days-before ))
- (iota 10)))))
- (stats-get
- (time-get
- (apply append
- (map (λ(x) (map (λ (y) (KSK-for-get y #:append (number->string x))) days-before ))
- (iota 10))))))))
- (pretty-print get-stats)
- (pretty-print put-stats)
- (stats->csv get-stats #:target-filename "fetchpull-stats-get.csv")
- (stats->csv put-stats #:target-filename "fetchpull-stats-put.csv"))))
|