type node_id = Node_id of string [@@unboxed] let node_id_to_string (Node_id s) = s let node_id_of_string s = Node_id s let equal_node_id (Node_id a) (Node_id b) = String.equal a b let compare_node_id (Node_id a) (Node_id b) = String.compare a b type incarnation = Incarnation of int [@@unboxed] let incarnation_to_int (Incarnation i) = i let incarnation_of_int i = Incarnation i let zero_incarnation = Incarnation 0 let compare_incarnation (Incarnation a) (Incarnation b) = Int.compare a b let incr_incarnation (Incarnation i) = Incarnation (i + 1) type addr = Eio.Net.Sockaddr.datagram type node_info = { id : node_id; addr : addr; meta : string } let make_node_info ~id ~addr ~meta = { id; addr; meta } type member_state = Alive | Suspect | Dead | Left let member_state_to_string = function | Alive -> "alive" | Suspect -> "suspect" | Dead -> "dead" | Left -> "left" let member_state_to_int = function | Alive -> 0 | Suspect -> 1 | Dead -> 2 | Left -> 3 let member_state_of_int = function | 0 -> Alive | 1 -> Suspect | 2 -> Dead | _ -> Left type member_snapshot = { node : node_info; state : member_state; incarnation : incarnation; state_change : Mtime.span; } type protocol_msg = | Ping of { seq : int; target : node_id; sender : node_info } | Ping_req of { seq : int; target : node_id; sender : node_info } | Ack of { seq : int; responder : node_info; payload : string option } | Alive of { node : node_info; incarnation : incarnation } | Suspect of { node : node_id; incarnation : incarnation; suspector : node_id; } | Dead of { node : node_id; incarnation : incarnation; declarator : node_id } | User_msg of { topic : string; payload : string; origin : node_id } type packet = { cluster : string; primary : protocol_msg; piggyback : protocol_msg list; } type decode_error = | Invalid_magic | Unsupported_version of int | Truncated_message | Invalid_tag of int | Decryption_failed | Msgpack_error of string | Invalid_crc let decode_error_to_string = function | Invalid_magic -> "invalid magic bytes" | Unsupported_version v -> Printf.sprintf "unsupported version: %d" v | Truncated_message -> "truncated message" | Invalid_tag t -> Printf.sprintf "invalid tag: %d" t | Decryption_failed -> "decryption failed" | Msgpack_error s -> Printf.sprintf "msgpack error: %s" s | Invalid_crc -> "invalid CRC checksum" type send_error = Node_unreachable | Timeout | Connection_reset let send_error_to_string = function | Node_unreachable -> "node unreachable" | Timeout -> "timeout" | Connection_reset -> "connection reset" type node_event = | Join of node_info | Leave of node_info | Update of node_info | Suspect_event of node_info | Alive_event of node_info type config = { bind_addr : string; bind_port : int; node_name : string option; protocol_interval : float; probe_timeout : float; indirect_checks : int; suspicion_mult : int; suspicion_max_timeout : float; retransmit_mult : int; udp_buffer_size : int; tcp_timeout : float; send_buffer_count : int; recv_buffer_count : int; secret_key : string; cluster_name : string; label : string; encryption_enabled : bool; gossip_verify_incoming : bool; gossip_verify_outgoing : bool; max_gossip_queue_depth : int; } let default_config = { bind_addr = "0.0.0.0"; bind_port = 7946; node_name = None; protocol_interval = 1.0; probe_timeout = 0.5; indirect_checks = 3; suspicion_mult = 4; suspicion_max_timeout = 60.0; retransmit_mult = 4; udp_buffer_size = 1400; tcp_timeout = 10.0; send_buffer_count = 16; recv_buffer_count = 16; secret_key = String.make 16 '\x00'; cluster_name = "default"; label = ""; encryption_enabled = false; gossip_verify_incoming = true; gossip_verify_outgoing = true; max_gossip_queue_depth = 5000; } type 'a env = { stdenv : 'a; sw : Eio.Switch.t; } constraint 'a = < clock : _ Eio.Time.clock ; mono_clock : _ Eio.Time.Mono.t ; net : _ Eio.Net.t ; secure_random : _ Eio.Flow.source ; .. > type stats = { nodes_alive : int; nodes_suspect : int; nodes_dead : int; msgs_sent : int; msgs_received : int; msgs_dropped : int; queue_depth : int; buffers_available : int; buffers_total : int; } let empty_stats = { nodes_alive = 0; nodes_suspect = 0; nodes_dead = 0; msgs_sent = 0; msgs_received = 0; msgs_dropped = 0; queue_depth = 0; buffers_available = 0; buffers_total = 0; } module Wire = struct type message_type = | Ping_msg | Indirect_ping_msg | Ack_resp_msg | Suspect_msg | Alive_msg | Dead_msg | Push_pull_msg | Compound_msg | User_msg | Compress_msg | Encrypt_msg | Nack_resp_msg | Has_crc_msg | Err_msg | Has_label_msg let message_type_to_int = function | Ping_msg -> 0 | Indirect_ping_msg -> 1 | Ack_resp_msg -> 2 | Suspect_msg -> 3 | Alive_msg -> 4 | Dead_msg -> 5 | Push_pull_msg -> 6 | Compound_msg -> 7 | User_msg -> 8 | Compress_msg -> 9 | Encrypt_msg -> 10 | Nack_resp_msg -> 11 | Has_crc_msg -> 12 | Err_msg -> 13 | Has_label_msg -> 244 let message_type_of_int = function | 0 -> Ok Ping_msg | 1 -> Ok Indirect_ping_msg | 2 -> Ok Ack_resp_msg | 3 -> Ok Suspect_msg | 4 -> Ok Alive_msg | 5 -> Ok Dead_msg | 6 -> Ok Push_pull_msg | 7 -> Ok Compound_msg | 8 -> Ok User_msg | 9 -> Ok Compress_msg | 10 -> Ok Encrypt_msg | 11 -> Ok Nack_resp_msg | 12 -> Ok Has_crc_msg | 13 -> Ok Err_msg | 244 -> Ok Has_label_msg | n -> Error n type ping = { seq_no : int; node : string; source_addr : string; source_port : int; source_node : string; } type indirect_ping_req = { seq_no : int; target : string; port : int; node : string; nack : bool; source_addr : string; source_port : int; source_node : string; } type ack_resp = { seq_no : int; payload : string } type nack_resp = { seq_no : int } type suspect = { incarnation : int; node : string; from : string } type alive = { incarnation : int; node : string; addr : string; port : int; meta : string; vsn : int list; } type dead = { incarnation : int; node : string; from : string } type compress = { algo : int; buf : string } type push_pull_header = { pp_nodes : int; pp_user_state_len : int; pp_join : bool; } type push_node_state = { pns_name : string; pns_addr : string; pns_port : int; pns_meta : string; pns_incarnation : int; pns_state : int; pns_vsn : int list; } type protocol_msg = | Ping of ping | Indirect_ping of indirect_ping_req | Ack of ack_resp | Nack of nack_resp | Suspect of suspect | Alive of alive | Dead of dead | User_data of string | Compound of string list | Compressed of compress | Err of string end let ip_to_bytes ip = let s = Fmt.to_to_string Eio.Net.Ipaddr.pp ip in if String.contains s ':' then ( let parts = String.split_on_char ':' s in let buf = Bytes.create 16 in let rec fill idx = function | [] -> () | "" :: rest when List.exists (( = ) "") rest -> let tail_len = List.length (List.filter (( <> ) "") rest) in let zeros = 8 - idx - tail_len in for i = 0 to (zeros * 2) - 1 do Bytes.set_uint8 buf ((idx * 2) + i) 0 done; fill (idx + zeros) rest | "" :: rest -> fill idx rest | h :: rest -> let v = int_of_string ("0x" ^ h) in Bytes.set_uint8 buf (idx * 2) (v lsr 8); Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff); fill (idx + 1) rest in fill 0 parts; Bytes.to_string buf) else Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d -> let buf = Bytes.create 4 in Bytes.set_uint8 buf 0 a; Bytes.set_uint8 buf 1 b; Bytes.set_uint8 buf 2 c; Bytes.set_uint8 buf 3 d; Bytes.to_string buf) let ip_of_bytes s = let len = String.length s in if len = 4 then Eio.Net.Ipaddr.of_raw s else if len = 16 then Eio.Net.Ipaddr.of_raw s else failwith "invalid IP address length" let default_vsn = [ 1; 5; 5; 0; 0; 0 ] let node_info_to_wire (info : node_info) ~source_node : string * int * string * string = match info.addr with | `Udp (ip, port) -> (ip_to_bytes ip, port, info.meta, source_node) | `Unix _ -> failwith "Unix sockets not supported" let node_info_of_wire ~name ~addr ~port ~meta : node_info = let ip = ip_of_bytes addr in { id = node_id_of_string name; addr = `Udp (ip, port); meta } let msg_to_wire ~self_name ~self_port (msg : protocol_msg) : Wire.protocol_msg = match msg with | Ping { seq; target; sender } -> let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in Wire.Ping { seq_no = seq; node = node_id_to_string target; source_addr = addr; source_port = port; source_node = self_name; } | Ping_req { seq; target; sender } -> let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in let target_addr = match sender.addr with `Udp (ip, _) -> ip_to_bytes ip | `Unix _ -> "" in Wire.Indirect_ping { seq_no = seq; target = target_addr; port = self_port; node = node_id_to_string target; nack = true; source_addr = addr; source_port = port; source_node = self_name; } | Ack { seq; responder = _; payload } -> Wire.Ack { seq_no = seq; payload = Option.value payload ~default:"" } | Alive { node; incarnation } -> let addr, port, meta, _ = node_info_to_wire node ~source_node:"" in Wire.Alive { incarnation = incarnation_to_int incarnation; node = node_id_to_string node.id; addr; port; meta; vsn = default_vsn; } | Suspect { node; incarnation; suspector } -> Wire.Suspect { incarnation = incarnation_to_int incarnation; node = node_id_to_string node; from = node_id_to_string suspector; } | Dead { node; incarnation; declarator } -> Wire.Dead { incarnation = incarnation_to_int incarnation; node = node_id_to_string node; from = node_id_to_string declarator; } | User_msg { topic; payload; origin } -> let origin_str = node_id_to_string origin in let topic_len = String.length topic in let origin_len = String.length origin_str in let encoded = String.concat "" [ string_of_int topic_len; ":"; topic; string_of_int origin_len; ":"; origin_str; payload; ] in Wire.User_data encoded let msg_of_wire ~default_port (wmsg : Wire.protocol_msg) : protocol_msg option = match wmsg with | Wire.Ping { seq_no; node; source_addr; source_port; source_node } -> let port = if source_port > 0 then source_port else default_port in let ip = if String.length source_addr > 0 then ip_of_bytes source_addr else Eio.Net.Ipaddr.of_raw "\000\000\000\000" in let sender = { id = node_id_of_string (if source_node <> "" then source_node else node); addr = `Udp (ip, port); meta = ""; } in Some (Ping { seq = seq_no; target = node_id_of_string node; sender }) | Wire.Indirect_ping { seq_no; target; port; node; source_addr; source_port; source_node; _ } -> let src_port = if source_port > 0 then source_port else default_port in let ip = if String.length source_addr > 0 then ip_of_bytes source_addr else Eio.Net.Ipaddr.of_raw "\000\000\000\000" in let sender = { id = node_id_of_string (if source_node <> "" then source_node else ""); addr = `Udp (ip, src_port); meta = ""; } in let _ = target in let _ = port in Some (Ping_req { seq = seq_no; target = node_id_of_string node; sender }) | Wire.Ack { seq_no; payload } -> let responder = { id = node_id_of_string ""; addr = `Udp (Eio.Net.Ipaddr.of_raw "\000\000\000\000", 0); meta = ""; } in let payload = if payload = "" then None else Some payload in Some (Ack { seq = seq_no; responder; payload }) | Wire.Alive { incarnation; node; addr; port; meta; _ } -> let ip = if String.length addr > 0 then ip_of_bytes addr else Eio.Net.Ipaddr.of_raw "\000\000\000\000" in let node_info = { id = node_id_of_string node; addr = `Udp (ip, port); meta } in Some (Alive { node = node_info; incarnation = incarnation_of_int incarnation }) | Wire.Suspect { incarnation; node; from } -> Some (Suspect { node = node_id_of_string node; incarnation = incarnation_of_int incarnation; suspector = node_id_of_string from; }) | Wire.Dead { incarnation; node; from } -> Some (Dead { node = node_id_of_string node; incarnation = incarnation_of_int incarnation; declarator = node_id_of_string from; }) | Wire.User_data encoded -> ( let parse_length s start = let rec find_colon i = if i >= String.length s then None else if s.[i] = ':' then Some i else find_colon (i + 1) in match find_colon start with | None -> None | Some colon_pos -> ( let len_str = String.sub s start (colon_pos - start) in match int_of_string_opt len_str with | None -> None | Some len -> Some (len, colon_pos + 1)) in match parse_length encoded 0 with | None -> Some (User_msg { topic = ""; payload = encoded; origin = node_id_of_string "" }) | Some (topic_len, topic_start) -> ( if topic_start + topic_len > String.length encoded then Some (User_msg { topic = ""; payload = encoded; origin = node_id_of_string ""; }) else let topic = String.sub encoded topic_start topic_len in let origin_start = topic_start + topic_len in match parse_length encoded origin_start with | None -> Some (User_msg { topic; payload = ""; origin = node_id_of_string "" }) | Some (origin_len, payload_start) -> if payload_start + origin_len > String.length encoded then Some (User_msg { topic; payload = ""; origin = node_id_of_string "" }) else let origin = String.sub encoded payload_start origin_len in let data_start = payload_start + origin_len in let payload = String.sub encoded data_start (String.length encoded - data_start) in Some (User_msg { topic; payload; origin = node_id_of_string origin }))) | Wire.Nack _ -> None | Wire.Compound _ -> None | Wire.Compressed _ -> None | Wire.Err _ -> None