this repo has no description
1open Types.Wire
2
3let encode_ping (p : ping) : Msgpck.t =
4 Msgpck.Map
5 [
6 (Msgpck.String "SeqNo", Msgpck.of_int p.seq_no);
7 (Msgpck.String "Node", Msgpck.String p.node);
8 (Msgpck.String "SourceAddr", Msgpck.String p.source_addr);
9 (Msgpck.String "SourcePort", Msgpck.of_int p.source_port);
10 (Msgpck.String "SourceNode", Msgpck.String p.source_node);
11 ]
12
13let decode_ping (m : Msgpck.t) : (ping, string) result =
14 match m with
15 | Msgpck.Map fields ->
16 let get_int key =
17 match List.assoc_opt (Msgpck.String key) fields with
18 | Some (Msgpck.Int i) -> Ok i
19 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i)
20 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i)
21 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
22 in
23 let get_string key =
24 match List.assoc_opt (Msgpck.String key) fields with
25 | Some (Msgpck.String s) -> Ok s
26 | Some (Msgpck.Bytes s) -> Ok s
27 | Some Msgpck.Nil -> Ok ""
28 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
29 in
30 let ( let* ) = Result.bind in
31 let* seq_no = get_int "SeqNo" in
32 let* node = get_string "Node" in
33 let* source_addr = get_string "SourceAddr" in
34 let* source_port =
35 match get_int "SourcePort" with Ok p -> Ok p | Error _ -> Ok 0
36 in
37 let* source_node =
38 match get_string "SourceNode" with Ok s -> Ok s | Error _ -> Ok ""
39 in
40 Ok { seq_no; node; source_addr; source_port; source_node }
41 | _ -> Error "expected map for ping"
42
43let encode_indirect_ping (p : indirect_ping_req) : Msgpck.t =
44 Msgpck.Map
45 [
46 (Msgpck.String "SeqNo", Msgpck.of_int p.seq_no);
47 (Msgpck.String "Target", Msgpck.String p.target);
48 (Msgpck.String "Port", Msgpck.of_int p.port);
49 (Msgpck.String "Node", Msgpck.String p.node);
50 (Msgpck.String "Nack", Msgpck.Bool p.nack);
51 (Msgpck.String "SourceAddr", Msgpck.String p.source_addr);
52 (Msgpck.String "SourcePort", Msgpck.of_int p.source_port);
53 (Msgpck.String "SourceNode", Msgpck.String p.source_node);
54 ]
55
56let decode_indirect_ping (m : Msgpck.t) : (indirect_ping_req, string) result =
57 match m with
58 | Msgpck.Map fields ->
59 let get_int key =
60 match List.assoc_opt (Msgpck.String key) fields with
61 | Some (Msgpck.Int i) -> Ok i
62 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i)
63 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i)
64 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
65 in
66 let get_string key =
67 match List.assoc_opt (Msgpck.String key) fields with
68 | Some (Msgpck.String s) -> Ok s
69 | Some (Msgpck.Bytes s) -> Ok s
70 | Some Msgpck.Nil -> Ok ""
71 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
72 in
73 let get_bool key =
74 match List.assoc_opt (Msgpck.String key) fields with
75 | Some (Msgpck.Bool b) -> Ok b
76 | _ -> Ok false
77 in
78 let ( let* ) = Result.bind in
79 let* seq_no = get_int "SeqNo" in
80 let* target = get_string "Target" in
81 let* port = match get_int "Port" with Ok p -> Ok p | Error _ -> Ok 0 in
82 let* node = get_string "Node" in
83 let* nack = get_bool "Nack" in
84 let* source_addr =
85 match get_string "SourceAddr" with Ok s -> Ok s | Error _ -> Ok ""
86 in
87 let* source_port =
88 match get_int "SourcePort" with Ok p -> Ok p | Error _ -> Ok 0
89 in
90 let* source_node =
91 match get_string "SourceNode" with Ok s -> Ok s | Error _ -> Ok ""
92 in
93 Ok
94 {
95 seq_no;
96 target;
97 port;
98 node;
99 nack;
100 source_addr;
101 source_port;
102 source_node;
103 }
104 | _ -> Error "expected map for indirect_ping"
105
106let encode_ack (a : ack_resp) : Msgpck.t =
107 Msgpck.Map
108 [
109 (Msgpck.String "SeqNo", Msgpck.of_int a.seq_no);
110 (Msgpck.String "Payload", Msgpck.String a.payload);
111 ]
112
113let decode_ack (m : Msgpck.t) : (ack_resp, string) result =
114 match m with
115 | Msgpck.Map fields ->
116 let get_int key =
117 match List.assoc_opt (Msgpck.String key) fields with
118 | Some (Msgpck.Int i) -> Ok i
119 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i)
120 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i)
121 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
122 in
123 let get_bytes key =
124 match List.assoc_opt (Msgpck.String key) fields with
125 | Some (Msgpck.Bytes s) -> Ok s
126 | Some (Msgpck.String s) -> Ok s
127 | Some Msgpck.Nil -> Ok ""
128 | _ -> Ok ""
129 in
130 let ( let* ) = Result.bind in
131 let* seq_no = get_int "SeqNo" in
132 let* payload = get_bytes "Payload" in
133 Ok { seq_no; payload }
134 | _ -> Error "expected map for ack"
135
136let encode_nack (n : nack_resp) : Msgpck.t =
137 Msgpck.Map [ (Msgpck.String "SeqNo", Msgpck.of_int n.seq_no) ]
138
139let decode_nack (m : Msgpck.t) : (nack_resp, string) result =
140 match m with
141 | Msgpck.Map fields ->
142 let get_int key =
143 match List.assoc_opt (Msgpck.String key) fields with
144 | Some (Msgpck.Int i) -> Ok i
145 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i)
146 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i)
147 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
148 in
149 let ( let* ) = Result.bind in
150 let* seq_no = get_int "SeqNo" in
151 Ok { seq_no }
152 | _ -> Error "expected map for nack"
153
154let encode_suspect (s : suspect) : Msgpck.t =
155 Msgpck.Map
156 [
157 (Msgpck.String "Incarnation", Msgpck.of_int s.incarnation);
158 (Msgpck.String "Node", Msgpck.String s.node);
159 (Msgpck.String "From", Msgpck.String s.from);
160 ]
161
162let decode_suspect (m : Msgpck.t) : (suspect, string) result =
163 match m with
164 | Msgpck.Map fields ->
165 let get_int key =
166 match List.assoc_opt (Msgpck.String key) fields with
167 | Some (Msgpck.Int i) -> Ok i
168 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i)
169 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i)
170 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
171 in
172 let get_string key =
173 match List.assoc_opt (Msgpck.String key) fields with
174 | Some (Msgpck.String s) -> Ok s
175 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
176 in
177 let ( let* ) = Result.bind in
178 let* incarnation = get_int "Incarnation" in
179 let* node = get_string "Node" in
180 let* from = get_string "From" in
181 Ok ({ incarnation; node; from } : suspect)
182 | _ -> Error "expected map for suspect"
183
184let encode_alive (a : alive) : Msgpck.t =
185 Msgpck.Map
186 [
187 (Msgpck.String "Incarnation", Msgpck.of_int a.incarnation);
188 (Msgpck.String "Node", Msgpck.String a.node);
189 (Msgpck.String "Addr", Msgpck.String a.addr);
190 (Msgpck.String "Port", Msgpck.of_int a.port);
191 (Msgpck.String "Meta", Msgpck.String a.meta);
192 (Msgpck.String "Vsn", Msgpck.List (List.map Msgpck.of_int a.vsn));
193 ]
194
195let decode_alive (m : Msgpck.t) : (alive, string) result =
196 match m with
197 | Msgpck.Map fields ->
198 let get_int key =
199 match List.assoc_opt (Msgpck.String key) fields with
200 | Some (Msgpck.Int i) -> Ok i
201 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i)
202 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i)
203 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
204 in
205 let get_string key =
206 match List.assoc_opt (Msgpck.String key) fields with
207 | Some (Msgpck.String s) -> Ok s
208 | Some (Msgpck.Bytes s) -> Ok s
209 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
210 in
211 let get_vsn () =
212 match List.assoc_opt (Msgpck.String "Vsn") fields with
213 | Some (Msgpck.List vs) ->
214 Ok
215 (List.filter_map
216 (function
217 | Msgpck.Int i -> Some i
218 | Msgpck.Int32 i -> Some (Int32.to_int i)
219 | _ -> None)
220 vs)
221 | _ -> Ok []
222 in
223 let ( let* ) = Result.bind in
224 let* incarnation = get_int "Incarnation" in
225 let* node = get_string "Node" in
226 let* addr = get_string "Addr" in
227 let* port = get_int "Port" in
228 let* meta =
229 match get_string "Meta" with Ok m -> Ok m | Error _ -> Ok ""
230 in
231 let* vsn = get_vsn () in
232 Ok { incarnation; node; addr; port; meta; vsn }
233 | _ -> Error "expected map for alive"
234
235let encode_dead (d : dead) : Msgpck.t =
236 Msgpck.Map
237 [
238 (Msgpck.String "Incarnation", Msgpck.of_int d.incarnation);
239 (Msgpck.String "Node", Msgpck.String d.node);
240 (Msgpck.String "From", Msgpck.String d.from);
241 ]
242
243let decode_dead (m : Msgpck.t) : (dead, string) result =
244 match m with
245 | Msgpck.Map fields ->
246 let get_int key =
247 match List.assoc_opt (Msgpck.String key) fields with
248 | Some (Msgpck.Int i) -> Ok i
249 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i)
250 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i)
251 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
252 in
253 let get_string key =
254 match List.assoc_opt (Msgpck.String key) fields with
255 | Some (Msgpck.String s) -> Ok s
256 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
257 in
258 let ( let* ) = Result.bind in
259 let* incarnation = get_int "Incarnation" in
260 let* node = get_string "Node" in
261 let* from = get_string "From" in
262 Ok ({ incarnation; node; from } : dead)
263 | _ -> Error "expected map for dead"
264
265let encode_compress (c : compress) : Msgpck.t =
266 Msgpck.Map
267 [
268 (Msgpck.String "Algo", Msgpck.of_int c.algo);
269 (Msgpck.String "Buf", Msgpck.String c.buf);
270 ]
271
272let decode_compress (m : Msgpck.t) : (compress, string) result =
273 match m with
274 | Msgpck.Map fields ->
275 let get_int key =
276 match List.assoc_opt (Msgpck.String key) fields with
277 | Some (Msgpck.Int i) -> Ok i
278 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i)
279 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
280 in
281 let get_bytes key =
282 match List.assoc_opt (Msgpck.String key) fields with
283 | Some (Msgpck.Bytes s) -> Ok s
284 | Some (Msgpck.String s) -> Ok s
285 | _ -> Error (Printf.sprintf "missing or invalid %s" key)
286 in
287 let ( let* ) = Result.bind in
288 let* algo = get_int "Algo" in
289 let* buf = get_bytes "Buf" in
290 Ok { algo; buf }
291 | _ -> Error "expected map for compress"
292
293let encode_msg (msg : protocol_msg) : string =
294 let msg_type, payload =
295 match msg with
296 | Ping p -> (Ping_msg, encode_ping p)
297 | Indirect_ping p -> (Indirect_ping_msg, encode_indirect_ping p)
298 | Ack a -> (Ack_resp_msg, encode_ack a)
299 | Nack n -> (Nack_resp_msg, encode_nack n)
300 | Suspect s -> (Suspect_msg, encode_suspect s)
301 | Alive a -> (Alive_msg, encode_alive a)
302 | Dead d -> (Dead_msg, encode_dead d)
303 | User_data _ -> (User_msg, Msgpck.Nil)
304 | Compound _ -> (Compound_msg, Msgpck.Nil)
305 | Compressed c -> (Compress_msg, encode_compress c)
306 | Err e -> (Err_msg, Msgpck.Map [ (Msgpck.String "Error", Msgpck.String e) ])
307 in
308 let buf = Buffer.create 256 in
309 Buffer.add_char buf (Char.chr (message_type_to_int msg_type));
310 (match msg with
311 | User_data data -> Buffer.add_string buf data
312 | _ -> ignore (Msgpck.StringBuf.write buf payload));
313 Buffer.contents buf
314
315let decode_msg (buf : string) : (protocol_msg, Types.decode_error) result =
316 if String.length buf < 1 then Error Types.Truncated_message
317 else
318 let msg_type_byte = Char.code buf.[0] in
319 match message_type_of_int msg_type_byte with
320 | Error n -> Error (Types.Invalid_tag n)
321 | Ok msg_type -> (
322 let payload = String.sub buf 1 (String.length buf - 1) in
323 match msg_type with
324 | User_msg -> Ok (User_data payload)
325 | Compound_msg -> Ok (Compound [])
326 | _ -> (
327 let _, msgpack = Msgpck.String.read payload in
328 match msg_type with
329 | Ping_msg -> (
330 match decode_ping msgpack with
331 | Ok p -> Ok (Ping p)
332 | Error e -> Error (Types.Msgpack_error e))
333 | Indirect_ping_msg -> (
334 match decode_indirect_ping msgpack with
335 | Ok p -> Ok (Indirect_ping p)
336 | Error e -> Error (Types.Msgpack_error e))
337 | Ack_resp_msg -> (
338 match decode_ack msgpack with
339 | Ok a -> Ok (Ack a)
340 | Error e -> Error (Types.Msgpack_error e))
341 | Nack_resp_msg -> (
342 match decode_nack msgpack with
343 | Ok n -> Ok (Nack n)
344 | Error e -> Error (Types.Msgpack_error e))
345 | Suspect_msg -> (
346 match decode_suspect msgpack with
347 | Ok s -> Ok (Suspect s)
348 | Error e -> Error (Types.Msgpack_error e))
349 | Alive_msg -> (
350 match decode_alive msgpack with
351 | Ok a -> Ok (Alive a)
352 | Error e -> Error (Types.Msgpack_error e))
353 | Dead_msg -> (
354 match decode_dead msgpack with
355 | Ok d -> Ok (Dead d)
356 | Error e -> Error (Types.Msgpack_error e))
357 | Compress_msg -> (
358 match decode_compress msgpack with
359 | Ok c -> Ok (Compressed c)
360 | Error e -> Error (Types.Msgpack_error e))
361 | Err_msg -> (
362 match msgpack with
363 | Msgpck.Map fields -> (
364 match List.assoc_opt (Msgpck.String "Error") fields with
365 | Some (Msgpck.String e) -> Ok (Err e)
366 | _ -> Ok (Err "unknown error"))
367 | _ -> Ok (Err "unknown error"))
368 | _ -> Error (Types.Invalid_tag msg_type_byte)))
369
370let make_compound_msg (msgs : string list) : string =
371 if List.length msgs > 255 then failwith "too many messages for compound"
372 else
373 let buf = Buffer.create 1024 in
374 Buffer.add_char buf (Char.chr (message_type_to_int Compound_msg));
375 Buffer.add_char buf (Char.chr (List.length msgs));
376 List.iter
377 (fun m ->
378 let len = String.length m in
379 Buffer.add_char buf (Char.chr ((len lsr 8) land 0xff));
380 Buffer.add_char buf (Char.chr (len land 0xff)))
381 msgs;
382 List.iter (Buffer.add_string buf) msgs;
383 Buffer.contents buf
384
385let decode_compound_msg (buf : string) :
386 (string list * int, Types.decode_error) result =
387 if String.length buf < 1 then Error Types.Truncated_message
388 else
389 let num_parts = Char.code buf.[0] in
390 let header_size = 1 + (num_parts * 2) in
391 if String.length buf < header_size then Error Types.Truncated_message
392 else
393 let lengths =
394 List.init num_parts (fun i ->
395 let hi = Char.code buf.[1 + (i * 2)] in
396 let lo = Char.code buf.[2 + (i * 2)] in
397 (hi lsl 8) lor lo)
398 in
399 let rec extract_parts offset remaining_lens acc trunc =
400 match remaining_lens with
401 | [] -> Ok (List.rev acc, trunc)
402 | len :: rest ->
403 if offset + len > String.length buf then
404 Ok (List.rev acc, List.length remaining_lens)
405 else
406 let part = String.sub buf offset len in
407 extract_parts (offset + len) rest (part :: acc) trunc
408 in
409 extract_parts header_size lengths [] 0
410
411let crc32_table =
412 Array.init 256 (fun i ->
413 let crc = ref (Int32.of_int i) in
414 for _ = 0 to 7 do
415 if Int32.logand !crc 1l = 1l then
416 crc := Int32.logxor (Int32.shift_right_logical !crc 1) 0xEDB88320l
417 else crc := Int32.shift_right_logical !crc 1
418 done;
419 !crc)
420
421let crc32 (data : string) : int32 =
422 let crc = ref 0xFFFFFFFFl in
423 String.iter
424 (fun c ->
425 let byte = Char.code c in
426 let idx =
427 Int32.to_int
428 (Int32.logand (Int32.logxor !crc (Int32.of_int byte)) 0xFFl)
429 in
430 crc := Int32.logxor (Int32.shift_right_logical !crc 8) crc32_table.(idx))
431 data;
432 Int32.logxor !crc 0xFFFFFFFFl
433
434let add_crc (buf : string) : string =
435 let crc = crc32 buf in
436 let header = Bytes.create 5 in
437 Bytes.set header 0 (Char.chr (message_type_to_int Has_crc_msg));
438 Bytes.set header 1
439 (Char.chr (Int32.to_int (Int32.shift_right_logical crc 24) land 0xff));
440 Bytes.set header 2
441 (Char.chr (Int32.to_int (Int32.shift_right_logical crc 16) land 0xff));
442 Bytes.set header 3
443 (Char.chr (Int32.to_int (Int32.shift_right_logical crc 8) land 0xff));
444 Bytes.set header 4 (Char.chr (Int32.to_int crc land 0xff));
445 Bytes.to_string header ^ buf
446
447let verify_and_strip_crc (buf : string) : (string, Types.decode_error) result =
448 if String.length buf < 5 then Error Types.Truncated_message
449 else if Char.code buf.[0] <> message_type_to_int Has_crc_msg then Ok buf
450 else
451 let expected =
452 Int32.logor
453 (Int32.logor
454 (Int32.shift_left (Int32.of_int (Char.code buf.[1])) 24)
455 (Int32.shift_left (Int32.of_int (Char.code buf.[2])) 16))
456 (Int32.logor
457 (Int32.shift_left (Int32.of_int (Char.code buf.[3])) 8)
458 (Int32.of_int (Char.code buf.[4])))
459 in
460 let payload = String.sub buf 5 (String.length buf - 5) in
461 let actual = crc32 payload in
462 if expected = actual then Ok payload else Error Types.Invalid_crc
463
464let add_label (label : string) (buf : string) : string =
465 if label = "" then buf
466 else
467 let header = Bytes.create (2 + String.length label) in
468 Bytes.set header 0 (Char.chr (message_type_to_int Has_label_msg));
469 Bytes.set header 1 (Char.chr (String.length label));
470 Bytes.blit_string label 0 header 2 (String.length label);
471 Bytes.to_string header ^ buf
472
473let strip_label (buf : string) : (string * string, Types.decode_error) result =
474 if String.length buf < 1 then Error Types.Truncated_message
475 else if Char.code buf.[0] <> message_type_to_int Has_label_msg then
476 Ok (buf, "")
477 else if String.length buf < 2 then Error Types.Truncated_message
478 else
479 let label_len = Char.code buf.[1] in
480 if String.length buf < 2 + label_len then Error Types.Truncated_message
481 else
482 let label = String.sub buf 2 label_len in
483 let payload =
484 String.sub buf (2 + label_len) (String.length buf - 2 - label_len)
485 in
486 Ok (payload, label)
487
488let encode_internal_msg ~self_name ~self_port (msg : Types.protocol_msg) :
489 string =
490 let wire_msg = Types.msg_to_wire ~self_name ~self_port msg in
491 encode_msg wire_msg
492
493let decode_internal_msg ~default_port (buf : string) :
494 (Types.protocol_msg, Types.decode_error) result =
495 match decode_msg buf with
496 | Error e -> Error e
497 | Ok wire_msg -> (
498 match Types.msg_of_wire ~default_port wire_msg with
499 | Some msg -> Ok msg
500 | None -> Error (Types.Invalid_tag 0))
501
502let encode_packet (packet : Types.packet) ~(buf : Cstruct.t) :
503 (int, [ `Buffer_too_small ]) result =
504 let self_name = packet.cluster in
505 let self_port = 7946 in
506 let primary_encoded =
507 encode_internal_msg ~self_name ~self_port packet.primary
508 in
509 match packet.piggyback with
510 | [] ->
511 let total_len = String.length primary_encoded in
512 if total_len > Cstruct.length buf then Error `Buffer_too_small
513 else begin
514 Cstruct.blit_from_string primary_encoded 0 buf 0 total_len;
515 Ok total_len
516 end
517 | piggyback ->
518 let piggyback_encoded =
519 List.map (encode_internal_msg ~self_name ~self_port) piggyback
520 in
521 let compound = make_compound_msg (primary_encoded :: piggyback_encoded) in
522 let total_len = String.length compound in
523 if total_len > Cstruct.length buf then Error `Buffer_too_small
524 else begin
525 Cstruct.blit_from_string compound 0 buf 0 total_len;
526 Ok total_len
527 end
528
529let decode_packet (buf : Cstruct.t) : (Types.packet, Types.decode_error) result
530 =
531 let str = Cstruct.to_string buf in
532 if String.length str < 1 then Error Types.Truncated_message
533 else
534 let msg_type = Char.code str.[0] in
535 if msg_type = message_type_to_int Compound_msg then
536 let payload = String.sub str 1 (String.length str - 1) in
537 match decode_compound_msg payload with
538 | Error e -> Error e
539 | Ok (parts, _truncated) -> (
540 match parts with
541 | [] -> Error Types.Truncated_message
542 | first :: rest -> (
543 match decode_internal_msg ~default_port:7946 first with
544 | Error e -> Error e
545 | Ok primary ->
546 let piggyback =
547 List.filter_map
548 (fun p ->
549 match decode_internal_msg ~default_port:7946 p with
550 | Ok m -> Some m
551 | Error _ -> None)
552 rest
553 in
554 Ok { Types.cluster = ""; primary; piggyback }))
555 else
556 match decode_internal_msg ~default_port:7946 str with
557 | Error e -> Error e
558 | Ok primary -> Ok { Types.cluster = ""; primary; piggyback = [] }
559
560let encoded_size (msg : Types.protocol_msg) : int =
561 let self_name = "" in
562 let self_port = 7946 in
563 let encoded = encode_internal_msg ~self_name ~self_port msg in
564 String.length encoded + 3