objective categorical abstract machine language personal data server

Ensure all blocks in a commit are written to mst

futur.blue 2a8ed6b2 7b776612

verified
+81 -21
+23 -11
mist/lib/mst.ml
··· 791 791 Lwt_result.bind (Store.put_block blockstore cid encoded) (fun _ -> 792 792 Lwt.return_ok {blockstore; root= cid} ) 793 793 794 + (* helper to propagate put_block errors *) 795 + let put_block_exn blockstore cid encoded = 796 + match%lwt Store.put_block blockstore cid encoded with 797 + | Ok _ -> 798 + Lwt.return_unit 799 + | Error e -> 800 + raise e 801 + 794 802 (* builds and persists a canonical mst from sorted leaves *) 795 803 let of_assoc blockstore assoc : t Lwt.t = 796 - let open Lwt.Infix in 797 804 let sorted = 798 805 List.sort (fun (k1, _) (k2, _) -> String.compare k1 k2) assoc 799 806 in ··· 805 812 | [] -> 806 813 let encoded = Dag_cbor.encode (encode_node_raw {l= None; e= []}) in 807 814 let cid = Cid.create Dcbor encoded in 808 - Store.put_block blockstore cid encoded >|= fun _ -> (cid, 0) 815 + let%lwt () = put_block_exn blockstore cid encoded in 816 + Lwt.return (cid, 0) 809 817 | _ -> 810 818 let with_layers = 811 819 List.map (fun (k, v) -> (k, v, Util.leading_zeros_on_hash k)) pairs ··· 842 850 Dag_cbor.encode (encode_node_raw {l= Some cid; e= []}) 843 851 in 844 852 let cid' = Cid.create Dcbor encoded in 845 - Store.put_block blockstore cid' encoded 846 - >>= fun _ -> wrap cid' (layer + 1) 853 + let%lwt () = put_block_exn blockstore cid' encoded in 854 + wrap cid' (layer + 1) 847 855 in 848 - wrap cid child_layer >|= fun c -> Some c 856 + let%lwt c = wrap cid child_layer in 857 + Lwt.return_some c 849 858 in 850 859 (* compute right groups aligned to on-layer entries *) 851 860 let rec right_groups acc rest = ··· 883 892 Dag_cbor.encode (encode_node_raw {l= Some cid; e= []}) 884 893 in 885 894 let cid' = Cid.create Dcbor encoded in 886 - Store.put_block blockstore cid' encoded 887 - >>= fun _ -> wrap cid' (layer + 1) 895 + let%lwt () = put_block_exn blockstore cid' encoded in 896 + wrap cid' (layer + 1) 888 897 in 889 - wrap cid child_layer >|= fun c -> Some c ) 898 + let%lwt c = wrap cid child_layer in 899 + Lwt.return_some c ) 890 900 rights 891 901 in 892 902 let entries_raw = ··· 905 915 let node_raw = {l= l_cid; e= entries_raw} in 906 916 let encoded = Dag_cbor.encode (encode_node_raw node_raw) in 907 917 let cid = Cid.create Dcbor encoded in 908 - Store.put_block blockstore cid encoded >|= fun _ -> (cid, root_layer) 918 + let%lwt () = put_block_exn blockstore cid encoded in 919 + Lwt.return (cid, root_layer) 909 920 in 910 - persist_from_sorted sorted >|= fun (root, _) -> {blockstore; root} 921 + let%lwt root, _ = persist_from_sorted sorted in 922 + Lwt.return {blockstore; root} 911 923 912 924 (* insert or replace an entry, constructing a new canonical mst from scratch *) 913 925 let add_rebuild t key cid : t Lwt.t = ··· 927 939 let persist_node_raw (blockstore : bs) (raw : node_raw) : Cid.t Lwt.t = 928 940 let encoded = Dag_cbor.encode (encode_node_raw raw) in 929 941 let cid = Cid.create Dcbor encoded in 930 - let%lwt _ = Store.put_block blockstore cid encoded in 942 + let%lwt () = put_block_exn blockstore cid encoded in 931 943 Lwt.return cid 932 944 933 945 (* decompress entry keys from a raw node *)
+4
mist/lib/storage/block_map.ml
··· 41 41 42 42 let size = Cid_map.cardinal 43 43 44 + let length = size 45 + 46 + let is_empty = Cid_map.is_empty 47 + 44 48 let byte_size m = Cid_map.fold (fun _ bytes acc -> acc + Bytes.length bytes) m 0 45 49 46 50 let equal = Cid_map.equal Bytes.equal
+38 -8
mist/lib/storage/cache_blockstore.ml
··· 1 - type 'bs data = {mutable reads: Cid.Set.t; mutable cache: Block_map.t; bs: 'bs} 1 + type 'bs data = 2 + { mutable reads: Cid.Set.t 3 + ; mutable cache: Block_map.t 4 + ; mutable pending_writes: Block_map.t 5 + ; bs: 'bs } 2 6 3 - module Make 4 - (Bs : Blockstore.Writable) : sig 7 + module Make (Bs : Blockstore.Writable) : sig 5 8 include Blockstore.Writable 6 9 7 10 val create : Bs.t -> t ··· 9 12 val get_reads : t -> Cid.Set.t 10 13 11 14 val get_cache : t -> Block_map.t 15 + 16 + val get_pending_writes : t -> Block_map.t 17 + 18 + val flush_writes : t -> (unit, exn) Lwt_result.t 12 19 end 13 20 with type t = Bs.t data = struct 14 21 type t = Bs.t data 15 22 16 - let create bs = {reads= Cid.Set.empty; cache= Block_map.empty; bs} 23 + let create bs = 24 + {reads= Cid.Set.empty; cache= Block_map.empty; pending_writes= Block_map.empty; bs} 17 25 18 26 let get_reads t = t.reads 19 27 20 28 let get_cache t = t.cache 21 29 30 + let get_pending_writes t = t.pending_writes 31 + 32 + let flush_writes t = 33 + if Block_map.is_empty t.pending_writes then Lwt_result.return () 34 + else 35 + match%lwt Bs.put_many t.bs t.pending_writes with 36 + | Ok _ -> 37 + t.pending_writes <- Block_map.empty ; 38 + Lwt_result.return () 39 + | Error e -> 40 + Lwt_result.fail e 41 + 22 42 let get_bytes t cid = 23 43 match Block_map.get cid t.cache with 24 44 | Some _ as cached -> ··· 58 78 59 79 let put_block t cid bytes = 60 80 t.cache <- Block_map.set cid bytes t.cache ; 61 - Bs.put_block t.bs cid bytes 81 + t.pending_writes <- Block_map.set cid bytes t.pending_writes ; 82 + (* defer actual write to flush_writes *) 83 + Lwt_result.return true 62 84 63 85 let put_many t blocks = 64 86 Block_map.iter 65 - (fun cid data -> t.cache <- Block_map.set cid data t.cache) 87 + (fun cid data -> 88 + t.cache <- Block_map.set cid data t.cache ; 89 + t.pending_writes <- Block_map.set cid data t.pending_writes ) 66 90 blocks ; 67 - Bs.put_many t.bs blocks 91 + (* defer actual write to flush_writes *) 92 + Lwt_result.return (Block_map.length blocks) 68 93 69 94 let delete_block t cid = 70 95 t.cache <- Block_map.remove cid t.cache ; 96 + t.pending_writes <- Block_map.remove cid t.pending_writes ; 71 97 Bs.delete_block t.bs cid 72 98 73 99 let delete_many t cids = 74 - List.iter (fun cid -> t.cache <- Block_map.remove cid t.cache) cids ; 100 + List.iter 101 + (fun cid -> 102 + t.cache <- Block_map.remove cid t.cache ; 103 + t.pending_writes <- Block_map.remove cid t.pending_writes ) 104 + cids ; 75 105 Bs.delete_many t.bs cids 76 106 end
+8
pegasus/lib/repository.ml
··· 419 419 writes 420 420 in 421 421 let new_mst = !mst in 422 + (* flush all writes, ensuring all blocks are written or none are *) 423 + let%lwt () = 424 + match%lwt Cached_store.flush_writes cached_store with 425 + | Ok () -> 426 + Lwt.return_unit 427 + | Error e -> 428 + raise e 429 + in 422 430 let%lwt new_commit = put_commit t new_mst.root ~previous:(Some prev_commit) in 423 431 let new_commit_cid, new_commit_signed = new_commit in 424 432 let commit_block =
+8 -2
pegasus/lib/util.ml
··· 279 279 | Error e -> 280 280 Lwt.return_error e 281 281 282 - (* runs a bunch of queries and catches duplicate insertion, returning how many succeeded *) 282 + (* runs a bunch of queries in a transaction, catches duplicate insertion, returning how many succeeded *) 283 283 let multi_query pool 284 284 (queries : (Caqti_lwt.connection -> ('a, Caqti_error.t) Lwt_result.t) list) 285 285 : (int, exn) Lwt_result.t = ··· 317 317 else Lwt.return_error e ) ) 318 318 in 319 319 let%lwt result = aux (Ok 0) queries in 320 - Lwt.return result ) ) 320 + match result with 321 + | Ok count -> 322 + let$! () = C.commit () in 323 + Lwt.return_ok count 324 + | Error e -> 325 + let%lwt _ = C.rollback () in 326 + Lwt.return_error e ) ) 321 327 322 328 let minute = 60 * 1000 323 329