123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- (*
- * _ _ ____ _
- * _| || |_/ ___| ___ _ __ _ __ ___ | |
- * |_ .. _\___ \ / _ \ '_ \| '_ \ / _ \| |
- * |_ _|___) | __/ |_) | |_) | (_) |_|
- * |_||_| |____/ \___| .__/| .__/ \___/(_)
- * |_| |_|
- *
- * Personal Social Web.
- *
- * Copyright (C) The #Seppo contributors. All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *)
- (* streamline wording with https://v2.ocaml.org/api/Queue.html *)
- type queue = Queue of string
- type slot = Slot of string
- type task = Task of string
- type f = queue -> (queue, string) result Lwt.t
- let qn = Queue "app/var/spool/job/"
- let cur = Slot "cur/"
- let err = Slot "err/"
- let new_ = Slot "new/"
- let run = Slot "run/"
- let tmp = Slot "tmp/"
- let wait = Slot "wait/"
- module P = struct
- let Slot cur' = cur
- let Slot err' = err
- let Slot new_' = new_
- let Slot run' = run
- let Slot tmp' = tmp
- let Slot wait' = wait
- end
- let fn_ (Queue q) (Slot s) (Task j) = q ^ s ^ j
- let pp_s ppf (Slot s) = Format.pp_print_string ppf s
- let pp_t ppf (Task j) = Format.pp_print_string ppf j
- (** exponentially growing delay.
- 0 is zero. Has https://encore.dev/blog/retries#jitter ?*)
- let do_wait ?(now = Ptime_clock.now ()) ?(jitter = (Random.float 0.1) -. 0.05) i =
- assert (jitter >= -0.05);
- assert (jitter <= 0.05);
- assert (i >= 0);
- let f = 60 * (i |> Int.shift_left 1 |> pred)
- |> float_of_int in
- Logr.debug (fun m -> m "%s.%s jitter %f %% = %.2fs" "Job" "do_wait" jitter (f *. jitter));
- let f = f *. (1.0 +. jitter) in
- match f
- |> Ptime.Span.of_float_s
- |> Option.get
- |> Ptime.add_span now with
- | None -> now
- | Some t -> t
- let rfc3339 t =
- let (y, m, d), ((hh, mm, ss), tz_s) = Ptime.to_date_time t in
- assert (tz_s = 0);
- Printf.sprintf "%04d-%02d-%02dT%02d%02d%02dZ" y m d hh mm ss
- let move que job src dst =
- Logr.debug (fun m -> m "%s.%s %a -> %a %a" "Job" "move" pp_s src pp_s dst pp_t job);
- Unix.rename (fn_ que src job) (fn_ que dst job)
- let compute_nonce byt =
- byt
- |> Mcdb.hash32_byt
- |> Optint.to_string
- let compute_fn due n nonce =
- let due = due |> rfc3339 in
- Task (Printf.sprintf "%s.%d.%s.s" due n nonce)
- (** similar Queue.add *)
- let enqueue ?(due = Ptime_clock.now ()) q' n byt =
- Logr.debug (fun m -> m "%s.%s %s" "Job" "enqueue" (due |> rfc3339));
- let nonce = compute_nonce byt in
- let fn = compute_fn due n nonce in
- let tmp' = fn_ q' tmp fn in
- let new' = fn_ q' new_ fn in
- Logr.debug (fun m -> m "%s.%s %s" "Job" "enqueue" new');
- let perm = 0o444 in
- (* @TODO rather an exclusive create *)
- tmp' |> File.out_channel_append ~perm (fun oc -> byt |> output_bytes oc);
- move q' fn tmp new_;
- Ok new_
- let p_true _ = true
- let any ?(pred = p_true) (Slot qn) (Queue qb) =
- (* Logr.debug (fun m -> m "%s.%s %s" "Job" "find_first" qn); *)
- let pred fn = St.is_suffix ~affix:".s" fn && pred fn in
- Option.bind
- (File.any pred (qb ^ qn))
- (fun v -> Some (Task v))
- let any_due ?(due = Ptime_clock.now ()) ?(wait = wait) q =
- let due = rfc3339 due in
- let pred fn =
- match fn |> String.split_on_char '.' with
- | [t; _; _; "s"] -> String.compare t due <= 0
- | _ -> false
- in
- (* Logr.debug (fun m -> m "%s.%s %s" "Job" "find_any_due" due); *)
- any ~pred wait q
- let wait_or_err ?(wait = wait) q s j =
- let maxtries = 13 in
- let Slot s' = s in
- let Task j' = j in
- Logr.debug (fun m -> m "%s.%s %s %s" "Job" "wait_or_err" s' j');
- assert (2 == (s' |> String.split_on_char '/' |> List.length));
- assert (1 == (j' |> String.split_on_char '/' |> List.length));
- match j' |> String.split_on_char '.' with
- | [t0; n; nonce; "s"] ->
- let n = n |> int_of_string |> succ in
- if n > maxtries
- then move q j s err
- else
- let now = match t0 |> Ptime.of_rfc3339 with
- | Ok (t,_,_) -> t
- | _ -> Ptime_clock.now () in
- let t = n |> do_wait ~now in
- let jn' = compute_fn t n nonce in
- Unix.rename (fn_ q s j) (fn_ q wait jn')
- | _ ->
- Logr.err (fun m -> m "%s invalid job '%s'" E.e1015 j');
- move q j s err
|