My personal data management layer
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(** Bushel sync orchestration
7
8 {1 Re-exported Modules}
9
10 - {!Zotero} - DOI resolution via Zotero Translation Server
11 - {!Immich} - Contact face thumbnails from Immich
12 - {!Peertube} - Video thumbnails from PeerTube
13 - {!Http} - Simple HTTP client using curl
14*)
15
16(** DOI resolution via Zotero Translation Server *)
17module Zotero = Bushel_zotero
18
19(** Contact face thumbnails from Immich *)
20module Immich = Bushel_immich
21
22(** Video metadata and thumbnails from PeerTube *)
23module Peertube = Bushel_peertube
24
25(** HTTP client using the requests library *)
26module Http = Bushel_http
27
28let src = Logs.Src.create "bushel.sync" ~doc:"Bushel sync pipeline"
29module Log = (val Logs.src_log src : Logs.LOG)
30
31(** {1 Sync Steps} *)
32
33type step =
34 | Images (** Rsync images from remote *)
35 | Srcsetter (** Run srcsetter on images *)
36 | Thumbs (** Generate paper thumbnails from PDFs *)
37 | Faces (** Fetch contact faces from Immich *)
38 | Videos (** Fetch video thumbnails from PeerTube *)
39 | Typesense (** Upload to Typesense *)
40
41let string_of_step = function
42 | Images -> "images"
43 | Srcsetter -> "srcsetter"
44 | Thumbs -> "thumbs"
45 | Faces -> "faces"
46 | Videos -> "videos"
47 | Typesense -> "typesense"
48
49let step_of_string = function
50 | "images" -> Some Images
51 | "srcsetter" -> Some Srcsetter
52 | "thumbs" -> Some Thumbs
53 | "faces" -> Some Faces
54 | "videos" -> Some Videos
55 | "typesense" -> Some Typesense
56 | _ -> None
57
58let all_steps = [Images; Thumbs; Faces; Videos; Srcsetter]
59let all_steps_with_remote = all_steps @ [Typesense]
60
61(** {1 Step Results} *)
62
63type step_result = {
64 step : step;
65 success : bool;
66 message : string;
67 details : string list;
68}
69
70let pp_result ppf r =
71 let status = if r.success then "OK" else "FAILED" in
72 Fmt.pf ppf "[%s] %s: %s" status (string_of_step r.step) r.message;
73 if r.details <> [] then begin
74 Fmt.pf ppf "@,";
75 List.iter (fun d -> Fmt.pf ppf " - %s@," d) r.details
76 end
77
78(** {1 Rsync Images} *)
79
80let sync_images ~dry_run ~fs ~proc_mgr config =
81 Log.info (fun m -> m "Syncing images from remote...");
82 let local_dir = config.Bushel_config.local_source_dir in
83 let args = ["rsync"; "-avz";
84 Bushel_config.rsync_source config ^ "/";
85 local_dir ^ "/"] in
86 let cmd = String.concat " " args in
87
88 if dry_run then begin
89 { step = Images; success = true;
90 message = "Would run rsync";
91 details = [cmd] }
92 end else begin
93 Log.debug (fun m -> m "Running: %s" cmd);
94
95 (* Ensure local directory exists (recursive) *)
96 let local_path = Eio.Path.(fs / local_dir) in
97 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 local_path;
98
99 try
100 Eio.Process.run proc_mgr args;
101 { step = Images; success = true;
102 message = "Images synced from remote";
103 details = [] }
104 with e ->
105 { step = Images; success = false;
106 message = Printf.sprintf "Rsync failed: %s" (Printexc.to_string e);
107 details = [] }
108 end
109
110(** {1 Srcsetter} *)
111
112let run_srcsetter ~dry_run ~fs ~proc_mgr config =
113 Log.info (fun m -> m "Running srcsetter...");
114 let src_dir = config.Bushel_config.local_source_dir in
115 let dst_dir = config.Bushel_config.local_output_dir in
116
117 if dry_run then begin
118 { step = Srcsetter; success = true;
119 message = "Would run srcsetter";
120 details = [Printf.sprintf "srcsetter %s %s" src_dir dst_dir] }
121 end else begin
122 (* Ensure output directory exists (recursive) *)
123 let src_path = Eio.Path.(fs / src_dir) in
124 let dst_path = Eio.Path.(fs / dst_dir) in
125 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 dst_path;
126
127 try
128 let entries = Srcsetter_cmd.run
129 ~proc_mgr
130 ~src_dir:src_path
131 ~dst_dir:dst_path
132 ~preserve:true
133 ()
134 in
135 { step = Srcsetter; success = true;
136 message = Printf.sprintf "Srcsetter completed: %d images processed"
137 (List.length entries);
138 details = [] }
139 with e ->
140 { step = Srcsetter; success = false;
141 message = Printf.sprintf "Srcsetter failed: %s" (Printexc.to_string e);
142 details = [] }
143 end
144
145(** {1 Paper Thumbnails} *)
146
147let generate_paper_thumbnails ~dry_run ~fs ~proc_mgr config =
148 Log.info (fun m -> m "Generating paper thumbnails...");
149 let pdfs_dir = config.Bushel_config.paper_pdfs_dir in
150 (* Output to local_source_dir/papers/ so srcsetter processes them *)
151 let output_dir = Filename.concat config.Bushel_config.local_source_dir "papers" in
152
153 if not (Sys.file_exists pdfs_dir) then begin
154 Log.warn (fun m -> m "PDFs directory does not exist: %s" pdfs_dir);
155 { step = Thumbs; success = true;
156 message = "No PDFs directory";
157 details = [] }
158 end else begin
159 let pdfs = Sys.readdir pdfs_dir |> Array.to_list
160 |> List.filter (fun f -> Filename.check_suffix f ".pdf") in
161
162 if dry_run then begin
163 let would_run = List.filter_map (fun pdf_file ->
164 let slug = Filename.chop_extension pdf_file in
165 let pdf_path = Filename.concat pdfs_dir pdf_file in
166 (* Output as PNG - srcsetter will convert to webp *)
167 let output_path = Filename.concat output_dir (slug ^ ".png") in
168 if Sys.file_exists output_path then None
169 else begin
170 let args = [
171 "magick"; "-density"; "600"; "-quality"; "100";
172 pdf_path ^ "[0]"; "-gravity"; "North";
173 "-crop"; "100%x50%+0+0"; "-resize"; "2048x"; output_path
174 ] in
175 Some (String.concat " " args)
176 end
177 ) pdfs in
178 let skipped = List.length pdfs - List.length would_run in
179 { step = Thumbs; success = true;
180 message = Printf.sprintf "Would generate %d thumbnails (%d already exist)"
181 (List.length would_run) skipped;
182 details = would_run }
183 end else begin
184 (* Ensure output directory exists (recursive) *)
185 let output_path = Eio.Path.(fs / output_dir) in
186 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 output_path;
187
188 let results = List.map (fun pdf_file ->
189 let slug = Filename.chop_extension pdf_file in
190 let pdf_path = Filename.concat pdfs_dir pdf_file in
191 (* Output as PNG - srcsetter will convert to webp *)
192 let output_path = Filename.concat output_dir (slug ^ ".png") in
193
194 if Sys.file_exists output_path then begin
195 Log.debug (fun m -> m "Skipping %s: thumbnail exists" slug);
196 `Skipped slug
197 end else begin
198 Log.info (fun m -> m "Generating thumbnail for %s" slug);
199 try
200 (* ImageMagick command: render PDF at 600 DPI, crop top 50%, resize to 2048px *)
201 let args = [
202 "magick";
203 "-density"; "600";
204 "-quality"; "100";
205 pdf_path ^ "[0]"; (* First page only *)
206 "-gravity"; "North";
207 "-crop"; "100%x50%+0+0";
208 "-resize"; "2048x";
209 output_path
210 ] in
211 Eio.Process.run proc_mgr args;
212 `Ok slug
213 with e ->
214 Log.err (fun m -> m "Failed to generate thumbnail for %s: %s"
215 slug (Printexc.to_string e));
216 `Error slug
217 end
218 ) pdfs in
219
220 let ok_count = List.fold_left (fun acc r -> match r with `Ok _ -> acc + 1 | _ -> acc) 0 results in
221 let skipped_count = List.fold_left (fun acc r -> match r with `Skipped _ -> acc + 1 | _ -> acc) 0 results in
222 let error_count = List.fold_left (fun acc r -> match r with `Error _ -> acc + 1 | _ -> acc) 0 results in
223
224 { step = Thumbs; success = error_count = 0;
225 message = Printf.sprintf "%d generated, %d skipped, %d errors"
226 ok_count skipped_count error_count;
227 details = List.filter_map (fun r -> match r with `Error s -> Some s | _ -> None) results }
228 end
229 end
230
231(** {1 Contact Faces} *)
232
233let sync_faces ~dry_run ~fs config entries =
234 Log.info (fun m -> m "Syncing contact faces from Sortal...");
235 (* Output to local_source_dir/faces/ so srcsetter processes them *)
236 let output_dir = Filename.concat config.Bushel_config.local_source_dir "faces" in
237 let contacts = Bushel.Entry.contacts entries in
238
239 (* Load sortal store to get thumbnail paths *)
240 let sortal_store = Sortal.Store.create fs "sortal" in
241
242 (* Find contacts with PNG thumbnails that need copying *)
243 let contacts_with_thumbs = List.filter_map (fun c ->
244 match Sortal.Store.png_thumbnail_path sortal_store c with
245 | Some path -> Some (c, path)
246 | None -> None
247 ) contacts in
248
249 if dry_run then begin
250 let would_copy = List.filter (fun (c, _src_path) ->
251 let handle = Sortal_schema.Contact.handle c in
252 let output_path = Filename.concat output_dir (handle ^ ".png") in
253 not (Sys.file_exists output_path)
254 ) contacts_with_thumbs in
255 let skipped = List.length contacts_with_thumbs - List.length would_copy in
256 let no_thumb = List.length contacts - List.length contacts_with_thumbs in
257 { step = Faces; success = true;
258 message = Printf.sprintf "Would copy %d faces from Sortal (%d already exist, %d without thumbnails)"
259 (List.length would_copy) skipped no_thumb;
260 details = List.map (fun (c, src_path) ->
261 let handle = Sortal_schema.Contact.handle c in
262 Printf.sprintf "cp %s %s/%s.png" (Eio.Path.native_exn src_path) output_dir handle
263 ) (List.filteri (fun i _ -> i < 5) would_copy) @
264 (if List.length would_copy > 5 then ["...and more"] else []) }
265 end else begin
266 (* Ensure output directory exists *)
267 let output_path = Eio.Path.(fs / output_dir) in
268 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 output_path;
269
270 let results = List.map (fun (c, src_path) ->
271 let handle = Sortal_schema.Contact.handle c in
272 let dst_path = Filename.concat output_dir (handle ^ ".png") in
273
274 if Sys.file_exists dst_path then begin
275 Log.debug (fun m -> m "Skipping %s: already exists" handle);
276 (handle, `Skipped)
277 end else begin
278 Log.info (fun m -> m "Copying face for %s" handle);
279 try
280 let content = Eio.Path.load src_path in
281 let oc = open_out_bin dst_path in
282 output_string oc content;
283 close_out oc;
284 (handle, `Ok)
285 with e ->
286 Log.err (fun m -> m "Failed to copy face for %s: %s" handle (Printexc.to_string e));
287 (handle, `Error (Printexc.to_string e))
288 end
289 ) contacts_with_thumbs in
290
291 let ok_count = List.length (List.filter (fun (_, r) -> r = `Ok) results) in
292 let skipped_count = List.length (List.filter (fun (_, r) -> r = `Skipped) results) in
293 let error_count = List.length (List.filter (fun (_, r) -> match r with `Error _ -> true | _ -> false) results) in
294 let no_thumb = List.length contacts - List.length contacts_with_thumbs in
295
296 { step = Faces; success = error_count = 0;
297 message = Printf.sprintf "%d copied, %d skipped, %d errors, %d without thumbnails"
298 ok_count skipped_count error_count no_thumb;
299 details = List.filter_map (fun (h, r) ->
300 match r with `Error e -> Some (h ^ ": " ^ e) | _ -> None
301 ) results }
302 end
303
304(** {1 Video Thumbnails} *)
305
306let sync_video_thumbnails ~dry_run ~http config entries =
307 Log.info (fun m -> m "Syncing video thumbnails from PeerTube...");
308 let output_dir = Bushel_config.video_thumbs_dir config in
309 let videos_yml = Filename.concat config.data_dir "videos.yml" in
310
311 let index = Bushel_peertube.VideoIndex.load_file videos_yml in
312 let videos = Bushel.Entry.videos entries in
313 let count = List.length videos in
314
315 if count = 0 then begin
316 Log.info (fun m -> m "No videos found");
317 { step = Videos; success = true;
318 message = "No videos found";
319 details = [] }
320 end else if dry_run then begin
321 let would_fetch = List.filter (fun video ->
322 let uuid = Bushel.Video.uuid video in
323 let output_path = Filename.concat output_dir (uuid ^ ".jpg") in
324 not (Sys.file_exists output_path)
325 ) videos in
326 let skipped = count - List.length would_fetch in
327 { step = Videos; success = true;
328 message = Printf.sprintf "Would fetch %d video thumbnails from PeerTube (%d already exist)"
329 (List.length would_fetch) skipped;
330 details = List.map (fun video ->
331 let uuid = Bushel.Video.uuid video in
332 let url = Bushel.Video.url video in
333 if url <> "" then
334 Printf.sprintf "%s (from URL: %s)" uuid url
335 else
336 Printf.sprintf "%s (will search servers)" uuid
337 ) (List.filteri (fun i _ -> i < 5) would_fetch) @
338 (if List.length would_fetch > 5 then ["...and more"] else []) }
339 end else begin
340 let results = Bushel_peertube.fetch_thumbnails
341 ~http
342 ~servers:config.peertube_servers
343 ~output_dir
344 ~videos
345 ~index in
346
347 (* Save updated index (may have discovered new server mappings) *)
348 Bushel_peertube.VideoIndex.save_file videos_yml index;
349
350 let ok_count = List.length (List.filter (fun (_, r) ->
351 match r with Bushel_peertube.Ok _ -> true | _ -> false) results) in
352 let skipped_count = List.length (List.filter (fun (_, r) ->
353 match r with Bushel_peertube.Skipped _ -> true | _ -> false) results) in
354 let error_count = List.length (List.filter (fun (_, r) ->
355 match r with Bushel_peertube.Error _ -> true | _ -> false) results) in
356
357 { step = Videos; success = true;
358 message = Printf.sprintf "%d fetched, %d skipped, %d errors"
359 ok_count skipped_count error_count;
360 details = List.filter_map (fun (uuid, r) ->
361 match r with Bushel_peertube.Error e -> Some (uuid ^ ": " ^ e) | _ -> None
362 ) results }
363 end
364
365(** {1 Typesense Upload} *)
366
367let upload_typesense ~dry_run ~sw ~env config entries =
368 Log.info (fun m -> m "%s Typesense..." (if dry_run then "Checking" else "Syncing"));
369
370 match Bushel_config.typesense_api_key config with
371 | Error e ->
372 { step = Typesense; success = false;
373 message = "Missing Typesense API key";
374 details = [e] }
375 | Ok api_key ->
376 try
377 (* Create Typesense client *)
378 let client = Typesense_auth.Client.login ~sw ~env
379 ~server_url:config.typesense_endpoint
380 ~api_key
381 () in
382
383 (* Run incremental sync *)
384 let result = Bushel_typesense.sync ~dry_run ~client ~entries in
385
386 (* Format details from each collection *)
387 let details = List.concat_map (fun (r : Bushel_typesense.collection_sync_result) ->
388 let stats = r.stats in
389 let summary = Printf.sprintf "%s: %d created, %d updated, %d deleted"
390 r.collection stats.created stats.updated stats.deleted in
391 summary :: r.details
392 ) result.collections in
393
394 { step = Typesense; success = result.total_errors = 0;
395 message = Printf.sprintf "%s: %d created, %d updated, %d deleted, %d errors"
396 (if dry_run then "Would sync" else "Synced")
397 result.total_created result.total_updated result.total_deleted result.total_errors;
398 details }
399 with e ->
400 { step = Typesense; success = false;
401 message = Printf.sprintf "Typesense sync failed: %s" (Printexc.to_string e);
402 details = [] }
403
404(** {1 Run Pipeline} *)
405
406let run ~dry_run ~sw ~env ~config ~steps ~entries =
407 let proc_mgr = Eio.Stdenv.process_mgr env in
408 let fs = Eio.Stdenv.fs env in
409 (* Create HTTP session for network requests *)
410 let http = Bushel_http.create ~sw env in
411
412 let results = List.map (fun step ->
413 Log.info (fun m -> m "%s step: %s"
414 (if dry_run then "Dry-run" else "Running")
415 (string_of_step step));
416 match step with
417 | Images -> sync_images ~dry_run ~fs ~proc_mgr config
418 | Srcsetter -> run_srcsetter ~dry_run ~fs ~proc_mgr config
419 | Thumbs -> generate_paper_thumbnails ~dry_run ~fs ~proc_mgr config
420 | Faces -> sync_faces ~dry_run ~fs config entries
421 | Videos -> sync_video_thumbnails ~dry_run ~http config entries
422 | Typesense -> upload_typesense ~dry_run ~sw ~env config entries
423 ) steps in
424
425 (* Summary *)
426 let success_count = List.length (List.filter (fun r -> r.success) results) in
427 let total = List.length results in
428 Log.info (fun m -> m "%s complete: %d/%d steps succeeded"
429 (if dry_run then "Dry-run" else "Sync")
430 success_count total);
431
432 results