My personal data management layer
at main 432 lines 16 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(** Bushel sync orchestration 7 8 {1 Re-exported Modules} 9 10 - {!Zotero} - DOI resolution via Zotero Translation Server 11 - {!Immich} - Contact face thumbnails from Immich 12 - {!Peertube} - Video thumbnails from PeerTube 13 - {!Http} - Simple HTTP client using curl 14*) 15 16(** DOI resolution via Zotero Translation Server *) 17module Zotero = Bushel_zotero 18 19(** Contact face thumbnails from Immich *) 20module Immich = Bushel_immich 21 22(** Video metadata and thumbnails from PeerTube *) 23module Peertube = Bushel_peertube 24 25(** HTTP client using the requests library *) 26module Http = Bushel_http 27 28let src = Logs.Src.create "bushel.sync" ~doc:"Bushel sync pipeline" 29module Log = (val Logs.src_log src : Logs.LOG) 30 31(** {1 Sync Steps} *) 32 33type step = 34 | Images (** Rsync images from remote *) 35 | Srcsetter (** Run srcsetter on images *) 36 | Thumbs (** Generate paper thumbnails from PDFs *) 37 | Faces (** Fetch contact faces from Immich *) 38 | Videos (** Fetch video thumbnails from PeerTube *) 39 | Typesense (** Upload to Typesense *) 40 41let string_of_step = function 42 | Images -> "images" 43 | Srcsetter -> "srcsetter" 44 | Thumbs -> "thumbs" 45 | Faces -> "faces" 46 | Videos -> "videos" 47 | Typesense -> "typesense" 48 49let step_of_string = function 50 | "images" -> Some Images 51 | "srcsetter" -> Some Srcsetter 52 | "thumbs" -> Some Thumbs 53 | "faces" -> Some Faces 54 | "videos" -> Some Videos 55 | "typesense" -> Some Typesense 56 | _ -> None 57 58let all_steps = [Images; Thumbs; Faces; Videos; Srcsetter] 59let all_steps_with_remote = all_steps @ [Typesense] 60 61(** {1 Step Results} *) 62 63type step_result = { 64 step : step; 65 success : bool; 66 message : string; 67 details : string list; 68} 69 70let pp_result ppf r = 71 let status = if r.success then "OK" else "FAILED" in 72 Fmt.pf ppf "[%s] %s: %s" status (string_of_step r.step) r.message; 73 if r.details <> [] then begin 74 Fmt.pf ppf "@,"; 75 List.iter (fun d -> Fmt.pf ppf " - %s@," d) r.details 76 end 77 78(** {1 Rsync Images} *) 79 80let sync_images ~dry_run ~fs ~proc_mgr config = 81 Log.info (fun m -> m "Syncing images from remote..."); 82 let local_dir = config.Bushel_config.local_source_dir in 83 let args = ["rsync"; "-avz"; 84 Bushel_config.rsync_source config ^ "/"; 85 local_dir ^ "/"] in 86 let cmd = String.concat " " args in 87 88 if dry_run then begin 89 { step = Images; success = true; 90 message = "Would run rsync"; 91 details = [cmd] } 92 end else begin 93 Log.debug (fun m -> m "Running: %s" cmd); 94 95 (* Ensure local directory exists (recursive) *) 96 let local_path = Eio.Path.(fs / local_dir) in 97 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 local_path; 98 99 try 100 Eio.Process.run proc_mgr args; 101 { step = Images; success = true; 102 message = "Images synced from remote"; 103 details = [] } 104 with e -> 105 { step = Images; success = false; 106 message = Printf.sprintf "Rsync failed: %s" (Printexc.to_string e); 107 details = [] } 108 end 109 110(** {1 Srcsetter} *) 111 112let run_srcsetter ~dry_run ~fs ~proc_mgr config = 113 Log.info (fun m -> m "Running srcsetter..."); 114 let src_dir = config.Bushel_config.local_source_dir in 115 let dst_dir = config.Bushel_config.local_output_dir in 116 117 if dry_run then begin 118 { step = Srcsetter; success = true; 119 message = "Would run srcsetter"; 120 details = [Printf.sprintf "srcsetter %s %s" src_dir dst_dir] } 121 end else begin 122 (* Ensure output directory exists (recursive) *) 123 let src_path = Eio.Path.(fs / src_dir) in 124 let dst_path = Eio.Path.(fs / dst_dir) in 125 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 dst_path; 126 127 try 128 let entries = Srcsetter_cmd.run 129 ~proc_mgr 130 ~src_dir:src_path 131 ~dst_dir:dst_path 132 ~preserve:true 133 () 134 in 135 { step = Srcsetter; success = true; 136 message = Printf.sprintf "Srcsetter completed: %d images processed" 137 (List.length entries); 138 details = [] } 139 with e -> 140 { step = Srcsetter; success = false; 141 message = Printf.sprintf "Srcsetter failed: %s" (Printexc.to_string e); 142 details = [] } 143 end 144 145(** {1 Paper Thumbnails} *) 146 147let generate_paper_thumbnails ~dry_run ~fs ~proc_mgr config = 148 Log.info (fun m -> m "Generating paper thumbnails..."); 149 let pdfs_dir = config.Bushel_config.paper_pdfs_dir in 150 (* Output to local_source_dir/papers/ so srcsetter processes them *) 151 let output_dir = Filename.concat config.Bushel_config.local_source_dir "papers" in 152 153 if not (Sys.file_exists pdfs_dir) then begin 154 Log.warn (fun m -> m "PDFs directory does not exist: %s" pdfs_dir); 155 { step = Thumbs; success = true; 156 message = "No PDFs directory"; 157 details = [] } 158 end else begin 159 let pdfs = Sys.readdir pdfs_dir |> Array.to_list 160 |> List.filter (fun f -> Filename.check_suffix f ".pdf") in 161 162 if dry_run then begin 163 let would_run = List.filter_map (fun pdf_file -> 164 let slug = Filename.chop_extension pdf_file in 165 let pdf_path = Filename.concat pdfs_dir pdf_file in 166 (* Output as PNG - srcsetter will convert to webp *) 167 let output_path = Filename.concat output_dir (slug ^ ".png") in 168 if Sys.file_exists output_path then None 169 else begin 170 let args = [ 171 "magick"; "-density"; "600"; "-quality"; "100"; 172 pdf_path ^ "[0]"; "-gravity"; "North"; 173 "-crop"; "100%x50%+0+0"; "-resize"; "2048x"; output_path 174 ] in 175 Some (String.concat " " args) 176 end 177 ) pdfs in 178 let skipped = List.length pdfs - List.length would_run in 179 { step = Thumbs; success = true; 180 message = Printf.sprintf "Would generate %d thumbnails (%d already exist)" 181 (List.length would_run) skipped; 182 details = would_run } 183 end else begin 184 (* Ensure output directory exists (recursive) *) 185 let output_path = Eio.Path.(fs / output_dir) in 186 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 output_path; 187 188 let results = List.map (fun pdf_file -> 189 let slug = Filename.chop_extension pdf_file in 190 let pdf_path = Filename.concat pdfs_dir pdf_file in 191 (* Output as PNG - srcsetter will convert to webp *) 192 let output_path = Filename.concat output_dir (slug ^ ".png") in 193 194 if Sys.file_exists output_path then begin 195 Log.debug (fun m -> m "Skipping %s: thumbnail exists" slug); 196 `Skipped slug 197 end else begin 198 Log.info (fun m -> m "Generating thumbnail for %s" slug); 199 try 200 (* ImageMagick command: render PDF at 600 DPI, crop top 50%, resize to 2048px *) 201 let args = [ 202 "magick"; 203 "-density"; "600"; 204 "-quality"; "100"; 205 pdf_path ^ "[0]"; (* First page only *) 206 "-gravity"; "North"; 207 "-crop"; "100%x50%+0+0"; 208 "-resize"; "2048x"; 209 output_path 210 ] in 211 Eio.Process.run proc_mgr args; 212 `Ok slug 213 with e -> 214 Log.err (fun m -> m "Failed to generate thumbnail for %s: %s" 215 slug (Printexc.to_string e)); 216 `Error slug 217 end 218 ) pdfs in 219 220 let ok_count = List.fold_left (fun acc r -> match r with `Ok _ -> acc + 1 | _ -> acc) 0 results in 221 let skipped_count = List.fold_left (fun acc r -> match r with `Skipped _ -> acc + 1 | _ -> acc) 0 results in 222 let error_count = List.fold_left (fun acc r -> match r with `Error _ -> acc + 1 | _ -> acc) 0 results in 223 224 { step = Thumbs; success = error_count = 0; 225 message = Printf.sprintf "%d generated, %d skipped, %d errors" 226 ok_count skipped_count error_count; 227 details = List.filter_map (fun r -> match r with `Error s -> Some s | _ -> None) results } 228 end 229 end 230 231(** {1 Contact Faces} *) 232 233let sync_faces ~dry_run ~fs config entries = 234 Log.info (fun m -> m "Syncing contact faces from Sortal..."); 235 (* Output to local_source_dir/faces/ so srcsetter processes them *) 236 let output_dir = Filename.concat config.Bushel_config.local_source_dir "faces" in 237 let contacts = Bushel.Entry.contacts entries in 238 239 (* Load sortal store to get thumbnail paths *) 240 let sortal_store = Sortal.Store.create fs "sortal" in 241 242 (* Find contacts with PNG thumbnails that need copying *) 243 let contacts_with_thumbs = List.filter_map (fun c -> 244 match Sortal.Store.png_thumbnail_path sortal_store c with 245 | Some path -> Some (c, path) 246 | None -> None 247 ) contacts in 248 249 if dry_run then begin 250 let would_copy = List.filter (fun (c, _src_path) -> 251 let handle = Sortal_schema.Contact.handle c in 252 let output_path = Filename.concat output_dir (handle ^ ".png") in 253 not (Sys.file_exists output_path) 254 ) contacts_with_thumbs in 255 let skipped = List.length contacts_with_thumbs - List.length would_copy in 256 let no_thumb = List.length contacts - List.length contacts_with_thumbs in 257 { step = Faces; success = true; 258 message = Printf.sprintf "Would copy %d faces from Sortal (%d already exist, %d without thumbnails)" 259 (List.length would_copy) skipped no_thumb; 260 details = List.map (fun (c, src_path) -> 261 let handle = Sortal_schema.Contact.handle c in 262 Printf.sprintf "cp %s %s/%s.png" (Eio.Path.native_exn src_path) output_dir handle 263 ) (List.filteri (fun i _ -> i < 5) would_copy) @ 264 (if List.length would_copy > 5 then ["...and more"] else []) } 265 end else begin 266 (* Ensure output directory exists *) 267 let output_path = Eio.Path.(fs / output_dir) in 268 Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 output_path; 269 270 let results = List.map (fun (c, src_path) -> 271 let handle = Sortal_schema.Contact.handle c in 272 let dst_path = Filename.concat output_dir (handle ^ ".png") in 273 274 if Sys.file_exists dst_path then begin 275 Log.debug (fun m -> m "Skipping %s: already exists" handle); 276 (handle, `Skipped) 277 end else begin 278 Log.info (fun m -> m "Copying face for %s" handle); 279 try 280 let content = Eio.Path.load src_path in 281 let oc = open_out_bin dst_path in 282 output_string oc content; 283 close_out oc; 284 (handle, `Ok) 285 with e -> 286 Log.err (fun m -> m "Failed to copy face for %s: %s" handle (Printexc.to_string e)); 287 (handle, `Error (Printexc.to_string e)) 288 end 289 ) contacts_with_thumbs in 290 291 let ok_count = List.length (List.filter (fun (_, r) -> r = `Ok) results) in 292 let skipped_count = List.length (List.filter (fun (_, r) -> r = `Skipped) results) in 293 let error_count = List.length (List.filter (fun (_, r) -> match r with `Error _ -> true | _ -> false) results) in 294 let no_thumb = List.length contacts - List.length contacts_with_thumbs in 295 296 { step = Faces; success = error_count = 0; 297 message = Printf.sprintf "%d copied, %d skipped, %d errors, %d without thumbnails" 298 ok_count skipped_count error_count no_thumb; 299 details = List.filter_map (fun (h, r) -> 300 match r with `Error e -> Some (h ^ ": " ^ e) | _ -> None 301 ) results } 302 end 303 304(** {1 Video Thumbnails} *) 305 306let sync_video_thumbnails ~dry_run ~http config entries = 307 Log.info (fun m -> m "Syncing video thumbnails from PeerTube..."); 308 let output_dir = Bushel_config.video_thumbs_dir config in 309 let videos_yml = Filename.concat config.data_dir "videos.yml" in 310 311 let index = Bushel_peertube.VideoIndex.load_file videos_yml in 312 let videos = Bushel.Entry.videos entries in 313 let count = List.length videos in 314 315 if count = 0 then begin 316 Log.info (fun m -> m "No videos found"); 317 { step = Videos; success = true; 318 message = "No videos found"; 319 details = [] } 320 end else if dry_run then begin 321 let would_fetch = List.filter (fun video -> 322 let uuid = Bushel.Video.uuid video in 323 let output_path = Filename.concat output_dir (uuid ^ ".jpg") in 324 not (Sys.file_exists output_path) 325 ) videos in 326 let skipped = count - List.length would_fetch in 327 { step = Videos; success = true; 328 message = Printf.sprintf "Would fetch %d video thumbnails from PeerTube (%d already exist)" 329 (List.length would_fetch) skipped; 330 details = List.map (fun video -> 331 let uuid = Bushel.Video.uuid video in 332 let url = Bushel.Video.url video in 333 if url <> "" then 334 Printf.sprintf "%s (from URL: %s)" uuid url 335 else 336 Printf.sprintf "%s (will search servers)" uuid 337 ) (List.filteri (fun i _ -> i < 5) would_fetch) @ 338 (if List.length would_fetch > 5 then ["...and more"] else []) } 339 end else begin 340 let results = Bushel_peertube.fetch_thumbnails 341 ~http 342 ~servers:config.peertube_servers 343 ~output_dir 344 ~videos 345 ~index in 346 347 (* Save updated index (may have discovered new server mappings) *) 348 Bushel_peertube.VideoIndex.save_file videos_yml index; 349 350 let ok_count = List.length (List.filter (fun (_, r) -> 351 match r with Bushel_peertube.Ok _ -> true | _ -> false) results) in 352 let skipped_count = List.length (List.filter (fun (_, r) -> 353 match r with Bushel_peertube.Skipped _ -> true | _ -> false) results) in 354 let error_count = List.length (List.filter (fun (_, r) -> 355 match r with Bushel_peertube.Error _ -> true | _ -> false) results) in 356 357 { step = Videos; success = true; 358 message = Printf.sprintf "%d fetched, %d skipped, %d errors" 359 ok_count skipped_count error_count; 360 details = List.filter_map (fun (uuid, r) -> 361 match r with Bushel_peertube.Error e -> Some (uuid ^ ": " ^ e) | _ -> None 362 ) results } 363 end 364 365(** {1 Typesense Upload} *) 366 367let upload_typesense ~dry_run ~sw ~env config entries = 368 Log.info (fun m -> m "%s Typesense..." (if dry_run then "Checking" else "Syncing")); 369 370 match Bushel_config.typesense_api_key config with 371 | Error e -> 372 { step = Typesense; success = false; 373 message = "Missing Typesense API key"; 374 details = [e] } 375 | Ok api_key -> 376 try 377 (* Create Typesense client *) 378 let client = Typesense_auth.Client.login ~sw ~env 379 ~server_url:config.typesense_endpoint 380 ~api_key 381 () in 382 383 (* Run incremental sync *) 384 let result = Bushel_typesense.sync ~dry_run ~client ~entries in 385 386 (* Format details from each collection *) 387 let details = List.concat_map (fun (r : Bushel_typesense.collection_sync_result) -> 388 let stats = r.stats in 389 let summary = Printf.sprintf "%s: %d created, %d updated, %d deleted" 390 r.collection stats.created stats.updated stats.deleted in 391 summary :: r.details 392 ) result.collections in 393 394 { step = Typesense; success = result.total_errors = 0; 395 message = Printf.sprintf "%s: %d created, %d updated, %d deleted, %d errors" 396 (if dry_run then "Would sync" else "Synced") 397 result.total_created result.total_updated result.total_deleted result.total_errors; 398 details } 399 with e -> 400 { step = Typesense; success = false; 401 message = Printf.sprintf "Typesense sync failed: %s" (Printexc.to_string e); 402 details = [] } 403 404(** {1 Run Pipeline} *) 405 406let run ~dry_run ~sw ~env ~config ~steps ~entries = 407 let proc_mgr = Eio.Stdenv.process_mgr env in 408 let fs = Eio.Stdenv.fs env in 409 (* Create HTTP session for network requests *) 410 let http = Bushel_http.create ~sw env in 411 412 let results = List.map (fun step -> 413 Log.info (fun m -> m "%s step: %s" 414 (if dry_run then "Dry-run" else "Running") 415 (string_of_step step)); 416 match step with 417 | Images -> sync_images ~dry_run ~fs ~proc_mgr config 418 | Srcsetter -> run_srcsetter ~dry_run ~fs ~proc_mgr config 419 | Thumbs -> generate_paper_thumbnails ~dry_run ~fs ~proc_mgr config 420 | Faces -> sync_faces ~dry_run ~fs config entries 421 | Videos -> sync_video_thumbnails ~dry_run ~http config entries 422 | Typesense -> upload_typesense ~dry_run ~sw ~env config entries 423 ) steps in 424 425 (* Summary *) 426 let success_count = List.length (List.filter (fun r -> r.success) results) in 427 let total = List.length results in 428 Log.info (fun m -> m "%s complete: %d/%d steps succeeded" 429 (if dry_run then "Dry-run" else "Sync") 430 success_count total); 431 432 results