forked from
futur.blue/pegasus
objective categorical abstract machine language personal data server
1type 'bs data =
2 { mutable reads: Cid.Set.t
3 ; mutable cache: Block_map.t
4 ; mutable pending_writes: Block_map.t
5 ; bs: 'bs }
6
7module Make
8 (Bs : Blockstore.Writable) : sig
9 include Blockstore.Writable
10
11 val create : Bs.t -> t
12
13 val get_reads : t -> Cid.Set.t
14
15 val get_cache : t -> Block_map.t
16
17 val get_pending_writes : t -> Block_map.t
18
19 val flush_writes : t -> (unit, exn) Lwt_result.t
20end
21with type t = Bs.t data = struct
22 type t = Bs.t data
23
24 let create bs =
25 { reads= Cid.Set.empty
26 ; cache= Block_map.empty
27 ; pending_writes= Block_map.empty
28 ; bs }
29
30 let get_reads t = t.reads
31
32 let get_cache t = t.cache
33
34 let get_pending_writes t = t.pending_writes
35
36 let flush_writes t =
37 if Block_map.is_empty t.pending_writes then Lwt_result.return ()
38 else
39 match%lwt Bs.put_many t.bs t.pending_writes with
40 | Ok _ ->
41 t.pending_writes <- Block_map.empty ;
42 Lwt_result.return ()
43 | Error e ->
44 Lwt_result.fail e
45
46 let get_bytes t cid =
47 match Block_map.get cid t.cache with
48 | Some _ as cached ->
49 t.reads <- Cid.Set.add cid t.reads ;
50 Lwt.return cached
51 | None -> (
52 match%lwt Bs.get_bytes t.bs cid with
53 | Some data as res ->
54 t.cache <- Block_map.set cid data t.cache ;
55 t.reads <- Cid.Set.add cid t.reads ;
56 Lwt.return res
57 | None ->
58 Lwt.return_none )
59
60 let has t cid =
61 if Block_map.has cid t.cache then Lwt.return_true else Bs.has t.bs cid
62
63 let get_blocks t cids =
64 let {Block_map.blocks= cached; missing} = Block_map.get_many cids t.cache in
65 (* mark cached as read *)
66 Block_map.iter (fun cid _ -> t.reads <- Cid.Set.add cid t.reads) cached ;
67 (* fetch missing from underlying store *)
68 let%lwt fetched = Bs.get_blocks t.bs missing in
69 (* cache and mark as read *)
70 Block_map.iter
71 (fun cid data ->
72 t.cache <- Block_map.set cid data t.cache ;
73 t.reads <- Cid.Set.add cid t.reads )
74 fetched.blocks ;
75 (* combine results *)
76 let blocks =
77 List.fold_left
78 (fun acc (cid, data) -> Block_map.set cid data acc)
79 fetched.blocks (Block_map.entries cached)
80 in
81 Lwt.return {Block_map.blocks; missing= fetched.missing}
82
83 let put_block t cid bytes =
84 t.cache <- Block_map.set cid bytes t.cache ;
85 t.pending_writes <- Block_map.set cid bytes t.pending_writes ;
86 (* defer actual write to flush_writes *)
87 Lwt_result.return true
88
89 let put_many t blocks =
90 Block_map.iter
91 (fun cid data ->
92 t.cache <- Block_map.set cid data t.cache ;
93 t.pending_writes <- Block_map.set cid data t.pending_writes )
94 blocks ;
95 (* defer actual write to flush_writes *)
96 Lwt_result.return (Block_map.length blocks)
97
98 let delete_block t cid =
99 t.cache <- Block_map.remove cid t.cache ;
100 t.pending_writes <- Block_map.remove cid t.pending_writes ;
101 Bs.delete_block t.bs cid
102
103 let delete_many t cids =
104 List.iter
105 (fun cid ->
106 t.cache <- Block_map.remove cid t.cache ;
107 t.pending_writes <- Block_map.remove cid t.pending_writes )
108 cids ;
109 Bs.delete_many t.bs cids
110end