main.ml 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. (*
  2. * _ _ ____ _
  3. * _| || |_/ ___| ___ _ __ _ __ ___ | |
  4. * |_ .. _\___ \ / _ \ '_ \| '_ \ / _ \| |
  5. * |_ _|___) | __/ |_) | |_) | (_) |_|
  6. * |_||_| |____/ \___| .__/| .__/ \___/(_)
  7. * |_| |_|
  8. *
  9. * Personal Social Web.
  10. *
  11. * Copyright (C) The #Seppo contributors. All rights reserved.
  12. *
  13. * This program is free software: you can redistribute it and/or modify
  14. * it under the terms of the GNU General Public License as published by
  15. * the Free Software Foundation, either version 3 of the License, or
  16. * (at your option) any later version.
  17. *
  18. * This program is distributed in the hope that it will be useful,
  19. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  20. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  21. * GNU General Public License for more details.
  22. *
  23. * You should have received a copy of the GNU General Public License
  24. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  25. *)
  26. let ( >>= ) = Result.bind
  27. let ( let* ) = Result.bind
  28. let ( let+ ) = Result.map
  29. let lwt_err e = Lwt.return (Error e)
  30. let ( ^/ ) a b =
  31. let p = Uri.path a in
  32. let p' = p ^ b in
  33. Uri.with_path a p'
  34. (** may go to where PubKeyPem is: As2 *)
  35. let post_signed
  36. ?(date = Ptime_clock.now ())
  37. ?(headers = [ Http.H.ct_jlda; Http.H.acc_app_jlda ])
  38. ~uuid
  39. ~key_id
  40. ~pk
  41. body
  42. uri =
  43. Logr.debug (fun m -> m "%s.%s %a key_id: %a" "Main" "post_signed" Uuidm.pp uuid Uri.pp key_id);
  44. assert (key_id |> Uri.to_string |> St.is_suffix ~affix:"/actor.jsa#main-key");
  45. let key = Http.Signature.mkey key_id pk date in
  46. let he_sig = (Http.signed_headers key (Ap.PubKeyPem.digest_base64' body) uri) in
  47. let headers = Cohttp.Header.add_list he_sig headers in
  48. Http.post ~headers body uri
  49. (** lift http errors to errors triggering a retry. *)
  50. let http_to_err sta =
  51. sta
  52. |> Cohttp.Code.string_of_status
  53. |> Result.error
  54. (** a plain (signed) http post *)
  55. let send_http_post ?(fkt = Lwt.return) ~uuid ~key_id ~pk (msg_id, uri, body) =
  56. Logr.debug (fun m -> m "%s.%s %a / %a %a" "Main" "send_http_post" Uri.pp msg_id Uri.pp uri Uuidm.pp uuid);
  57. let%lwt r = post_signed ~uuid ~pk ~key_id body uri in
  58. match r with
  59. | Error _ as e -> Lwt.return e
  60. | Ok (re,_) as o ->
  61. (match re.status with
  62. | #Cohttp.Code.success_status ->
  63. (* may leak memory for unconsumed body *)
  64. fkt o
  65. | sta ->
  66. sta
  67. |> http_to_err
  68. |> Lwt.return)
  69. (** asynchronous, queueable task.
  70. ActivityPub delivery https://www.w3.org/TR/activitypub/#delivery *)
  71. module Apjob = struct
  72. module Notify = struct
  73. (** Prepare a job to queue. Must correspond to dispatch_job *)
  74. let encode msg_id (ibox, dst_actor_id) json =
  75. let msg_id = msg_id |> Uri.to_string
  76. and ibox = ibox |> Uri.to_string
  77. and id = dst_actor_id |> Uri.to_string
  78. and json = json |> Ezjsonm.value_to_string in
  79. Csexp.(List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom id; Atom json]])
  80. let decode = function
  81. | Csexp.(List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom dst_actor_id; Atom json]]) ->
  82. (match json |> Ezjsonm.value_from_string_result with
  83. | Error _ -> Error ()
  84. | Ok json ->
  85. Ok (
  86. msg_id |> Uri.of_string,
  87. (
  88. ibox |> Uri.of_string,
  89. dst_actor_id |> Uri.of_string
  90. ),
  91. json
  92. ))
  93. | _ ->
  94. Error ()
  95. end
  96. end
  97. (** process one job, typically doing http post requests or signed ActivityPub delivery. *)
  98. let dispatch_job
  99. ?(uuid = Uuidm.v4_gen (Random.State.make_self_init ()) ())
  100. ~base ~pk j payload =
  101. let sndr = Uri.make ~path:Ap.proj () |> Http.reso ~base in
  102. let key_id = sndr |> Ap.Person.my_key_id in
  103. Logr.debug (fun m -> m "%s.%s %s %a" "Main" "dispatch_job" j Uuidm.pp uuid);
  104. assert (sndr |> Uri.to_string |> St.is_suffix ~affix:"/actor.jsa");
  105. assert (key_id |> Uri.to_string |> St.is_suffix ~affix:"/actor.jsa#main-key");
  106. let fkt ibox = function
  107. | Error e as o ->
  108. Logr.debug (fun m -> m "%s.%s %a %s Error: %s" "Main" "dispatch_job.fkt" Uuidm.pp uuid ibox e);
  109. Lwt.return o
  110. | Ok (rsp,bod) as o ->
  111. let%lwt b = bod |> Cohttp_lwt.Body.to_string in
  112. Logr.debug (fun m -> m "%s.%s %a %s Response: %a\n\n%s" "Main" "dispatch_job.fkt" Uuidm.pp uuid ibox Cohttp.Response.pp_hum rsp b);
  113. Lwt.return o
  114. in
  115. let open Csexp in
  116. match payload with
  117. | List [Atom "2"; Atom msg_id; Atom "http.post"; List [Atom uri; Atom body]] ->
  118. Logr.warn (fun m -> m "%s.%s legacy (maybe future?)" "Main" "dispatch_job");
  119. send_http_post ~uuid ~key_id ~pk (msg_id |> Uri.of_string, uri |> Uri.of_string, body)
  120. | List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom _dst_actor_id; Atom json]] ->
  121. (* Apjob.Notify.encode *)
  122. send_http_post ~fkt:(fkt ibox) ~uuid ~key_id ~pk (msg_id |> Uri.of_string, ibox |> Uri.of_string, json)
  123. | _ ->
  124. (* must correspond to Apjob.Notify.encode *)
  125. Logr.err (fun m -> m "%s %s.%s invalid job format %s" E.e1016 "Main" "dispatch_job" j);
  126. Lwt.return (Error "invalid job format")
  127. (** Simple, file-based, scheduled job queue.
  128. Inspired by http://cr.yp.to/proto/maildir.html
  129. *)
  130. module Queue = struct
  131. let keep_cur_s = 3 * 24 * 60 * 60
  132. (** Move due tasks from wait to new and loop all new. *)
  133. let process_new_and_due
  134. ?(due = Ptime_clock.now ())
  135. ?(wait = Job.wait)
  136. ?(new_ = Job.new_)
  137. ?(run = Job.run)
  138. ?(cur = Job.cur)
  139. ~base
  140. ~pk
  141. que =
  142. let t0 = Sys.time () in
  143. (* Logr.debug (fun m -> m "%s.%s" "Main" "process_queue"); *)
  144. (** unlink old from dn *)
  145. let dir_clean tmin (Job.Queue qn) dn =
  146. Logr.debug (fun m -> m "%s.%s unlink old from '%s->%s'" "Main.Queue" "process_new_and_due.dir_clean" qn dn);
  147. let tmin = tmin |> Ptime.to_float_s in
  148. let open Astring in
  149. assert (qn |> String.is_suffix ~affix:"/");
  150. assert (dn |> String.is_suffix ~affix:"/");
  151. let dn = qn ^ dn in
  152. File.fold_dir (fun init fn ->
  153. if fn |> String.length > 12 (* keep README.txt etc. *)
  154. then
  155. (try
  156. let fn = dn ^ fn in
  157. let st = fn |> Unix.stat in
  158. if st.st_mtime < tmin
  159. then
  160. (Unix.unlink fn;
  161. Logr.debug (fun m -> m "%s.%s unlinked '%s' %f < %f" "Main.Queue" "process_new_and_due.dir_ćlean" fn st.st_mtime tmin))
  162. with _ -> ());
  163. init,true) () dn
  164. in
  165. (** move those due from wait into new *)
  166. let rec move_due_wait_new ~wait ~new_ ~due =
  167. match Job.(any_due ~due ~wait que) with
  168. | None -> ()
  169. | Some j ->
  170. Job.(move que j wait new_);
  171. move_due_wait_new ~wait ~new_ ~due
  172. in
  173. let Queue que' = que in
  174. let rec loop (i : int) : int Lwt.t =
  175. match Job.any new_ que with
  176. | None -> Lwt.return i
  177. | Some j ->
  178. let%lwt _ =
  179. Job.move que j new_ run;
  180. let fn = que' ^ run ^ j
  181. and error s =
  182. Job.wait_or_err ~wait que run j;
  183. Logr.info (fun m -> m "%s.%s job postponed/cancelled: %s reason: %s" "Main.Queue" "process_new_and_due" j s)
  184. and ok _ =
  185. Job.move que j run cur;
  186. Logr.debug (fun m -> m "%s.%s job done: %s" "Main.Queue" "process_new_and_due" j)
  187. in
  188. match fn |> File.in_channel Csexp.input with
  189. | Error s ->
  190. s
  191. |> error
  192. |> Lwt.return
  193. | Ok p ->
  194. let%lwt r = try%lwt
  195. dispatch_job ~base ~pk j p
  196. with
  197. | Failure s ->
  198. Lwt.return (Error s)
  199. | exn ->
  200. let e = exn |> Printexc.to_string in
  201. Logr.warn (fun m -> m "%s.%s Uncaught Exception job:%s %s" "Main.Queue" "process_new_and_due" j e);
  202. Lwt.return (Error e)
  203. in
  204. r
  205. |> Result.fold ~error ~ok
  206. |> Lwt.return in
  207. loop (i+1)
  208. in
  209. try%lwt
  210. dir_clean (keep_cur_s |> Ptime.Span.of_int_s |> Ptime.sub_span due |> Option.value ~default:Ptime.epoch) Job.qn cur;
  211. move_due_wait_new ~wait ~new_ ~due;
  212. let%lwt i = loop 0 in
  213. Logr.info (fun m -> m "%s.%s finished, %i jobs processed in dt=%.3fs." "Main.Queue" "process_new_and_due" i (Sys.time() -. t0));
  214. Lwt.return (Ok que)
  215. with | exn ->
  216. let msg = Printexc.to_string exn in
  217. Logr.err (fun m -> m "%s %s.%s processing failed %s" E.e1017 "Main.Queue" "process_new_and_due" msg);
  218. Lwt.return (Error msg)
  219. (** do one http request, fire and forget *)
  220. let ping_and_forget ~base ~run_delay_s =
  221. Logr.debug (fun m -> m "%s.%s %is" "Main.Queue" "ping_and_forget" run_delay_s);
  222. let path = Cfg.seppo_cgi ^ "/ping"
  223. and query = [("loop",[Printf.sprintf "%is" run_delay_s])] in
  224. let uri = Uri.make ~path ~query ()
  225. |> Http.reso ~base in
  226. let%lwt f = uri
  227. |> Http.get ~seconds:0.5 (* fire and forget *) in
  228. let _ = Sys.opaque_identity f in
  229. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok")
  230. |> Lwt.return
  231. let run_fn = "app/var/run/queue.pid"
  232. (** synchronously, sequentially run fkt for all jobs in new. *)
  233. let loop
  234. ?(lock = run_fn)
  235. ~base
  236. ~run_delay_s
  237. fkt =
  238. Logr.debug (fun m -> m "%s.%s" "Main.Queue" "loop");
  239. let _t0 = Unix.time () in
  240. try%lwt
  241. let fd = Unix.openfile lock [O_CLOEXEC; O_CREAT; O_TRUNC; O_WRONLY; O_SYNC] 0o644 in
  242. (* https://git.radicallyopensecurity.com/nlnet/ngie-seppo/-/issues/14#note_129407 *)
  243. Unix.lockf fd F_TLOCK 0;
  244. Unix.ftruncate fd 0;
  245. let oc = fd |> Unix.out_channel_of_descr in
  246. Printf.fprintf oc "%i" (Unix.getpid ());
  247. flush oc;
  248. let%lwt _ = fkt Job.qn
  249. in
  250. Logr.debug (fun m -> m "%s.%s sleep %is" "Main.Queue" "loop" run_delay_s);
  251. Unix.sleep (run_delay_s |> max 3 |> min 1900);
  252. close_out oc;
  253. Unix.unlink lock; (* do not loop if someone unlinked the lock *)
  254. ping_and_forget ~base ~run_delay_s
  255. with
  256. | Unix.(Unix_error(EAGAIN, "lockf", ""))
  257. | Unix.(Unix_error(EACCES, "open", "app/var/run/queue.pid")) ->
  258. Logr.debug (fun m -> m "%s.%s don't race, noop" "Main.Queue" "loop");
  259. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "noop")
  260. |> Lwt.return
  261. | exn ->
  262. (* @TODO Error number *)
  263. Logr.warn (fun m -> m "%s.%s Uncaught exception %a" "Main.Queue" "loop" St.pp_exc exn);
  264. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok")
  265. |> Lwt.return
  266. end
  267. (** monitor outgoing url and add to <link>? *)
  268. let sift_urls (e : Rfc4287.Entry.t) =
  269. Logr.debug (fun m -> m "%s.%s not implemented." "Main" "sift_urls");
  270. Ok e
  271. (** Extract tags from a post into a list.
  272. Needs the post and a tag store. Modifies both.
  273. *)
  274. let sift_tags cdb (e : Rfc4287.Entry.t) =
  275. Logr.debug (fun m -> m "%s.%s" "Main" "sift_tags");
  276. let open Rfc4287 in
  277. let c2t init ((Label (Single l),_,_) : Rfc4287.Category.t) =
  278. (Tag.Tag ("#" ^ l)) :: init
  279. in
  280. let t2c init (Tag.Tag t) =
  281. Logr.debug (fun m -> m "%s.%s %s" "Main" "sift_tags" t);
  282. let le = t |> String.length in
  283. assert (1 < le);
  284. let t = if '#' == t.[0]
  285. then String.sub t 1 (le-1)
  286. else t in
  287. let t = Single t in
  288. let l = Category.Label t in
  289. let te = Category.Term t in
  290. (l, te, Rfc4287.tagu) :: init
  291. in
  292. let ti = e.title in
  293. let co = e.content in
  294. let tl = e.categories |> List.fold_left c2t [] in
  295. let ti,co,tl = Tag.cdb_normalise ti co tl cdb in
  296. Ok {e with
  297. title = ti;
  298. content = co;
  299. categories = tl |> List.fold_left t2c []}
  300. let find_handles s =
  301. s
  302. |> Lexing.from_string
  303. |> Plain2handle.handle []
  304. (** find mentions *)
  305. let sift_handles (e : Rfc4287.Entry.t) =
  306. Logr.debug (fun m -> m "%s.%s not implemented." "Main" "sift_handles");
  307. (* Ok ((e.title |> find_handles) @ (e.content |> find_handles)) *)
  308. Ok e
  309. (** queue json for destination reac,ibox into que. *)
  310. let fldbl_notify ~due ~que msg_id json init (reac,ibox) =
  311. Logr.debug (fun m -> m "%s.%s %a -> %a" "Main" "fldbl_notify" Uri.pp reac Uri.pp ibox);
  312. let _ = Apjob.Notify.encode msg_id (ibox, reac) json
  313. |> Csexp.to_string
  314. |> Bytes.of_string
  315. |> Job.enqueue ~due que 0 in
  316. init
  317. (** https://www.rfc-editor.org/rfc/rfc4287#section-4.1.2 *)
  318. module Note = struct
  319. let load_basics () =
  320. let* base = Cfg.Base.(from_file fn) in
  321. let* prof = Cfg.Profile.(from_file fn) in
  322. let* Auth.Uid userinfo,_ = Auth.(from_file fn) in
  323. let host = base |> Uri.host |> Option.value ~default:"-" in
  324. let auth = {Rfc4287.Person.empty with
  325. name = prof.title;
  326. uri = Some (Uri.make ~userinfo ~host ())} in
  327. Ok (base,prof,auth)
  328. (** https://www.rfc-editor.org/rfc/rfc4287#section-4.1.2 *)
  329. module Atom = struct
  330. (** rebuild a single atom page plus evtl. the softlink *)
  331. let page_to_atom ~base ~title ~updated ~lang ~(author : Rfc4287.Person.t) (a,b as pag) =
  332. Logr.debug (fun m -> m "%s.%s %s/%d" "Main.Note.Atom" "page_to_atom" a b);
  333. (** fold ix range into entry. *)
  334. let hydrate sc init (p0,_) =
  335. let* init = init
  336. |> Result.map_error
  337. (fun e -> Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom.hydrate.a" e);
  338. e) in
  339. seek_in sc p0;
  340. let* item = Csexp.input sc
  341. |> Result.map_error
  342. (fun e -> Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom.hydrate.b" e);
  343. e) in
  344. match Rfc4287.Entry.decode item with
  345. | Ok item ->
  346. Logr.debug (fun m -> m "%s.%s 0x%x %a" "Main.Note.Atom" "page_to_atom.hydrate.0" p0 Uri.pp item.id);
  347. Ok (item :: init)
  348. | Error "deleted" ->
  349. Logr.warn (fun m -> m "%s.%s found a stale index entry 0x%x" "Main.Note.Atom" "page_to_atom.hydrate.1" p0);
  350. Ok init
  351. | Error e ->
  352. Logr.err (fun m -> m "%s.%s 0x%x ignoring: %s" "Main.Note.Atom" "page_to_atom.hydrate.2" p0 e);
  353. Ok init in
  354. let pn = pag |> Storage.Page.to_fn in
  355. let* es = Storage.fn |> File.in_channel (fun sc ->
  356. Logr.debug (fun m -> m "%s.%s %s" "Main.Note.Atom" "page_to_atom.hydrate" pn);
  357. pn |> File.in_channel (fun ic ->
  358. match Csexp.input_many ic with
  359. | Error e' as e ->
  360. Logr.err (fun m -> m "%s.%s %s/%d: %s" "Main.Note.Atom" "page_to_atom.hydrate" a b e');
  361. e
  362. | Ok l -> l
  363. |> Storage.TwoPad10.decode_many
  364. |> List.rev
  365. |> List.fold_left (hydrate sc) (Ok [])
  366. )) |> Result.map_error (fun e ->
  367. Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom" e);
  368. e) in
  369. (* this used to be an assert, but I hit one case of non-understood failure *)
  370. if not (es |> St.is_monotonous Rfc4287.Entry.compare_published_desc)
  371. then Logr.warn (fun m -> m "%s soft assertion failed: %s" __LOC__ pn);
  372. let self,first,last,prev,next = pag |> Rfc4287.Feed.compute_links ~max:7000 ~base:Uri.empty in
  373. (* atom index.xml *)
  374. let j_xml = "%-%/index.xml" |> Make.Jig.make in
  375. let fn = [a;b|> string_of_int] |> Make.Jig.paste j_xml |> Option.get in
  376. Logr.debug (fun m -> m "%s.%s %s/%d -> %s (%d entries)" "Main.Note.Atom" "page_to_atom" a b fn (es |> List.length));
  377. let x = es |> Rfc4287.Feed.to_atom
  378. ~base
  379. ~self
  380. ~prev
  381. ~next
  382. ~first
  383. ~last
  384. ~title
  385. ~updated
  386. ~lang
  387. ~author in
  388. let _ = fn |> Filename.dirname |> File.mkdir_p File.pDir in
  389. fn |> File.out_channel_replace (x |> Xml.to_chan ~xsl:(Rfc4287.xsl "posts.xsl" fn));
  390. let _,_ = Storage.make_feed_syml pag fn in
  391. Ok fn
  392. let rule = ({
  393. target = "%-%/index.xml";
  394. prerequisites = "app/var/db/%/%.s" :: Cfg.Base.fn :: Cfg.Profile.fn :: [];
  395. fresh = Make.Outdated;
  396. command = (fun p _rz r t ->
  397. let* base,prof,auth = load_basics () in
  398. assert ("%-%/index.xml" |> String.equal r.target);
  399. assert ("app/var/db/%/%.s" |> String.equal (r.prerequisites |> List.hd));
  400. let src,_,v = t |> Make.src_from r in
  401. Logr.debug (fun m -> m "%s.%s %s %s -> %s" "Main.Note.Atom" "rule" p src t);
  402. let pag = match v with
  403. | [a;b] -> (a,b |> int_of_string)
  404. | _ -> failwith __LOC__ in
  405. let max = Storage.Page.( pag |> find_max |> to_int )
  406. and now = Ptime_clock.now () in
  407. let author = auth
  408. and lang = prof.language
  409. and title = prof.title
  410. and tz = prof.timezone
  411. and self,first,last,prev,next = pag |> Rfc4287.Feed.compute_links ~max ~base:Uri.empty in
  412. let updated = now |> Rfc3339.of_ptime ~tz in
  413. let* pag = src |> File.in_channel Csexp.input_many in
  414. let r = pag
  415. |> List.fold_left Storage.fold_of_twopad10 []
  416. |> Rfc4287.Feed.to_atom_
  417. ~base
  418. ~self
  419. ~prev
  420. ~next
  421. ~first
  422. ~last
  423. ~title
  424. ~updated
  425. ~lang
  426. ~author
  427. t
  428. |> Rfc4287.Feed.to_file t in
  429. (* HOW to (re-)create the symlink in case *)
  430. (* let _,_ = mk_unnumbered_syml (depth,unn,p) fn in *)
  431. r
  432. );
  433. } : Make.t)
  434. let jig = rule.target |> Make.Jig.make
  435. let page_to_fn (a,i : Storage.Page.t) =
  436. assert (a |> St.is_prefix ~affix:"o/");
  437. [a;i |> string_of_int]
  438. |> Make.Jig.paste jig
  439. |> Option.get
  440. end
  441. (** Atom, local *)
  442. let publish ~base ~(profile : Cfg.Profile.t) ~(author : Rfc4287.Person.t) (n : Rfc4287.Entry.t) =
  443. Logr.debug (fun m -> m "%s.%s '%s'" "Main.Note" "publish" n.title);
  444. (* determine id and do store app/var/db/o/p.s *)
  445. (* add to indices (p,d,t) *)
  446. (* (2,"o/p",4) app/var/db/o/p.s app/var/db/o/p/4.s -> o/p-4/index.xml *)
  447. (* (3,"o/d/2023-20-13",4) app/var/db/o/d/2023-10-13/4.s -> o/d/2023-10-13-4/index.xml *)
  448. (* (3,"o/t/tag",4) app/var/db/o/t/tag/4.s -> o/t/tag-4/index.xml *)
  449. (* add to storage and indices (main,date,tags)) *)
  450. let items_per_page = profile.posts_per_page in
  451. let n,(a,_b as ix),pos = n |> Storage.save ~items_per_page in
  452. assert (a |> String.equal "o/p");
  453. let append_to_page pos init pa = let _ = (pos |> Storage.Page.apnd pa) in
  454. pa :: init in
  455. let ix_other : Storage.Page.t list = n
  456. |> Storage.Page.next_other_pages ~items_per_page
  457. |> List.fold_left (append_to_page pos) [] in
  458. (* refresh feeds, outbox etc. *)
  459. let lang = profile.language in
  460. let title = profile.title in
  461. let updated = n.updated (* more precisely would be: now *) in
  462. let mater init ix = (ix |> Atom.page_to_atom ~base ~title ~updated ~lang ~author) :: init in
  463. let l = ix :: ix_other
  464. |> List.fold_left mater [] in
  465. assert ( 1 + 1 + (n.categories |> List.length) == (l |> List.length));
  466. Ok n
  467. module Create = struct
  468. (** Enqueue jobs.
  469. https://www.w3.org/TR/activitypub/#delivery says "Servers MUST de-duplicate
  470. the final recipient list." which implies each actor profile / inbox lookup
  471. can lag delivery for all.
  472. How long could such a consolidated inbox list be cached? In theory not at
  473. all because each inbox target url may change without further notice.
  474. In pratice, we will use the inbox as long as it works and redo the
  475. webfinger/actor lookup otherwise.
  476. 1. get all actor profiles (limit redirects) and extract inbox url
  477. 2. de-duplicate
  478. 3. deliver to all
  479. 4. retry temporary failures
  480. 5. handle permanent failures to clean link rot
  481. *)
  482. let notify_subscribers
  483. ?(due = Ptime_clock.now ())
  484. ?(que = Job.qn)
  485. ?(cdb = Ap.Followers.cdb)
  486. ~base
  487. (n : Rfc4287.Entry.t) =
  488. let json = n |> Ap.Note.Create.to_json ~base in
  489. cdb |> Ap.Followers.(fold_left (State.ibox' (fldbl_notify ~due ~que n.id json))) (Ok n)
  490. end
  491. (** application logic around delete. *)
  492. module Delete = struct
  493. (* find dirty o/t/foo-1/index.xml and o/d/2024-03-12-7/index.xml pages *)
  494. let dirty (n : Rfc4287.Entry.t) : Storage.Page.t list =
  495. (* the primary o/p/0.s *)
  496. match n.id |> Storage.Id.to_page_i with
  497. | Error _ -> []
  498. | Ok ((p : Storage.Page.t),_ as id') ->
  499. p
  500. :: match id' |> Storage.TwoPad10.from_page_i with
  501. | Error _ -> []
  502. | Ok pos ->
  503. n
  504. |> Storage.Page.other_feeds
  505. |> List.fold_left (fun init (bas,_) ->
  506. match Storage.Page.find pos bas with
  507. | None -> init
  508. | Some v ->
  509. let p,i = v in
  510. Logr.debug (fun m -> m "%s.%s and %s-%i/index.xml" "Main.Note.Delete" "find" p i);
  511. v :: init ) []
  512. (** - remove from storage
  513. - refresh dirty feeds
  514. todo? rather keep a tombstone? https://www.w3.org/TR/activitypub/#delete-activity-outbox *)
  515. let delete (id : Uri.t) =
  516. Logr.debug (fun m -> m "%s.%s '%a'" "Main.Note.Delete" "delete" Uri.pp id);
  517. let* n = Storage.delete id in
  518. let rz = [Atom.rule] in
  519. let _ = n
  520. |> dirty
  521. |> List.fold_left (fun init pag ->
  522. let fn = pag |> Atom.page_to_fn in
  523. (try fn |> Unix.unlink; (* rather than touch .s *)
  524. with Unix.(Unix_error(ENOENT, "unlink", _)) -> ());
  525. (fn |> Make.M2.make rz)
  526. :: init) [] in
  527. Ok n
  528. (** make Ap.Note.Delete.to_json and queue it via fldbl_notify for each in cdb. *)
  529. let notify_subscribers
  530. ?(due = Ptime_clock.now ())
  531. ?(que = Job.qn)
  532. ?(cdb = Ap.Followers.cdb)
  533. ~base
  534. (n : Rfc4287.Entry.t) =
  535. let json = n |> Ap.Note.Delete.to_json ~base in
  536. cdb |> Ap.Followers.(fold_left (State.ibox' (fldbl_notify ~due ~que n.id json))) (Ok n)
  537. end
  538. end