type node_id = Node_id of string [@@unboxed] let node_id_to_string (Node_id s) = s let node_id_of_string s = Node_id s let equal_node_id (Node_id a) (Node_id b) = String.equal a b let compare_node_id (Node_id a) (Node_id b) = String.compare a b type incarnation = Incarnation of int [@@unboxed] let incarnation_to_int (Incarnation i) = i let incarnation_of_int i = Incarnation i let zero_incarnation = Incarnation 0 let compare_incarnation (Incarnation a) (Incarnation b) = Int.compare a b let incr_incarnation (Incarnation i) = Incarnation (i + 1) type addr = Eio.Net.Sockaddr.datagram type node_info = { id : node_id; addr : addr; meta : string } let make_node_info ~id ~addr ~meta = { id; addr; meta } type member_state = Alive | Suspect | Dead let member_state_to_string = function | Alive -> "alive" | Suspect -> "suspect" | Dead -> "dead" type member_snapshot = { node : node_info; state : member_state; incarnation : incarnation; state_change : Mtime.span; } type protocol_msg = | Ping of { seq : int; sender : node_info } | Ping_req of { seq : int; target : node_id; sender : node_info } | Ack of { seq : int; responder : node_info; payload : string option } | Alive of { node : node_info; incarnation : incarnation } | Suspect of { node : node_id; incarnation : incarnation; suspector : node_id; } | Dead of { node : node_id; incarnation : incarnation; declarator : node_id } | User_msg of { topic : string; payload : string; origin : node_id } type packet = { cluster : string; primary : protocol_msg; piggyback : protocol_msg list; } type decode_error = | Invalid_magic | Unsupported_version of int | Truncated_message | Invalid_tag of int | Decryption_failed let decode_error_to_string = function | Invalid_magic -> "invalid magic bytes" | Unsupported_version v -> Printf.sprintf "unsupported version: %d" v | Truncated_message -> "truncated message" | Invalid_tag t -> Printf.sprintf "invalid tag: %d" t | Decryption_failed -> "decryption failed" type send_error = Node_unreachable | Timeout | Connection_reset let send_error_to_string = function | Node_unreachable -> "node unreachable" | Timeout -> "timeout" | Connection_reset -> "connection reset" type node_event = | Join of node_info | Leave of node_info | Update of node_info | Suspect_event of node_info | Alive_event of node_info type config = { bind_addr : string; bind_port : int; node_name : string option; protocol_interval : float; probe_timeout : float; indirect_checks : int; suspicion_mult : int; suspicion_max_timeout : float; retransmit_mult : int; udp_buffer_size : int; tcp_timeout : float; send_buffer_count : int; recv_buffer_count : int; secret_key : string; cluster_name : string; } let default_config = { bind_addr = "0.0.0.0"; bind_port = 7946; node_name = None; protocol_interval = 1.0; probe_timeout = 0.5; indirect_checks = 3; suspicion_mult = 4; suspicion_max_timeout = 60.0; retransmit_mult = 4; udp_buffer_size = 1400; tcp_timeout = 10.0; send_buffer_count = 16; recv_buffer_count = 16; secret_key = String.make 32 '\x00'; cluster_name = "default"; } type 'a env = { stdenv : 'a; sw : Eio.Switch.t; } constraint 'a = < net : _ Eio.Net.t ; clock : _ Eio.Time.clock ; mono_clock : _ Eio.Time.Mono.t ; secure_random : _ Eio.Flow.source ; .. > type stats = { nodes_alive : int; nodes_suspect : int; nodes_dead : int; msgs_sent : int; msgs_received : int; msgs_dropped : int; queue_depth : int; buffers_available : int; buffers_total : int; } let empty_stats = { nodes_alive = 0; nodes_suspect = 0; nodes_dead = 0; msgs_sent = 0; msgs_received = 0; msgs_dropped = 0; queue_depth = 0; buffers_available = 0; buffers_total = 0; }