objective categorical abstract machine language personal data server
1(* varint encoding/decoding; ripped from https://github.com/chrisdickinson/varint *)
2module Varint = struct
3 let bytes = ref 0
4
5 let msb = 0x80
6
7 let rest = 0x7F
8
9 let msball = lnot rest
10
11 let int_threshold = 1 lsl 31
12
13 let encode (n0 : int) : bytes =
14 if n0 < 0 then (
15 bytes := 0 ;
16 failwith "negative numbers not supported" ) ;
17 let num = ref n0 in
18 let buf = Buffer.create 10 in
19 let offset = ref 0 in
20 while !num >= int_threshold do
21 let byte = !num land 0xFF lor msb in
22 Buffer.add_char buf (Char.chr (byte land 0xFF)) ;
23 incr offset ;
24 num := !num / 128
25 done ;
26 while !num land msball <> 0 do
27 let byte = !num land 0xFF lor msb in
28 Buffer.add_char buf (Char.chr (byte land 0xFF)) ;
29 incr offset ;
30 num := !num lsr 7
31 done ;
32 let last = !num in
33 Buffer.add_char buf (Char.chr (last land 0xFF)) ;
34 incr offset ;
35 bytes := !offset ;
36 Bytes.unsafe_of_string (Buffer.contents buf)
37
38 let decode buf =
39 let l = Bytes.length buf in
40 let rec aux res shift counter =
41 if counter >= l || shift > 49 then failwith "could not decode varint"
42 else
43 let b = Bytes.get_uint8 buf counter in
44 let new_res =
45 if shift < 28 then res + ((b land rest) lsl shift)
46 else res + (b land rest * (1 lsl shift))
47 in
48 let new_counter = counter + 1 in
49 let new_shift = shift + 7 in
50 if b >= msb then aux new_res new_shift new_counter
51 else (new_res, new_counter)
52 in
53 let result, final_counter = aux 0 0 0 in
54 (result, final_counter)
55end
56
57type block = Cid.t * bytes
58
59type block_stream = block Lwt_seq.t
60
61type stream = bytes Lwt_seq.t
62
63(* converts a series of blocks into a car stream *)
64let blocks_to_stream (root : Cid.t) (blocks : block_stream) : stream =
65 let header =
66 Dag_cbor.encode
67 (`Map
68 (Dag_cbor.String_map.of_list
69 [("version", `Integer 1L); ("roots", `Array [|`Link root|])] ) )
70 in
71 let seq = Lwt_seq.of_list [Varint.encode (Bytes.length header); header] in
72 Lwt_seq.append seq
73 (Lwt_seq.flat_map
74 (fun ((cid, block) : Cid.t * bytes) ->
75 Lwt_seq.of_list
76 [ Varint.encode (Bytes.length cid.bytes + Bytes.length block)
77 ; cid.bytes
78 ; block ] )
79 blocks )
80
81(* collects a stream into a car file *)
82let collect_stream (stream : stream) : bytes Lwt.t =
83 let buf = Buffer.create 1024 in
84 let%lwt () = Lwt_seq.iter (Buffer.add_bytes buf) stream in
85 Lwt.return (Buffer.to_bytes buf)
86
87(* converts a series of blocks into a car file *)
88let blocks_to_car (root : Cid.t) (blocks : block_stream) : bytes Lwt.t =
89 blocks_to_stream root blocks |> collect_stream
90
91(* reads a car stream into a series of blocks
92 returns (roots, blocks) *)
93let read_car_stream (stream : stream) : (Cid.t list * block_stream) Lwt.t =
94 let open Lwt.Infix in
95 let q : bytes option Lwt_mvar.t = Lwt_mvar.create_empty () in
96 let () =
97 Lwt.async (fun () ->
98 Lwt.finalize
99 (fun () ->
100 Lwt_seq.iter_s (fun chunk -> Lwt_mvar.put q (Some chunk)) stream )
101 (fun () -> Lwt_mvar.put q None) )
102 in
103 let buf = ref Bytes.empty in
104 let pos = ref 0 in
105 let len = ref 0 in
106 let eof = ref false in
107 let rec refill () =
108 if !pos < !len || !eof then Lwt.return_unit
109 else
110 Lwt_mvar.take q
111 >>= function
112 | None ->
113 eof := true ;
114 buf := Bytes.empty ;
115 pos := 0 ;
116 len := 0 ;
117 Lwt.return_unit
118 | Some chunk ->
119 buf := chunk ;
120 pos := 0 ;
121 len := Bytes.length chunk ;
122 if !len = 0 then refill () else Lwt.return_unit
123 in
124 let read_byte () =
125 refill ()
126 >>= fun () ->
127 if !pos < !len then (
128 let b = Bytes.get_uint8 !buf !pos in
129 pos := !pos + 1 ;
130 Lwt.return_some b )
131 else Lwt.return_none
132 in
133 let read_exact n =
134 let out = Buffer.create n in
135 let rec loop remaining =
136 if remaining = 0 then Lwt.return (Buffer.to_bytes out)
137 else
138 refill ()
139 >>= fun () ->
140 if !pos >= !len && !eof then
141 Lwt.fail_with "unexpected end of car stream"
142 else
143 let avail = !len - !pos in
144 let take = if avail < remaining then avail else remaining in
145 if take = 0 then loop remaining
146 else (
147 Buffer.add_bytes out (Bytes.sub !buf !pos take) ;
148 pos := !pos + take ;
149 loop (remaining - take) )
150 in
151 loop n
152 in
153 let read_varint_stream () =
154 let rec aux res shift =
155 if shift > 49 then Lwt.fail_with "could not decode varint"
156 else
157 read_byte ()
158 >>= function
159 | None ->
160 if shift = 0 then Lwt.return_none
161 else Lwt.fail_with "could not decode varint"
162 | Some b ->
163 let v =
164 if shift < 28 then res + ((b land Varint.rest) lsl shift)
165 else res + (b land Varint.rest * (1 lsl shift))
166 in
167 if b land Varint.msb <> 0 then aux v (shift + 7)
168 else Lwt.return_some v
169 in
170 aux 0 0
171 in
172 let%lwt header_size_opt = read_varint_stream () in
173 let header_size =
174 match header_size_opt with
175 | None ->
176 failwith "could not parse car header"
177 | Some n ->
178 n
179 in
180 let%lwt header_bytes = read_exact header_size in
181 let header = Dag_cbor.decode header_bytes in
182 let roots =
183 match header with
184 | `Map m -> (
185 let roots_v =
186 try Some (Dag_cbor.String_map.find "roots" m) with Not_found -> None
187 in
188 match roots_v with
189 | Some (`Array arr) ->
190 Array.fold_right
191 (fun v acc -> match v with `Link cid -> cid :: acc | _ -> acc)
192 arr []
193 | _ ->
194 [] )
195 | _ ->
196 []
197 in
198 let rec next () =
199 read_varint_stream ()
200 >>= function
201 | None ->
202 Lwt.return_none
203 | Some block_size ->
204 if block_size <= 0 then next ()
205 else
206 read_exact block_size
207 >>= fun block_bytes ->
208 let cid, remainder = Cid.decode_first block_bytes in
209 Lwt.return_some (cid, remainder)
210 in
211 let blocks : (Cid.t * bytes) Lwt_seq.t =
212 Lwt_seq.unfold_lwt
213 (fun () -> next () >|= function None -> None | Some x -> Some (x, ()))
214 ()
215 in
216 Lwt.return (roots, blocks)