this repo has no description
1type node_id = Node_id of string [@@unboxed] 2 3let node_id_to_string (Node_id s) = s 4let node_id_of_string s = Node_id s 5let equal_node_id (Node_id a) (Node_id b) = String.equal a b 6let compare_node_id (Node_id a) (Node_id b) = String.compare a b 7 8type incarnation = Incarnation of int [@@unboxed] 9 10let incarnation_to_int (Incarnation i) = i 11let incarnation_of_int i = Incarnation i 12let zero_incarnation = Incarnation 0 13let compare_incarnation (Incarnation a) (Incarnation b) = Int.compare a b 14let incr_incarnation (Incarnation i) = Incarnation (i + 1) 15 16type addr = Eio.Net.Sockaddr.datagram 17type node_info = { id : node_id; addr : addr; meta : string } 18 19let make_node_info ~id ~addr ~meta = { id; addr; meta } 20 21type member_state = Alive | Suspect | Dead | Left 22 23let member_state_to_string = function 24 | Alive -> "alive" 25 | Suspect -> "suspect" 26 | Dead -> "dead" 27 | Left -> "left" 28 29let member_state_to_int = function 30 | Alive -> 0 31 | Suspect -> 1 32 | Dead -> 2 33 | Left -> 3 34 35let member_state_of_int = function 36 | 0 -> Alive 37 | 1 -> Suspect 38 | 2 -> Dead 39 | _ -> Left 40 41type member_snapshot = { 42 node : node_info; 43 state : member_state; 44 incarnation : incarnation; 45 state_change : Mtime.span; 46} 47 48type protocol_msg = 49 | Ping of { seq : int; target : node_id; sender : node_info } 50 | Ping_req of { seq : int; target : node_id; sender : node_info } 51 | Ack of { seq : int; responder : node_info; payload : string option } 52 | Alive of { node : node_info; incarnation : incarnation } 53 | Suspect of { 54 node : node_id; 55 incarnation : incarnation; 56 suspector : node_id; 57 } 58 | Dead of { node : node_id; incarnation : incarnation; declarator : node_id } 59 | User_msg of { topic : string; payload : string; origin : node_id } 60 61type packet = { 62 cluster : string; 63 primary : protocol_msg; 64 piggyback : protocol_msg list; 65} 66 67type decode_error = 68 | Invalid_magic 69 | Unsupported_version of int 70 | Truncated_message 71 | Invalid_tag of int 72 | Decryption_failed 73 | Msgpack_error of string 74 | Invalid_crc 75 76let decode_error_to_string = function 77 | Invalid_magic -> "invalid magic bytes" 78 | Unsupported_version v -> Printf.sprintf "unsupported version: %d" v 79 | Truncated_message -> "truncated message" 80 | Invalid_tag t -> Printf.sprintf "invalid tag: %d" t 81 | Decryption_failed -> "decryption failed" 82 | Msgpack_error s -> Printf.sprintf "msgpack error: %s" s 83 | Invalid_crc -> "invalid CRC checksum" 84 85type send_error = Node_unreachable | Timeout | Connection_reset 86 87let send_error_to_string = function 88 | Node_unreachable -> "node unreachable" 89 | Timeout -> "timeout" 90 | Connection_reset -> "connection reset" 91 92type node_event = 93 | Join of node_info 94 | Leave of node_info 95 | Update of node_info 96 | Suspect_event of node_info 97 | Alive_event of node_info 98 99type config = { 100 bind_addr : string; 101 bind_port : int; 102 node_name : string option; 103 protocol_interval : float; 104 probe_timeout : float; 105 indirect_checks : int; 106 suspicion_mult : int; 107 suspicion_max_timeout : float; 108 retransmit_mult : int; 109 udp_buffer_size : int; 110 tcp_timeout : float; 111 send_buffer_count : int; 112 recv_buffer_count : int; 113 secret_key : string; 114 cluster_name : string; 115 label : string; 116 encryption_enabled : bool; 117 gossip_verify_incoming : bool; 118 gossip_verify_outgoing : bool; 119} 120 121let default_config = 122 { 123 bind_addr = "0.0.0.0"; 124 bind_port = 7946; 125 node_name = None; 126 protocol_interval = 1.0; 127 probe_timeout = 0.5; 128 indirect_checks = 3; 129 suspicion_mult = 4; 130 suspicion_max_timeout = 60.0; 131 retransmit_mult = 4; 132 udp_buffer_size = 1400; 133 tcp_timeout = 10.0; 134 send_buffer_count = 16; 135 recv_buffer_count = 16; 136 secret_key = String.make 16 '\x00'; 137 cluster_name = "default"; 138 label = ""; 139 encryption_enabled = false; 140 gossip_verify_incoming = true; 141 gossip_verify_outgoing = true; 142 } 143 144type 'a env = { 145 stdenv : 'a; 146 sw : Eio.Switch.t; 147} 148 constraint 149 'a = 150 < clock : _ Eio.Time.clock 151 ; mono_clock : _ Eio.Time.Mono.t 152 ; net : _ Eio.Net.t 153 ; secure_random : _ Eio.Flow.source 154 ; .. > 155 156type stats = { 157 nodes_alive : int; 158 nodes_suspect : int; 159 nodes_dead : int; 160 msgs_sent : int; 161 msgs_received : int; 162 msgs_dropped : int; 163 queue_depth : int; 164 buffers_available : int; 165 buffers_total : int; 166} 167 168let empty_stats = 169 { 170 nodes_alive = 0; 171 nodes_suspect = 0; 172 nodes_dead = 0; 173 msgs_sent = 0; 174 msgs_received = 0; 175 msgs_dropped = 0; 176 queue_depth = 0; 177 buffers_available = 0; 178 buffers_total = 0; 179 } 180 181module Wire = struct 182 type message_type = 183 | Ping_msg 184 | Indirect_ping_msg 185 | Ack_resp_msg 186 | Suspect_msg 187 | Alive_msg 188 | Dead_msg 189 | Push_pull_msg 190 | Compound_msg 191 | User_msg 192 | Compress_msg 193 | Encrypt_msg 194 | Nack_resp_msg 195 | Has_crc_msg 196 | Err_msg 197 | Has_label_msg 198 199 let message_type_to_int = function 200 | Ping_msg -> 0 201 | Indirect_ping_msg -> 1 202 | Ack_resp_msg -> 2 203 | Suspect_msg -> 3 204 | Alive_msg -> 4 205 | Dead_msg -> 5 206 | Push_pull_msg -> 6 207 | Compound_msg -> 7 208 | User_msg -> 8 209 | Compress_msg -> 9 210 | Encrypt_msg -> 10 211 | Nack_resp_msg -> 11 212 | Has_crc_msg -> 12 213 | Err_msg -> 13 214 | Has_label_msg -> 244 215 216 let message_type_of_int = function 217 | 0 -> Ok Ping_msg 218 | 1 -> Ok Indirect_ping_msg 219 | 2 -> Ok Ack_resp_msg 220 | 3 -> Ok Suspect_msg 221 | 4 -> Ok Alive_msg 222 | 5 -> Ok Dead_msg 223 | 6 -> Ok Push_pull_msg 224 | 7 -> Ok Compound_msg 225 | 8 -> Ok User_msg 226 | 9 -> Ok Compress_msg 227 | 10 -> Ok Encrypt_msg 228 | 11 -> Ok Nack_resp_msg 229 | 12 -> Ok Has_crc_msg 230 | 13 -> Ok Err_msg 231 | 244 -> Ok Has_label_msg 232 | n -> Error n 233 234 type ping = { 235 seq_no : int; 236 node : string; 237 source_addr : string; 238 source_port : int; 239 source_node : string; 240 } 241 242 type indirect_ping_req = { 243 seq_no : int; 244 target : string; 245 port : int; 246 node : string; 247 nack : bool; 248 source_addr : string; 249 source_port : int; 250 source_node : string; 251 } 252 253 type ack_resp = { seq_no : int; payload : string } 254 type nack_resp = { seq_no : int } 255 type suspect = { incarnation : int; node : string; from : string } 256 257 type alive = { 258 incarnation : int; 259 node : string; 260 addr : string; 261 port : int; 262 meta : string; 263 vsn : int list; 264 } 265 266 type dead = { incarnation : int; node : string; from : string } 267 type compress = { algo : int; buf : string } 268 269 type protocol_msg = 270 | Ping of ping 271 | Indirect_ping of indirect_ping_req 272 | Ack of ack_resp 273 | Nack of nack_resp 274 | Suspect of suspect 275 | Alive of alive 276 | Dead of dead 277 | User_data of string 278 | Compound of string list 279 | Compressed of compress 280 | Err of string 281end 282 283let ip_to_bytes ip = 284 let s = Fmt.to_to_string Eio.Net.Ipaddr.pp ip in 285 if String.contains s ':' then ( 286 let parts = String.split_on_char ':' s in 287 let buf = Bytes.create 16 in 288 let rec fill idx = function 289 | [] -> () 290 | "" :: rest when List.exists (( = ) "") rest -> 291 let tail_len = List.length (List.filter (( <> ) "") rest) in 292 let zeros = 8 - idx - tail_len in 293 for i = 0 to (zeros * 2) - 1 do 294 Bytes.set_uint8 buf ((idx * 2) + i) 0 295 done; 296 fill (idx + zeros) rest 297 | "" :: rest -> fill idx rest 298 | h :: rest -> 299 let v = int_of_string ("0x" ^ h) in 300 Bytes.set_uint8 buf (idx * 2) (v lsr 8); 301 Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff); 302 fill (idx + 1) rest 303 in 304 fill 0 parts; 305 Bytes.to_string buf) 306 else 307 Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d -> 308 let buf = Bytes.create 4 in 309 Bytes.set_uint8 buf 0 a; 310 Bytes.set_uint8 buf 1 b; 311 Bytes.set_uint8 buf 2 c; 312 Bytes.set_uint8 buf 3 d; 313 Bytes.to_string buf) 314 315let ip_of_bytes s = 316 let len = String.length s in 317 if len = 4 then Eio.Net.Ipaddr.of_raw s 318 else if len = 16 then Eio.Net.Ipaddr.of_raw s 319 else failwith "invalid IP address length" 320 321let default_vsn = [ 1; 5; 5; 0; 0; 0 ] 322 323let node_info_to_wire (info : node_info) ~source_node : 324 string * int * string * string = 325 match info.addr with 326 | `Udp (ip, port) -> (ip_to_bytes ip, port, info.meta, source_node) 327 | `Unix _ -> failwith "Unix sockets not supported" 328 329let node_info_of_wire ~name ~addr ~port ~meta : node_info = 330 let ip = ip_of_bytes addr in 331 { id = node_id_of_string name; addr = `Udp (ip, port); meta } 332 333let msg_to_wire ~self_name ~self_port (msg : protocol_msg) : Wire.protocol_msg = 334 match msg with 335 | Ping { seq; target; sender } -> 336 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in 337 Wire.Ping 338 { 339 seq_no = seq; 340 node = node_id_to_string target; 341 source_addr = addr; 342 source_port = port; 343 source_node = self_name; 344 } 345 | Ping_req { seq; target; sender } -> 346 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in 347 let target_addr = 348 match sender.addr with `Udp (ip, _) -> ip_to_bytes ip | `Unix _ -> "" 349 in 350 Wire.Indirect_ping 351 { 352 seq_no = seq; 353 target = target_addr; 354 port = self_port; 355 node = node_id_to_string target; 356 nack = true; 357 source_addr = addr; 358 source_port = port; 359 source_node = self_name; 360 } 361 | Ack { seq; responder = _; payload } -> 362 Wire.Ack { seq_no = seq; payload = Option.value payload ~default:"" } 363 | Alive { node; incarnation } -> 364 let addr, port, meta, _ = node_info_to_wire node ~source_node:"" in 365 Wire.Alive 366 { 367 incarnation = incarnation_to_int incarnation; 368 node = node_id_to_string node.id; 369 addr; 370 port; 371 meta; 372 vsn = default_vsn; 373 } 374 | Suspect { node; incarnation; suspector } -> 375 Wire.Suspect 376 { 377 incarnation = incarnation_to_int incarnation; 378 node = node_id_to_string node; 379 from = node_id_to_string suspector; 380 } 381 | Dead { node; incarnation; declarator } -> 382 Wire.Dead 383 { 384 incarnation = incarnation_to_int incarnation; 385 node = node_id_to_string node; 386 from = node_id_to_string declarator; 387 } 388 | User_msg { topic = _; payload; origin = _ } -> Wire.User_data payload 389 390let msg_of_wire ~default_port (wmsg : Wire.protocol_msg) : protocol_msg option = 391 match wmsg with 392 | Wire.Ping { seq_no; node; source_addr; source_port; source_node } -> 393 let port = if source_port > 0 then source_port else default_port in 394 let ip = 395 if String.length source_addr > 0 then ip_of_bytes source_addr 396 else Eio.Net.Ipaddr.of_raw "\000\000\000\000" 397 in 398 let sender = 399 { 400 id = 401 node_id_of_string (if source_node <> "" then source_node else node); 402 addr = `Udp (ip, port); 403 meta = ""; 404 } 405 in 406 Some (Ping { seq = seq_no; target = node_id_of_string node; sender }) 407 | Wire.Indirect_ping 408 { seq_no; target; port; node; source_addr; source_port; source_node; _ } 409 -> 410 let src_port = if source_port > 0 then source_port else default_port in 411 let ip = 412 if String.length source_addr > 0 then ip_of_bytes source_addr 413 else Eio.Net.Ipaddr.of_raw "\000\000\000\000" 414 in 415 let sender = 416 { 417 id = node_id_of_string (if source_node <> "" then source_node else ""); 418 addr = `Udp (ip, src_port); 419 meta = ""; 420 } 421 in 422 let _ = target in 423 let _ = port in 424 Some (Ping_req { seq = seq_no; target = node_id_of_string node; sender }) 425 | Wire.Ack { seq_no; payload } -> 426 let responder = 427 { 428 id = node_id_of_string ""; 429 addr = `Udp (Eio.Net.Ipaddr.of_raw "\000\000\000\000", 0); 430 meta = ""; 431 } 432 in 433 let payload = if payload = "" then None else Some payload in 434 Some (Ack { seq = seq_no; responder; payload }) 435 | Wire.Alive { incarnation; node; addr; port; meta; _ } -> 436 let ip = 437 if String.length addr > 0 then ip_of_bytes addr 438 else Eio.Net.Ipaddr.of_raw "\000\000\000\000" 439 in 440 let node_info = 441 { id = node_id_of_string node; addr = `Udp (ip, port); meta } 442 in 443 Some 444 (Alive 445 { node = node_info; incarnation = incarnation_of_int incarnation }) 446 | Wire.Suspect { incarnation; node; from } -> 447 Some 448 (Suspect 449 { 450 node = node_id_of_string node; 451 incarnation = incarnation_of_int incarnation; 452 suspector = node_id_of_string from; 453 }) 454 | Wire.Dead { incarnation; node; from } -> 455 Some 456 (Dead 457 { 458 node = node_id_of_string node; 459 incarnation = incarnation_of_int incarnation; 460 declarator = node_id_of_string from; 461 }) 462 | Wire.User_data payload -> 463 Some (User_msg { topic = ""; payload; origin = node_id_of_string "" }) 464 | Wire.Nack _ -> None 465 | Wire.Compound _ -> None 466 | Wire.Compressed _ -> None 467 | Wire.Err _ -> None