objective categorical abstract machine language personal data server

Remove uses of Lwt_main.run, make MST lazy

futur.blue 57bea8a7 b7107d6a

verified
+134 -90
+134 -90
mist/lib/mst.ml
··· 30 [ ("l", match node.l with Some l -> `Link l | None -> `Null) 31 ; ("e", `Array (Array.of_list (List.map encode_entry_raw node.e))) ] ) 32 33 - type node_hydrated = 34 { layer: int 35 - ; mutable left: node_hydrated option 36 - ; mutable entries: entry_hydrated list } 37 38 - and entry_hydrated = 39 - {layer: int; key: string; value: Cid.t; right: node_hydrated option} 40 41 (* figures out where to put an entry in or below a hydrated node, returns new node *) 42 - let rec insert_entry node entry : node_hydrated Lwt.t = 43 let entry_layer = Util.leading_zeros_on_hash entry.key in 44 (* as long as node layer <= entry layer, create a new node above node 45 until we have a node at the correct height for the entry to be inserted *) ··· 47 if layer >= entry_layer then node 48 else 49 build_insert_node 50 - {layer= layer + 1; left= Some node; entries= []} 51 (layer + 1) 52 in 53 let insert_node = build_insert_node node node.layer in 54 (* if entry is below node, recursively insert into node's left subtree *) 55 if entry_layer < insert_node.layer then 56 - match (insert_node.entries, insert_node.left) with 57 | [], None -> 58 failwith "found totally empty mst node" 59 | [], Some left -> 60 - node.left <- Some (Lwt_main.run (insert_entry left entry)) ; 61 Lwt.return insert_node 62 | _ -> 63 Lwt.return insert_node ··· 86 in 87 aux node.entries 88 89 - (* hydrates a list of entries with their keys; layer and right value are placeholders *) 90 - let hydrate_entries_keys_only node = 91 - node.e 92 |> List.fold_left 93 - (fun (prev_path, entries) entry -> 94 - let prefix = String.sub prev_path 0 entry.p in 95 let path = String.concat "" [prefix; Bytes.to_string entry.k] in 96 - Util.ensure_valid_key path ; 97 - (path, entries @ [{layer= 0; key= path; value= entry.v; right= None}]) ) 98 - ("", []) 99 - |> snd 100 101 module Make (Store : Storage.Writable_blockstore) = struct 102 type bs = Store.t ··· 106 let create blockstore root = {blockstore; root} 107 108 (* decodes a node retrieved from the blockstore *) 109 - let decode_block b : node_raw = 110 match Dag_cbor.decode b with 111 | `Map node -> 112 if not (StringMap.mem "e" node) then ··· 157 | _ -> 158 raise (Invalid_argument "invalid block") 159 160 - (* retrieves & decodes a node by cid *) 161 - let retrieve_node t cid : node_raw option Lwt.t = 162 match%lwt Store.get_bytes t.blockstore cid with 163 | Some bytes -> 164 - Lwt.return_some (decode_block bytes) 165 | None -> 166 Lwt.return_none 167 168 (* returns the layer of a node *) 169 - let rec get_node_height t node : int Lwt.t = 170 match (node.l, node.e) with 171 | None, [] -> 172 Lwt.return 0 173 | Some left, [] -> ( 174 - match%lwt retrieve_node t left with 175 | Some node -> 176 let%lwt height = get_node_height t node in 177 Lwt.return (height + 1) ··· 188 let traverse t fn : unit Lwt.t = 189 let rec traverse node = 190 let%lwt () = 191 - match node.l with 192 - | Some cid -> ( 193 - match%lwt retrieve_node t cid with 194 - | Some node -> 195 - traverse node 196 - | None -> 197 - Lwt.return_unit ) 198 - | None -> 199 - Lwt.return_unit 200 in 201 - ignore 202 - (List.fold_left 203 - (fun prev_path entry -> 204 - let prefix = String.sub prev_path 0 entry.p in 205 - let path = String.concat "" [prefix; Bytes.to_string entry.k] in 206 - fn path entry.v ; path ) 207 - "" node.e ) ; 208 Lwt.return_unit 209 in 210 match%lwt retrieve_node t t.root with ··· 221 in 222 Lwt.return map 223 224 - (* produces a hydrated mst from a map of key -> cid *) 225 - let hydrate_from_map t map : Cid.t Lwt.t = 226 let keys = 227 map |> StringMap.bindings |> List.map fst |> List.sort String.compare 228 in 229 let entry_for_key key = 230 let value = StringMap.find key map in 231 let height = Util.leading_zeros_on_hash key in 232 - {layer= height; key; value; right= None} 233 in 234 let root = 235 { layer= keys |> List.hd |> Util.leading_zeros_on_hash 236 ; entries= [] 237 - ; left= None } 238 in 239 List.iter 240 (fun key -> ignore (insert_entry root (entry_for_key key))) 241 (List.tl keys) ; 242 - let rec finalize node : Cid.t Lwt.t = 243 - let left = 244 - match node.left with 245 | Some l -> 246 - Some (Lwt_main.run (finalize l)) 247 | None -> 248 - None 249 in 250 let last_key = ref "" in 251 - let mst_entries = 252 - List.map 253 (fun entry -> 254 - let right = 255 - match entry.right with 256 | Some r -> 257 - Some (Lwt_main.run (finalize r)) 258 | None -> 259 - None 260 in 261 let prefix_len = Util.shared_prefix_length !last_key entry.key in 262 last_key := entry.key ; 263 - { k= 264 - Bytes.of_string 265 - (String.sub entry.key prefix_len 266 - (String.length entry.key - prefix_len) ) 267 - ; p= prefix_len 268 - ; v= entry.value 269 - ; t= right } ) 270 node.entries 271 in 272 - let mst_node = {l= left; e= mst_entries} in 273 - let encoded = Dag_cbor.encode (encode_node_raw mst_node) in 274 let cid = Cid.create Dcbor encoded in 275 let%lwt () = Store.put_block t.blockstore cid encoded in 276 - Lwt.return cid 277 in 278 finalize root 279 280 (* returns cids and blocks that form the path from a given node to a given entry *) 281 let rec path_to_entry t node key : (Cid.t * bytes) list Lwt.t = 282 - let%lwt root_bytes = Store.get_bytes t node in 283 let%lwt root = 284 match root_bytes with 285 | None -> 286 Lwt.return_none 287 | Some bytes -> 288 - Lwt.return_some (decode_block bytes) 289 in 290 let path_tail = [(node, Option.get root_bytes)] in 291 (* if there is a left child, try to find a path through the left subtree *) ··· 310 Lwt.return path 311 | None -> ( 312 (* if a left subtree path couldn't be found, find the entry whose right subtree this key would belong to *) 313 - let root' = Option.get root in 314 - let entries_keys = hydrate_entries_keys_only root' in 315 - let entries_len = List.length root'.e in 316 let entry_index = 317 - match List.find_index (fun e -> e.key >= key) entries_keys with 318 | Some index -> 319 index 320 | None -> ··· 325 | _ 326 (* because entries[entry_index] might turn out to be the entry we're looking for *) 327 when entry_index < entries_len 328 - && (List.nth entries_keys entry_index).key = key -> 329 Lwt.return path_tail 330 | _ -> ( 331 (* otherwise, we continue down the right subtree of the entry before entry_index *) 332 - match Util.last root'.e with 333 | Some last when last.t != None -> 334 let%lwt path_through_right = 335 path_to_entry t (Option.get last.t) key ··· 339 Lwt.return path_tail ) ) 340 341 (* returns all mst entries in order for a car stream *) 342 - let to_car_stream t : (Cid.t * bytes) Seq.t = 343 let module M = struct 344 type stage = 345 | Nodes of 346 - (* currently walking nodes *) 347 - 348 { next: Cid.t list (* next cids to fetch *) 349 ; fetched: (Cid.t * bytes) list (* fetched cids and their bytes *) 350 ; leaves: Cid.Set.t (* seen leaf cids *) } 351 - | Leaves of 352 - (* done walking nodes, streaming accumulated leaves *) 353 - (Cid.t * bytes) list 354 | Done 355 end in 356 let open M in ··· 359 in 360 let rec step = function 361 | Done -> 362 - None 363 (* node has been fetched, can now be yielded *) 364 | Nodes ({fetched= (cid, bytes) :: rest; _} as s) -> 365 - Some ((cid, bytes), Nodes {s with fetched= rest}) 366 (* need to fetch next nodes *) 367 | Nodes {next; fetched= []; leaves} -> 368 if List.is_empty next then ( 369 (* finished traversing nodes, time to switch to leaves *) 370 let leaves_list = Cid.Set.to_list leaves in 371 - let leaves_bm = 372 - Lwt_main.run (Store.get_blocks t.blockstore leaves_list) 373 - in 374 if leaves_bm.missing <> [] then failwith "missing mst leaf blocks" ; 375 let leaves_nodes = Storage.Block_map.entries leaves_bm.blocks in 376 match leaves_nodes with 377 | [] -> 378 (* with Done, we don't care about the first pair element *) 379 - Some (Obj.magic (), Done) 380 | _ -> 381 (* it's leafin time *) 382 step (Leaves leaves_nodes) ) 383 else 384 (* go ahead and fetch the next nodes *) 385 - let bm = Lwt_main.run (Store.get_blocks t.blockstore next) in 386 if bm.missing <> [] then failwith "missing mst nodes" ; 387 let fetched, next', leaves' = 388 List.fold_left ··· 391 (* we should be safe to do this since we just got the cids from the blockmap *) 392 Storage.Block_map.get cid bm.blocks |> Option.get 393 in 394 - let node = decode_block bytes in 395 let nxt' = 396 List.fold_left 397 (* node.entries.map(e => e.right) *) 398 (fun n e -> match e.t with Some c -> c :: n | None -> n ) 399 (* start with [node.left, ...nxt] if node has a left subtree *) 400 ( match node.l with 401 | Some l -> 402 l :: nxt ··· 420 (* if we're onto yielding leaves, do that *) 421 | Leaves ((cid, bytes) :: rest) -> 422 let next = if rest = [] then Done else Leaves rest in 423 - Some ((cid, bytes), next) 424 (* once we're out of leaves, we're done *) 425 | Leaves [] -> 426 - Some (Obj.magic (), Done) 427 in 428 - Seq.unfold step init_state 429 end
··· 30 [ ("l", match node.l with Some l -> `Link l | None -> `Null) 31 ; ("e", `Array (Array.of_list (List.map encode_entry_raw node.e))) ] ) 32 33 + type node = 34 { layer: int 35 + ; mutable left: node option Lwt.t Lazy.t 36 + ; mutable entries: entry list } 37 38 + and entry = 39 + {layer: int; key: string; value: Cid.t; right: node option Lwt.t Lazy.t} 40 + 41 + let ( let*? ) lazy_opt_lwt f = 42 + let%lwt result = Lazy.force lazy_opt_lwt in 43 + f result 44 + 45 + let ( >>? ) lazy_opt_lwt f = 46 + let%lwt result = Lazy.force lazy_opt_lwt in 47 + f result 48 49 (* figures out where to put an entry in or below a hydrated node, returns new node *) 50 + let rec insert_entry node entry : node Lwt.t = 51 let entry_layer = Util.leading_zeros_on_hash entry.key in 52 (* as long as node layer <= entry layer, create a new node above node 53 until we have a node at the correct height for the entry to be inserted *) ··· 55 if layer >= entry_layer then node 56 else 57 build_insert_node 58 + {layer= layer + 1; left= lazy (Lwt.return_some node); entries= []} 59 (layer + 1) 60 in 61 let insert_node = build_insert_node node node.layer in 62 (* if entry is below node, recursively insert into node's left subtree *) 63 if entry_layer < insert_node.layer then 64 + let*? left = insert_node.left in 65 + match (insert_node.entries, left) with 66 | [], None -> 67 failwith "found totally empty mst node" 68 | [], Some left -> 69 + let%lwt left_inserted = insert_entry left entry in 70 + node.left <- lazy (Lwt.return_some left_inserted) ; 71 Lwt.return insert_node 72 | _ -> 73 Lwt.return insert_node ··· 96 in 97 aux node.entries 98 99 + (* from a list of raw entries, produces a list of their keys *) 100 + let entries_to_keys entries = 101 + entries 102 |> List.fold_left 103 + (fun keys entry -> 104 + let prefix = 105 + match keys with [] -> "" | prev :: _ -> String.sub prev 0 entry.p 106 + in 107 let path = String.concat "" [prefix; Bytes.to_string entry.k] in 108 + Util.ensure_valid_key path ; path :: keys ) 109 + [] 110 + |> List.rev 111 112 module Make (Store : Storage.Writable_blockstore) = struct 113 type bs = Store.t ··· 117 let create blockstore root = {blockstore; root} 118 119 (* decodes a node retrieved from the blockstore *) 120 + let decode_block_raw b : node_raw = 121 match Dag_cbor.decode b with 122 | `Map node -> 123 if not (StringMap.mem "e" node) then ··· 168 | _ -> 169 raise (Invalid_argument "invalid block") 170 171 + let retrieve_node_raw t cid : node_raw option Lwt.t = 172 match%lwt Store.get_bytes t.blockstore cid with 173 | Some bytes -> 174 + bytes |> decode_block_raw |> Lwt.return_some 175 | None -> 176 Lwt.return_none 177 178 + (* retrieves & decodes a node by cid *) 179 + let rec retrieve_node t cid : node option Lwt.t = 180 + match%lwt retrieve_node_raw t cid with 181 + | Some raw -> 182 + hydrate_node t raw |> Lwt.map Option.some 183 + | None -> 184 + Lwt.return_none 185 + 186 + and retrieve_node_lazy t cid = lazy (retrieve_node t cid) 187 + 188 + (* hydrates a raw node *) 189 + and hydrate_node t node_raw : node Lwt.t = 190 + let left = 191 + match node_raw.l with 192 + | Some l -> 193 + retrieve_node_lazy t l 194 + | None -> 195 + lazy Lwt.return_none 196 + in 197 + let%lwt layer = get_node_height t node_raw in 198 + let entries = 199 + List.fold_left 200 + (fun entries entry -> 201 + let prefix = 202 + match entries with 203 + | [] -> 204 + "" 205 + | prev :: _ -> 206 + String.sub prev.key 0 entry.p 207 + in 208 + let path = String.concat "" [prefix; Bytes.to_string entry.k] in 209 + Util.ensure_valid_key path ; 210 + let right = 211 + match entry.t with 212 + | Some r -> 213 + retrieve_node_lazy t r 214 + | None -> 215 + lazy Lwt.return_none 216 + in 217 + {layer; key= path; value= entry.v; right} :: entries ) 218 + [] node_raw.e 219 + in 220 + Lwt.return {layer; left; entries} 221 + 222 (* returns the layer of a node *) 223 + and get_node_height t node : int Lwt.t = 224 match (node.l, node.e) with 225 | None, [] -> 226 Lwt.return 0 227 | Some left, [] -> ( 228 + match%lwt retrieve_node_raw t left with 229 | Some node -> 230 let%lwt height = get_node_height t node in 231 Lwt.return (height + 1) ··· 242 let traverse t fn : unit Lwt.t = 243 let rec traverse node = 244 let%lwt () = 245 + let*? left = node.left in 246 + match left with Some l -> traverse l | None -> Lwt.return_unit 247 in 248 + List.iter (fun entry -> fn entry.key entry.value) node.entries ; 249 Lwt.return_unit 250 in 251 match%lwt retrieve_node t t.root with ··· 262 in 263 Lwt.return map 264 265 + (* produces a cid and cbor-encoded bytes for this mst *) 266 + let serialize t map : (Cid.t * bytes) Lwt.t = 267 let keys = 268 map |> StringMap.bindings |> List.map fst |> List.sort String.compare 269 in 270 let entry_for_key key = 271 let value = StringMap.find key map in 272 let height = Util.leading_zeros_on_hash key in 273 + {layer= height; key; value; right= lazy Lwt.return_none} 274 in 275 let root = 276 { layer= keys |> List.hd |> Util.leading_zeros_on_hash 277 ; entries= [] 278 + ; left= lazy Lwt.return_none } 279 in 280 List.iter 281 (fun key -> ignore (insert_entry root (entry_for_key key))) 282 (List.tl keys) ; 283 + let rec finalize node : (Cid.t * bytes) Lwt.t = 284 + let%lwt left = 285 + node.left 286 + >>? function 287 | Some l -> 288 + let%lwt cid, _ = finalize l in 289 + Lwt.return_some cid 290 | None -> 291 + Lwt.return_none 292 in 293 let last_key = ref "" in 294 + let%lwt mst_entries = 295 + Lwt_list.map_s 296 (fun entry -> 297 + let%lwt right = 298 + entry.right 299 + >>? function 300 | Some r -> 301 + let%lwt cid, _ = finalize r in 302 + Lwt.return (Some cid) 303 | None -> 304 + Lwt.return None 305 in 306 let prefix_len = Util.shared_prefix_length !last_key entry.key in 307 last_key := entry.key ; 308 + Lwt.return 309 + { k= 310 + Bytes.of_string 311 + (String.sub entry.key prefix_len 312 + (String.length entry.key - prefix_len) ) 313 + ; p= prefix_len 314 + ; v= entry.value 315 + ; t= right } ) 316 node.entries 317 in 318 + let encoded = 319 + Dag_cbor.encode (encode_node_raw {l= left; e= mst_entries}) 320 + in 321 let cid = Cid.create Dcbor encoded in 322 let%lwt () = Store.put_block t.blockstore cid encoded in 323 + Lwt.return (cid, encoded) 324 in 325 finalize root 326 327 (* returns cids and blocks that form the path from a given node to a given entry *) 328 let rec path_to_entry t node key : (Cid.t * bytes) list Lwt.t = 329 + let%lwt root_bytes = Store.get_bytes t.blockstore node in 330 let%lwt root = 331 match root_bytes with 332 | None -> 333 Lwt.return_none 334 | Some bytes -> 335 + Lwt.return_some (decode_block_raw bytes) 336 in 337 let path_tail = [(node, Option.get root_bytes)] in 338 (* if there is a left child, try to find a path through the left subtree *) ··· 357 Lwt.return path 358 | None -> ( 359 (* if a left subtree path couldn't be found, find the entry whose right subtree this key would belong to *) 360 + let entries = (Option.get root).e in 361 + let entries_keys = entries_to_keys entries in 362 + let entries_len = List.length entries in 363 let entry_index = 364 + match List.find_index (fun e -> e >= key) entries_keys with 365 | Some index -> 366 index 367 | None -> ··· 372 | _ 373 (* because entries[entry_index] might turn out to be the entry we're looking for *) 374 when entry_index < entries_len 375 + && List.nth entries_keys entry_index = key -> 376 Lwt.return path_tail 377 | _ -> ( 378 (* otherwise, we continue down the right subtree of the entry before entry_index *) 379 + match Util.last entries with 380 | Some last when last.t != None -> 381 let%lwt path_through_right = 382 path_to_entry t (Option.get last.t) key ··· 386 Lwt.return path_tail ) ) 387 388 (* returns all mst entries in order for a car stream *) 389 + let to_blocks_seq t : (Cid.t * bytes) Lwt_seq.t = 390 let module M = struct 391 type stage = 392 + (* currently walking nodes *) 393 | Nodes of 394 { next: Cid.t list (* next cids to fetch *) 395 ; fetched: (Cid.t * bytes) list (* fetched cids and their bytes *) 396 ; leaves: Cid.Set.t (* seen leaf cids *) } 397 + (* done walking nodes, streaming accumulated leaves *) 398 + | Leaves of (Cid.t * bytes) list 399 | Done 400 end in 401 let open M in ··· 404 in 405 let rec step = function 406 | Done -> 407 + Lwt.return_none 408 (* node has been fetched, can now be yielded *) 409 | Nodes ({fetched= (cid, bytes) :: rest; _} as s) -> 410 + Lwt.return_some ((cid, bytes), Nodes {s with fetched= rest}) 411 (* need to fetch next nodes *) 412 | Nodes {next; fetched= []; leaves} -> 413 if List.is_empty next then ( 414 (* finished traversing nodes, time to switch to leaves *) 415 let leaves_list = Cid.Set.to_list leaves in 416 + let%lwt leaves_bm = Store.get_blocks t.blockstore leaves_list in 417 if leaves_bm.missing <> [] then failwith "missing mst leaf blocks" ; 418 let leaves_nodes = Storage.Block_map.entries leaves_bm.blocks in 419 match leaves_nodes with 420 | [] -> 421 (* with Done, we don't care about the first pair element *) 422 + Lwt.return_some (Obj.magic (), Done) 423 | _ -> 424 (* it's leafin time *) 425 step (Leaves leaves_nodes) ) 426 else 427 (* go ahead and fetch the next nodes *) 428 + let%lwt bm = Store.get_blocks t.blockstore next in 429 if bm.missing <> [] then failwith "missing mst nodes" ; 430 let fetched, next', leaves' = 431 List.fold_left ··· 434 (* we should be safe to do this since we just got the cids from the blockmap *) 435 Storage.Block_map.get cid bm.blocks |> Option.get 436 in 437 + let node = decode_block_raw bytes in 438 let nxt' = 439 List.fold_left 440 (* node.entries.map(e => e.right) *) 441 (fun n e -> match e.t with Some c -> c :: n | None -> n ) 442 (* start with [node.left, ...nxt] if node has a left subtree *) 443 + (* next' looks like [..., n_2.r_2, n_2.l, n_1.r_n, ..., n_1.r_1, n_1.l]) *) 444 ( match node.l with 445 | Some l -> 446 l :: nxt ··· 464 (* if we're onto yielding leaves, do that *) 465 | Leaves ((cid, bytes) :: rest) -> 466 let next = if rest = [] then Done else Leaves rest in 467 + Lwt.return_some ((cid, bytes), next) 468 (* once we're out of leaves, we're done *) 469 | Leaves [] -> 470 + Lwt.return_some (Obj.magic (), Done) 471 in 472 + Lwt_seq.unfold_lwt step init_state 473 end