job.ml 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. (*
  2. * _ _ ____ _
  3. * _| || |_/ ___| ___ _ __ _ __ ___ | |
  4. * |_ .. _\___ \ / _ \ '_ \| '_ \ / _ \| |
  5. * |_ _|___) | __/ |_) | |_) | (_) |_|
  6. * |_||_| |____/ \___| .__/| .__/ \___/(_)
  7. * |_| |_|
  8. *
  9. * Personal Social Web.
  10. *
  11. * Copyright (C) The #Seppo contributors. All rights reserved.
  12. *
  13. * This program is free software: you can redistribute it and/or modify
  14. * it under the terms of the GNU General Public License as published by
  15. * the Free Software Foundation, either version 3 of the License, or
  16. * (at your option) any later version.
  17. *
  18. * This program is distributed in the hope that it will be useful,
  19. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  20. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  21. * GNU General Public License for more details.
  22. *
  23. * You should have received a copy of the GNU General Public License
  24. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  25. *)
  26. (* streamline wording with https://v2.ocaml.org/api/Queue.html *)
  27. type queue = Queue of string
  28. let qn = Queue "app/var/spool/job/"
  29. (* slots *)
  30. let cur = "cur/"
  31. let err = "err/"
  32. let new_ = "new/"
  33. let run = "run/"
  34. let tmp = "tmp/"
  35. let wait = "wait/"
  36. (** exponentially growing delay.
  37. 0 is zero. Has https://encore.dev/blog/retries#jitter ?*)
  38. let do_wait ?(now = Ptime_clock.now ()) ?(jitter = (Random.float 0.1) -. 0.05) i =
  39. assert (jitter >= -0.05);
  40. assert (jitter <= 0.05);
  41. assert (i >= 0);
  42. let f = 60 * (i |> Int.shift_left 1 |> pred)
  43. |> float_of_int in
  44. Logr.debug (fun m -> m "%s.%s jitter %f %% = %.2fs" "Job" "do_wait" jitter (f *. jitter));
  45. let f = f *. (1.0 +. jitter) in
  46. match f
  47. |> Ptime.Span.of_float_s
  48. |> Option.get
  49. |> Ptime.add_span now with
  50. | None -> now
  51. | Some t -> t
  52. let rfc3339 t =
  53. let (y, m, d), ((hh, mm, ss), tz_s) = Ptime.to_date_time t in
  54. assert (tz_s = 0);
  55. Printf.sprintf "%04d-%02d-%02dT%02d%02d%02dZ" y m d hh mm ss
  56. let move (Queue que) job src dst =
  57. Logr.debug (fun m -> m "%s.%s %s -> %s %s" "Job" "move" src dst job);
  58. Unix.rename (que ^ src ^ job) (que ^ dst ^ job)
  59. let compute_nonce byt =
  60. byt
  61. |> Mcdb.hash32_byt
  62. |> Optint.to_string
  63. let compute_fn due n nonce =
  64. Printf.sprintf "%s.%d.%s.s" due n nonce
  65. (** similar Queue.add *)
  66. let enqueue ?(due = Ptime_clock.now ()) q' n byt =
  67. Logr.debug (fun m -> m "%s.%s %s" "Job" "enqueue" (due |> rfc3339));
  68. let due = due |> rfc3339 in
  69. let nonce = compute_nonce byt in
  70. let fn = compute_fn due n nonce in
  71. let Queue q = q' in
  72. let tmp' = q ^ tmp ^ fn in
  73. let new' = q ^ new_ ^ fn in
  74. Logr.debug (fun m -> m "%s.%s %s" "Job" "enqueue" new');
  75. let perm = 0o444 in
  76. (* @TODO rather an exclusive create *)
  77. tmp' |> File.out_channel_append ~perm (fun oc -> byt |> output_bytes oc);
  78. move q' fn tmp new_;
  79. Ok new'
  80. let p_true _ = true
  81. let any ?(pred = p_true) qn (Queue qb) =
  82. (* Logr.debug (fun m -> m "%s.%s %s" "Job" "find_first" qn); *)
  83. let pred fn = St.is_suffix ~affix:".s" fn && pred fn in
  84. File.any pred (qb ^ qn)
  85. let any_due ?(due = Ptime_clock.now ()) ?(wait = wait) q =
  86. let due = rfc3339 due in
  87. let pred fn =
  88. match fn |> String.split_on_char '.' with
  89. | [t; _; _; "s"] -> String.compare t due <= 0
  90. | _ -> false
  91. in
  92. (* Logr.debug (fun m -> m "%s.%s %s" "Job" "find_any_due" due); *)
  93. any ~pred wait q
  94. let wait_or_err ?(wait = wait) q' s j =
  95. let maxtries = 13 in
  96. let (Queue q) = q' in
  97. Logr.debug (fun m -> m "%s.%s %s %s" "Job" "wait_or_err" s j);
  98. assert (2 == (s |> String.split_on_char '/' |> List.length));
  99. assert (1 == (j |> String.split_on_char '/' |> List.length));
  100. match j |> String.split_on_char '.' with
  101. | [t0; n; nonce; "s"] ->
  102. let n = n |> int_of_string |> succ in
  103. if n > maxtries
  104. then move q' j s err
  105. else
  106. let now = match t0 |> Ptime.of_rfc3339 with
  107. | Ok (t,_,_) -> t
  108. | _ -> Ptime_clock.now () in
  109. let t = n |> do_wait ~now |> rfc3339 in
  110. let jn' = compute_fn t n nonce in
  111. Unix.rename (q ^ s ^ j) (q ^ wait ^ jn')
  112. | _ ->
  113. Logr.err (fun m -> m "%s invalid job '%s'" E.e1015 j);
  114. move q' j s err