main.ml 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. (*
  2. * _ _ ____ _
  3. * _| || |_/ ___| ___ _ __ _ __ ___ | |
  4. * |_ .. _\___ \ / _ \ '_ \| '_ \ / _ \| |
  5. * |_ _|___) | __/ |_) | |_) | (_) |_|
  6. * |_||_| |____/ \___| .__/| .__/ \___/(_)
  7. * |_| |_|
  8. *
  9. * Personal Social Web.
  10. *
  11. * Copyright (C) The #Seppo contributors. All rights reserved.
  12. *
  13. * This program is free software: you can redistribute it and/or modify
  14. * it under the terms of the GNU General Public License as published by
  15. * the Free Software Foundation, either version 3 of the License, or
  16. * (at your option) any later version.
  17. *
  18. * This program is distributed in the hope that it will be useful,
  19. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  20. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  21. * GNU General Public License for more details.
  22. *
  23. * You should have received a copy of the GNU General Public License
  24. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  25. *)
  26. let ( >>= ) = Result.bind
  27. let ( let* ) = Result.bind
  28. let ( let+ ) = Result.map
  29. let lwt_err e = Lwt.return (Error e)
  30. let ( ^/ ) a b =
  31. let p = Uri.path a in
  32. let p' = p ^ b in
  33. Uri.with_path a p'
  34. (** may go to where PubKeyPem is: As2 *)
  35. let post_signed
  36. ?(headers = [ Http.H.ct_jlda; Http.H.acc_app_jlda ])
  37. ~uuid
  38. ~key
  39. body
  40. uri =
  41. let key_id,_,_ = key in
  42. Logr.debug (fun m -> m "%s.%s %a key_id: %a" "Main" "post_signed" Uuidm.pp uuid Uri.pp key_id);
  43. let he_sig = Http.signed_headers key (Ap.PubKeyPem.digest_base64' body) uri in
  44. let headers = Cohttp.Header.add_list he_sig headers in
  45. Http.post ~headers body uri
  46. (** lift http errors to errors triggering a retry. *)
  47. let http_to_err sta =
  48. sta
  49. |> Cohttp.Code.string_of_status
  50. |> Result.error
  51. (** a plain (signed) http post *)
  52. let send_http_post ?(fkt = Lwt.return) ~uuid ~key (msg_id, uri, body) =
  53. Logr.debug (fun m -> m "%s.%s %a / %a %a" "Main" "send_http_post" Uri.pp msg_id Uri.pp uri Uuidm.pp uuid);
  54. let%lwt r = post_signed ~uuid ~key body uri in
  55. match r with
  56. | Error _ as e -> Lwt.return e
  57. | Ok (re,_) as o ->
  58. (match re.status with
  59. | #Cohttp.Code.success_status ->
  60. (* may leak memory for unconsumed body *)
  61. fkt o
  62. | sta ->
  63. sta
  64. |> http_to_err
  65. |> Lwt.return)
  66. (** asynchronous, queueable task.
  67. ActivityPub delivery https://www.w3.org/TR/activitypub/#delivery *)
  68. module Apjob = struct
  69. module Notify = struct
  70. (** Prepare a job to queue. Must correspond to dispatch_job *)
  71. let encode msg_id (ibox, dst_actor_id) json =
  72. let msg_id = msg_id |> Uri.to_string
  73. and ibox = ibox |> Uri.to_string
  74. and id = dst_actor_id |> Uri.to_string
  75. and json = json |> Ezjsonm.value_to_string in
  76. Csexp.(List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom id; Atom json]])
  77. let decode = function
  78. | Csexp.(List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom dst_actor_id; Atom json]]) ->
  79. (match json |> Ezjsonm.value_from_string_result with
  80. | Error _ -> Error ()
  81. | Ok json ->
  82. Ok (
  83. msg_id |> Uri.of_string,
  84. (
  85. ibox |> Uri.of_string,
  86. dst_actor_id |> Uri.of_string
  87. ),
  88. json
  89. ))
  90. | _ ->
  91. Error ()
  92. end
  93. end
  94. (** process one job, typically doing http post requests or signed ActivityPub delivery. *)
  95. let dispatch_job
  96. ?(uuid = Uuidm.v4_gen (Random.State.make_self_init ()) ())
  97. ~base ~key (Job.Task j) payload =
  98. let key_id = Ap.Person.my_key_id ~base in
  99. assert (key_id |> Uri.to_string |> St.is_suffix ~affix:"/actor.jsa#main-key");
  100. Logr.debug (fun m -> m "%s.%s %s %a" "Main" "dispatch_job" j Uuidm.pp uuid);
  101. let fkt ibox = function
  102. | Error e as o ->
  103. Logr.debug (fun m -> m "%s.%s %a %s Error: %s" "Main" "dispatch_job.fkt" Uuidm.pp uuid ibox e);
  104. Lwt.return o
  105. | Ok (rsp,bod) as o ->
  106. let%lwt b = bod |> Cohttp_lwt.Body.to_string in
  107. Logr.debug (fun m -> m "%s.%s %a %s Response: %a\n\n%s" "Main" "dispatch_job.fkt" Uuidm.pp uuid ibox Cohttp.Response.pp_hum rsp b);
  108. Lwt.return o
  109. in
  110. let open Csexp in
  111. match payload with
  112. | List [Atom "2"; Atom msg_id; Atom "http.post"; List [Atom uri; Atom body]] ->
  113. Logr.warn (fun m -> m "%s.%s legacy (maybe future?)" "Main" "dispatch_job");
  114. send_http_post ~uuid ~key (msg_id |> Uri.of_string, uri |> Uri.of_string, body)
  115. | List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom _dst_actor_id; Atom json]] ->
  116. (* Apjob.Notify.encode *)
  117. send_http_post ~fkt:(fkt ibox) ~uuid ~key (msg_id |> Uri.of_string, ibox |> Uri.of_string, json)
  118. | _ ->
  119. (* must correspond to Apjob.Notify.encode *)
  120. Logr.err (fun m -> m "%s %s.%s invalid job format %s" E.e1016 "Main" "dispatch_job" j);
  121. Lwt.return (Error "invalid job format")
  122. (** Simple, file-based, scheduled job queue.
  123. Inspired by http://cr.yp.to/proto/maildir.html
  124. *)
  125. module Queue = struct
  126. let keep_cur_s = 8 * 24 * 60 * 60
  127. let qn = Job.Queue "app/var/spool/job/"
  128. let cur = Job.cur
  129. let err = Job.Slot "err/"
  130. let new_ = Job.new_
  131. let run = Job.Slot "run/"
  132. let tmp = Job.tmp
  133. let wait = Job.Slot "wait/"
  134. (** Move due tasks from wait to new and loop all new. *)
  135. let process_new_and_due
  136. ?(due = Ptime_clock.now ())
  137. ?(wait = wait)
  138. ?(new_ = new_)
  139. ?(run = run)
  140. ?(cur = cur)
  141. ?(err = err)
  142. ~base
  143. ~key
  144. que =
  145. let t0 = Sys.time () in
  146. (* Logr.debug (fun m -> m "%s.%s" "Main" "process_queue"); *)
  147. (** unlink old from dn *)
  148. let dir_clean tmin (Job.Queue qn) (Job.Slot dn) =
  149. Logr.debug (fun m -> m "%s.%s unlink old from '%s->%s'" "Main.Queue" "process_new_and_due.dir_clean" qn dn);
  150. let tmin = tmin |> Ptime.to_float_s in
  151. let open Astring in
  152. assert (qn |> String.is_suffix ~affix:"/");
  153. assert (dn |> String.is_suffix ~affix:"/");
  154. let dn = qn ^ dn in
  155. File.fold_dir (fun init fn ->
  156. if fn |> String.length > 12 (* keep README.txt etc. *)
  157. then
  158. (try
  159. let fn = dn ^ fn in
  160. let st = fn |> Unix.stat in
  161. if st.st_mtime < tmin
  162. then
  163. (Unix.unlink fn;
  164. Logr.debug (fun m -> m "%s.%s unlinked '%s' %f < %f" "Main.Queue" "process_new_and_due.dir_ćlean" fn st.st_mtime tmin))
  165. with _ -> ());
  166. init,true) () dn
  167. in
  168. (** move those due from wait into new *)
  169. let rec move_due_wait_new ~wait ~new_ ~due =
  170. match Job.(any_due ~due wait que) with
  171. | None -> ()
  172. | Some j ->
  173. Job.(move que j wait new_);
  174. move_due_wait_new ~wait ~new_ ~due
  175. in
  176. let rec loop (i : int) : int Lwt.t =
  177. match Job.any new_ que with
  178. | None -> Lwt.return i
  179. | Some j ->
  180. let%lwt _ =
  181. Job.move que j new_ run;
  182. let fn = Job.fn_ que run j
  183. and error s =
  184. Job.wait_or_err ~wait ~err que run j;
  185. Logr.info (fun m -> m "%s.%s job postponed/cancelled: %a reason: %s" "Main.Queue" "process_new_and_due" Job.pp_t j s)
  186. and ok _ =
  187. Job.move que j run cur;
  188. Logr.debug (fun m -> m "%s.%s job done: %a" "Main.Queue" "process_new_and_due" Job.pp_t j)
  189. in
  190. match fn |> File.in_channel Csexp.input with
  191. | Error s ->
  192. s
  193. |> error
  194. |> Lwt.return
  195. | Ok p ->
  196. let%lwt r = try%lwt
  197. dispatch_job ~base ~key j p
  198. with
  199. | Failure s ->
  200. Lwt.return (Error s)
  201. | exn ->
  202. let e = exn |> Printexc.to_string in
  203. Logr.warn (fun m -> m "%s.%s Uncaught Exception job:%a %s" "Main.Queue" "process_new_and_due" Job.pp_t j e);
  204. Lwt.return (Error e)
  205. in
  206. r
  207. |> Result.fold ~error ~ok
  208. |> Lwt.return in
  209. loop (i+1)
  210. in
  211. try%lwt
  212. let oldest = keep_cur_s |> Ptime.Span.of_int_s |> Ptime.sub_span due |> Option.value ~default:Ptime.epoch in
  213. dir_clean oldest qn cur;
  214. dir_clean oldest qn err;
  215. move_due_wait_new ~wait ~new_ ~due;
  216. let%lwt i = loop 0 in
  217. Logr.info (fun m -> m "%s.%s finished, %i jobs processed in dt=%.3fs." "Main.Queue" "process_new_and_due" i (Sys.time() -. t0));
  218. Lwt.return (Ok que)
  219. with | exn ->
  220. let msg = Printexc.to_string exn in
  221. Logr.err (fun m -> m "%s %s.%s processing failed %s" E.e1017 "Main.Queue" "process_new_and_due" msg);
  222. Lwt.return (Error msg)
  223. (** do one http request, fire and forget *)
  224. let http_ping_and_forget ~base ~key ~run_delay_s : Cgi.Response.t' Lwt.t =
  225. Logr.debug (fun m -> m "%s.%s %is" "Main.Queue" "http_ping_and_forget" run_delay_s);
  226. let path = Cfg.seppo_cgi ^ "/ping"
  227. and query = [("loop",[Printf.sprintf "%is" run_delay_s])] in
  228. let uri = Uri.make ~path ~query ()
  229. |> Http.reso ~base in
  230. let key = Some key in
  231. let%lwt f = uri
  232. |> Http.get ~key ~seconds:0.5 (* fire and forget *) in
  233. let _ = Sys.opaque_identity f in
  234. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok") (* discard GET response *)
  235. |> Lwt.return
  236. let run_fn = "app/var/run/queue.pid"
  237. let once
  238. ?(lock = run_fn)
  239. ~run_delay_s
  240. (fkt : Job.f) =
  241. Logr.debug (fun m -> m "%s.%s" "Main.Queue" "once");
  242. try%lwt
  243. let fd = Unix.openfile lock [O_CLOEXEC; O_CREAT; O_TRUNC; O_WRONLY; O_SYNC] 0o644 in
  244. (* https://git.radicallyopensecurity.com/nlnet/ngie-seppo/-/issues/14#note_129407 *)
  245. Unix.lockf fd F_TLOCK 0;
  246. let oc = fd |> Unix.out_channel_of_descr in
  247. Printf.fprintf oc "%i" (Unix.getpid ());
  248. flush oc;
  249. assert ((lock |> Unix.lstat).st_size > 0);
  250. let%lwt _ = fkt qn in
  251. Logr.debug (fun m -> m "%s.%s sleep %is" "Main.Queue" "once" run_delay_s);
  252. Unix.sleep run_delay_s;
  253. 0 |> Unix.ftruncate fd;
  254. close_out oc;
  255. Ok (`OK, [Http.H.ct_plain], Cgi.Response.nobody)
  256. |> Lwt.return
  257. with
  258. | Unix.(Unix_error(EAGAIN, "lockf", ""))
  259. | Unix.(Unix_error(EACCES, "open", "app/var/run/queue.pid")) ->
  260. Logr.debug (fun m -> m "%s.%s don't race, noop" "Main.Queue" "once");
  261. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "noop")
  262. |> Lwt.return
  263. | exn ->
  264. (* @TODO Error number *)
  265. Logr.warn (fun m -> m "%s.%s Uncaught exception %a" "Main.Queue" "once" St.pp_exc exn);
  266. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok")
  267. |> Lwt.return
  268. (** synchronously, sequentially run fkt for all jobs in new. *)
  269. let loop
  270. ?(lock = run_fn)
  271. ~base
  272. ~key
  273. ~run_delay_s
  274. (fkt : Job.f) =
  275. Logr.debug (fun m -> m "%s.%s" "Main.Queue" "loop");
  276. try%lwt
  277. let fd = Unix.openfile lock [O_CLOEXEC; O_CREAT; O_WRONLY; O_SYNC] 0o644 in
  278. (* https://git.radicallyopensecurity.com/nlnet/ngie-seppo/-/issues/14#note_129407 *)
  279. Unix.lockf fd F_TLOCK 0;
  280. let oc = fd |> Unix.out_channel_of_descr in
  281. Printf.fprintf oc "%i" (Unix.getpid ());
  282. flush oc;
  283. let%lwt _ = fkt qn in
  284. Logr.debug (fun m -> m "%s.%s sleep %is" "Main.Queue" "loop" run_delay_s);
  285. Unix.sleep (run_delay_s |> max 3 |> min 1900);
  286. 0 |> Unix.ftruncate fd;
  287. close_out oc;
  288. http_ping_and_forget ~base ~key ~run_delay_s
  289. with
  290. | Unix.(Unix_error(EAGAIN, "lockf", ""))
  291. | Unix.(Unix_error(EACCES, "open", "app/var/run/queue.pid")) ->
  292. Logr.debug (fun m -> m "%s.%s don't race, noop" "Main.Queue" "loop");
  293. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "noop")
  294. |> Lwt.return
  295. | exn ->
  296. (* @TODO Error number *)
  297. Logr.warn (fun m -> m "%s.%s Uncaught exception %a" "Main.Queue" "loop" St.pp_exc exn);
  298. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok")
  299. |> Lwt.return
  300. (** statistics
  301. queue slot job dst_actor note
  302. e.g.
  303. app/var/spool/job/ wait 2025-03-26T090507Z.2.1923969385 https://outerheaven.club/users/mro o/p-52/#17
  304. *)
  305. let stats stderr oc (Job.Queue qn) =
  306. let has_ws = Astring.Char.Ascii.is_white in
  307. assert (qn |> String.exists has_ws = false);
  308. assert (qn |> Astring.String.is_suffix ~affix:"/");
  309. let print oc qn sn jn dst_actor_id msg_id =
  310. Printf.fprintf oc "%s %s %s %s %s\n" qn sn jn dst_actor_id msg_id in
  311. let _ = File.fold_dir (fun oc sn -> (
  312. assert (sn |> String.exists has_ws = false);
  313. assert (sn |> Astring.String.is_suffix ~affix:"/" = false);
  314. match sn with
  315. | "." | ".." -> ()
  316. | sn -> let _ = File.fold_dir (fun oc jn -> (match jn with
  317. | "." | ".." | "README.txt" -> ()
  318. | jn ->
  319. assert (jn |> String.exists has_ws = false);
  320. match qn ^ sn ^ "/" ^ jn |> File.in_channel Csexp.input with
  321. | Error e -> Printf.fprintf stderr "ERROR reading job %s %s: %s" qn sn e
  322. | Ok job -> match job with
  323. | List [Atom "2"; Atom msg_id; Atom "http.post"; List [Atom _uri; Atom _body]] ->
  324. assert (msg_id |> String.exists has_ws = false);
  325. print oc qn sn jn "?" msg_id
  326. | List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom _ibox; Atom dst_actor_id; Atom _json]] ->
  327. assert (msg_id |> String.exists has_ws = false);
  328. print oc qn sn jn dst_actor_id msg_id
  329. | _ ->
  330. Printf.fprintf stderr "ERROR parsing job %s %s %s\n" qn sn jn
  331. );
  332. oc,true) oc (qn ^ sn) in
  333. ());
  334. oc,true) oc qn in
  335. Ok ()
  336. end
  337. (** monitor outgoing url and add to <link>? *)
  338. let sift_urls (e : Rfc4287.Entry.t) =
  339. Logr.debug (fun m -> m "%s.%s not implemented." "Main" "sift_urls");
  340. Ok e
  341. (** Extract tags from a post into a list.
  342. Needs the post and a tag store. Modifies both.
  343. *)
  344. let sift_tags cdb (e : Rfc4287.Entry.t) =
  345. Logr.debug (fun m -> m "%s.%s" "Main" "sift_tags");
  346. let open Rfc4287 in
  347. let c2t init ((Label (Single l),_,_) : Rfc4287.Category.t) =
  348. (Tag.Tag ("#" ^ l)) :: init
  349. in
  350. let t2c init (Tag.Tag t) =
  351. Logr.debug (fun m -> m "%s.%s %s" "Main" "sift_tags" t);
  352. let le = t |> String.length in
  353. assert (1 < le);
  354. let t = if '#' == t.[0]
  355. then String.sub t 1 (le-1)
  356. else t in
  357. let t = Single t in
  358. let l = Category.Label t in
  359. let te = Category.Term t in
  360. (l, te, Rfc4287.tagu) :: init
  361. in
  362. let ti = e.title in
  363. let co = e.content in
  364. let tl = e.categories |> List.fold_left c2t [] in
  365. let ti,co,tl = Tag.cdb_normalise ti co tl cdb in
  366. Ok {e with
  367. title = ti;
  368. content = co;
  369. categories = tl |> List.fold_left t2c []}
  370. let find_handles s =
  371. s
  372. |> Lexing.from_string
  373. |> Plain2handle.handle []
  374. (** find mentions *)
  375. let sift_handles (e : Rfc4287.Entry.t) =
  376. Logr.debug (fun m -> m "%s.%s not implemented." "Main" "sift_handles");
  377. (* Ok ((e.title |> find_handles) @ (e.content |> find_handles)) *)
  378. Ok e
  379. (** queue json for destination reac,ibox into que. *)
  380. let fldbl_notify ~due ~que msg_id json init (reac,ibox) =
  381. Logr.debug (fun m -> m "%s.%s %a -> %a" "Main" "fldbl_notify" Uri.pp reac Uri.pp ibox);
  382. let _ = Apjob.Notify.encode msg_id (ibox, reac) json
  383. |> Csexp.to_string
  384. |> Bytes.of_string
  385. |> Job.enqueue ~due que 0 in
  386. init
  387. (** https://www.rfc-editor.org/rfc/rfc4287#section-4.1.2 *)
  388. module Note = struct
  389. let load_basics () =
  390. let* base = Cfg.Base.(from_file fn) in
  391. let* prof = Cfg.Profile.(from_file fn) in
  392. let* Auth.Uid userinfo,_ = Auth.(from_file fn) in
  393. let host = base |> Uri.host |> Option.value ~default:"-" in
  394. let auth = {Rfc4287.Person.empty with
  395. name = prof.title;
  396. uri = Some (Uri.make ~userinfo ~host ())} in
  397. Ok (base,prof,auth)
  398. (** https://www.rfc-editor.org/rfc/rfc4287#section-4.1.2 *)
  399. module Atom = struct
  400. (** rebuild a single atom page plus evtl. the softlink *)
  401. let page_to_atom ~base ~title ~updated ~lang ~(author : Rfc4287.Person.t) (a,b as pag) =
  402. Logr.debug (fun m -> m "%s.%s %s/%d" "Main.Note.Atom" "page_to_atom" a b);
  403. (** fold ix range into entry. *)
  404. let hydrate sc init (p0,_) =
  405. let* init = init
  406. |> Result.map_error
  407. (fun e -> Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom.hydrate.a" e);
  408. e) in
  409. seek_in sc p0;
  410. let* item = Csexp.input sc
  411. |> Result.map_error
  412. (fun e -> Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom.hydrate.b" e);
  413. e) in
  414. match Rfc4287.Entry.decode item with
  415. | Ok item ->
  416. Logr.debug (fun m -> m "%s.%s 0x%x %a" "Main.Note.Atom" "page_to_atom.hydrate.0" p0 Uri.pp item.id);
  417. Ok (item :: init)
  418. | Error "deleted" ->
  419. Logr.warn (fun m -> m "%s.%s found a stale index entry 0x%x" "Main.Note.Atom" "page_to_atom.hydrate.1" p0);
  420. Ok init
  421. | Error e ->
  422. Logr.err (fun m -> m "%s.%s 0x%x ignoring: %s" "Main.Note.Atom" "page_to_atom.hydrate.2" p0 e);
  423. Ok init in
  424. let pn = pag |> Storage.Page.to_fn in
  425. let* es = Storage.fn |> File.in_channel (fun sc ->
  426. Logr.debug (fun m -> m "%s.%s %s" "Main.Note.Atom" "page_to_atom.hydrate" pn);
  427. pn |> File.in_channel (fun ic ->
  428. match Csexp.input_many ic with
  429. | Error e' as e ->
  430. Logr.err (fun m -> m "%s.%s %s/%d: %s" "Main.Note.Atom" "page_to_atom.hydrate" a b e');
  431. e
  432. | Ok l -> l
  433. |> Storage.TwoPad10.decode_many
  434. |> List.rev
  435. |> List.fold_left (hydrate sc) (Ok [])
  436. )) |> Result.map_error (fun e ->
  437. Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom" e);
  438. e) in
  439. (* this used to be an assert, but I hit one case of non-understood failure *)
  440. if not (es |> St.is_monotonous Rfc4287.Entry.compare_published_desc)
  441. then Logr.warn (fun m -> m "%s soft assertion failed: %s" __LOC__ pn);
  442. let self,first,last,prev,next = pag |> Rfc4287.Feed.compute_links ~max:7000 ~base:Uri.empty in
  443. (* atom index.xml *)
  444. let j_xml = "%-%/index.xml" |> Make.Jig.make in
  445. let fn = [a;b|> string_of_int] |> Make.Jig.paste j_xml |> Option.get in
  446. Logr.debug (fun m -> m "%s.%s %s/%d -> %s (%d entries)" "Main.Note.Atom" "page_to_atom" a b fn (es |> List.length));
  447. let x = es |> Rfc4287.Feed.to_atom
  448. ~base
  449. ~self
  450. ~prev
  451. ~next
  452. ~first
  453. ~last
  454. ~title
  455. ~updated
  456. ~lang
  457. ~author in
  458. let _ = fn |> Filename.dirname |> File.mkdir_p File.pDir in
  459. fn |> File.out_channel_replace (x |> Xml.to_chan ~xsl:(Rfc4287.xsl "posts.xsl" fn));
  460. let _,_ = Storage.make_feed_syml pag fn in
  461. Ok fn
  462. let rule = ({
  463. target = "%-%/index.xml";
  464. prerequisites = "app/var/db/%/%.s" :: Cfg.Base.fn :: Cfg.Profile.fn :: [];
  465. fresh = Make.Outdated;
  466. command = (fun p _rz r t ->
  467. let* base,prof,auth = load_basics () in
  468. assert ("%-%/index.xml" |> String.equal r.target);
  469. assert ("app/var/db/%/%.s" |> String.equal (r.prerequisites |> List.hd));
  470. let src,_,v = t |> Make.src_from r in
  471. Logr.debug (fun m -> m "%s.%s %s %s -> %s" "Main.Note.Atom" "rule" p src t);
  472. let pag = match v with
  473. | [a;b] -> (a,b |> int_of_string)
  474. | _ -> failwith __LOC__ in
  475. let max = Storage.Page.( pag |> find_max |> to_int )
  476. and now = Ptime_clock.now () in
  477. let author = auth
  478. and lang = prof.language
  479. and title = prof.title
  480. and tz = prof.timezone
  481. and self,first,last,prev,next = pag |> Rfc4287.Feed.compute_links ~max ~base:Uri.empty in
  482. let updated = now |> Rfc3339.of_ptime ~tz in
  483. let* pag = src |> File.in_channel Csexp.input_many in
  484. let r = pag
  485. |> List.fold_left Storage.fold_of_twopad10 []
  486. |> Rfc4287.Feed.to_atom_
  487. ~base
  488. ~self
  489. ~prev
  490. ~next
  491. ~first
  492. ~last
  493. ~title
  494. ~updated
  495. ~lang
  496. ~author
  497. t
  498. |> Rfc4287.Feed.to_file t in
  499. (* HOW to (re-)create the symlink in case *)
  500. (* let _,_ = mk_unnumbered_syml (depth,unn,p) fn in *)
  501. r
  502. );
  503. } : Make.t)
  504. let jig = rule.target |> Make.Jig.make
  505. let page_to_fn (a,i : Storage.Page.t) =
  506. assert (a |> St.is_prefix ~affix:"o/");
  507. [a;i |> string_of_int]
  508. |> Make.Jig.paste jig
  509. |> Option.get
  510. end
  511. (** Atom, local *)
  512. let publish ~base ~(profile : Cfg.Profile.t) ~(author : Rfc4287.Person.t) (n : Rfc4287.Entry.t) =
  513. Logr.debug (fun m -> m "%s.%s '%s'" "Main.Note" "publish" n.title);
  514. (* determine id and do store app/var/db/o/p.s *)
  515. (* add to indices (p,d,t) *)
  516. (* (2,"o/p",4) app/var/db/o/p.s app/var/db/o/p/4.s -> o/p-4/index.xml *)
  517. (* (3,"o/d/2023-20-13",4) app/var/db/o/d/2023-10-13/4.s -> o/d/2023-10-13-4/index.xml *)
  518. (* (3,"o/t/tag",4) app/var/db/o/t/tag/4.s -> o/t/tag-4/index.xml *)
  519. (* add to storage and indices (main,date,tags)) *)
  520. let items_per_page = profile.posts_per_page in
  521. let n,(a,_b as ix),pos = n |> Storage.save ~items_per_page in
  522. assert (a |> String.equal "o/p");
  523. let append_to_page pos init pa = let _ = (pos |> Storage.Page.apnd pa) in
  524. pa :: init in
  525. let ix_other : Storage.Page.t list = n
  526. |> Storage.Page.next_other_pages ~items_per_page
  527. |> List.fold_left (append_to_page pos) [] in
  528. (* refresh feeds, outbox etc. *)
  529. let lang = profile.language in
  530. let title = profile.title in
  531. let updated = n.updated (* more precisely would be: now *) in
  532. let mater init ix = (ix |> Atom.page_to_atom ~base ~title ~updated ~lang ~author) :: init in
  533. let l = ix :: ix_other
  534. |> List.fold_left mater [] in
  535. assert ( 1 + 1 + (n.categories |> List.length) == (l |> List.length));
  536. Ok n
  537. module Create = struct
  538. (** Enqueue jobs.
  539. https://www.w3.org/TR/activitypub/#delivery says "Servers MUST de-duplicate
  540. the final recipient list." which implies each actor profile / inbox lookup
  541. can lag delivery for all.
  542. How long could such a consolidated inbox list be cached? In theory not at
  543. all because each inbox target url may change without further notice.
  544. In pratice, we will use the inbox as long as it works and redo the
  545. webfinger/actor lookup otherwise.
  546. 1. get all actor profiles (limit redirects) and extract inbox url
  547. 2. de-duplicate
  548. 3. deliver to all
  549. 4. retry temporary failures
  550. 5. handle permanent failures to clean link rot
  551. *)
  552. let notify_subscribers
  553. ?(due = Ptime_clock.now ())
  554. ?(que = Queue.qn)
  555. ?(cdb = Ap.Followers.cdb)
  556. ~base
  557. (n : Rfc4287.Entry.t) =
  558. let json = n |> Ap.Note.Create.to_json ~base in
  559. cdb |> Ap.Followers.(fold_left (State.ibox' (fldbl_notify ~due ~que n.id json))) (Ok n)
  560. end
  561. (** application logic around delete. *)
  562. module Delete = struct
  563. (* find dirty o/t/foo-1/index.xml and o/d/2024-03-12-7/index.xml pages *)
  564. let dirty (n : Rfc4287.Entry.t) : Storage.Page.t list =
  565. (* the primary o/p/0.s *)
  566. match n.id |> Storage.Id.to_page_i with
  567. | Error _ -> []
  568. | Ok ((p : Storage.Page.t),_ as id') ->
  569. p
  570. :: match id' |> Storage.TwoPad10.from_page_i with
  571. | Error _ -> []
  572. | Ok pos ->
  573. n
  574. |> Storage.Page.other_feeds
  575. |> List.fold_left (fun init (bas,_) ->
  576. match Storage.Page.find pos bas with
  577. | None -> init
  578. | Some v ->
  579. let p,i = v in
  580. Logr.debug (fun m -> m "%s.%s and %s-%i/index.xml" "Main.Note.Delete" "find" p i);
  581. v :: init ) []
  582. (** - remove from storage
  583. - refresh dirty feeds
  584. todo? rather keep a tombstone? https://www.w3.org/TR/activitypub/#delete-activity-outbox *)
  585. let delete (id : Uri.t) =
  586. Logr.debug (fun m -> m "%s.%s '%a'" "Main.Note.Delete" "delete" Uri.pp id);
  587. let* n = Storage.delete id in
  588. let rz = [Atom.rule] in
  589. let _ = n
  590. |> dirty
  591. |> List.fold_left (fun init pag ->
  592. let fn = pag |> Atom.page_to_fn in
  593. (try fn |> Unix.unlink; (* rather than touch .s *)
  594. with Unix.(Unix_error(ENOENT, "unlink", _)) -> ());
  595. (fn |> Make.M2.make rz)
  596. :: init) [] in
  597. Ok n
  598. (** make Ap.Note.Delete.to_json and queue it via fldbl_notify for each in cdb. *)
  599. let notify_subscribers
  600. ?(due = Ptime_clock.now ())
  601. ?(que = Queue.qn)
  602. ?(cdb = Ap.Followers.cdb)
  603. ~base
  604. (n : Rfc4287.Entry.t) =
  605. let json = n |> Ap.Note.Delete.to_json ~base in
  606. cdb |> Ap.Followers.(fold_left (State.ibox' (fldbl_notify ~due ~que n.id json))) (Ok n)
  607. end
  608. end