this repo has no description
1type node_id = Node_id of string [@@unboxed] 2 3let node_id_to_string (Node_id s) = s 4let node_id_of_string s = Node_id s 5let equal_node_id (Node_id a) (Node_id b) = String.equal a b 6let compare_node_id (Node_id a) (Node_id b) = String.compare a b 7 8type incarnation = Incarnation of int [@@unboxed] 9 10let incarnation_to_int (Incarnation i) = i 11let incarnation_of_int i = Incarnation i 12let zero_incarnation = Incarnation 0 13let compare_incarnation (Incarnation a) (Incarnation b) = Int.compare a b 14let incr_incarnation (Incarnation i) = Incarnation (i + 1) 15 16type addr = Eio.Net.Sockaddr.datagram 17type node_info = { id : node_id; addr : addr; meta : string } 18 19let make_node_info ~id ~addr ~meta = { id; addr; meta } 20 21type member_state = Alive | Suspect | Dead | Left 22 23let member_state_to_string = function 24 | Alive -> "alive" 25 | Suspect -> "suspect" 26 | Dead -> "dead" 27 | Left -> "left" 28 29let member_state_to_int = function 30 | Alive -> 0 31 | Suspect -> 1 32 | Dead -> 2 33 | Left -> 3 34 35let member_state_of_int = function 36 | 0 -> Alive 37 | 1 -> Suspect 38 | 2 -> Dead 39 | _ -> Left 40 41type member_snapshot = { 42 node : node_info; 43 state : member_state; 44 incarnation : incarnation; 45 state_change : Mtime.span; 46} 47 48type protocol_msg = 49 | Ping of { seq : int; target : node_id; sender : node_info } 50 | Ping_req of { seq : int; target : node_id; sender : node_info } 51 | Ack of { seq : int; responder : node_info; payload : string option } 52 | Alive of { node : node_info; incarnation : incarnation } 53 | Suspect of { 54 node : node_id; 55 incarnation : incarnation; 56 suspector : node_id; 57 } 58 | Dead of { node : node_id; incarnation : incarnation; declarator : node_id } 59 | User_msg of { topic : string; payload : string; origin : node_id } 60 61type packet = { 62 cluster : string; 63 primary : protocol_msg; 64 piggyback : protocol_msg list; 65} 66 67type decode_error = 68 | Invalid_magic 69 | Unsupported_version of int 70 | Truncated_message 71 | Invalid_tag of int 72 | Decryption_failed 73 | Msgpack_error of string 74 | Invalid_crc 75 76let decode_error_to_string = function 77 | Invalid_magic -> "invalid magic bytes" 78 | Unsupported_version v -> Printf.sprintf "unsupported version: %d" v 79 | Truncated_message -> "truncated message" 80 | Invalid_tag t -> Printf.sprintf "invalid tag: %d" t 81 | Decryption_failed -> "decryption failed" 82 | Msgpack_error s -> Printf.sprintf "msgpack error: %s" s 83 | Invalid_crc -> "invalid CRC checksum" 84 85type send_error = Node_unreachable | Timeout | Connection_reset 86 87let send_error_to_string = function 88 | Node_unreachable -> "node unreachable" 89 | Timeout -> "timeout" 90 | Connection_reset -> "connection reset" 91 92type node_event = 93 | Join of node_info 94 | Leave of node_info 95 | Update of node_info 96 | Suspect_event of node_info 97 | Alive_event of node_info 98 99type config = { 100 bind_addr : string; 101 bind_port : int; 102 node_name : string option; 103 protocol_interval : float; 104 probe_timeout : float; 105 indirect_checks : int; 106 suspicion_mult : int; 107 suspicion_max_timeout : float; 108 retransmit_mult : int; 109 udp_buffer_size : int; 110 tcp_timeout : float; 111 send_buffer_count : int; 112 recv_buffer_count : int; 113 secret_key : string; 114 cluster_name : string; 115 label : string; 116 encryption_enabled : bool; 117 gossip_verify_incoming : bool; 118 gossip_verify_outgoing : bool; 119} 120 121let default_config = 122 { 123 bind_addr = "0.0.0.0"; 124 bind_port = 7946; 125 node_name = None; 126 protocol_interval = 1.0; 127 probe_timeout = 0.5; 128 indirect_checks = 3; 129 suspicion_mult = 4; 130 suspicion_max_timeout = 60.0; 131 retransmit_mult = 4; 132 udp_buffer_size = 1400; 133 tcp_timeout = 10.0; 134 send_buffer_count = 16; 135 recv_buffer_count = 16; 136 secret_key = String.make 16 '\x00'; 137 cluster_name = "default"; 138 label = ""; 139 encryption_enabled = false; 140 gossip_verify_incoming = true; 141 gossip_verify_outgoing = true; 142 } 143 144type 'a env = { 145 stdenv : 'a; 146 sw : Eio.Switch.t; 147} 148 constraint 149 'a = 150 < clock : _ Eio.Time.clock 151 ; mono_clock : _ Eio.Time.Mono.t 152 ; net : _ Eio.Net.t 153 ; secure_random : _ Eio.Flow.source 154 ; .. > 155 156type stats = { 157 nodes_alive : int; 158 nodes_suspect : int; 159 nodes_dead : int; 160 msgs_sent : int; 161 msgs_received : int; 162 msgs_dropped : int; 163 queue_depth : int; 164 buffers_available : int; 165 buffers_total : int; 166} 167 168let empty_stats = 169 { 170 nodes_alive = 0; 171 nodes_suspect = 0; 172 nodes_dead = 0; 173 msgs_sent = 0; 174 msgs_received = 0; 175 msgs_dropped = 0; 176 queue_depth = 0; 177 buffers_available = 0; 178 buffers_total = 0; 179 } 180 181module Wire = struct 182 type message_type = 183 | Ping_msg 184 | Indirect_ping_msg 185 | Ack_resp_msg 186 | Suspect_msg 187 | Alive_msg 188 | Dead_msg 189 | Push_pull_msg 190 | Compound_msg 191 | User_msg 192 | Compress_msg 193 | Encrypt_msg 194 | Nack_resp_msg 195 | Has_crc_msg 196 | Err_msg 197 | Has_label_msg 198 199 let message_type_to_int = function 200 | Ping_msg -> 0 201 | Indirect_ping_msg -> 1 202 | Ack_resp_msg -> 2 203 | Suspect_msg -> 3 204 | Alive_msg -> 4 205 | Dead_msg -> 5 206 | Push_pull_msg -> 6 207 | Compound_msg -> 7 208 | User_msg -> 8 209 | Compress_msg -> 9 210 | Encrypt_msg -> 10 211 | Nack_resp_msg -> 11 212 | Has_crc_msg -> 12 213 | Err_msg -> 13 214 | Has_label_msg -> 244 215 216 let message_type_of_int = function 217 | 0 -> Ok Ping_msg 218 | 1 -> Ok Indirect_ping_msg 219 | 2 -> Ok Ack_resp_msg 220 | 3 -> Ok Suspect_msg 221 | 4 -> Ok Alive_msg 222 | 5 -> Ok Dead_msg 223 | 6 -> Ok Push_pull_msg 224 | 7 -> Ok Compound_msg 225 | 8 -> Ok User_msg 226 | 9 -> Ok Compress_msg 227 | 10 -> Ok Encrypt_msg 228 | 11 -> Ok Nack_resp_msg 229 | 12 -> Ok Has_crc_msg 230 | 13 -> Ok Err_msg 231 | 244 -> Ok Has_label_msg 232 | n -> Error n 233 234 type ping = { 235 seq_no : int; 236 node : string; 237 source_addr : string; 238 source_port : int; 239 source_node : string; 240 } 241 242 type indirect_ping_req = { 243 seq_no : int; 244 target : string; 245 port : int; 246 node : string; 247 nack : bool; 248 source_addr : string; 249 source_port : int; 250 source_node : string; 251 } 252 253 type ack_resp = { seq_no : int; payload : string } 254 type nack_resp = { seq_no : int } 255 type suspect = { incarnation : int; node : string; from : string } 256 257 type alive = { 258 incarnation : int; 259 node : string; 260 addr : string; 261 port : int; 262 meta : string; 263 vsn : int list; 264 } 265 266 type dead = { incarnation : int; node : string; from : string } 267 type compress = { algo : int; buf : string } 268 269 type push_pull_header = { 270 pp_nodes : int; 271 pp_user_state_len : int; 272 pp_join : bool; 273 } 274 275 type push_node_state = { 276 pns_name : string; 277 pns_addr : string; 278 pns_port : int; 279 pns_meta : string; 280 pns_incarnation : int; 281 pns_state : int; 282 pns_vsn : int list; 283 } 284 285 type protocol_msg = 286 | Ping of ping 287 | Indirect_ping of indirect_ping_req 288 | Ack of ack_resp 289 | Nack of nack_resp 290 | Suspect of suspect 291 | Alive of alive 292 | Dead of dead 293 | User_data of string 294 | Compound of string list 295 | Compressed of compress 296 | Err of string 297end 298 299let ip_to_bytes ip = 300 let s = Fmt.to_to_string Eio.Net.Ipaddr.pp ip in 301 if String.contains s ':' then ( 302 let parts = String.split_on_char ':' s in 303 let buf = Bytes.create 16 in 304 let rec fill idx = function 305 | [] -> () 306 | "" :: rest when List.exists (( = ) "") rest -> 307 let tail_len = List.length (List.filter (( <> ) "") rest) in 308 let zeros = 8 - idx - tail_len in 309 for i = 0 to (zeros * 2) - 1 do 310 Bytes.set_uint8 buf ((idx * 2) + i) 0 311 done; 312 fill (idx + zeros) rest 313 | "" :: rest -> fill idx rest 314 | h :: rest -> 315 let v = int_of_string ("0x" ^ h) in 316 Bytes.set_uint8 buf (idx * 2) (v lsr 8); 317 Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff); 318 fill (idx + 1) rest 319 in 320 fill 0 parts; 321 Bytes.to_string buf) 322 else 323 Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d -> 324 let buf = Bytes.create 4 in 325 Bytes.set_uint8 buf 0 a; 326 Bytes.set_uint8 buf 1 b; 327 Bytes.set_uint8 buf 2 c; 328 Bytes.set_uint8 buf 3 d; 329 Bytes.to_string buf) 330 331let ip_of_bytes s = 332 let len = String.length s in 333 if len = 4 then Eio.Net.Ipaddr.of_raw s 334 else if len = 16 then Eio.Net.Ipaddr.of_raw s 335 else failwith "invalid IP address length" 336 337let default_vsn = [ 1; 5; 5; 0; 0; 0 ] 338 339let node_info_to_wire (info : node_info) ~source_node : 340 string * int * string * string = 341 match info.addr with 342 | `Udp (ip, port) -> (ip_to_bytes ip, port, info.meta, source_node) 343 | `Unix _ -> failwith "Unix sockets not supported" 344 345let node_info_of_wire ~name ~addr ~port ~meta : node_info = 346 let ip = ip_of_bytes addr in 347 { id = node_id_of_string name; addr = `Udp (ip, port); meta } 348 349let msg_to_wire ~self_name ~self_port (msg : protocol_msg) : Wire.protocol_msg = 350 match msg with 351 | Ping { seq; target; sender } -> 352 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in 353 Wire.Ping 354 { 355 seq_no = seq; 356 node = node_id_to_string target; 357 source_addr = addr; 358 source_port = port; 359 source_node = self_name; 360 } 361 | Ping_req { seq; target; sender } -> 362 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in 363 let target_addr = 364 match sender.addr with `Udp (ip, _) -> ip_to_bytes ip | `Unix _ -> "" 365 in 366 Wire.Indirect_ping 367 { 368 seq_no = seq; 369 target = target_addr; 370 port = self_port; 371 node = node_id_to_string target; 372 nack = true; 373 source_addr = addr; 374 source_port = port; 375 source_node = self_name; 376 } 377 | Ack { seq; responder = _; payload } -> 378 Wire.Ack { seq_no = seq; payload = Option.value payload ~default:"" } 379 | Alive { node; incarnation } -> 380 let addr, port, meta, _ = node_info_to_wire node ~source_node:"" in 381 Wire.Alive 382 { 383 incarnation = incarnation_to_int incarnation; 384 node = node_id_to_string node.id; 385 addr; 386 port; 387 meta; 388 vsn = default_vsn; 389 } 390 | Suspect { node; incarnation; suspector } -> 391 Wire.Suspect 392 { 393 incarnation = incarnation_to_int incarnation; 394 node = node_id_to_string node; 395 from = node_id_to_string suspector; 396 } 397 | Dead { node; incarnation; declarator } -> 398 Wire.Dead 399 { 400 incarnation = incarnation_to_int incarnation; 401 node = node_id_to_string node; 402 from = node_id_to_string declarator; 403 } 404 | User_msg { topic; payload; origin } -> 405 let origin_str = node_id_to_string origin in 406 let topic_len = String.length topic in 407 let origin_len = String.length origin_str in 408 let encoded = 409 String.concat "" 410 [ 411 string_of_int topic_len; 412 ":"; 413 topic; 414 string_of_int origin_len; 415 ":"; 416 origin_str; 417 payload; 418 ] 419 in 420 Wire.User_data encoded 421 422let msg_of_wire ~default_port (wmsg : Wire.protocol_msg) : protocol_msg option = 423 match wmsg with 424 | Wire.Ping { seq_no; node; source_addr; source_port; source_node } -> 425 let port = if source_port > 0 then source_port else default_port in 426 let ip = 427 if String.length source_addr > 0 then ip_of_bytes source_addr 428 else Eio.Net.Ipaddr.of_raw "\000\000\000\000" 429 in 430 let sender = 431 { 432 id = 433 node_id_of_string (if source_node <> "" then source_node else node); 434 addr = `Udp (ip, port); 435 meta = ""; 436 } 437 in 438 Some (Ping { seq = seq_no; target = node_id_of_string node; sender }) 439 | Wire.Indirect_ping 440 { seq_no; target; port; node; source_addr; source_port; source_node; _ } 441 -> 442 let src_port = if source_port > 0 then source_port else default_port in 443 let ip = 444 if String.length source_addr > 0 then ip_of_bytes source_addr 445 else Eio.Net.Ipaddr.of_raw "\000\000\000\000" 446 in 447 let sender = 448 { 449 id = node_id_of_string (if source_node <> "" then source_node else ""); 450 addr = `Udp (ip, src_port); 451 meta = ""; 452 } 453 in 454 let _ = target in 455 let _ = port in 456 Some (Ping_req { seq = seq_no; target = node_id_of_string node; sender }) 457 | Wire.Ack { seq_no; payload } -> 458 let responder = 459 { 460 id = node_id_of_string ""; 461 addr = `Udp (Eio.Net.Ipaddr.of_raw "\000\000\000\000", 0); 462 meta = ""; 463 } 464 in 465 let payload = if payload = "" then None else Some payload in 466 Some (Ack { seq = seq_no; responder; payload }) 467 | Wire.Alive { incarnation; node; addr; port; meta; _ } -> 468 let ip = 469 if String.length addr > 0 then ip_of_bytes addr 470 else Eio.Net.Ipaddr.of_raw "\000\000\000\000" 471 in 472 let node_info = 473 { id = node_id_of_string node; addr = `Udp (ip, port); meta } 474 in 475 Some 476 (Alive 477 { node = node_info; incarnation = incarnation_of_int incarnation }) 478 | Wire.Suspect { incarnation; node; from } -> 479 Some 480 (Suspect 481 { 482 node = node_id_of_string node; 483 incarnation = incarnation_of_int incarnation; 484 suspector = node_id_of_string from; 485 }) 486 | Wire.Dead { incarnation; node; from } -> 487 Some 488 (Dead 489 { 490 node = node_id_of_string node; 491 incarnation = incarnation_of_int incarnation; 492 declarator = node_id_of_string from; 493 }) 494 | Wire.User_data encoded -> ( 495 let parse_length s start = 496 let rec find_colon i = 497 if i >= String.length s then None 498 else if s.[i] = ':' then Some i 499 else find_colon (i + 1) 500 in 501 match find_colon start with 502 | None -> None 503 | Some colon_pos -> ( 504 let len_str = String.sub s start (colon_pos - start) in 505 match int_of_string_opt len_str with 506 | None -> None 507 | Some len -> Some (len, colon_pos + 1)) 508 in 509 match parse_length encoded 0 with 510 | None -> 511 Some 512 (User_msg 513 { topic = ""; payload = encoded; origin = node_id_of_string "" }) 514 | Some (topic_len, topic_start) -> ( 515 if topic_start + topic_len > String.length encoded then 516 Some 517 (User_msg 518 { 519 topic = ""; 520 payload = encoded; 521 origin = node_id_of_string ""; 522 }) 523 else 524 let topic = String.sub encoded topic_start topic_len in 525 let origin_start = topic_start + topic_len in 526 match parse_length encoded origin_start with 527 | None -> 528 Some 529 (User_msg 530 { topic; payload = ""; origin = node_id_of_string "" }) 531 | Some (origin_len, payload_start) -> 532 if payload_start + origin_len > String.length encoded then 533 Some 534 (User_msg 535 { topic; payload = ""; origin = node_id_of_string "" }) 536 else 537 let origin = String.sub encoded payload_start origin_len in 538 let data_start = payload_start + origin_len in 539 let payload = 540 String.sub encoded data_start 541 (String.length encoded - data_start) 542 in 543 Some 544 (User_msg 545 { topic; payload; origin = node_id_of_string origin }))) 546 | Wire.Nack _ -> None 547 | Wire.Compound _ -> None 548 | Wire.Compressed _ -> None 549 | Wire.Err _ -> None