objective categorical abstract machine language personal data server
at main 110 lines 3.2 kB view raw
1type 'bs data = 2 { mutable reads: Cid.Set.t 3 ; mutable cache: Block_map.t 4 ; mutable pending_writes: Block_map.t 5 ; bs: 'bs } 6 7module Make 8 (Bs : Blockstore.Writable) : sig 9 include Blockstore.Writable 10 11 val create : Bs.t -> t 12 13 val get_reads : t -> Cid.Set.t 14 15 val get_cache : t -> Block_map.t 16 17 val get_pending_writes : t -> Block_map.t 18 19 val flush_writes : t -> (unit, exn) Lwt_result.t 20end 21with type t = Bs.t data = struct 22 type t = Bs.t data 23 24 let create bs = 25 { reads= Cid.Set.empty 26 ; cache= Block_map.empty 27 ; pending_writes= Block_map.empty 28 ; bs } 29 30 let get_reads t = t.reads 31 32 let get_cache t = t.cache 33 34 let get_pending_writes t = t.pending_writes 35 36 let flush_writes t = 37 if Block_map.is_empty t.pending_writes then Lwt_result.return () 38 else 39 match%lwt Bs.put_many t.bs t.pending_writes with 40 | Ok _ -> 41 t.pending_writes <- Block_map.empty ; 42 Lwt_result.return () 43 | Error e -> 44 Lwt_result.fail e 45 46 let get_bytes t cid = 47 match Block_map.get cid t.cache with 48 | Some _ as cached -> 49 t.reads <- Cid.Set.add cid t.reads ; 50 Lwt.return cached 51 | None -> ( 52 match%lwt Bs.get_bytes t.bs cid with 53 | Some data as res -> 54 t.cache <- Block_map.set cid data t.cache ; 55 t.reads <- Cid.Set.add cid t.reads ; 56 Lwt.return res 57 | None -> 58 Lwt.return_none ) 59 60 let has t cid = 61 if Block_map.has cid t.cache then Lwt.return_true else Bs.has t.bs cid 62 63 let get_blocks t cids = 64 let {Block_map.blocks= cached; missing} = Block_map.get_many cids t.cache in 65 (* mark cached as read *) 66 Block_map.iter (fun cid _ -> t.reads <- Cid.Set.add cid t.reads) cached ; 67 (* fetch missing from underlying store *) 68 let%lwt fetched = Bs.get_blocks t.bs missing in 69 (* cache and mark as read *) 70 Block_map.iter 71 (fun cid data -> 72 t.cache <- Block_map.set cid data t.cache ; 73 t.reads <- Cid.Set.add cid t.reads ) 74 fetched.blocks ; 75 (* combine results *) 76 let blocks = 77 List.fold_left 78 (fun acc (cid, data) -> Block_map.set cid data acc) 79 fetched.blocks (Block_map.entries cached) 80 in 81 Lwt.return {Block_map.blocks; missing= fetched.missing} 82 83 let put_block t cid bytes = 84 t.cache <- Block_map.set cid bytes t.cache ; 85 t.pending_writes <- Block_map.set cid bytes t.pending_writes ; 86 (* defer actual write to flush_writes *) 87 Lwt_result.return true 88 89 let put_many t blocks = 90 Block_map.iter 91 (fun cid data -> 92 t.cache <- Block_map.set cid data t.cache ; 93 t.pending_writes <- Block_map.set cid data t.pending_writes ) 94 blocks ; 95 (* defer actual write to flush_writes *) 96 Lwt_result.return (Block_map.length blocks) 97 98 let delete_block t cid = 99 t.cache <- Block_map.remove cid t.cache ; 100 t.pending_writes <- Block_map.remove cid t.pending_writes ; 101 Bs.delete_block t.bs cid 102 103 let delete_many t cids = 104 List.iter 105 (fun cid -> 106 t.cache <- Block_map.remove cid t.cache ; 107 t.pending_writes <- Block_map.remove cid t.pending_writes ) 108 cids ; 109 Bs.delete_many t.bs cids 110end