objective categorical abstract machine language personal data server
at main 216 lines 6.4 kB view raw
1(* varint encoding/decoding; ripped from https://github.com/chrisdickinson/varint *) 2module Varint = struct 3 let bytes = ref 0 4 5 let msb = 0x80 6 7 let rest = 0x7F 8 9 let msball = lnot rest 10 11 let int_threshold = 1 lsl 31 12 13 let encode (n0 : int) : bytes = 14 if n0 < 0 then ( 15 bytes := 0 ; 16 failwith "negative numbers not supported" ) ; 17 let num = ref n0 in 18 let buf = Buffer.create 10 in 19 let offset = ref 0 in 20 while !num >= int_threshold do 21 let byte = !num land 0xFF lor msb in 22 Buffer.add_char buf (Char.chr (byte land 0xFF)) ; 23 incr offset ; 24 num := !num / 128 25 done ; 26 while !num land msball <> 0 do 27 let byte = !num land 0xFF lor msb in 28 Buffer.add_char buf (Char.chr (byte land 0xFF)) ; 29 incr offset ; 30 num := !num lsr 7 31 done ; 32 let last = !num in 33 Buffer.add_char buf (Char.chr (last land 0xFF)) ; 34 incr offset ; 35 bytes := !offset ; 36 Bytes.unsafe_of_string (Buffer.contents buf) 37 38 let decode buf = 39 let l = Bytes.length buf in 40 let rec aux res shift counter = 41 if counter >= l || shift > 49 then failwith "could not decode varint" 42 else 43 let b = Bytes.get_uint8 buf counter in 44 let new_res = 45 if shift < 28 then res + ((b land rest) lsl shift) 46 else res + (b land rest * (1 lsl shift)) 47 in 48 let new_counter = counter + 1 in 49 let new_shift = shift + 7 in 50 if b >= msb then aux new_res new_shift new_counter 51 else (new_res, new_counter) 52 in 53 let result, final_counter = aux 0 0 0 in 54 (result, final_counter) 55end 56 57type block = Cid.t * bytes 58 59type block_stream = block Lwt_seq.t 60 61type stream = bytes Lwt_seq.t 62 63(* converts a series of blocks into a car stream *) 64let blocks_to_stream (root : Cid.t) (blocks : block_stream) : stream = 65 let header = 66 Dag_cbor.encode 67 (`Map 68 (Dag_cbor.String_map.of_list 69 [("version", `Integer 1L); ("roots", `Array [|`Link root|])] ) ) 70 in 71 let seq = Lwt_seq.of_list [Varint.encode (Bytes.length header); header] in 72 Lwt_seq.append seq 73 (Lwt_seq.flat_map 74 (fun ((cid, block) : Cid.t * bytes) -> 75 Lwt_seq.of_list 76 [ Varint.encode (Bytes.length cid.bytes + Bytes.length block) 77 ; cid.bytes 78 ; block ] ) 79 blocks ) 80 81(* collects a stream into a car file *) 82let collect_stream (stream : stream) : bytes Lwt.t = 83 let buf = Buffer.create 1024 in 84 let%lwt () = Lwt_seq.iter (Buffer.add_bytes buf) stream in 85 Lwt.return (Buffer.to_bytes buf) 86 87(* converts a series of blocks into a car file *) 88let blocks_to_car (root : Cid.t) (blocks : block_stream) : bytes Lwt.t = 89 blocks_to_stream root blocks |> collect_stream 90 91(* reads a car stream into a series of blocks 92 returns (roots, blocks) *) 93let read_car_stream (stream : stream) : (Cid.t list * block_stream) Lwt.t = 94 let open Lwt.Infix in 95 let q : bytes option Lwt_mvar.t = Lwt_mvar.create_empty () in 96 let () = 97 Lwt.async (fun () -> 98 Lwt.finalize 99 (fun () -> 100 Lwt_seq.iter_s (fun chunk -> Lwt_mvar.put q (Some chunk)) stream ) 101 (fun () -> Lwt_mvar.put q None) ) 102 in 103 let buf = ref Bytes.empty in 104 let pos = ref 0 in 105 let len = ref 0 in 106 let eof = ref false in 107 let rec refill () = 108 if !pos < !len || !eof then Lwt.return_unit 109 else 110 Lwt_mvar.take q 111 >>= function 112 | None -> 113 eof := true ; 114 buf := Bytes.empty ; 115 pos := 0 ; 116 len := 0 ; 117 Lwt.return_unit 118 | Some chunk -> 119 buf := chunk ; 120 pos := 0 ; 121 len := Bytes.length chunk ; 122 if !len = 0 then refill () else Lwt.return_unit 123 in 124 let read_byte () = 125 refill () 126 >>= fun () -> 127 if !pos < !len then ( 128 let b = Bytes.get_uint8 !buf !pos in 129 pos := !pos + 1 ; 130 Lwt.return_some b ) 131 else Lwt.return_none 132 in 133 let read_exact n = 134 let out = Buffer.create n in 135 let rec loop remaining = 136 if remaining = 0 then Lwt.return (Buffer.to_bytes out) 137 else 138 refill () 139 >>= fun () -> 140 if !pos >= !len && !eof then 141 Lwt.fail_with "unexpected end of car stream" 142 else 143 let avail = !len - !pos in 144 let take = if avail < remaining then avail else remaining in 145 if take = 0 then loop remaining 146 else ( 147 Buffer.add_bytes out (Bytes.sub !buf !pos take) ; 148 pos := !pos + take ; 149 loop (remaining - take) ) 150 in 151 loop n 152 in 153 let read_varint_stream () = 154 let rec aux res shift = 155 if shift > 49 then Lwt.fail_with "could not decode varint" 156 else 157 read_byte () 158 >>= function 159 | None -> 160 if shift = 0 then Lwt.return_none 161 else Lwt.fail_with "could not decode varint" 162 | Some b -> 163 let v = 164 if shift < 28 then res + ((b land Varint.rest) lsl shift) 165 else res + (b land Varint.rest * (1 lsl shift)) 166 in 167 if b land Varint.msb <> 0 then aux v (shift + 7) 168 else Lwt.return_some v 169 in 170 aux 0 0 171 in 172 let%lwt header_size_opt = read_varint_stream () in 173 let header_size = 174 match header_size_opt with 175 | None -> 176 failwith "could not parse car header" 177 | Some n -> 178 n 179 in 180 let%lwt header_bytes = read_exact header_size in 181 let header = Dag_cbor.decode header_bytes in 182 let roots = 183 match header with 184 | `Map m -> ( 185 let roots_v = 186 try Some (Dag_cbor.String_map.find "roots" m) with Not_found -> None 187 in 188 match roots_v with 189 | Some (`Array arr) -> 190 Array.fold_right 191 (fun v acc -> match v with `Link cid -> cid :: acc | _ -> acc) 192 arr [] 193 | _ -> 194 [] ) 195 | _ -> 196 [] 197 in 198 let rec next () = 199 read_varint_stream () 200 >>= function 201 | None -> 202 Lwt.return_none 203 | Some block_size -> 204 if block_size <= 0 then next () 205 else 206 read_exact block_size 207 >>= fun block_bytes -> 208 let cid, remainder = Cid.decode_first block_bytes in 209 Lwt.return_some (cid, remainder) 210 in 211 let blocks : (Cid.t * bytes) Lwt_seq.t = 212 Lwt_seq.unfold_lwt 213 (fun () -> next () >|= function None -> None | Some x -> Some (x, ())) 214 () 215 in 216 Lwt.return (roots, blocks)