this repo has no description
1open Types.Wire 2 3let encode_ping (p : ping) : Msgpck.t = 4 Msgpck.Map 5 [ 6 (Msgpck.String "SeqNo", Msgpck.of_int p.seq_no); 7 (Msgpck.String "Node", Msgpck.String p.node); 8 (Msgpck.String "SourceAddr", Msgpck.String p.source_addr); 9 (Msgpck.String "SourcePort", Msgpck.of_int p.source_port); 10 (Msgpck.String "SourceNode", Msgpck.String p.source_node); 11 ] 12 13let decode_ping (m : Msgpck.t) : (ping, string) result = 14 match m with 15 | Msgpck.Map fields -> 16 let get_int key = 17 match List.assoc_opt (Msgpck.String key) fields with 18 | Some (Msgpck.Int i) -> Ok i 19 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i) 20 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i) 21 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 22 in 23 let get_string key = 24 match List.assoc_opt (Msgpck.String key) fields with 25 | Some (Msgpck.String s) -> Ok s 26 | Some (Msgpck.Bytes s) -> Ok s 27 | Some Msgpck.Nil -> Ok "" 28 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 29 in 30 let ( let* ) = Result.bind in 31 let* seq_no = get_int "SeqNo" in 32 let* node = get_string "Node" in 33 let* source_addr = get_string "SourceAddr" in 34 let* source_port = 35 match get_int "SourcePort" with Ok p -> Ok p | Error _ -> Ok 0 36 in 37 let* source_node = 38 match get_string "SourceNode" with Ok s -> Ok s | Error _ -> Ok "" 39 in 40 Ok { seq_no; node; source_addr; source_port; source_node } 41 | _ -> Error "expected map for ping" 42 43let encode_indirect_ping (p : indirect_ping_req) : Msgpck.t = 44 Msgpck.Map 45 [ 46 (Msgpck.String "SeqNo", Msgpck.of_int p.seq_no); 47 (Msgpck.String "Target", Msgpck.String p.target); 48 (Msgpck.String "Port", Msgpck.of_int p.port); 49 (Msgpck.String "Node", Msgpck.String p.node); 50 (Msgpck.String "Nack", Msgpck.Bool p.nack); 51 (Msgpck.String "SourceAddr", Msgpck.String p.source_addr); 52 (Msgpck.String "SourcePort", Msgpck.of_int p.source_port); 53 (Msgpck.String "SourceNode", Msgpck.String p.source_node); 54 ] 55 56let decode_indirect_ping (m : Msgpck.t) : (indirect_ping_req, string) result = 57 match m with 58 | Msgpck.Map fields -> 59 let get_int key = 60 match List.assoc_opt (Msgpck.String key) fields with 61 | Some (Msgpck.Int i) -> Ok i 62 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i) 63 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i) 64 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 65 in 66 let get_string key = 67 match List.assoc_opt (Msgpck.String key) fields with 68 | Some (Msgpck.String s) -> Ok s 69 | Some (Msgpck.Bytes s) -> Ok s 70 | Some Msgpck.Nil -> Ok "" 71 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 72 in 73 let get_bool key = 74 match List.assoc_opt (Msgpck.String key) fields with 75 | Some (Msgpck.Bool b) -> Ok b 76 | _ -> Ok false 77 in 78 let ( let* ) = Result.bind in 79 let* seq_no = get_int "SeqNo" in 80 let* target = get_string "Target" in 81 let* port = match get_int "Port" with Ok p -> Ok p | Error _ -> Ok 0 in 82 let* node = get_string "Node" in 83 let* nack = get_bool "Nack" in 84 let* source_addr = 85 match get_string "SourceAddr" with Ok s -> Ok s | Error _ -> Ok "" 86 in 87 let* source_port = 88 match get_int "SourcePort" with Ok p -> Ok p | Error _ -> Ok 0 89 in 90 let* source_node = 91 match get_string "SourceNode" with Ok s -> Ok s | Error _ -> Ok "" 92 in 93 Ok 94 { 95 seq_no; 96 target; 97 port; 98 node; 99 nack; 100 source_addr; 101 source_port; 102 source_node; 103 } 104 | _ -> Error "expected map for indirect_ping" 105 106let encode_ack (a : ack_resp) : Msgpck.t = 107 Msgpck.Map 108 [ 109 (Msgpck.String "SeqNo", Msgpck.of_int a.seq_no); 110 (Msgpck.String "Payload", Msgpck.String a.payload); 111 ] 112 113let decode_ack (m : Msgpck.t) : (ack_resp, string) result = 114 match m with 115 | Msgpck.Map fields -> 116 let get_int key = 117 match List.assoc_opt (Msgpck.String key) fields with 118 | Some (Msgpck.Int i) -> Ok i 119 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i) 120 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i) 121 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 122 in 123 let get_bytes key = 124 match List.assoc_opt (Msgpck.String key) fields with 125 | Some (Msgpck.Bytes s) -> Ok s 126 | Some (Msgpck.String s) -> Ok s 127 | Some Msgpck.Nil -> Ok "" 128 | _ -> Ok "" 129 in 130 let ( let* ) = Result.bind in 131 let* seq_no = get_int "SeqNo" in 132 let* payload = get_bytes "Payload" in 133 Ok { seq_no; payload } 134 | _ -> Error "expected map for ack" 135 136let encode_nack (n : nack_resp) : Msgpck.t = 137 Msgpck.Map [ (Msgpck.String "SeqNo", Msgpck.of_int n.seq_no) ] 138 139let decode_nack (m : Msgpck.t) : (nack_resp, string) result = 140 match m with 141 | Msgpck.Map fields -> 142 let get_int key = 143 match List.assoc_opt (Msgpck.String key) fields with 144 | Some (Msgpck.Int i) -> Ok i 145 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i) 146 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i) 147 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 148 in 149 let ( let* ) = Result.bind in 150 let* seq_no = get_int "SeqNo" in 151 Ok { seq_no } 152 | _ -> Error "expected map for nack" 153 154let encode_suspect (s : suspect) : Msgpck.t = 155 Msgpck.Map 156 [ 157 (Msgpck.String "Incarnation", Msgpck.of_int s.incarnation); 158 (Msgpck.String "Node", Msgpck.String s.node); 159 (Msgpck.String "From", Msgpck.String s.from); 160 ] 161 162let decode_suspect (m : Msgpck.t) : (suspect, string) result = 163 match m with 164 | Msgpck.Map fields -> 165 let get_int key = 166 match List.assoc_opt (Msgpck.String key) fields with 167 | Some (Msgpck.Int i) -> Ok i 168 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i) 169 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i) 170 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 171 in 172 let get_string key = 173 match List.assoc_opt (Msgpck.String key) fields with 174 | Some (Msgpck.String s) -> Ok s 175 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 176 in 177 let ( let* ) = Result.bind in 178 let* incarnation = get_int "Incarnation" in 179 let* node = get_string "Node" in 180 let* from = get_string "From" in 181 Ok ({ incarnation; node; from } : suspect) 182 | _ -> Error "expected map for suspect" 183 184let encode_alive (a : alive) : Msgpck.t = 185 Msgpck.Map 186 [ 187 (Msgpck.String "Incarnation", Msgpck.of_int a.incarnation); 188 (Msgpck.String "Node", Msgpck.String a.node); 189 (Msgpck.String "Addr", Msgpck.String a.addr); 190 (Msgpck.String "Port", Msgpck.of_int a.port); 191 (Msgpck.String "Meta", Msgpck.String a.meta); 192 (Msgpck.String "Vsn", Msgpck.List (List.map Msgpck.of_int a.vsn)); 193 ] 194 195let decode_alive (m : Msgpck.t) : (alive, string) result = 196 match m with 197 | Msgpck.Map fields -> 198 let get_int key = 199 match List.assoc_opt (Msgpck.String key) fields with 200 | Some (Msgpck.Int i) -> Ok i 201 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i) 202 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i) 203 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 204 in 205 let get_string key = 206 match List.assoc_opt (Msgpck.String key) fields with 207 | Some (Msgpck.String s) -> Ok s 208 | Some (Msgpck.Bytes s) -> Ok s 209 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 210 in 211 let get_vsn () = 212 match List.assoc_opt (Msgpck.String "Vsn") fields with 213 | Some (Msgpck.List vs) -> 214 Ok 215 (List.filter_map 216 (function 217 | Msgpck.Int i -> Some i 218 | Msgpck.Int32 i -> Some (Int32.to_int i) 219 | _ -> None) 220 vs) 221 | _ -> Ok [] 222 in 223 let ( let* ) = Result.bind in 224 let* incarnation = get_int "Incarnation" in 225 let* node = get_string "Node" in 226 let* addr = get_string "Addr" in 227 let* port = get_int "Port" in 228 let* meta = 229 match get_string "Meta" with Ok m -> Ok m | Error _ -> Ok "" 230 in 231 let* vsn = get_vsn () in 232 Ok { incarnation; node; addr; port; meta; vsn } 233 | _ -> Error "expected map for alive" 234 235let encode_dead (d : dead) : Msgpck.t = 236 Msgpck.Map 237 [ 238 (Msgpck.String "Incarnation", Msgpck.of_int d.incarnation); 239 (Msgpck.String "Node", Msgpck.String d.node); 240 (Msgpck.String "From", Msgpck.String d.from); 241 ] 242 243let decode_dead (m : Msgpck.t) : (dead, string) result = 244 match m with 245 | Msgpck.Map fields -> 246 let get_int key = 247 match List.assoc_opt (Msgpck.String key) fields with 248 | Some (Msgpck.Int i) -> Ok i 249 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i) 250 | Some (Msgpck.Uint32 i) -> Ok (Int32.to_int i) 251 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 252 in 253 let get_string key = 254 match List.assoc_opt (Msgpck.String key) fields with 255 | Some (Msgpck.String s) -> Ok s 256 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 257 in 258 let ( let* ) = Result.bind in 259 let* incarnation = get_int "Incarnation" in 260 let* node = get_string "Node" in 261 let* from = get_string "From" in 262 Ok ({ incarnation; node; from } : dead) 263 | _ -> Error "expected map for dead" 264 265let encode_compress (c : compress) : Msgpck.t = 266 Msgpck.Map 267 [ 268 (Msgpck.String "Algo", Msgpck.of_int c.algo); 269 (Msgpck.String "Buf", Msgpck.String c.buf); 270 ] 271 272let decode_compress (m : Msgpck.t) : (compress, string) result = 273 match m with 274 | Msgpck.Map fields -> 275 let get_int key = 276 match List.assoc_opt (Msgpck.String key) fields with 277 | Some (Msgpck.Int i) -> Ok i 278 | Some (Msgpck.Int32 i) -> Ok (Int32.to_int i) 279 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 280 in 281 let get_bytes key = 282 match List.assoc_opt (Msgpck.String key) fields with 283 | Some (Msgpck.Bytes s) -> Ok s 284 | Some (Msgpck.String s) -> Ok s 285 | _ -> Error (Printf.sprintf "missing or invalid %s" key) 286 in 287 let ( let* ) = Result.bind in 288 let* algo = get_int "Algo" in 289 let* buf = get_bytes "Buf" in 290 Ok { algo; buf } 291 | _ -> Error "expected map for compress" 292 293let encode_msg (msg : protocol_msg) : string = 294 let msg_type, payload = 295 match msg with 296 | Ping p -> (Ping_msg, encode_ping p) 297 | Indirect_ping p -> (Indirect_ping_msg, encode_indirect_ping p) 298 | Ack a -> (Ack_resp_msg, encode_ack a) 299 | Nack n -> (Nack_resp_msg, encode_nack n) 300 | Suspect s -> (Suspect_msg, encode_suspect s) 301 | Alive a -> (Alive_msg, encode_alive a) 302 | Dead d -> (Dead_msg, encode_dead d) 303 | User_data _ -> (User_msg, Msgpck.Nil) 304 | Compound _ -> (Compound_msg, Msgpck.Nil) 305 | Compressed c -> (Compress_msg, encode_compress c) 306 | Err e -> (Err_msg, Msgpck.Map [ (Msgpck.String "Error", Msgpck.String e) ]) 307 in 308 let buf = Buffer.create 256 in 309 Buffer.add_char buf (Char.chr (message_type_to_int msg_type)); 310 (match msg with 311 | User_data data -> Buffer.add_string buf data 312 | _ -> ignore (Msgpck.StringBuf.write buf payload)); 313 Buffer.contents buf 314 315let decode_msg (buf : string) : (protocol_msg, Types.decode_error) result = 316 if String.length buf < 1 then Error Types.Truncated_message 317 else 318 let msg_type_byte = Char.code buf.[0] in 319 match message_type_of_int msg_type_byte with 320 | Error n -> Error (Types.Invalid_tag n) 321 | Ok msg_type -> ( 322 let payload = String.sub buf 1 (String.length buf - 1) in 323 match msg_type with 324 | User_msg -> Ok (User_data payload) 325 | Compound_msg -> Ok (Compound []) 326 | _ -> ( 327 let _, msgpack = Msgpck.String.read payload in 328 match msg_type with 329 | Ping_msg -> ( 330 match decode_ping msgpack with 331 | Ok p -> Ok (Ping p) 332 | Error e -> Error (Types.Msgpack_error e)) 333 | Indirect_ping_msg -> ( 334 match decode_indirect_ping msgpack with 335 | Ok p -> Ok (Indirect_ping p) 336 | Error e -> Error (Types.Msgpack_error e)) 337 | Ack_resp_msg -> ( 338 match decode_ack msgpack with 339 | Ok a -> Ok (Ack a) 340 | Error e -> Error (Types.Msgpack_error e)) 341 | Nack_resp_msg -> ( 342 match decode_nack msgpack with 343 | Ok n -> Ok (Nack n) 344 | Error e -> Error (Types.Msgpack_error e)) 345 | Suspect_msg -> ( 346 match decode_suspect msgpack with 347 | Ok s -> Ok (Suspect s) 348 | Error e -> Error (Types.Msgpack_error e)) 349 | Alive_msg -> ( 350 match decode_alive msgpack with 351 | Ok a -> Ok (Alive a) 352 | Error e -> Error (Types.Msgpack_error e)) 353 | Dead_msg -> ( 354 match decode_dead msgpack with 355 | Ok d -> Ok (Dead d) 356 | Error e -> Error (Types.Msgpack_error e)) 357 | Compress_msg -> ( 358 match decode_compress msgpack with 359 | Ok c -> Ok (Compressed c) 360 | Error e -> Error (Types.Msgpack_error e)) 361 | Err_msg -> ( 362 match msgpack with 363 | Msgpck.Map fields -> ( 364 match List.assoc_opt (Msgpck.String "Error") fields with 365 | Some (Msgpck.String e) -> Ok (Err e) 366 | _ -> Ok (Err "unknown error")) 367 | _ -> Ok (Err "unknown error")) 368 | _ -> Error (Types.Invalid_tag msg_type_byte))) 369 370let make_compound_msg (msgs : string list) : string = 371 if List.length msgs > 255 then failwith "too many messages for compound" 372 else 373 let buf = Buffer.create 1024 in 374 Buffer.add_char buf (Char.chr (message_type_to_int Compound_msg)); 375 Buffer.add_char buf (Char.chr (List.length msgs)); 376 List.iter 377 (fun m -> 378 let len = String.length m in 379 Buffer.add_char buf (Char.chr ((len lsr 8) land 0xff)); 380 Buffer.add_char buf (Char.chr (len land 0xff))) 381 msgs; 382 List.iter (Buffer.add_string buf) msgs; 383 Buffer.contents buf 384 385let decode_compound_msg (buf : string) : 386 (string list * int, Types.decode_error) result = 387 if String.length buf < 1 then Error Types.Truncated_message 388 else 389 let num_parts = Char.code buf.[0] in 390 let header_size = 1 + (num_parts * 2) in 391 if String.length buf < header_size then Error Types.Truncated_message 392 else 393 let lengths = 394 List.init num_parts (fun i -> 395 let hi = Char.code buf.[1 + (i * 2)] in 396 let lo = Char.code buf.[2 + (i * 2)] in 397 (hi lsl 8) lor lo) 398 in 399 let rec extract_parts offset remaining_lens acc trunc = 400 match remaining_lens with 401 | [] -> Ok (List.rev acc, trunc) 402 | len :: rest -> 403 if offset + len > String.length buf then 404 Ok (List.rev acc, List.length remaining_lens) 405 else 406 let part = String.sub buf offset len in 407 extract_parts (offset + len) rest (part :: acc) trunc 408 in 409 extract_parts header_size lengths [] 0 410 411let crc32_table = 412 Array.init 256 (fun i -> 413 let crc = ref (Int32.of_int i) in 414 for _ = 0 to 7 do 415 if Int32.logand !crc 1l = 1l then 416 crc := Int32.logxor (Int32.shift_right_logical !crc 1) 0xEDB88320l 417 else crc := Int32.shift_right_logical !crc 1 418 done; 419 !crc) 420 421let crc32 (data : string) : int32 = 422 let crc = ref 0xFFFFFFFFl in 423 String.iter 424 (fun c -> 425 let byte = Char.code c in 426 let idx = 427 Int32.to_int 428 (Int32.logand (Int32.logxor !crc (Int32.of_int byte)) 0xFFl) 429 in 430 crc := Int32.logxor (Int32.shift_right_logical !crc 8) crc32_table.(idx)) 431 data; 432 Int32.logxor !crc 0xFFFFFFFFl 433 434let add_crc (buf : string) : string = 435 let crc = crc32 buf in 436 let header = Bytes.create 5 in 437 Bytes.set header 0 (Char.chr (message_type_to_int Has_crc_msg)); 438 Bytes.set header 1 439 (Char.chr (Int32.to_int (Int32.shift_right_logical crc 24) land 0xff)); 440 Bytes.set header 2 441 (Char.chr (Int32.to_int (Int32.shift_right_logical crc 16) land 0xff)); 442 Bytes.set header 3 443 (Char.chr (Int32.to_int (Int32.shift_right_logical crc 8) land 0xff)); 444 Bytes.set header 4 (Char.chr (Int32.to_int crc land 0xff)); 445 Bytes.to_string header ^ buf 446 447let verify_and_strip_crc (buf : string) : (string, Types.decode_error) result = 448 if String.length buf < 5 then Error Types.Truncated_message 449 else if Char.code buf.[0] <> message_type_to_int Has_crc_msg then Ok buf 450 else 451 let expected = 452 Int32.logor 453 (Int32.logor 454 (Int32.shift_left (Int32.of_int (Char.code buf.[1])) 24) 455 (Int32.shift_left (Int32.of_int (Char.code buf.[2])) 16)) 456 (Int32.logor 457 (Int32.shift_left (Int32.of_int (Char.code buf.[3])) 8) 458 (Int32.of_int (Char.code buf.[4]))) 459 in 460 let payload = String.sub buf 5 (String.length buf - 5) in 461 let actual = crc32 payload in 462 if expected = actual then Ok payload else Error Types.Invalid_crc 463 464let add_label (label : string) (buf : string) : string = 465 if label = "" then buf 466 else 467 let header = Bytes.create (2 + String.length label) in 468 Bytes.set header 0 (Char.chr (message_type_to_int Has_label_msg)); 469 Bytes.set header 1 (Char.chr (String.length label)); 470 Bytes.blit_string label 0 header 2 (String.length label); 471 Bytes.to_string header ^ buf 472 473let strip_label (buf : string) : (string * string, Types.decode_error) result = 474 if String.length buf < 1 then Error Types.Truncated_message 475 else if Char.code buf.[0] <> message_type_to_int Has_label_msg then 476 Ok (buf, "") 477 else if String.length buf < 2 then Error Types.Truncated_message 478 else 479 let label_len = Char.code buf.[1] in 480 if String.length buf < 2 + label_len then Error Types.Truncated_message 481 else 482 let label = String.sub buf 2 label_len in 483 let payload = 484 String.sub buf (2 + label_len) (String.length buf - 2 - label_len) 485 in 486 Ok (payload, label) 487 488let encode_internal_msg ~self_name ~self_port (msg : Types.protocol_msg) : 489 string = 490 let wire_msg = Types.msg_to_wire ~self_name ~self_port msg in 491 encode_msg wire_msg 492 493let decode_internal_msg ~default_port (buf : string) : 494 (Types.protocol_msg, Types.decode_error) result = 495 match decode_msg buf with 496 | Error e -> Error e 497 | Ok wire_msg -> ( 498 match Types.msg_of_wire ~default_port wire_msg with 499 | Some msg -> Ok msg 500 | None -> Error (Types.Invalid_tag 0)) 501 502let encode_packet (packet : Types.packet) ~(buf : Cstruct.t) : 503 (int, [ `Buffer_too_small ]) result = 504 let self_name = packet.cluster in 505 let self_port = 7946 in 506 let primary_encoded = 507 encode_internal_msg ~self_name ~self_port packet.primary 508 in 509 match packet.piggyback with 510 | [] -> 511 let total_len = String.length primary_encoded in 512 if total_len > Cstruct.length buf then Error `Buffer_too_small 513 else begin 514 Cstruct.blit_from_string primary_encoded 0 buf 0 total_len; 515 Ok total_len 516 end 517 | piggyback -> 518 let piggyback_encoded = 519 List.map (encode_internal_msg ~self_name ~self_port) piggyback 520 in 521 let compound = make_compound_msg (primary_encoded :: piggyback_encoded) in 522 let total_len = String.length compound in 523 if total_len > Cstruct.length buf then Error `Buffer_too_small 524 else begin 525 Cstruct.blit_from_string compound 0 buf 0 total_len; 526 Ok total_len 527 end 528 529let decode_packet (buf : Cstruct.t) : (Types.packet, Types.decode_error) result 530 = 531 let str = Cstruct.to_string buf in 532 if String.length str < 1 then Error Types.Truncated_message 533 else 534 let msg_type = Char.code str.[0] in 535 if msg_type = message_type_to_int Compound_msg then 536 let payload = String.sub str 1 (String.length str - 1) in 537 match decode_compound_msg payload with 538 | Error e -> Error e 539 | Ok (parts, _truncated) -> ( 540 match parts with 541 | [] -> Error Types.Truncated_message 542 | first :: rest -> ( 543 match decode_internal_msg ~default_port:7946 first with 544 | Error e -> Error e 545 | Ok primary -> 546 let piggyback = 547 List.filter_map 548 (fun p -> 549 match decode_internal_msg ~default_port:7946 p with 550 | Ok m -> Some m 551 | Error _ -> None) 552 rest 553 in 554 Ok { Types.cluster = ""; primary; piggyback })) 555 else 556 match decode_internal_msg ~default_port:7946 str with 557 | Error e -> Error e 558 | Ok primary -> Ok { Types.cluster = ""; primary; piggyback = [] } 559 560let encoded_size (msg : Types.protocol_msg) : int = 561 let self_name = "" in 562 let self_port = 7946 in 563 let encoded = encode_internal_msg ~self_name ~self_port msg in 564 String.length encoded + 3