···11module String_map = Map.Make (String)
2233+(* sort map keys by length first, then lexicographically *)
44+let dag_cbor_key_compare a b =
55+ let la = String.length a in
66+ let lb = String.length b in
77+ if la = lb then String.compare a b else compare la lb
88+39let ordered_map_keys (m : 'a String_map.t) : string list =
410 let keys = String_map.bindings m |> List.map fst in
55- List.sort
66- (fun a b ->
77- let la = String.length a in
88- let lb = String.length b in
99- if la = lb then String.compare a b else compare la lb )
1010- keys
1111+ List.sort dag_cbor_key_compare keys
1212+1313+(* returns bindings sorted in dag-cbor canonical order *)
1414+let ordered_map_bindings (m : 'a String_map.t) : (string * 'a) list =
1515+ String_map.bindings m |> List.sort (fun (a, _) (b, _) -> dag_cbor_key_compare a b)
11161217let type_info_length len =
1318 if len < 24 then 1
···195200 | `Map m ->
196201 let len = String_map.cardinal m in
197202 write_type_and_argument t 5 (Int64.of_int len) ;
198198- ordered_map_keys m
199199- |> List.iter (fun k ->
203203+ ordered_map_bindings m
204204+ |> List.iter (fun (k, v) ->
200205 write_string t k ;
201201- write_value t (String_map.find k m) )
206206+ write_value t v )
202207 | `Link cid ->
203208 write_cid t cid
204209
+400-155
mist/lib/mst.ml
···208208209209 val proof_for_key : t -> Cid.t -> string -> Block_map.t Lwt.t
210210211211+ val proof_for_keys : t -> Cid.t -> string list -> Block_map.t Lwt.t
212212+211213 val leaf_count : t -> int Lwt.t
212214213215 val layer : t -> int Lwt.t
···246248247249 let create blockstore root = {blockstore; root}
248250251251+ let entries_are_sorted (entries : entry list) : bool =
252252+ let rec aux prev_key = function
253253+ | [] ->
254254+ true
255255+ | e :: tl ->
256256+ if String.compare prev_key e.key <= 0 then aux e.key tl else false
257257+ in
258258+ match entries with [] -> true | e :: tl -> aux e.key tl
259259+260260+ (* we try to batch reads from the blockstore when possible
261261+ 200 seems like a sane upper limit for that *)
262262+ let batch_size = 200
263263+264264+ let take_n n lst =
265265+ if n <= 0 then ([], lst)
266266+ else
267267+ let rec loop acc remaining xs =
268268+ match (remaining, xs) with
269269+ | 0, _ ->
270270+ (List.rev acc, xs)
271271+ | _, [] ->
272272+ (List.rev acc, [])
273273+ | _, x :: xs' ->
274274+ loop (x :: acc) (remaining - 1) xs'
275275+ in
276276+ loop [] n lst
277277+278278+ let get_blocks_exn (t : t) (cids : Cid.t list) : Block_map.t Lwt.t =
279279+ if List.is_empty cids then Lwt.return Block_map.empty
280280+ else
281281+ let%lwt bm = Store.get_blocks t.blockstore cids in
282282+ match bm.missing with
283283+ | [] ->
284284+ Lwt.return bm.blocks
285285+ | missing :: _ ->
286286+ failwith ("missing mst node block: " ^ Cid.to_string missing)
287287+249288 (* retrieves a raw node by cid *)
250289 let retrieve_node_raw t cid : node_raw option Lwt.t =
251290 match%lwt Store.get_bytes t.blockstore cid with
···288327 | None ->
289328 lazy Lwt.return_none
290329 in
330330+ let last_key = ref "" in
291331 let entries =
292292- List.fold_left
293293- (fun (entries : entry list) entry ->
332332+ List.map
333333+ (fun entry ->
294334 let prefix =
295295- match entries with
296296- | [] ->
297297- ""
298298- | prev :: _ ->
299299- String.sub prev.key 0 entry.p
335335+ if entry.p = 0 then ""
336336+ else if !last_key = "" then ""
337337+ else String.sub !last_key 0 entry.p
300338 in
301339 let path = String.concat "" [prefix; Bytes.to_string entry.k] in
302340 Util.ensure_valid_key path ;
341341+ last_key := path ;
303342 let right =
304343 match entry.t with
305344 | Some r ->
···307346 | None ->
308347 lazy Lwt.return_none
309348 in
310310- ({layer; key= path; value= entry.v; right} : entry) :: entries )
311311- [] node_raw.e
349349+ ({layer; key= path; value= entry.v; right} : entry) )
350350+ node_raw.e
312351 in
313352 Lwt.return {layer; left; entries}
314353···349388 (* returns a map of key -> cid *)
350389 let build_map t : Cid.t String_map.t Lwt.t =
351390 let map = ref String_map.empty in
352352- let%lwt () =
353353- traverse t (fun path cid -> map := String_map.add path cid !map)
391391+ let rec loop queue visited =
392392+ match queue with
393393+ | [] ->
394394+ Lwt.return !map
395395+ | _ ->
396396+ let batch, rest = take_n batch_size queue in
397397+ let to_fetch =
398398+ List.filter (fun (cid, _) -> not (Cid.Set.mem cid visited)) batch
399399+ in
400400+ let%lwt blocks = get_blocks_exn t (List.map fst to_fetch) in
401401+ let visited', next_queue =
402402+ List.fold_left
403403+ (fun (visited, queue) (cid, prefix) ->
404404+ if Cid.Set.mem cid visited then (visited, queue)
405405+ else
406406+ let bytes = Block_map.get cid blocks |> Option.get in
407407+ let raw = decode_block_raw bytes in
408408+ let last_key = ref prefix in
409409+ let next_pairs =
410410+ List.fold_left
411411+ (fun acc (e : entry_raw) ->
412412+ let key_prefix =
413413+ if e.p = 0 then ""
414414+ else if e.p <= String.length !last_key then
415415+ String.sub !last_key 0 e.p
416416+ else !last_key
417417+ in
418418+ let full_key = key_prefix ^ Bytes.to_string e.k in
419419+ Util.ensure_valid_key full_key ;
420420+ last_key := full_key ;
421421+ map := String_map.add full_key e.v !map ;
422422+ match e.t with
423423+ | Some r ->
424424+ (r, full_key) :: acc
425425+ | None ->
426426+ acc )
427427+ ( match raw.l with
428428+ | Some l ->
429429+ [(l, prefix)]
430430+ | None ->
431431+ [] )
432432+ raw.e
433433+ in
434434+ (Cid.Set.add cid visited, List.rev_append next_pairs queue) )
435435+ (visited, rest) batch
436436+ in
437437+ loop next_queue visited'
354438 in
355355- Lwt.return !map
439439+ loop [(t.root, "")] Cid.Set.empty
356440357441 (* returns all non-leaf mst node blocks in order for a car stream
358442 leaf cids can be obtained via collect_nodes_and_leaves or leaves_of_root *)
···404488 for each node: node block, left subtree, then for each entry: record, right subtree *)
405489 let to_ordered_stream t : ordered_item Lwt_seq.t =
406490 (* queue items: `Node cid to visit, `Leaf cid to yield *)
407407- let rec step queue =
491491+ let prefetch queue cache missing =
492492+ let rec collect acc seen remaining = function
493493+ | [] ->
494494+ (List.rev acc, seen)
495495+ | _ when remaining = 0 ->
496496+ (List.rev acc, seen)
497497+ | `Node cid :: rest ->
498498+ if
499499+ Cid.Set.mem cid missing
500500+ || Block_map.has cid cache
501501+ || Cid.Set.mem cid seen
502502+ then collect acc seen remaining rest
503503+ else
504504+ collect (cid :: acc) (Cid.Set.add cid seen) (remaining - 1) rest
505505+ | _ :: rest ->
506506+ collect acc seen remaining rest
507507+ in
508508+ let cids, _seen = collect [] Cid.Set.empty batch_size queue in
509509+ if List.is_empty cids then Lwt.return (cache, missing)
510510+ else
511511+ let%lwt bm = Store.get_blocks t.blockstore cids in
512512+ let cache' =
513513+ List.fold_left
514514+ (fun acc (cid, bytes) -> Block_map.set cid bytes acc)
515515+ cache (Block_map.entries bm.blocks)
516516+ in
517517+ let missing' =
518518+ List.fold_left
519519+ (fun acc cid -> Cid.Set.add cid acc)
520520+ missing bm.missing
521521+ in
522522+ Lwt.return (cache', missing')
523523+ in
524524+ let rec step (queue, cache, missing) =
408525 match queue with
409526 | [] ->
410527 Lwt.return_none
411411- | `Node cid :: rest -> (
412412- let%lwt bytes_opt = Store.get_bytes t.blockstore cid in
413413- match bytes_opt with
414414- | None ->
415415- step rest
416416- | Some bytes ->
417417- let node = decode_block_raw bytes in
418418- (* queue items: left subtree, then for each entry: record then right subtree *)
419419- let left_queue =
420420- match node.l with Some l -> [`Node l] | None -> []
421421- in
422422- let entries_queue =
423423- List.concat_map
424424- (fun (e : entry_raw) ->
425425- let right_queue =
426426- match e.t with Some r -> [`Node r] | None -> []
427427- in
428428- `Leaf e.v :: right_queue )
429429- node.e
430430- in
431431- let new_queue = left_queue @ entries_queue @ rest in
432432- Lwt.return_some ((Node (cid, bytes) : ordered_item), new_queue) )
433528 | `Leaf cid :: rest ->
434434- Lwt.return_some ((Leaf cid : ordered_item), rest)
529529+ Lwt.return_some ((Leaf cid : ordered_item), (rest, cache, missing))
530530+ | `Node cid :: rest ->
531531+ if Cid.Set.mem cid missing then step (rest, cache, missing)
532532+ else
533533+ ( match Block_map.get cid cache with
534534+ | None ->
535535+ let%lwt cache', missing' = prefetch queue cache missing in
536536+ if cache' == cache && Cid.Set.mem cid missing' then
537537+ step (rest, cache', missing')
538538+ else step (queue, cache', missing')
539539+ | Some bytes ->
540540+ let node = decode_block_raw bytes in
541541+ (* queue items: left subtree, then for each entry: record then right subtree *)
542542+ let left_queue =
543543+ match node.l with Some l -> [`Node l] | None -> []
544544+ in
545545+ let entries_queue =
546546+ List.concat_map
547547+ (fun (e : entry_raw) ->
548548+ let right_queue =
549549+ match e.t with Some r -> [`Node r] | None -> []
550550+ in
551551+ `Leaf e.v :: right_queue )
552552+ node.e
553553+ in
554554+ let new_queue = left_queue @ entries_queue @ rest in
555555+ let cache' = Block_map.remove cid cache in
556556+ Lwt.return_some
557557+ ((Node (cid, bytes) : ordered_item), (new_queue, cache', missing))
558558+ )
435559 in
436436- Lwt_seq.unfold_lwt step [`Node t.root]
560560+ Lwt_seq.unfold_lwt step ([`Node t.root], Block_map.empty, Cid.Set.empty)
437561438562 (* produces a cid and cbor-encoded bytes for a given tree *)
439563 let serialize t node : (Cid.t * bytes, exn) Lwt_result.t =
440440- let sorted_entries =
441441- List.sort (fun (a : entry) b -> String.compare a.key b.key) node.entries
442442- in
443564 let rec aux node : (Cid.t * bytes) Lwt.t =
565565+ let entries =
566566+ if entries_are_sorted node.entries then node.entries
567567+ else
568568+ List.sort (fun (a : entry) b -> String.compare a.key b.key)
569569+ node.entries
570570+ in
444571 let%lwt left =
445572 node.left
446573 >>? function
···473600 ; p= prefix_len
474601 ; v= entry.value
475602 ; t= right } )
476476- node.entries
603603+ entries
477604 in
478605 let encoded =
479606 Dag_cbor.encode (encode_node_raw {l= left; e= mst_entries})
···485612 | Error e ->
486613 raise e
487614 in
488488- try%lwt Lwt.map Result.ok (aux {node with entries= sorted_entries})
615615+ try%lwt Lwt.map Result.ok (aux node)
489616 with e -> Lwt.return_error e
490617491618 (* raw-node helpers for covering proofs: operate on stored bytes, not re-serialization *)
···545672 let seq = interleave_raw raw keys in
546673 let index = find_gte_leaf_index key seq in
547674 let%lwt blocks =
548548- match Util.at_index index seq with
675675+ match List.nth_opt seq index with
549676 | Some (Leaf (k, _, _)) when k = key ->
550677 Lwt.return Block_map.empty
551678 | Some (Leaf (_k, v_right, _)) -> (
552679 let prev =
553553- if index - 1 >= 0 then Util.at_index (index - 1) seq else None
680680+ if index - 1 >= 0 then List.nth_opt seq (index - 1) else None
554681 in
555682 match prev with
556683 | Some (Tree c) ->
···587714 proof_for_key t c key
588715 | None -> (
589716 let prev =
590590- if index - 1 >= 0 then Util.at_index (index - 1) seq else None
717717+ if index - 1 >= 0 then List.nth_opt seq (index - 1) else None
591718 in
592719 match prev with
593720 | Some (Tree c) ->
···602729 None
603730 in
604731 let right_leaf =
605605- match Util.at_index index seq with
732732+ match List.nth_opt seq index with
606733 | Some (Leaf (_, v_right, _)) ->
607734 Some v_right
608735 | _ ->
···634761 in
635762 Lwt.return (Block_map.set cid bytes blocks)
636763764764+ let proof_for_keys t cid keys : Block_map.t Lwt.t =
765765+ if List.is_empty keys then Lwt.return Block_map.empty
766766+ else
767767+ let keys = List.sort_uniq String.compare keys in
768768+ let cache = ref Block_map.empty in
769769+ let missing = ref Cid.Set.empty in
770770+ let acc = ref Block_map.empty in
771771+ let add_block cid bytes =
772772+ if not (Block_map.has cid !acc) then
773773+ acc := Block_map.set cid bytes !acc
774774+ in
775775+ let get_bytes_cached cid =
776776+ match Block_map.get cid !cache with
777777+ | Some bytes ->
778778+ Lwt.return_some bytes
779779+ | None ->
780780+ if Cid.Set.mem cid !missing then Lwt.return_none
781781+ else
782782+ let%lwt bytes_opt = Store.get_bytes t.blockstore cid in
783783+ ( match bytes_opt with
784784+ | Some bytes ->
785785+ cache := Block_map.set cid bytes !cache
786786+ | None ->
787787+ missing := Cid.Set.add cid !missing ) ;
788788+ Lwt.return bytes_opt
789789+ in
790790+ let add_leaf cid_opt =
791791+ match cid_opt with
792792+ | None ->
793793+ Lwt.return_unit
794794+ | Some leaf_cid -> (
795795+ match%lwt get_bytes_cached leaf_cid with
796796+ | Some bytes ->
797797+ add_block leaf_cid bytes ;
798798+ Lwt.return_unit
799799+ | None ->
800800+ Lwt.return_unit )
801801+ in
802802+ let rec proof_for_key_cached cid key =
803803+ match%lwt get_bytes_cached cid with
804804+ | None ->
805805+ Lwt.return_unit
806806+ | Some bytes ->
807807+ add_block cid bytes ;
808808+ let raw = decode_block_raw bytes in
809809+ let keys = node_entry_keys raw in
810810+ let seq = interleave_raw raw keys in
811811+ let index = find_gte_leaf_index key seq in
812812+ ( match List.nth_opt seq index with
813813+ | Some (Leaf (k, _, _)) when k = key ->
814814+ Lwt.return_unit
815815+ | Some (Leaf (_k, v_right, _)) -> (
816816+ let prev =
817817+ if index - 1 >= 0 then List.nth_opt seq (index - 1) else None
818818+ in
819819+ match prev with
820820+ | Some (Tree c) ->
821821+ proof_for_key_cached c key
822822+ | _ ->
823823+ let left_leaf =
824824+ match prev with
825825+ | Some (Leaf (_, v_left, _)) ->
826826+ Some v_left
827827+ | _ ->
828828+ None
829829+ in
830830+ let%lwt () = add_leaf left_leaf in
831831+ add_leaf (Some v_right) )
832832+ | Some (Tree c) ->
833833+ proof_for_key_cached c key
834834+ | None -> (
835835+ let prev =
836836+ if index - 1 >= 0 then List.nth_opt seq (index - 1) else None
837837+ in
838838+ match prev with
839839+ | Some (Tree c) ->
840840+ proof_for_key_cached c key
841841+ | _ ->
842842+ let left_leaf =
843843+ match prev with
844844+ | Some (Leaf (_, v_left, _)) ->
845845+ Some v_left
846846+ | _ ->
847847+ None
848848+ in
849849+ let right_leaf =
850850+ match List.nth_opt seq index with
851851+ | Some (Leaf (_, v_right, _)) ->
852852+ Some v_right
853853+ | _ ->
854854+ None
855855+ in
856856+ let%lwt () = add_leaf left_leaf in
857857+ add_leaf right_leaf ) )
858858+ in
859859+ let%lwt () = Lwt_list.iter_s (proof_for_key_cached cid) keys in
860860+ Lwt.return !acc
861861+637862 (* collects all node blocks (cid, bytes) and all leaf cids reachable from root
638863 only traverses nodes; doesn't fetch leaf blocks
639864 returns (nodes, visited, leaves) *)
640865 let collect_nodes_and_leaves t :
641866 ((Cid.t * bytes) list * Cid.Set.t * Cid.Set.t) Lwt.t =
642642- let rec bfs (queue : Cid.t list) (visited : Cid.Set.t)
643643- (nodes : (Cid.t * bytes) list) (leaves : Cid.Set.t) =
867867+ let rec loop queue visited nodes leaves =
644868 match queue with
645869 | [] ->
646870 Lwt.return (nodes, visited, leaves)
647647- | cid :: rest -> (
648648- if Cid.Set.mem cid visited then bfs rest visited nodes leaves
649649- else
650650- let%lwt bytes_opt = Store.get_bytes t.blockstore cid in
651651- match bytes_opt with
652652- | None ->
653653- failwith ("missing mst node block: " ^ Cid.to_string cid)
654654- | Some bytes ->
655655- let raw = decode_block_raw bytes in
656656- (* queue subtrees *)
657657- let next_cids =
658658- let acc = match raw.l with Some l -> [l] | None -> [] in
659659- List.fold_left
660660- (fun acc e ->
661661- match e.t with Some r -> r :: acc | None -> acc )
662662- acc raw.e
663663- in
664664- (* accumulate leaf cids *)
665665- let leaves' =
666666- List.fold_left (fun s e -> Cid.Set.add e.v s) leaves raw.e
667667- in
668668- let visited' = Cid.Set.add cid visited in
669669- bfs
670670- (List.rev_append next_cids rest)
671671- visited' ((cid, bytes) :: nodes) leaves' )
871871+ | _ ->
872872+ let batch, rest = take_n batch_size queue in
873873+ let to_fetch =
874874+ List.filter (fun cid -> not (Cid.Set.mem cid visited)) batch
875875+ in
876876+ let%lwt blocks = get_blocks_exn t to_fetch in
877877+ let visited', nodes', leaves', next_queue =
878878+ List.fold_left
879879+ (fun (visited, nodes, leaves, queue) cid ->
880880+ if Cid.Set.mem cid visited then (visited, nodes, leaves, queue)
881881+ else
882882+ let bytes = Block_map.get cid blocks |> Option.get in
883883+ let raw = decode_block_raw bytes in
884884+ let next_cids =
885885+ let acc = match raw.l with Some l -> [l] | None -> [] in
886886+ List.fold_left
887887+ (fun acc e ->
888888+ match e.t with Some r -> r :: acc | None -> acc )
889889+ acc raw.e
890890+ in
891891+ let leaves' =
892892+ List.fold_left (fun s e -> Cid.Set.add e.v s) leaves raw.e
893893+ in
894894+ let visited' = Cid.Set.add cid visited in
895895+ ( visited'
896896+ , (cid, bytes) :: nodes
897897+ , leaves'
898898+ , List.rev_append next_cids queue ) )
899899+ (visited, nodes, leaves, rest) batch
900900+ in
901901+ loop next_queue visited' nodes' leaves'
672902 in
673673- bfs [t.root] Cid.Set.empty [] Cid.Set.empty
903903+ loop [t.root] Cid.Set.empty [] Cid.Set.empty
674904675905 (* list of all leaves belonging to a node and its children, ordered by key *)
676906 let rec leaves_of_node n : (string * Cid.t) list Lwt.t =
677907 let%lwt left_leaves =
678908 n.left >>? function Some l -> leaves_of_node l | None -> Lwt.return []
679909 in
680680- let sorted_entries =
681681- List.sort
682682- (fun (a : entry) (b : entry) -> String.compare a.key b.key)
683683- n.entries
910910+ let entries =
911911+ if entries_are_sorted n.entries then n.entries
912912+ else
913913+ List.sort
914914+ (fun (a : entry) (b : entry) -> String.compare a.key b.key)
915915+ n.entries
684916 in
685685- let%lwt leaves =
686686- Lwt_list.fold_left_s
687687- (fun acc e ->
917917+ let%lwt entry_sublists =
918918+ Lwt_list.map_s
919919+ (fun e ->
688920 let%lwt right_leaves =
689921 e.right
690922 >>? function Some r -> leaves_of_node r | None -> Lwt.return []
691923 in
692692- Lwt.return (acc @ [(e.key, e.value)] @ right_leaves) )
693693- left_leaves sorted_entries
924924+ Lwt.return ((e.key, e.value) :: right_leaves) )
925925+ entries
694926 in
695695- Lwt.return leaves
927927+ Lwt.return (left_leaves @ List.concat entry_sublists)
696928697929 (* list of all leaves in the mst *)
698930 let leaves_of_root t : (string * Cid.t) list Lwt.t =
···704936705937 (* returns a count of all leaves in the mst *)
706938 let leaf_count t : int Lwt.t =
707707- match%lwt retrieve_node t t.root with
708708- | None ->
709709- failwith "root cid not found in repo store"
710710- | Some root ->
711711- let rec count (n : node) : int Lwt.t =
712712- let%lwt left_count =
713713- n.left >>? function Some l -> count l | None -> Lwt.return 0
939939+ let rec loop queue visited acc =
940940+ match queue with
941941+ | [] ->
942942+ Lwt.return acc
943943+ | _ ->
944944+ let batch, rest = take_n batch_size queue in
945945+ let to_fetch =
946946+ List.filter (fun cid -> not (Cid.Set.mem cid visited)) batch
714947 in
715715- let%lwt right_counts =
716716- Lwt_list.map_s
717717- (fun (e : entry) ->
718718- e.right >>? function Some r -> count r | None -> Lwt.return 0 )
719719- n.entries
948948+ let%lwt blocks = get_blocks_exn t to_fetch in
949949+ let visited', acc', next_queue =
950950+ List.fold_left
951951+ (fun (visited, acc, queue) cid ->
952952+ if Cid.Set.mem cid visited then (visited, acc, queue)
953953+ else
954954+ let bytes = Block_map.get cid blocks |> Option.get in
955955+ let raw = decode_block_raw bytes in
956956+ let next_cids =
957957+ let acc = match raw.l with Some l -> [l] | None -> [] in
958958+ List.fold_left
959959+ (fun acc e ->
960960+ match e.t with Some r -> r :: acc | None -> acc )
961961+ acc raw.e
962962+ in
963963+ let visited' = Cid.Set.add cid visited in
964964+ ( visited'
965965+ , acc + List.length raw.e
966966+ , List.rev_append next_cids queue ) )
967967+ (visited, acc, rest) batch
720968 in
721721- let sum_right = List.fold_left ( + ) 0 right_counts in
722722- Lwt.return (left_count + List.length n.entries + sum_right)
723723- in
724724- count root
969969+ loop next_queue visited' acc'
970970+ in
971971+ loop [t.root] Cid.Set.empty 0
725972726973 (* returns height of mst root *)
727974 let layer t : int Lwt.t =
···733980734981 (* returns all nodes sorted by cid *)
735982 let all_nodes t : (Cid.t * bytes) list Lwt.t =
736736- let rec bfs (queue : Cid.t list) (visited : Cid.Set.t)
737737- (nodes : (Cid.t * bytes) list) =
983983+ let rec loop queue visited nodes =
738984 match queue with
739985 | [] ->
740986 Lwt.return nodes
741741- | cid :: rest -> (
742742- if Cid.Set.mem cid visited then bfs rest visited nodes
743743- else
744744- match%lwt Store.get_bytes t.blockstore cid with
745745- | None ->
746746- failwith ("missing mst node block: " ^ Cid.to_string cid)
747747- | Some bytes ->
748748- let raw = decode_block_raw bytes in
749749- let next_cids =
750750- let acc = match raw.l with Some l -> [l] | None -> [] in
751751- List.fold_left
752752- (fun acc e ->
753753- match e.t with Some r -> r :: acc | None -> acc )
754754- acc raw.e
755755- in
756756- let visited' = Cid.Set.add cid visited in
757757- bfs
758758- (List.rev_append next_cids rest)
759759- visited' ((cid, bytes) :: nodes) )
987987+ | _ ->
988988+ let batch, rest = take_n batch_size queue in
989989+ let to_fetch =
990990+ List.filter (fun cid -> not (Cid.Set.mem cid visited)) batch
991991+ in
992992+ let%lwt blocks = get_blocks_exn t to_fetch in
993993+ let visited', nodes', next_queue =
994994+ List.fold_left
995995+ (fun (visited, nodes, queue) cid ->
996996+ if Cid.Set.mem cid visited then (visited, nodes, queue)
997997+ else
998998+ let bytes = Block_map.get cid blocks |> Option.get in
999999+ let raw = decode_block_raw bytes in
10001000+ let next_cids =
10011001+ let acc = match raw.l with Some l -> [l] | None -> [] in
10021002+ List.fold_left
10031003+ (fun acc e ->
10041004+ match e.t with Some r -> r :: acc | None -> acc )
10051005+ acc raw.e
10061006+ in
10071007+ let visited' = Cid.Set.add cid visited in
10081008+ ( visited'
10091009+ , (cid, bytes) :: nodes
10101010+ , List.rev_append next_cids queue ) )
10111011+ (visited, nodes, rest) batch
10121012+ in
10131013+ loop next_queue visited' nodes'
7601014 in
761761- let%lwt nodes = bfs [t.root] Cid.Set.empty [] in
10151015+ let%lwt nodes = loop [t.root] Cid.Set.empty [] in
7621016 let sorted =
7631017 List.sort
7641018 (fun (a, _) (b, _) -> String.compare (Cid.to_string a) (Cid.to_string b))
···8031057 let root_layer =
8041058 List.fold_left (fun acc (_, _, lz) -> max acc lz) 0 with_layers
8051059 in
806806- let on_layer =
807807- List.filter (fun (_, _, lz) -> lz = root_layer) with_layers
808808- |> List.map (fun (k, v, _) -> (k, v))
809809- in
810810- (* left group is keys below first on-layer key *)
811811- let left_group =
812812- match on_layer with
813813- | (k0, _) :: _ ->
814814- List.filter
815815- (fun (k, _, lz) -> lz < root_layer && k < k0)
816816- with_layers
817817- |> List.map (fun (k, v, _) -> (k, v))
818818- | [] ->
819819- []
10601060+ let left_group, on_layer, right_groups =
10611061+ let left_group = ref [] in
10621062+ let current_group = ref [] in
10631063+ let on_layer_rev = ref [] in
10641064+ let groups_rev = ref [] in
10651065+ let seen_on = ref false in
10661066+ List.iter
10671067+ (fun (k, v, lz) ->
10681068+ if lz = root_layer then (
10691069+ if not !seen_on then left_group := List.rev !current_group
10701070+ else groups_rev := List.rev !current_group :: !groups_rev ;
10711071+ current_group := [] ;
10721072+ on_layer_rev := (k, v) :: !on_layer_rev ;
10731073+ seen_on := true )
10741074+ else current_group := (k, v) :: !current_group )
10751075+ with_layers ;
10761076+ let on_layer = List.rev !on_layer_rev in
10771077+ let right_groups =
10781078+ if not !seen_on then []
10791079+ else List.rev (List.rev !current_group :: !groups_rev)
10801080+ in
10811081+ (!left_group, on_layer, right_groups)
8201082 in
8211083 let%lwt l_cid =
8221084 match left_group with
···8381100 let%lwt c = wrap cid child_layer in
8391101 Lwt.return_some c
8401102 in
841841- (* compute right groups aligned to on-layer entries *)
842842- let rec right_groups acc rest =
843843- match rest with
844844- | [] ->
845845- List.rev acc
846846- | (k, _) :: tl ->
847847- let upper =
848848- match tl with (k2, _) :: _ -> Some k2 | [] -> None
849849- in
850850- let grp =
851851- List.filter
852852- (fun (k', _, lz) ->
853853- lz < root_layer && k' > k
854854- && match upper with Some ku -> k' < ku | None -> true )
855855- with_layers
856856- |> List.map (fun (k', v', _) -> (k', v'))
857857- in
858858- right_groups ((k, grp) :: acc) tl
11031103+ let rights =
11041104+ List.map2 (fun (k, _) grp -> (k, grp)) on_layer right_groups
8591105 in
860860- let rights = right_groups [] on_layer in
8611106 let%lwt t_links =
8621107 Lwt_list.map_s
8631108 (fun (_k, grp) ->
-11
mist/lib/util.ml
···48484949let rec last (lst : 'a list) : 'a option =
5050 match lst with [] -> None | [x] -> Some x | _ :: xs -> last xs
5151-5252-let at_index i (lst : 'a list) : 'a option =
5353- let rec aux j = function
5454- | [] ->
5555- None
5656- | [x] ->
5757- Some x
5858- | x :: xs ->
5959- if j = 0 then Some x else aux (j - 1) xs
6060- in
6161- aux i lst
+1-1
pegasus/bench/bench_repository.ml
···406406let bench_db_io_patterns () =
407407 print_header "database i/o" ;
408408 let%lwt db, path = setup_test_db () in
409409- let size = 100000 in
409409+ let size = 20000 in
410410 let blocks = generate_blocks size in
411411 let%lwt () =
412412 Util.use_pool db.db (fun conn -> User_store.Bulk.put_blocks blocks conn) >|= fun _ -> ()
+29-37
pegasus/lib/repository.ml
···221221 ref (Cached_mst.create cached_store prev_commit.data)
222222 in
223223 (* ops to emit, built in loop because prev_data (previous cid) is otherwise inaccessible *)
224224- let commit_ops : commit_evt_op list ref = ref [] in
224224+ let commit_ops_rev : commit_evt_op list ref = ref [] in
225225 let added_leaves = ref Block_map.empty in
226226 let%lwt results =
227227 Lwt_list.map_s
···250250 User_store.put_record t.db (`LexMap record_with_type) path
251251 in
252252 added_leaves := Block_map.set cid block !added_leaves ;
253253- commit_ops :=
254254- !commit_ops @ [{action= `Create; path; cid= Some cid; prev= None}] ;
253253+ commit_ops_rev :=
254254+ {action= `Create; path; cid= Some cid; prev= None}
255255+ :: !commit_ops_rev ;
255256 let%lwt new_mst = Cached_mst.add !mst path cid in
256257 mst := new_mst ;
257258 let refs =
···272273 | Update {collection; rkey; value; swap_record; _} ->
273274 let path = Format.sprintf "%s/%s" collection rkey in
274275 let uri = Format.sprintf "at://%s/%s" t.did path in
275275- let%lwt old_cid = User_store.get_record_cid t.db path in
276276+ let%lwt existing_record = User_store.get_record t.db path in
277277+ let old_cid = Option.map (fun (r : record) -> r.cid) existing_record in
276278 ( if
277279 (swap_record <> None && swap_record <> old_cid)
278280 || (swap_record = None && old_cid = None)
···288290 (Format.sprintf "attempted to update record %s with cid %s"
289291 path cid_str ) ) ;
290292 let%lwt () =
291291- match old_cid with
292292- | Some _ -> (
293293- match%lwt User_store.get_record t.db path with
294294- | Some record ->
295295- let refs =
296296- Util.find_blob_refs record.value
297297- |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref)
293293+ match existing_record with
294294+ | Some record ->
295295+ let refs =
296296+ Util.find_blob_refs record.value
297297+ |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref)
298298+ in
299299+ if not (List.is_empty refs) then
300300+ let%lwt _ =
301301+ User_store.delete_orphaned_blobs_by_record_path t.db path
298302 in
299299- if not (List.is_empty refs) then
300300- let%lwt _ =
301301- User_store.delete_orphaned_blobs_by_record_path t.db
302302- path
303303- in
304304- Lwt.return_unit
305305- else Lwt.return_unit
306306- | None ->
307307- Lwt.return_unit )
303303+ Lwt.return_unit
304304+ else Lwt.return_unit
308305 | None ->
309306 Lwt.return_unit
310307 in
···316313 User_store.put_record t.db (`LexMap record_with_type) path
317314 in
318315 added_leaves := Block_map.set new_cid new_block !added_leaves ;
319319- commit_ops :=
320320- !commit_ops
321321- @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ;
316316+ commit_ops_rev :=
317317+ {action= `Update; path; cid= Some new_cid; prev= old_cid}
318318+ :: !commit_ops_rev ;
322319 let%lwt new_mst = Cached_mst.add !mst path new_cid in
323320 mst := new_mst ;
324321 let refs =
···339336 ; cid= new_cid } )
340337 | Delete {collection; rkey; swap_record; _} ->
341338 let path = Format.sprintf "%s/%s" collection rkey in
342342- let%lwt cid = User_store.get_record_cid t.db path in
339339+ let%lwt existing_record = User_store.get_record t.db path in
340340+ let cid = Option.map (fun (r : record) -> r.cid) existing_record in
343341 ( if cid = None || (swap_record <> None && swap_record <> cid) then
344342 let cid_str =
345343 match cid with
···352350 (Format.sprintf "attempted to delete record %s with cid %s"
353351 path cid_str ) ) ;
354352 let%lwt () =
355355- match%lwt User_store.get_record t.db path with
353353+ match existing_record with
356354 | Some record ->
357355 let refs =
358356 Util.find_blob_refs record.value
···368366 Lwt.return_unit
369367 in
370368 let%lwt () = User_store.delete_record t.db path in
371371- commit_ops :=
372372- !commit_ops @ [{action= `Delete; path; cid= None; prev= cid}] ;
369369+ commit_ops_rev :=
370370+ {action= `Delete; path; cid= None; prev= cid} :: !commit_ops_rev ;
373371 let%lwt new_mst = Cached_mst.delete !mst path in
374372 mst := new_mst ;
375373 Lwt.return
376374 (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) )
377375 writes
378376 in
377377+ let commit_ops = List.rev !commit_ops_rev in
379378 let new_mst = !mst in
380379 (* flush all writes, ensuring all blocks are written or none are *)
381380 let%lwt () =
···390389 let commit_block =
391390 new_commit_signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson
392391 in
393393- let%lwt proof_blocks =
394394- Lwt_list.fold_left_s
395395- (fun acc ({path; _} : commit_evt_op) ->
396396- let%lwt key_proof =
397397- Cached_mst.proof_for_key new_mst new_mst.root path
398398- in
399399- Lwt.return (Block_map.merge acc key_proof) )
400400- Block_map.empty !commit_ops
401401- in
392392+ let proof_keys = List.map (fun ({path; _} : commit_evt_op) -> path) commit_ops in
393393+ let%lwt proof_blocks = Cached_mst.proof_for_keys new_mst new_mst.root proof_keys in
402394 let proof_blocks = Block_map.merge proof_blocks !added_leaves in
403395 let block_stream =
404396 proof_blocks |> Block_map.entries |> Lwt_seq.of_list
···410402 let%lwt ds = Data_store.connect () in
411403 let%lwt _ =
412404 Sequencer.sequence_commit ds ~did:t.did ~commit:new_commit_cid
413413- ~rev:new_commit_signed.rev ~blocks ~ops:!commit_ops ~since:prev_commit.rev
405405+ ~rev:new_commit_signed.rev ~blocks ~ops:commit_ops ~since:prev_commit.rev
414406 ~prev_data:prev_commit.data ()
415407 in
416408 Lwt.return {commit= new_commit; results}
+156-149
pegasus/lib/user_store.ml
···314314 ~path ~cids
315315end
316316317317+module Bulk = struct
318318+ open struct
319319+ let escape_sql_string s = Str.global_replace (Str.regexp "'") "''" s
320320+321321+ let bytes_to_hex data =
322322+ let buf = Buffer.create (Bytes.length data * 2) in
323323+ Bytes.iter
324324+ (fun c -> Buffer.add_string buf (Printf.sprintf "%02x" (Char.code c)))
325325+ data ;
326326+ Buffer.contents buf
327327+328328+ let chunk_list n lst =
329329+ if n <= 0 then invalid_arg "negative n passed to chunk_list" ;
330330+ let rec take_n acc remaining xs =
331331+ match (remaining, xs) with
332332+ | _, [] ->
333333+ (List.rev acc, [])
334334+ | 0, rest ->
335335+ (List.rev acc, rest)
336336+ | _, x :: xs' ->
337337+ take_n (x :: acc) (remaining - 1) xs'
338338+ in
339339+ let rec go xs =
340340+ match xs with
341341+ | [] ->
342342+ []
343343+ | _ ->
344344+ let chunk, rest = take_n [] n xs in
345345+ chunk :: go rest
346346+ in
347347+ go lst
348348+ end
349349+350350+ let put_blocks (blocks : (Cid.t * bytes) list) conn =
351351+ if List.is_empty blocks then Lwt.return_ok ()
352352+ else
353353+ let module C = (val conn : Caqti_lwt.CONNECTION) in
354354+ let chunks = chunk_list 200 blocks in
355355+ let rec process_chunks = function
356356+ | [] ->
357357+ Lwt.return_ok ()
358358+ | chunk :: rest -> (
359359+ let values =
360360+ List.map
361361+ (fun (cid, data) ->
362362+ let cid_str = escape_sql_string (Cid.to_string cid) in
363363+ let hex_data = bytes_to_hex data in
364364+ Printf.sprintf "('%s', CAST(X'%s' AS TEXT))" cid_str hex_data )
365365+ chunk
366366+ |> String.concat ", "
367367+ in
368368+ let sql =
369369+ Printf.sprintf
370370+ "INSERT INTO mst (cid, data) VALUES %s ON CONFLICT DO NOTHING"
371371+ values
372372+ in
373373+ let query =
374374+ Caqti_request.Infix.( ->. ) Caqti_type.unit Caqti_type.unit sql
375375+ in
376376+ let%lwt result = C.exec query () in
377377+ match result with
378378+ | Ok () ->
379379+ process_chunks rest
380380+ | Error e ->
381381+ Lwt.return_error e )
382382+ in
383383+ process_chunks chunks
384384+385385+ let put_records (records : (string * Cid.t * bytes * string) list) conn =
386386+ if List.is_empty records then Lwt.return_ok ()
387387+ else
388388+ let module C = (val conn : Caqti_lwt.CONNECTION) in
389389+ let chunks = chunk_list 100 records in
390390+ let rec process_chunks = function
391391+ | [] ->
392392+ Lwt.return_ok ()
393393+ | chunk :: rest -> (
394394+ let values =
395395+ List.map
396396+ (fun (path, cid, data, since) ->
397397+ let hex_data = bytes_to_hex data in
398398+ Printf.sprintf "('%s', '%s', CAST(X'%s' AS TEXT), '%s')"
399399+ (escape_sql_string path)
400400+ (escape_sql_string (Cid.to_string cid))
401401+ hex_data (escape_sql_string since) )
402402+ chunk
403403+ |> String.concat ", "
404404+ in
405405+ let sql =
406406+ Printf.sprintf
407407+ "INSERT INTO records (path, cid, data, since) VALUES %s ON \
408408+ CONFLICT (path) DO UPDATE SET cid = excluded.cid, data = \
409409+ excluded.data, since = excluded.since"
410410+ values
411411+ in
412412+ let query =
413413+ Caqti_request.Infix.( ->. ) Caqti_type.unit Caqti_type.unit sql
414414+ in
415415+ let%lwt result = C.exec query () in
416416+ match result with
417417+ | Ok () ->
418418+ process_chunks rest
419419+ | Error e ->
420420+ Lwt.return_error e )
421421+ in
422422+ process_chunks chunks
423423+424424+ let put_blob_refs (refs : (string * Cid.t) list) conn =
425425+ if List.is_empty refs then Lwt.return_ok ()
426426+ else
427427+ let module C = (val conn : Caqti_lwt.CONNECTION) in
428428+ let chunks = chunk_list 200 refs in
429429+ let rec process_chunks = function
430430+ | [] ->
431431+ Lwt.return_ok ()
432432+ | chunk :: rest -> (
433433+ let values =
434434+ List.map
435435+ (fun (path, cid) ->
436436+ Printf.sprintf "('%s', '%s')" (escape_sql_string path)
437437+ (escape_sql_string (Cid.to_string cid)) )
438438+ chunk
439439+ |> String.concat ", "
440440+ in
441441+ let sql =
442442+ Printf.sprintf
443443+ "INSERT INTO blobs_records (record_path, blob_cid) VALUES %s \
444444+ ON CONFLICT DO NOTHING"
445445+ values
446446+ in
447447+ let query =
448448+ Caqti_request.Infix.( ->. ) Caqti_type.unit Caqti_type.unit sql
449449+ in
450450+ let%lwt result = C.exec query () in
451451+ match result with
452452+ | Ok () ->
453453+ process_chunks rest
454454+ | Error e ->
455455+ Lwt.return_error e )
456456+ in
457457+ process_chunks chunks
458458+end
459459+317460type t = {did: string; db: Util.caqti_pool}
318461319462let pool_cache : (string, t) Hashtbl.t = Hashtbl.create 64
···351494 Lwt.return ({blocks= Block_map.empty; missing= []} : Block_map.with_missing)
352495 else
353496 let%lwt blocks = Util.use_pool t.db @@ Queries.get_blocks cids in
497497+ let found_map =
498498+ List.fold_left
499499+ (fun acc ({cid; data} : block) -> Block_map.set cid data acc)
500500+ Block_map.empty blocks
501501+ in
354502 Lwt.return
355503 (List.fold_left
356504 (fun (acc : Block_map.with_missing) cid ->
357357- match List.find_opt (fun (b : block) -> b.cid = cid) blocks with
358358- | Some {data; _} ->
505505+ match Block_map.get cid found_map with
506506+ | Some data ->
359507 {acc with blocks= Block_map.set cid data acc.blocks}
360508 | None ->
361509 {acc with missing= cid :: acc.missing} )
···376524 Lwt.return false
377525378526let put_many t bm : (int, exn) Lwt_result.t =
379379- Util.multi_query t.db
380380- (List.map
381381- (fun (cid, block) -> Queries.put_block cid block)
382382- (Block_map.entries bm) )
527527+ let entries = Block_map.entries bm in
528528+ if List.is_empty entries then Lwt.return_ok 0
529529+ else
530530+ Lwt_result.catch (fun () ->
531531+ let%lwt () = Util.use_pool t.db (fun conn -> Bulk.put_blocks entries conn) in
532532+ Lwt.return (List.length entries) )
383533384534let delete_block t cid : (bool, exn) Lwt_result.t =
385535 Lwt_result.catch
···569719 let storage_str = Blob_store.storage_to_string storage in
570720 Util.use_pool t.db
571721 @@ Queries.list_blobs_by_storage ~storage:storage_str ~limit ~cursor
572572-573573-module Bulk = struct
574574- open struct
575575- let escape_sql_string s = Str.global_replace (Str.regexp "'") "''" s
576576-577577- let bytes_to_hex data =
578578- let buf = Buffer.create (Bytes.length data * 2) in
579579- Bytes.iter
580580- (fun c -> Buffer.add_string buf (Printf.sprintf "%02x" (Char.code c)))
581581- data ;
582582- Buffer.contents buf
583583-584584- let chunk_list n lst =
585585- if n <= 0 then invalid_arg "negative n passed to chunk_list" ;
586586- let rec take_n acc remaining xs =
587587- match (remaining, xs) with
588588- | _, [] ->
589589- (List.rev acc, [])
590590- | 0, rest ->
591591- (List.rev acc, rest)
592592- | _, x :: xs' ->
593593- take_n (x :: acc) (remaining - 1) xs'
594594- in
595595- let rec go xs =
596596- match xs with
597597- | [] ->
598598- []
599599- | _ ->
600600- let chunk, rest = take_n [] n xs in
601601- chunk :: go rest
602602- in
603603- go lst
604604- end
605605-606606- let put_blocks (blocks : (Cid.t * bytes) list) conn =
607607- if List.is_empty blocks then Lwt.return_ok ()
608608- else
609609- let module C = (val conn : Caqti_lwt.CONNECTION) in
610610- let chunks = chunk_list 200 blocks in
611611- let rec process_chunks = function
612612- | [] ->
613613- Lwt.return_ok ()
614614- | chunk :: rest -> (
615615- let values =
616616- List.map
617617- (fun (cid, data) ->
618618- let cid_str = escape_sql_string (Cid.to_string cid) in
619619- let hex_data = bytes_to_hex data in
620620- Printf.sprintf "('%s', CAST(X'%s' AS TEXT))" cid_str hex_data )
621621- chunk
622622- |> String.concat ", "
623623- in
624624- let sql =
625625- Printf.sprintf
626626- "INSERT INTO mst (cid, data) VALUES %s ON CONFLICT DO NOTHING"
627627- values
628628- in
629629- let query =
630630- Caqti_request.Infix.( ->. ) Caqti_type.unit Caqti_type.unit sql
631631- in
632632- let%lwt result = C.exec query () in
633633- match result with
634634- | Ok () ->
635635- process_chunks rest
636636- | Error e ->
637637- Lwt.return_error e )
638638- in
639639- process_chunks chunks
640640-641641- let put_records (records : (string * Cid.t * bytes * string) list) conn =
642642- if List.is_empty records then Lwt.return_ok ()
643643- else
644644- let module C = (val conn : Caqti_lwt.CONNECTION) in
645645- let chunks = chunk_list 100 records in
646646- let rec process_chunks = function
647647- | [] ->
648648- Lwt.return_ok ()
649649- | chunk :: rest -> (
650650- let values =
651651- List.map
652652- (fun (path, cid, data, since) ->
653653- let hex_data = bytes_to_hex data in
654654- Printf.sprintf "('%s', '%s', CAST(X'%s' AS TEXT), '%s')"
655655- (escape_sql_string path)
656656- (escape_sql_string (Cid.to_string cid))
657657- hex_data (escape_sql_string since) )
658658- chunk
659659- |> String.concat ", "
660660- in
661661- let sql =
662662- Printf.sprintf
663663- "INSERT INTO records (path, cid, data, since) VALUES %s ON \
664664- CONFLICT (path) DO UPDATE SET cid = excluded.cid, data = \
665665- excluded.data, since = excluded.since"
666666- values
667667- in
668668- let query =
669669- Caqti_request.Infix.( ->. ) Caqti_type.unit Caqti_type.unit sql
670670- in
671671- let%lwt result = C.exec query () in
672672- match result with
673673- | Ok () ->
674674- process_chunks rest
675675- | Error e ->
676676- Lwt.return_error e )
677677- in
678678- process_chunks chunks
679679-680680- let put_blob_refs (refs : (string * Cid.t) list) conn =
681681- if List.is_empty refs then Lwt.return_ok ()
682682- else
683683- let module C = (val conn : Caqti_lwt.CONNECTION) in
684684- let chunks = chunk_list 200 refs in
685685- let rec process_chunks = function
686686- | [] ->
687687- Lwt.return_ok ()
688688- | chunk :: rest -> (
689689- let values =
690690- List.map
691691- (fun (path, cid) ->
692692- Printf.sprintf "('%s', '%s')" (escape_sql_string path)
693693- (escape_sql_string (Cid.to_string cid)) )
694694- chunk
695695- |> String.concat ", "
696696- in
697697- let sql =
698698- Printf.sprintf
699699- "INSERT INTO blobs_records (record_path, blob_cid) VALUES %s \
700700- ON CONFLICT DO NOTHING"
701701- values
702702- in
703703- let query =
704704- Caqti_request.Infix.( ->. ) Caqti_type.unit Caqti_type.unit sql
705705- in
706706- let%lwt result = C.exec query () in
707707- match result with
708708- | Ok () ->
709709- process_chunks rest
710710- | Error e ->
711711- Lwt.return_error e )
712712- in
713713- process_chunks chunks
714714-end