My personal data management layer

Replace curl with requests library and fix sync pipeline order

- Replace curl-based HTTP client with ocaml-requests library
- Proper HTTP status code handling (was silently failing on errors)
- Connection pooling and TLS support via Eio
- Change ~proc_mgr parameter to ~http across all sync modules

- Fix PeerTube JSON parsing for nullable fields
- Add nullable_string and nullable_ptime Jsont combinators
- Handle null values in description, originallyPublishedAt, thumbnailPath

- Reorder sync pipeline: images -> thumbs -> faces -> videos -> srcsetter
- Srcsetter now runs last to process all fetched images
- Thumbnails now go to local_source_dir for srcsetter processing

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+409 -201
+134 -89
bin/main.ml
··· 332 332 in 333 333 334 334 Eio_main.run @@ fun env -> 335 + Eio.Switch.run @@ fun sw -> 335 336 let fs = Eio.Stdenv.fs env in 336 337 let entries = Bushel_eio.Bushel_loader.load fs data_dir in 337 338 ··· 341 342 ) steps; 342 343 Printf.printf "\n"; 343 344 344 - let results = Bushel_sync.run ~dry_run ~env ~config ~steps ~entries in 345 + let results = Bushel_sync.run ~dry_run ~sw ~env ~config ~steps ~entries in 345 346 346 347 Printf.printf "\nResults:\n"; 347 348 List.iter (fun r -> ··· 364 365 `S Manpage.s_description; 365 366 `P "The sync command runs a pipeline to synchronize images and thumbnails:"; 366 367 `P "1. $(b,images) - Rsync images from remote server"; 367 - `P "2. $(b,srcsetter) - Convert images to WebP srcset variants"; 368 - `P "3. $(b,thumbs) - Generate paper thumbnails from PDFs"; 369 - `P "4. $(b,faces) - Fetch contact face thumbnails from Immich"; 370 - `P "5. $(b,videos) - Fetch video thumbnails from PeerTube"; 368 + `P "2. $(b,thumbs) - Generate paper thumbnails from PDFs"; 369 + `P "3. $(b,faces) - Fetch contact face thumbnails from Sortal"; 370 + `P "4. $(b,videos) - Fetch video thumbnails from PeerTube"; 371 + `P "5. $(b,srcsetter) - Convert all images to WebP srcset variants"; 371 372 `P "6. $(b,typesense) - Upload to Typesense (with --remote)"; 372 373 `P "Use $(b,--dry-run) to see what commands would be run without executing them."; 373 374 ] in ··· 396 397 let data_dir = get_data_dir config data_dir in 397 398 398 399 Eio_main.run @@ fun env -> 400 + Eio.Switch.run @@ fun sw -> 399 401 let fs = Eio.Stdenv.fs env in 400 - let proc_mgr = Eio.Stdenv.process_mgr env in 402 + let http = Bushel_sync.Http.create ~sw env in 401 403 let entries = Bushel_eio.Bushel_loader.load fs data_dir in 402 404 403 405 (* Determine version *) 404 - let papers_dir = Filename.concat data_dir ("data/papers/" ^ slug) in 406 + let papers_dir = Filename.concat data_dir ("papers/" ^ slug) in 405 407 let version = match version with 406 408 | Some v -> v 407 409 | None -> ··· 426 428 Printf.printf "Resolving DOI: %s\n" doi; 427 429 Printf.printf "Slug: %s, Version: %s\n" slug version; 428 430 429 - match Bushel_sync.Zotero.resolve ~proc_mgr 431 + match Bushel_sync.Zotero.resolve ~http 430 432 ~server_url:config.zotero_translation_server 431 433 ~slug doi with 432 434 | Error e -> ··· 475 477 476 478 (** {1 Video Fetch Command} *) 477 479 478 - let video_fetch_cmd = 479 - let server = 480 - let doc = "PeerTube server name from config." in 481 - Arg.(required & opt (some string) None & info ["server"; "s"] ~docv:"NAME" ~doc) 482 - in 483 - let channel = 484 - let doc = "Channel name to fetch videos from." in 485 - Arg.(required & opt (some string) None & info ["channel"] ~docv:"CHANNEL" ~doc) 486 - in 487 - let run () config_file data_dir server channel = 488 - match load_config config_file with 489 - | Error e -> Printf.eprintf "Config error: %s\n" e; 1 490 - | Ok config -> 491 - let data_dir = get_data_dir config data_dir in 492 - 493 - (* Find server endpoint *) 494 - let endpoint = List.find_map (fun (s : Bushel_config.peertube_server) -> 495 - if s.name = server then Some s.endpoint else None 496 - ) config.peertube_servers in 497 - 498 - match endpoint with 499 - | None -> 500 - Printf.eprintf "Unknown server: %s\n" server; 501 - Printf.eprintf "Available servers:\n"; 502 - List.iter (fun (s : Bushel_config.peertube_server) -> 503 - Printf.eprintf " - %s (%s)\n" s.name s.endpoint 504 - ) config.peertube_servers; 505 - 1 506 - | Some endpoint -> 507 - Eio_main.run @@ fun env -> 508 - let proc_mgr = Eio.Stdenv.process_mgr env in 509 - 510 - Printf.printf "Fetching videos from %s channel %s...\n" server channel; 511 - 512 - let videos = Bushel_sync.Peertube.fetch_all_channel_videos 513 - ~proc_mgr ~endpoint ~channel () in 514 - 515 - Printf.printf "Found %d videos\n" (List.length videos); 516 - 517 - (* Load or create videos index *) 518 - let index_path = Filename.concat data_dir "data/videos.yml" in 519 - let index = Bushel_sync.Peertube.VideoIndex.load_file index_path in 520 - 521 - (* Create video files and update index *) 522 - let videos_dir = Filename.concat data_dir "data/videos" in 523 - if not (Sys.file_exists videos_dir) then 524 - Unix.mkdir videos_dir 0o755; 525 - 526 - let new_count = ref 0 in 527 - List.iter (fun (video : Bushel_sync.Peertube.video) -> 528 - let video_path = Filename.concat videos_dir (video.uuid ^ ".md") in 529 - 530 - if Sys.file_exists video_path then 531 - Printf.printf " Skipping %s (exists)\n" video.uuid 532 - else begin 533 - Printf.printf " Creating %s: %s\n" video.uuid video.name; 534 - 535 - (* Generate markdown file *) 536 - let content = Printf.sprintf {|--- 480 + (** Helper to create a video markdown file *) 481 + let create_video_file ~videos_dir ~index ~server (video : Bushel_sync.Peertube.video) = 482 + let video_path = Filename.concat videos_dir (video.uuid ^ ".md") in 483 + if Sys.file_exists video_path then begin 484 + Printf.printf " Skipping %s (exists)\n" video.uuid; 485 + false 486 + end else begin 487 + Printf.printf " Creating %s: %s\n" video.uuid video.name; 488 + let content = Printf.sprintf {|--- 537 489 title: %s 538 490 published_date: %s 539 491 uuid: %s ··· 544 496 545 497 %s 546 498 |} 547 - video.name 548 - (Ptime.to_rfc3339 video.published_at) 549 - video.uuid 550 - video.url 551 - (Option.value ~default:"" video.description) 552 - in 499 + video.name 500 + (Ptime.to_rfc3339 video.published_at) 501 + video.uuid 502 + video.url 503 + (Option.value ~default:"" video.description) 504 + in 505 + let oc = open_out video_path in 506 + output_string oc content; 507 + close_out oc; 508 + Bushel_sync.Peertube.VideoIndex.add index ~uuid:video.uuid ~server; 509 + true 510 + end 553 511 554 - let oc = open_out video_path in 555 - output_string oc content; 556 - close_out oc; 512 + let video_fetch_cmd = 513 + let url_arg = 514 + let doc = "PeerTube video URL to fetch (e.g., https://example.com/w/UUID)." in 515 + Arg.(value & pos 0 (some string) None & info [] ~docv:"URL" ~doc) 516 + in 517 + let server = 518 + let doc = "PeerTube server name from config (for channel mode)." in 519 + Arg.(value & opt (some string) None & info ["server"; "s"] ~docv:"NAME" ~doc) 520 + in 521 + let channel = 522 + let doc = "Channel name to fetch videos from (for channel mode)." in 523 + Arg.(value & opt (some string) None & info ["channel"] ~docv:"CHANNEL" ~doc) 524 + in 525 + let run () config_file data_dir url_arg server channel = 526 + match load_config config_file with 527 + | Error e -> Printf.eprintf "Config error: %s\n" e; 1 528 + | Ok config -> 529 + let data_dir = get_data_dir config data_dir in 530 + let index_path = Filename.concat data_dir "videos.yml" in 531 + let index = Bushel_sync.Peertube.VideoIndex.load_file index_path in 532 + let videos_dir = Filename.concat data_dir "videos" in 533 + if not (Sys.file_exists videos_dir) then 534 + Unix.mkdir videos_dir 0o755; 557 535 558 - (* Update index *) 559 - Bushel_sync.Peertube.VideoIndex.add index ~uuid:video.uuid ~server; 560 - incr new_count 561 - end 562 - ) videos; 536 + match url_arg, server, channel with 537 + (* Single video mode: fetch by URL *) 538 + | Some url, _, _ -> 539 + (match Bushel_sync.Peertube.find_server_for_url config.peertube_servers url with 540 + | None -> 541 + Printf.eprintf "No configured server matches URL: %s\n" url; 542 + Printf.eprintf "Configured servers:\n"; 543 + List.iter (fun (s : Bushel_config.peertube_server) -> 544 + Printf.eprintf " - %s (%s)\n" s.name s.endpoint 545 + ) config.peertube_servers; 546 + 1 547 + | Some matched_server -> 548 + match Bushel_sync.Peertube.uuid_of_url url with 549 + | None -> 550 + Printf.eprintf "Could not extract video UUID from URL: %s\n" url; 551 + 1 552 + | Some uuid -> 553 + Printf.printf "Fetching video %s from %s...\n" uuid matched_server.name; 554 + Eio_main.run @@ fun env -> 555 + Eio.Switch.run @@ fun sw -> 556 + let http = Bushel_sync.Http.create ~sw env in 557 + match Bushel_sync.Peertube.fetch_video_details ~http 558 + ~endpoint:matched_server.endpoint uuid with 559 + | Error e -> 560 + Printf.eprintf "Error fetching video: %s\n" e; 561 + 1 562 + | Ok video -> 563 + let created = create_video_file ~videos_dir ~index 564 + ~server:matched_server.name video in 565 + Bushel_sync.Peertube.VideoIndex.save_file index_path index; 566 + if created then 567 + Printf.printf "\nCreated video entry: %s\n" video.name 568 + else 569 + Printf.printf "\nVideo already exists: %s\n" video.name; 570 + 0) 563 571 564 - (* Save updated index *) 565 - Bushel_sync.Peertube.VideoIndex.save_file index_path index; 572 + (* Channel mode: fetch all videos from channel *) 573 + | None, Some server, Some channel -> 574 + let endpoint = List.find_map (fun (s : Bushel_config.peertube_server) -> 575 + if s.name = server then Some s.endpoint else None 576 + ) config.peertube_servers in 577 + (match endpoint with 578 + | None -> 579 + Printf.eprintf "Unknown server: %s\n" server; 580 + Printf.eprintf "Available servers:\n"; 581 + List.iter (fun (s : Bushel_config.peertube_server) -> 582 + Printf.eprintf " - %s (%s)\n" s.name s.endpoint 583 + ) config.peertube_servers; 584 + 1 585 + | Some endpoint -> 586 + Eio_main.run @@ fun env -> 587 + Eio.Switch.run @@ fun sw -> 588 + let http = Bushel_sync.Http.create ~sw env in 589 + Printf.printf "Fetching videos from %s channel %s...\n" server channel; 590 + let videos = Bushel_sync.Peertube.fetch_all_channel_videos 591 + ~http ~endpoint ~channel () in 592 + Printf.printf "Found %d videos\n" (List.length videos); 593 + let new_count = List.fold_left (fun count video -> 594 + if create_video_file ~videos_dir ~index ~server video 595 + then count + 1 else count 596 + ) 0 videos in 597 + Bushel_sync.Peertube.VideoIndex.save_file index_path index; 598 + Printf.printf "\nCreated %d new video entries\n" new_count; 599 + Printf.printf "Updated index: %s\n" index_path; 600 + 0) 566 601 567 - Printf.printf "\nCreated %d new video entries\n" !new_count; 568 - Printf.printf "Updated index: %s\n" index_path; 569 - 0 602 + (* Missing arguments *) 603 + | None, None, _ | None, _, None -> 604 + Printf.eprintf "Usage: bushel video <URL>\n"; 605 + Printf.eprintf " or: bushel video --server NAME --channel CHANNEL\n"; 606 + 1 570 607 in 571 - let doc = "Fetch videos from a PeerTube channel." in 572 - let info = Cmd.info "video" ~doc in 573 - Cmd.v info Term.(const run $ logging_t $ config_file $ data_dir $ server $ channel) 608 + let doc = "Fetch videos from PeerTube." in 609 + let man = [ 610 + `S Manpage.s_description; 611 + `P "Fetch video metadata from a PeerTube instance."; 612 + `P "Single video mode: $(b,bushel video <URL>)"; 613 + `P " Fetches a single video by URL. The server is auto-detected from config."; 614 + `P "Channel mode: $(b,bushel video --server NAME --channel CHANNEL)"; 615 + `P " Fetches all videos from a channel on the named server."; 616 + ] in 617 + let info = Cmd.info "video" ~doc ~man in 618 + Cmd.v info Term.(const run $ logging_t $ config_file $ data_dir $ url_arg $ server $ channel) 574 619 575 620 (** {1 Images Command} *) 576 621
+3 -3
lib_config/bushel_config.ml
··· 91 91 | None -> path 92 92 else path 93 93 94 - let paper_thumbs_dir t = Filename.concat t.local_output_dir t.paper_thumbs_subdir 95 - let contact_faces_dir t = Filename.concat t.local_output_dir t.contact_faces_subdir 96 - let video_thumbs_dir t = Filename.concat t.local_output_dir t.video_thumbs_subdir 94 + let paper_thumbs_dir t = Filename.concat t.local_source_dir t.paper_thumbs_subdir 95 + let contact_faces_dir t = Filename.concat t.local_source_dir t.contact_faces_subdir 96 + let video_thumbs_dir t = Filename.concat t.local_source_dir t.video_thumbs_subdir 97 97 98 98 (** {1 Tomlt Codecs} *) 99 99
+9 -9
lib_eio/bushel_loader.ml
··· 40 40 41 41 (** Load and map files from a directory *) 42 42 let map_category fs base subdir parse_fn = 43 - let dir = Filename.concat base ("data/" ^ subdir) in 43 + let dir = Filename.concat base subdir in 44 44 Log.debug (fun m -> m "Loading %s" subdir); 45 45 let files = list_md_files fs dir in 46 46 List.filter_map (fun path -> ··· 61 61 let store = Sortal.Store.create fs "sortal" in 62 62 Sortal.Store.list store 63 63 64 - (** Load projects from data/projects/ *) 64 + (** Load projects from projects/ *) 65 65 let load_projects fs base = 66 66 map_category fs base "projects" Bushel.Project.of_frontmatter 67 67 68 - (** Load notes from data/notes/ and data/news/ *) 68 + (** Load notes from notes/ and news/ *) 69 69 let load_notes fs base = 70 70 let notes_dir = map_category fs base "notes" Bushel.Note.of_frontmatter in 71 71 let news_dir = map_category fs base "news" Bushel.Note.of_frontmatter in 72 72 notes_dir @ news_dir 73 73 74 - (** Load ideas from data/ideas/ *) 74 + (** Load ideas from ideas/ *) 75 75 let load_ideas fs base = 76 76 map_category fs base "ideas" Bushel.Idea.of_frontmatter 77 77 78 - (** Load videos from data/videos/ *) 78 + (** Load videos from videos/ *) 79 79 let load_videos fs base = 80 80 map_category fs base "videos" Bushel.Video.of_frontmatter 81 81 82 - (** Load papers from data/papers/ (nested directory structure) *) 82 + (** Load papers from papers/ (nested directory structure) *) 83 83 let load_papers fs base = 84 - let papers_dir = Filename.concat base "data/papers" in 84 + let papers_dir = Filename.concat base "papers" in 85 85 Log.debug (fun m -> m "Loading papers from %s" papers_dir); 86 86 let path = Eio.Path.(fs / papers_dir) in 87 87 let slug_dirs = ··· 140 140 in 141 141 Log.info (fun m -> m "Loaded %d images" (List.length images)); 142 142 let doi_entries = 143 - let doi_path = Filename.concat base "data/doi.yml" in 143 + let doi_path = Filename.concat base "doi.yml" in 144 144 try 145 145 let content = Eio.Path.load Eio.Path.(fs / doi_path) in 146 146 let entries = Bushel.Doi_entry.of_yaml_string content in ··· 151 151 Log.info (fun m -> m "No DOI cache found at %s" doi_path); 152 152 [] 153 153 in 154 - let data_dir = Filename.concat base "data" in 154 + let data_dir = base in 155 155 let entries = Bushel.Entry.v ~papers ~notes ~projects ~ideas ~videos ~contacts ~images ~doi_entries ~data_dir () in 156 156 Log.info (fun m -> m "Building link graph"); 157 157 let graph = build_link_graph entries in
+50 -30
lib_sync/bushel_http.ml
··· 3 3 SPDX-License-Identifier: ISC 4 4 ---------------------------------------------------------------------------*) 5 5 6 - (** Simple HTTP client using curl via Eio.Process *) 6 + (** HTTP client using the requests library *) 7 7 8 8 let src = Logs.Src.create "bushel.http" ~doc:"HTTP client" 9 9 module Log = (val Logs.src_log src : Logs.LOG) 10 10 11 - (** Run curl and capture stdout *) 12 - let get ~proc_mgr url = 11 + type t = Requests.t 12 + 13 + let create ~sw env = 14 + Requests.create ~sw ~follow_redirects:true env 15 + 16 + let get ~http url = 13 17 Log.debug (fun m -> m "GET %s" url); 14 - let stdout = Buffer.create 4096 in 15 18 try 16 - Eio.Process.run proc_mgr 17 - ~stdout:(Eio.Flow.buffer_sink stdout) 18 - ["curl"; "-s"; "-L"; url]; 19 - Ok (Buffer.contents stdout) 20 - with e -> 21 - Error (Printf.sprintf "curl failed: %s" (Printexc.to_string e)) 19 + let response = Requests.get http url in 20 + if Requests.Response.ok response then begin 21 + let body = Requests.Response.body response |> Eio.Flow.read_all in 22 + Ok body 23 + end else begin 24 + let status = Requests.Response.status_code response in 25 + Error (Printf.sprintf "HTTP %d" status) 26 + end 27 + with exn -> 28 + Error (Printf.sprintf "Request failed: %s" (Printexc.to_string exn)) 22 29 23 - let get_with_header ~proc_mgr ~header url = 30 + let get_with_header ~http ~header url = 24 31 Log.debug (fun m -> m "GET %s (with header)" url); 25 - let stdout = Buffer.create 4096 in 26 32 try 27 - Eio.Process.run proc_mgr 28 - ~stdout:(Eio.Flow.buffer_sink stdout) 29 - ["curl"; "-s"; "-L"; "-H"; header; url]; 30 - Ok (Buffer.contents stdout) 31 - with e -> 32 - Error (Printf.sprintf "curl failed: %s" (Printexc.to_string e)) 33 + (* Parse header "Name: Value" format *) 34 + let name, value = match String.index_opt header ':' with 35 + | Some i -> 36 + let name = String.sub header 0 i in 37 + let value = String.trim (String.sub header (i + 1) (String.length header - i - 1)) in 38 + (name, value) 39 + | None -> (header, "") 40 + in 41 + let headers = Requests.Headers.empty |> Requests.Headers.add_string name value in 42 + let response = Requests.get http ~headers url in 43 + if Requests.Response.ok response then begin 44 + let body = Requests.Response.body response |> Eio.Flow.read_all in 45 + Ok body 46 + end else begin 47 + let status = Requests.Response.status_code response in 48 + Error (Printf.sprintf "HTTP %d" status) 49 + end 50 + with exn -> 51 + Error (Printf.sprintf "Request failed: %s" (Printexc.to_string exn)) 33 52 34 - let post ~proc_mgr ~content_type ~body url = 53 + let post ~http ~content_type ~body url = 35 54 Log.debug (fun m -> m "POST %s" url); 36 - let stdout = Buffer.create 4096 in 37 55 try 38 - Eio.Process.run proc_mgr 39 - ~stdout:(Eio.Flow.buffer_sink stdout) 40 - ["curl"; "-s"; "-L"; 41 - "-X"; "POST"; 42 - "-H"; "Content-Type: " ^ content_type; 43 - "-d"; body; 44 - url]; 45 - Ok (Buffer.contents stdout) 46 - with e -> 47 - Error (Printf.sprintf "curl failed: %s" (Printexc.to_string e)) 56 + let mime = Requests.Mime.of_string content_type in 57 + let body = Requests.Body.of_string mime body in 58 + let response = Requests.post http ~body url in 59 + if Requests.Response.ok response then begin 60 + let body = Requests.Response.body response |> Eio.Flow.read_all in 61 + Ok body 62 + end else begin 63 + let status = Requests.Response.status_code response in 64 + Error (Printf.sprintf "HTTP %d" status) 65 + end 66 + with exn -> 67 + Error (Printf.sprintf "Request failed: %s" (Printexc.to_string exn))
+9 -9
lib_sync/bushel_immich.ml
··· 42 42 43 43 (** {1 Immich API} *) 44 44 45 - let search_person ~proc_mgr ~endpoint ~api_key name = 45 + let search_person ~http ~endpoint ~api_key name = 46 46 let encoded_name = Uri.pct_encode name in 47 47 let url = Printf.sprintf "%s/api/search/person?name=%s" endpoint encoded_name in 48 48 let header = "X-Api-Key: " ^ api_key in 49 49 50 - match Bushel_http.get_with_header ~proc_mgr ~header url with 50 + match Bushel_http.get_with_header ~http ~header url with 51 51 | Result.Error e -> Result.Error e 52 52 | Result.Ok body -> decode_people body 53 53 54 - let download_thumbnail ~proc_mgr ~endpoint ~api_key person_id output_path = 54 + let download_thumbnail ~http ~endpoint ~api_key person_id output_path = 55 55 let url = Printf.sprintf "%s/api/people/%s/thumbnail" endpoint person_id in 56 56 let header = "X-Api-Key: " ^ api_key in 57 57 58 - match Bushel_http.get_with_header ~proc_mgr ~header url with 58 + match Bushel_http.get_with_header ~http ~header url with 59 59 | Result.Error e -> Result.Error e 60 60 | Result.Ok body -> 61 61 try ··· 72 72 73 73 (** {1 Contact Face Fetching} *) 74 74 75 - let fetch_face_for_contact ~proc_mgr ~endpoint ~api_key ~output_dir contact = 75 + let fetch_face_for_contact ~http ~endpoint ~api_key ~output_dir contact = 76 76 let names = Sortal_schema.Contact.names contact in 77 77 let handle = Sortal_schema.Contact.handle contact in 78 78 let output_path = Filename.concat output_dir (handle ^ ".jpg") in ··· 91 91 NotFound handle 92 92 | name :: rest -> 93 93 Log.debug (fun m -> m "Trying name: %s" name); 94 - match search_person ~proc_mgr ~endpoint ~api_key name with 94 + match search_person ~http ~endpoint ~api_key name with 95 95 | Result.Error e -> 96 96 Log.err (fun m -> m "Search error for %s: %s" name e); 97 97 Error e ··· 100 100 try_names rest 101 101 | Result.Ok (person :: _) -> 102 102 Log.info (fun m -> m "Found match for %s: %s" name person.name); 103 - match download_thumbnail ~proc_mgr ~endpoint ~api_key person.id output_path with 103 + match download_thumbnail ~http ~endpoint ~api_key person.id output_path with 104 104 | Result.Ok path -> Ok path 105 105 | Result.Error e -> Error e 106 106 in 107 107 try_names names 108 108 end 109 109 110 - let fetch_all_faces ~proc_mgr ~endpoint ~api_key ~output_dir contacts = 110 + let fetch_all_faces ~http ~endpoint ~api_key ~output_dir contacts = 111 111 (* Ensure output directory exists *) 112 112 if not (Sys.file_exists output_dir) then 113 113 Unix.mkdir output_dir 0o755; 114 114 115 115 let results = List.map (fun contact -> 116 116 let handle = Sortal_schema.Contact.handle contact in 117 - let result = fetch_face_for_contact ~proc_mgr ~endpoint ~api_key ~output_dir contact in 117 + let result = fetch_face_for_contact ~http ~endpoint ~api_key ~output_dir contact in 118 118 (handle, result) 119 119 ) contacts in 120 120
+167 -37
lib_sync/bushel_peertube.ml
··· 44 44 match Ptime.to_rfc3339 ~frac_s:0 t with 45 45 | s -> s) 46 46 47 + (** Nullable string - handles both absent and explicit null *) 48 + let nullable_string = 49 + let null = Jsont.null None in 50 + let some = Jsont.string |> Jsont.map ~dec:(fun s -> Some s) 51 + ~enc:(function Some s -> s | None -> "") in 52 + Jsont.any ~dec_null:null ~dec_string:some 53 + ~enc:(function None -> null | Some _ -> some) () 54 + 55 + (** Nullable ptime - handles both absent and explicit null *) 56 + let nullable_ptime = 57 + let null = Jsont.null None in 58 + let some = ptime_jsont |> Jsont.map ~dec:(fun t -> Some t) 59 + ~enc:(function Some t -> t | None -> Ptime.epoch) in 60 + Jsont.any ~dec_null:null ~dec_string:some 61 + ~enc:(function None -> null | Some _ -> some) () 62 + 47 63 let make_video ~id ~uuid ~name ~description ~url ~embed_path 48 64 ~published_at ~originally_published_at ~thumbnail_path ~tags = 49 65 { id; uuid; name; description; url; embed_path; ··· 59 75 |> mem "id" int ~enc:(fun v -> v.id) 60 76 |> mem "uuid" string ~enc:(fun v -> v.uuid) 61 77 |> mem "name" string ~enc:(fun v -> v.name) 62 - |> mem "description" (some string) ~dec_absent:None ~enc_omit:Option.is_none ~enc:(fun v -> v.description) 78 + |> mem "description" nullable_string ~dec_absent:None ~enc_omit:Option.is_none ~enc:(fun v -> v.description) 63 79 |> mem "url" string ~enc:(fun v -> v.url) 64 80 |> mem "embedPath" string ~enc:(fun v -> v.embed_path) 65 81 |> mem "publishedAt" ptime_jsont ~enc:(fun v -> v.published_at) 66 - |> mem "originallyPublishedAt" (some ptime_jsont) ~dec_absent:None ~enc_omit:Option.is_none ~enc:(fun v -> v.originally_published_at) 67 - |> mem "thumbnailPath" (some string) ~dec_absent:None ~enc_omit:Option.is_none ~enc:(fun v -> v.thumbnail_path) 82 + |> mem "originallyPublishedAt" nullable_ptime ~dec_absent:None ~enc_omit:Option.is_none ~enc:(fun v -> v.originally_published_at) 83 + |> mem "thumbnailPath" nullable_string ~dec_absent:None ~enc_omit:Option.is_none ~enc:(fun v -> v.thumbnail_path) 68 84 |> mem "tags" (list string) ~dec_absent:[] ~enc:(fun v -> v.tags) 69 85 |> finish 70 86 ··· 93 109 | Ok r -> Result.Ok r 94 110 | Error e -> Result.Error e 95 111 112 + (** {1 URL Parsing} *) 113 + 114 + (** Extract UUID from a PeerTube video URL. 115 + Handles formats like: 116 + - https://example.com/w/UUID 117 + - https://example.com/videos/watch/UUID *) 118 + let uuid_of_url url = 119 + let uri = Uri.of_string url in 120 + let path = Uri.path uri in 121 + (* Split path and find UUID *) 122 + let segments = String.split_on_char '/' path |> List.filter (fun s -> s <> "") in 123 + match segments with 124 + | ["w"; uuid] -> Some uuid 125 + | ["videos"; "watch"; uuid] -> Some uuid 126 + | _ -> None 127 + 128 + (** Extract the origin (scheme + host) from a URL *) 129 + let origin_of_url url = 130 + let uri = Uri.of_string url in 131 + match Uri.scheme uri, Uri.host uri with 132 + | Some scheme, Some host -> 133 + let port = match Uri.port uri with 134 + | Some p -> Printf.sprintf ":%d" p 135 + | None -> "" 136 + in 137 + Some (Printf.sprintf "%s://%s%s" scheme host port) 138 + | _ -> None 139 + 140 + (** Find a configured server that matches the URL's origin *) 141 + let find_server_for_url servers url = 142 + match origin_of_url url with 143 + | None -> None 144 + | Some origin -> 145 + List.find_opt (fun (s : Bushel_config.peertube_server) -> 146 + (* Normalize endpoints for comparison *) 147 + let endpoint = String.lowercase_ascii s.endpoint in 148 + let origin = String.lowercase_ascii origin in 149 + (* Strip trailing slashes *) 150 + let strip_slash s = 151 + if String.length s > 0 && s.[String.length s - 1] = '/' 152 + then String.sub s 0 (String.length s - 1) 153 + else s 154 + in 155 + strip_slash endpoint = strip_slash origin 156 + ) servers 157 + 96 158 (** {1 PeerTube API} *) 97 159 98 - let fetch_video_details ~proc_mgr ~endpoint uuid = 160 + let fetch_video_details ~http ~endpoint uuid = 99 161 let url = Printf.sprintf "%s/api/v1/videos/%s" endpoint uuid in 100 - match Bushel_http.get ~proc_mgr url with 162 + match Bushel_http.get ~http url with 101 163 | Result.Error e -> Result.Error e 102 - | Result.Ok body -> decode_video body 164 + | Result.Ok body -> 165 + match decode_video body with 166 + | Result.Ok v -> Result.Ok v 167 + | Result.Error e -> 168 + Log.warn (fun m -> m "Failed to decode video %s: %s" uuid e); 169 + Log.debug (fun m -> m "Response body: %s" (String.sub body 0 (min 500 (String.length body)))); 170 + Result.Error e 103 171 104 - let fetch_channel_videos ~proc_mgr ~endpoint ~channel ?(count=20) ?(start=0) () = 172 + let fetch_channel_videos ~http ~endpoint ~channel ?(count=20) ?(start=0) () = 105 173 let url = Printf.sprintf "%s/api/v1/video-channels/%s/videos?count=%d&start=%d" 106 174 endpoint channel count start in 107 - match Bushel_http.get ~proc_mgr url with 175 + match Bushel_http.get ~http url with 108 176 | Result.Error _ -> (0, []) 109 177 | Result.Ok body -> 110 178 match decode_channel_response body with 111 179 | Result.Ok r -> (r.total, r.data) 112 180 | Result.Error _ -> (0, []) 113 181 114 - let fetch_all_channel_videos ~proc_mgr ~endpoint ~channel ?(page_size=20) () = 182 + let fetch_all_channel_videos ~http ~endpoint ~channel ?(page_size=20) () = 115 183 let rec fetch_pages start acc = 116 - let (total, videos) = fetch_channel_videos ~proc_mgr ~endpoint ~channel ~count:page_size ~start () in 184 + let (total, videos) = fetch_channel_videos ~http ~endpoint ~channel ~count:page_size ~start () in 117 185 let all = acc @ videos in 118 186 let fetched = start + List.length videos in 119 187 if fetched < total && List.length videos > 0 then ··· 130 198 | Some path -> Some (endpoint ^ path) 131 199 | None -> None 132 200 133 - let download_thumbnail ~proc_mgr ~endpoint video output_path = 201 + let download_thumbnail ~http ~endpoint video output_path = 134 202 match thumbnail_url endpoint video with 135 203 | None -> 136 204 Log.warn (fun m -> m "No thumbnail for video %s" video.uuid); 137 205 Error "No thumbnail available" 138 206 | Some url -> 139 - match Bushel_http.get ~proc_mgr url with 207 + match Bushel_http.get ~http url with 140 208 | Result.Error e -> Error e 141 209 | Result.Ok body -> 142 210 try ··· 198 266 Hashtbl.fold (fun k v acc -> (k, v) :: acc) index [] 199 267 end 200 268 201 - (** {1 Fetch Thumbnails from Index} *) 269 + (** {1 Fetch Thumbnails} *) 202 270 203 - let fetch_thumbnails_from_index ~proc_mgr ~servers ~output_dir index = 271 + (** Try to fetch a video from a specific server *) 272 + let try_fetch_from_server ~http ~endpoint ~output_path uuid = 273 + match fetch_video_details ~http ~endpoint uuid with 274 + | Result.Error _ -> None 275 + | Result.Ok video -> 276 + match download_thumbnail ~http ~endpoint video output_path with 277 + | Ok path -> Some (Ok path) 278 + | Skipped path -> Some (Skipped path) 279 + | Error _ -> None 280 + 281 + (** Try each server until one succeeds, return the server that worked *) 282 + let try_all_servers ~http ~servers ~output_path uuid = 283 + let rec try_next = function 284 + | [] -> None 285 + | (server : Bushel_config.peertube_server) :: rest -> 286 + Log.debug (fun m -> m "Trying server %s for video %s" server.name uuid); 287 + match try_fetch_from_server ~http ~endpoint:server.endpoint ~output_path uuid with 288 + | Some result -> Some (server, result) 289 + | None -> try_next rest 290 + in 291 + try_next servers 292 + 293 + (** Fetch thumbnails for videos, using URL field, index, or server discovery. 294 + Updates the index when servers are discovered. *) 295 + let fetch_thumbnails ~http ~servers ~output_dir ~videos ~index = 204 296 (* Ensure output dir exists *) 205 297 if not (Sys.file_exists output_dir) then 206 298 Unix.mkdir output_dir 0o755; 207 299 208 - let server_map = 209 - List.fold_left (fun acc (s : Bushel_config.peertube_server) -> 210 - (s.name, s.endpoint) :: acc 211 - ) [] servers 212 - in 213 - 214 - let results = List.filter_map (fun (uuid, server_name) -> 300 + let results = List.filter_map (fun (video : Bushel.Video.t) -> 301 + let uuid = Bushel.Video.uuid video in 302 + let url = Bushel.Video.url video in 215 303 let output_path = Filename.concat output_dir (uuid ^ ".jpg") in 216 304 217 - (* Skip if exists *) 305 + (* Skip if thumbnail exists *) 218 306 if Sys.file_exists output_path then begin 219 307 Log.debug (fun m -> m "Skipping %s: thumbnail exists" uuid); 220 308 Some (uuid, Skipped output_path) 221 309 end else begin 222 - match List.assoc_opt server_name server_map with 223 - | None -> 224 - Log.warn (fun m -> m "Unknown server %s for video %s" server_name uuid); 225 - Some (uuid, Error (Printf.sprintf "Unknown server: %s" server_name)) 226 - | Some endpoint -> 227 - Log.info (fun m -> m "Fetching thumbnail for %s from %s" uuid server_name); 228 - match fetch_video_details ~proc_mgr ~endpoint uuid with 229 - | Result.Error e -> 230 - Some (uuid, Error e) 231 - | Result.Ok video -> 232 - match download_thumbnail ~proc_mgr ~endpoint video output_path with 233 - | Ok path -> Some (uuid, Ok path) 234 - | Skipped path -> Some (uuid, Skipped path) 235 - | Error e -> Some (uuid, Error e) 310 + (* Strategy 1: Try to derive server from video URL *) 311 + let server_from_url = 312 + if url <> "" then find_server_for_url servers url 313 + else None 314 + in 315 + 316 + (* Strategy 2: Check the index *) 317 + let server_from_index = 318 + match VideoIndex.find index uuid with 319 + | Some server_name -> 320 + List.find_opt (fun (s : Bushel_config.peertube_server) -> 321 + s.name = server_name) servers 322 + | None -> None 323 + in 324 + 325 + (* Helper to try all servers and update index on success *) 326 + let search_all_servers () = 327 + Log.info (fun m -> m "Searching all servers for video %s" uuid); 328 + match try_all_servers ~http ~servers ~output_path uuid with 329 + | Some (server, result) -> 330 + Log.info (fun m -> m "Found video %s on server %s" uuid server.name); 331 + VideoIndex.add index ~uuid ~server:server.name; 332 + Some (uuid, result) 333 + | None -> 334 + Log.warn (fun m -> m "Video %s not found on any server" uuid); 335 + Some (uuid, Error "Not found on any configured server") 336 + in 337 + 338 + match server_from_url, server_from_index with 339 + | Some server, _ -> 340 + (* Have server from URL - try it first, fall back to searching all *) 341 + Log.info (fun m -> m "Fetching thumbnail for %s from %s (from URL)" uuid server.name); 342 + (match try_fetch_from_server ~http ~endpoint:server.endpoint ~output_path uuid with 343 + | Some result -> 344 + VideoIndex.add index ~uuid ~server:server.name; 345 + Some (uuid, result) 346 + | None -> 347 + Log.info (fun m -> m "URL-derived server failed, trying others..."); 348 + search_all_servers ()) 349 + 350 + | None, Some server -> 351 + (* Have server from index - try it first, fall back to searching all *) 352 + Log.info (fun m -> m "Fetching thumbnail for %s from %s (from index)" uuid server.name); 353 + (match try_fetch_from_server ~http ~endpoint:server.endpoint ~output_path uuid with 354 + | Some result -> Some (uuid, result) 355 + | None -> 356 + Log.info (fun m -> m "Indexed server failed, trying others..."); 357 + search_all_servers ()) 358 + 359 + | None, None -> 360 + (* No server known - search all *) 361 + search_all_servers () 236 362 end 237 - ) (VideoIndex.to_list index) in 363 + ) videos in 238 364 239 365 let ok_count = List.length (List.filter (fun (_, r) -> match r with Ok _ -> true | _ -> false) results) in 240 366 let skipped_count = List.length (List.filter (fun (_, r) -> match r with Skipped _ -> true | _ -> false) results) in ··· 244 370 ok_count skipped_count error_count); 245 371 246 372 results 373 + 374 + (** Legacy function for compatibility - calls fetch_thumbnails with empty video list *) 375 + let fetch_thumbnails_from_index ~http ~servers ~output_dir index = 376 + fetch_thumbnails ~http ~servers ~output_dir ~videos:[] ~index
+28 -16
lib_sync/bushel_sync.ml
··· 22 22 (** Video metadata and thumbnails from PeerTube *) 23 23 module Peertube = Bushel_peertube 24 24 25 - (** Simple HTTP client using curl via Eio.Process *) 25 + (** HTTP client using the requests library *) 26 26 module Http = Bushel_http 27 27 28 28 let src = Logs.Src.create "bushel.sync" ~doc:"Bushel sync pipeline" ··· 55 55 | "typesense" -> Some Typesense 56 56 | _ -> None 57 57 58 - let all_steps = [Images; Thumbs; Faces; Srcsetter; Videos] 58 + let all_steps = [Images; Thumbs; Faces; Videos; Srcsetter] 59 59 let all_steps_with_remote = all_steps @ [Typesense] 60 60 61 61 (** {1 Step Results} *) ··· 303 303 304 304 (** {1 Video Thumbnails} *) 305 305 306 - let sync_video_thumbnails ~dry_run ~proc_mgr config = 306 + let sync_video_thumbnails ~dry_run ~http config entries = 307 307 Log.info (fun m -> m "Syncing video thumbnails from PeerTube..."); 308 308 let output_dir = Bushel_config.video_thumbs_dir config in 309 309 let videos_yml = Filename.concat config.data_dir "videos.yml" in 310 310 311 311 let index = Bushel_peertube.VideoIndex.load_file videos_yml in 312 - let video_list = Bushel_peertube.VideoIndex.to_list index in 313 - let count = List.length video_list in 312 + let videos = Bushel.Entry.videos entries in 313 + let count = List.length videos in 314 314 315 315 if count = 0 then begin 316 - Log.info (fun m -> m "No videos in index"); 316 + Log.info (fun m -> m "No videos found"); 317 317 { step = Videos; success = true; 318 - message = "No videos in index"; 318 + message = "No videos found"; 319 319 details = [] } 320 320 end else if dry_run then begin 321 - let would_fetch = List.filter (fun (uuid, _server) -> 321 + let would_fetch = List.filter (fun video -> 322 + let uuid = Bushel.Video.uuid video in 322 323 let output_path = Filename.concat output_dir (uuid ^ ".jpg") in 323 324 not (Sys.file_exists output_path) 324 - ) video_list in 325 + ) videos in 325 326 let skipped = count - List.length would_fetch in 326 327 { step = Videos; success = true; 327 328 message = Printf.sprintf "Would fetch %d video thumbnails from PeerTube (%d already exist)" 328 329 (List.length would_fetch) skipped; 329 - details = List.map (fun (uuid, server) -> 330 - Printf.sprintf "curl <server:%s>/api/v1/videos/%s -> %s.jpg" server uuid uuid 330 + details = List.map (fun video -> 331 + let uuid = Bushel.Video.uuid video in 332 + let url = Bushel.Video.url video in 333 + if url <> "" then 334 + Printf.sprintf "%s (from URL: %s)" uuid url 335 + else 336 + Printf.sprintf "%s (will search servers)" uuid 331 337 ) (List.filteri (fun i _ -> i < 5) would_fetch) @ 332 338 (if List.length would_fetch > 5 then ["...and more"] else []) } 333 339 end else begin 334 - let results = Bushel_peertube.fetch_thumbnails_from_index 335 - ~proc_mgr 340 + let results = Bushel_peertube.fetch_thumbnails 341 + ~http 336 342 ~servers:config.peertube_servers 337 343 ~output_dir 338 - index in 344 + ~videos 345 + ~index in 346 + 347 + (* Save updated index (may have discovered new server mappings) *) 348 + Bushel_peertube.VideoIndex.save_file videos_yml index; 339 349 340 350 let ok_count = List.length (List.filter (fun (_, r) -> 341 351 match r with Bushel_peertube.Ok _ -> true | _ -> false) results) in ··· 375 385 376 386 (** {1 Run Pipeline} *) 377 387 378 - let run ~dry_run ~env ~config ~steps ~entries = 388 + let run ~dry_run ~sw ~env ~config ~steps ~entries = 379 389 let proc_mgr = Eio.Stdenv.process_mgr env in 380 390 let fs = Eio.Stdenv.fs env in 391 + (* Create HTTP session for network requests *) 392 + let http = Bushel_http.create ~sw env in 381 393 382 394 let results = List.map (fun step -> 383 395 Log.info (fun m -> m "%s step: %s" ··· 388 400 | Srcsetter -> run_srcsetter ~dry_run ~fs ~proc_mgr config 389 401 | Thumbs -> generate_paper_thumbnails ~dry_run ~fs ~proc_mgr config 390 402 | Faces -> sync_faces ~dry_run ~fs config entries 391 - | Videos -> sync_video_thumbnails ~dry_run ~proc_mgr config 403 + | Videos -> sync_video_thumbnails ~dry_run ~http config entries 392 404 | Typesense -> upload_typesense ~dry_run config entries 393 405 ) steps in 394 406
+7 -7
lib_sync/bushel_zotero.ml
··· 156 156 if String.ends_with ~suffix:"/" base_url then base_url ^ "export" 157 157 else base_url ^ "/export" 158 158 159 - let resolve_doi ~proc_mgr ~server_url doi = 159 + let resolve_doi ~http ~server_url doi = 160 160 Log.info (fun m -> m "Resolving DOI: %s" doi); 161 161 let url = web_endpoint server_url in 162 162 let body = "https://doi.org/" ^ doi in 163 - match Bushel_http.post ~proc_mgr ~content_type:"text/plain" ~body url with 163 + match Bushel_http.post ~http ~content_type:"text/plain" ~body url with 164 164 | Error e -> Error e 165 165 | Ok json_str -> 166 166 match Jsont_bytesrw.decode_string Jsont.json json_str with 167 167 | Ok json -> Ok json 168 168 | Error e -> Error (Printf.sprintf "JSON parse error: %s" e) 169 169 170 - let export_bibtex ~proc_mgr ~server_url json = 170 + let export_bibtex ~http ~server_url json = 171 171 let url = export_endpoint server_url ^ "?format=bibtex" in 172 172 match Jsont_bytesrw.encode_string Jsont.json json with 173 173 | Error e -> Error e 174 - | Ok body -> Bushel_http.post ~proc_mgr ~content_type:"application/json" ~body url 174 + | Ok body -> Bushel_http.post ~http ~content_type:"application/json" ~body url 175 175 176 176 (** {1 DOI Resolution} *) 177 177 178 - let resolve ~proc_mgr ~server_url ~slug doi = 179 - match resolve_doi ~proc_mgr ~server_url doi with 178 + let resolve ~http ~server_url ~slug doi = 179 + match resolve_doi ~http ~server_url doi with 180 180 | Error e -> Error e 181 181 | Ok json -> 182 182 (* Export to BibTeX *) 183 - match export_bibtex ~proc_mgr ~server_url json with 183 + match export_bibtex ~http ~server_url json with 184 184 | Error e -> Error (Printf.sprintf "BibTeX export failed: %s" e) 185 185 | Ok bib -> 186 186 Log.debug (fun m -> m "Got BibTeX: %s" bib);
+2 -1
lib_sync/dune
··· 16 16 fmt 17 17 sortal.schema 18 18 sortal 19 - srcsetter-cmd)) 19 + srcsetter-cmd 20 + requests))