tangled
alpha
login
or
join now
futur.blue
/
pegasus
57
fork
atom
objective categorical abstract machine language personal data server
57
fork
atom
overview
issues
2
pulls
pipelines
MST diff logic
futur.blue
7 months ago
46d545d3
e1cce0f6
verified
This commit was signed with the committer's
known signature
.
futur.blue
SSH Key Fingerprint:
SHA256:QHGqHWNpqYyw9bt8KmPuJIyeZX9SZewBZ0PR1COtKQ0=
+173
1 changed file
expand all
collapse all
unified
split
mist
lib
mst.ml
+173
mist/lib/mst.ml
···
570
570
(List.fold_left
571
571
(fun acc proof -> Block_map.merge acc proof)
572
572
Block_map.empty proofs )
573
573
+
574
574
+
(*** diffs ***)
575
575
+
type diff_add = {key: string; cid: Cid.t}
576
576
+
577
577
+
type diff_update = {key: string; prev: Cid.t; cid: Cid.t}
578
578
+
579
579
+
type diff_delete = {key: string; cid: Cid.t}
580
580
+
581
581
+
type data_diff =
582
582
+
{ adds: diff_add list
583
583
+
; updates: diff_update list
584
584
+
; deletes: diff_delete list
585
585
+
; new_mst_blocks: (Cid.t * bytes) list
586
586
+
; new_leaf_cids: Cid.Set.t
587
587
+
; removed_cids: Cid.Set.t }
588
588
+
589
589
+
(* collects all node blocks (cid, bytes) and all leaf cids reachable from root
590
590
+
only traverses nodes; doesn't fetch leaf blocks
591
591
+
returns (nodes, visited, leaves) *)
592
592
+
let collect_nodes_and_leaves (t : t) :
593
593
+
((Cid.t * bytes) list * Cid.Set.t * Cid.Set.t) Lwt.t =
594
594
+
let rec bfs (queue : Cid.t list) (visited : Cid.Set.t)
595
595
+
(nodes : (Cid.t * bytes) list) (leaves : Cid.Set.t) =
596
596
+
match queue with
597
597
+
| [] ->
598
598
+
Lwt.return (nodes, visited, leaves)
599
599
+
| cid :: rest -> (
600
600
+
if Cid.Set.mem cid visited then bfs rest visited nodes leaves
601
601
+
else
602
602
+
let%lwt bytes_opt = Store.get_bytes t.blockstore cid in
603
603
+
match bytes_opt with
604
604
+
| None ->
605
605
+
failwith ("missing mst node block: " ^ Cid.to_string cid)
606
606
+
| Some bytes ->
607
607
+
let raw = decode_block_raw bytes in
608
608
+
(* queue subtrees *)
609
609
+
let next_cids =
610
610
+
let acc = match raw.l with Some l -> [l] | None -> [] in
611
611
+
List.fold_left
612
612
+
(fun acc e ->
613
613
+
match e.t with Some r -> r :: acc | None -> acc )
614
614
+
acc raw.e
615
615
+
in
616
616
+
(* accumulate leaf cids *)
617
617
+
let leaves' =
618
618
+
List.fold_left (fun s e -> Cid.Set.add e.v s) leaves raw.e
619
619
+
in
620
620
+
let visited' = Cid.Set.add cid visited in
621
621
+
bfs
622
622
+
(List.rev_append next_cids rest)
623
623
+
visited' ((cid, bytes) :: nodes) leaves' )
624
624
+
in
625
625
+
bfs [t.root] Cid.Set.empty [] Cid.Set.empty
626
626
+
627
627
+
(* list of all leaves belonging to a node, ordered by key *)
628
628
+
let rec leaves_of_node (n : node) : (string * Cid.t) list Lwt.t =
629
629
+
let%lwt left_leaves =
630
630
+
n.left >>? function Some l -> leaves_of_node l | None -> Lwt.return []
631
631
+
in
632
632
+
let sorted_entries =
633
633
+
List.sort
634
634
+
(fun (a : entry) (b : entry) -> String.compare a.key b.key)
635
635
+
n.entries
636
636
+
in
637
637
+
let%lwt leaves =
638
638
+
Lwt_list.fold_left_s
639
639
+
(fun acc e ->
640
640
+
let%lwt right_leaves =
641
641
+
e.right
642
642
+
>>? function Some r -> leaves_of_node r | None -> Lwt.return []
643
643
+
in
644
644
+
Lwt.return (acc @ [(e.key, e.value)] @ right_leaves) )
645
645
+
left_leaves sorted_entries
646
646
+
in
647
647
+
Lwt.return leaves
648
648
+
649
649
+
(* little helper *)
650
650
+
let leaves_of_root (t : t) : (string * Cid.t) list Lwt.t =
651
651
+
match%lwt retrieve_node t t.root with
652
652
+
| None ->
653
653
+
failwith "root cid not found in repo store"
654
654
+
| Some root ->
655
655
+
leaves_of_node root
656
656
+
657
657
+
(* produces a diff from an empty mst to the current one *)
658
658
+
let null_diff (curr : t) : data_diff Lwt.t =
659
659
+
let%lwt curr_nodes, _, curr_leaf_set = collect_nodes_and_leaves curr in
660
660
+
let%lwt curr_leaves = leaves_of_root curr in
661
661
+
let adds = List.map (fun (key, cid) : diff_add -> {key; cid}) curr_leaves in
662
662
+
Lwt.return
663
663
+
{ adds
664
664
+
; updates= []
665
665
+
; deletes= []
666
666
+
; new_mst_blocks= curr_nodes
667
667
+
; new_leaf_cids= curr_leaf_set
668
668
+
; removed_cids= Cid.Set.empty }
669
669
+
670
670
+
(* produces a diff between two msts *)
671
671
+
let mst_diff (t_curr : t) (t_prev_opt : t option) : data_diff Lwt.t =
672
672
+
match t_prev_opt with
673
673
+
| None ->
674
674
+
null_diff t_curr
675
675
+
| Some t_prev ->
676
676
+
let%lwt curr_nodes, curr_node_set, curr_leaf_set =
677
677
+
collect_nodes_and_leaves t_curr
678
678
+
in
679
679
+
let%lwt _, prev_node_set, prev_leaf_set =
680
680
+
collect_nodes_and_leaves t_prev
681
681
+
in
682
682
+
(* just convenient to have these functions *)
683
683
+
let in_prev_nodes cid = Cid.Set.mem cid prev_node_set in
684
684
+
let in_curr_nodes cid = Cid.Set.mem cid curr_node_set in
685
685
+
let in_prev_leaves cid = Cid.Set.mem cid prev_leaf_set in
686
686
+
let in_curr_leaves cid = Cid.Set.mem cid curr_leaf_set in
687
687
+
(* new mst blocks are curr nodes that are not in prev *)
688
688
+
let new_mst_blocks =
689
689
+
List.filter (fun (cid, _) -> not (in_prev_nodes cid)) curr_nodes
690
690
+
in
691
691
+
(* removed cids are prev nodes not in curr plus prev leaves not in curr *)
692
692
+
let removed_node_cids =
693
693
+
Cid.Set.fold
694
694
+
(fun cid acc ->
695
695
+
if not (in_curr_nodes cid) then Cid.Set.add cid acc else acc )
696
696
+
prev_node_set Cid.Set.empty
697
697
+
in
698
698
+
let removed_leaf_cids =
699
699
+
Cid.Set.fold
700
700
+
(fun cid acc ->
701
701
+
if not (in_curr_leaves cid) then Cid.Set.add cid acc else acc )
702
702
+
prev_leaf_set Cid.Set.empty
703
703
+
in
704
704
+
let removed_cids = Cid.Set.union removed_node_cids removed_leaf_cids in
705
705
+
(* new leaf cids are curr leaves not in prev *)
706
706
+
let new_leaf_cids =
707
707
+
Cid.Set.fold
708
708
+
(fun cid acc ->
709
709
+
if not (in_prev_leaves cid) then Cid.Set.add cid acc else acc )
710
710
+
curr_leaf_set Cid.Set.empty
711
711
+
in
712
712
+
(* compute adds/updates/deletes by merging sorted leaves *)
713
713
+
let%lwt curr_leaves = leaves_of_root t_curr in
714
714
+
let%lwt prev_leaves = leaves_of_root t_prev in
715
715
+
let rec merge (pl : (string * Cid.t) list) (cl : (string * Cid.t) list)
716
716
+
(adds : diff_add list) (updates : diff_update list)
717
717
+
(deletes : diff_delete list) =
718
718
+
match (pl, cl) with
719
719
+
| [], [] ->
720
720
+
(* we prepend for speed, then reverse at the end *)
721
721
+
(List.rev adds, List.rev updates, List.rev deletes)
722
722
+
| [], (k, c) :: cr ->
723
723
+
(* more curr than prev, goes in adds *)
724
724
+
merge [] cr ({key= k; cid= c} :: adds) updates deletes
725
725
+
| (k, c) :: pr, [] ->
726
726
+
(* more prev than curr, goes in deletes *)
727
727
+
merge pr [] adds updates ({key= k; cid= c} :: deletes)
728
728
+
| (k1, c1) :: pr, (k2, c2) :: cr ->
729
729
+
if k1 = k2 then (* if key & value are the same, keep going *)
730
730
+
if Cid.equal c1 c2 then merge pr cr adds updates deletes
731
731
+
else (* same key, different value; update *)
732
732
+
merge pr cr adds
733
733
+
({key= k1; prev= c1; cid= c2} :: updates)
734
734
+
deletes
735
735
+
else if k1 < k2 then
736
736
+
merge pr ((k2, c2) :: cr) adds updates
737
737
+
({key= k1; cid= c1} :: deletes)
738
738
+
else
739
739
+
merge ((k1, c1) :: pr) cr
740
740
+
({key= k2; cid= c2} :: adds)
741
741
+
updates deletes
742
742
+
in
743
743
+
let adds, updates, deletes = merge prev_leaves curr_leaves [] [] [] in
744
744
+
Lwt.return
745
745
+
{adds; updates; deletes; new_mst_blocks; new_leaf_cids; removed_cids}
573
746
end