···620620 in
621621 (* collect record data for insert *)
622622 let since = Tid.now () in
623623+ let blob_refs : (string * Cid.t) list ref = ref [] in
623624 let record_data =
624624- List.filter_map
625625+ List.map
625626 (fun (path, cid) ->
626627 match Block_map.get cid all_blocks with
627628 | Some data ->
628628- Some (path, cid, data, since)
629629+ let record = Lex.of_cbor data in
630630+ let record_refs =
631631+ Util.find_blob_refs record
632632+ |> List.map (fun (br : Mist.Blob_ref.t) -> (path, br.ref))
633633+ in
634634+ blob_refs := record_refs @ !blob_refs ;
635635+ (path, cid, data, since)
629636 | None ->
630637 failwith ("missing record block: " ^ Cid.to_string cid) )
631638 leaves
···639646 let$! () =
640647 [%rapper execute {sql| DELETE FROM records |sql}] () conn
641648 in
642642- let$! () = User_store.Bulk.put_records record_data conn in
649649+ let$!* _ =
650650+ Lwt.all
651651+ [ User_store.Bulk.put_records record_data conn
652652+ ; User_store.Bulk.put_blob_refs !blob_refs conn ]
653653+ in
643654 Lwt.return_ok () ) )
644655 in
645656 (* clear cached block_map so it's rebuilt on next access *)
+35
pegasus/lib/user_store.ml
···635635 Lwt.return_error e )
636636 in
637637 process_chunks chunks
638638+639639+ let put_blob_refs (refs : (string * Cid.t) list) conn =
640640+ if List.is_empty refs then Lwt.return_ok ()
641641+ else
642642+ let module C = (val conn : Caqti_lwt.CONNECTION) in
643643+ let chunks = chunk_list 200 refs in
644644+ let rec process_chunks = function
645645+ | [] ->
646646+ Lwt.return_ok ()
647647+ | chunk :: rest -> (
648648+ let values =
649649+ List.map
650650+ (fun (path, cid) ->
651651+ Printf.sprintf "('%s', '%s')" (escape_sql_string path)
652652+ (escape_sql_string (Cid.to_string cid)) )
653653+ chunk
654654+ |> String.concat ", "
655655+ in
656656+ let sql =
657657+ Printf.sprintf
658658+ "INSERT INTO blobs_records (record_path, blob_cid) VALUES %s \
659659+ ON CONFLICT DO NOTHING"
660660+ values
661661+ in
662662+ let query =
663663+ Caqti_request.Infix.( ->. ) Caqti_type.unit Caqti_type.unit sql
664664+ in
665665+ let%lwt result = C.exec query () in
666666+ match result with
667667+ | Ok () ->
668668+ process_chunks rest
669669+ | Error e ->
670670+ Lwt.return_error e )
671671+ in
672672+ process_chunks chunks
638673end
+20-1
pegasus/lib/util.ml
···3333 let ( let$! ) m f =
3434 match%lwt m with Ok x -> f x | Error e -> raise (Caqti_error.Exn e)
35353636+ (* let$! but for an array of results *)
3737+ let ( let$!* ) m f =
3838+ let%lwt results =
3939+ match%lwt m with
4040+ | xs ->
4141+ Lwt.return @@ List.rev
4242+ @@ List.fold_left
4343+ (fun acc x ->
4444+ match x with
4545+ | Ok x ->
4646+ x :: acc
4747+ | Error e ->
4848+ raise (Caqti_error.Exn e) )
4949+ [] xs
5050+ in
5151+ f results
5252+3653 (* unwraps an Lwt result, raising an exception if there's an error *)
3754 let ( >$! ) m f =
3855 match%lwt m with
···468485 let props_json = Template.props_to_json props |> Yojson.Basic.to_string in
469486 let page_data = Printf.sprintf "window.__PAGE__ = {props: %s};" props_json in
470487 let app = Template.make ~props () in
471471- let page = Frontend.Layout.make ?title ~favicon:Env.favicon_url ~children:app () in
488488+ let page =
489489+ Frontend.Layout.make ?title ~favicon:Env.favicon_url ~children:app ()
490490+ in
472491 Dream.stream ?status
473492 ~headers:[("Content-Type", "text/html")]
474493 (fun stream ->