open Types type 'a transition = { new_state : 'a; broadcasts : protocol_msg list; events : node_event list; } let no_change state = { new_state = state; broadcasts = []; events = [] } let node_id_of_msg = function | Ping { sender; _ } -> sender.id | Ping_req { sender; _ } -> sender.id | Ack { responder; _ } -> responder.id | Alive { node; _ } -> node.id | Suspect { node; _ } -> node | Dead { node; _ } -> node | User_msg { origin; _ } -> origin let incarnation_of_msg = function | Alive { incarnation; _ } -> Some incarnation | Suspect { incarnation; _ } -> Some incarnation | Dead { incarnation; _ } -> Some incarnation | Ping _ | Ping_req _ | Ack _ | User_msg _ -> None let handle_alive ~(self : node_id) (member : member_snapshot) (msg : protocol_msg) ~(now : Mtime.span) : member_snapshot transition = let _ = self in match msg with | Alive { node; incarnation = msg_inc } -> if not (equal_node_id node.id member.node.id) then no_change member else let cmp = compare_incarnation msg_inc member.incarnation in if cmp > 0 then let new_member = { node; state = Alive; incarnation = msg_inc; state_change = now } in let events = match member.state with | Dead | Left -> [ Join node ] | Suspect -> [ Alive_event node ] | Alive -> [ Update node ] in { new_state = new_member; broadcasts = [ msg ]; events } else if cmp = 0 && member.state = Suspect then let new_member = { member with state = Alive; state_change = now } in { new_state = new_member; broadcasts = [ msg ]; events = [ Alive_event node ]; } else no_change member | _ -> no_change member let handle_suspect ~(self : node_id) (member : member_snapshot) (msg : protocol_msg) ~(now : Mtime.span) : member_snapshot transition = match msg with | Suspect { node = msg_node; incarnation = msg_inc; suspector = _ } -> if not (equal_node_id msg_node member.node.id) then no_change member else if member.state = Dead then no_change member else if equal_node_id msg_node self then let new_inc = incr_incarnation member.incarnation in let refute = Alive { node = member.node; incarnation = new_inc } in { new_state = { member with incarnation = new_inc }; broadcasts = [ refute ]; events = []; } else let dominated = compare_incarnation msg_inc member.incarnation > 0 || compare_incarnation msg_inc member.incarnation = 0 && member.state = Alive in if dominated then let new_member = { member with state = Suspect; incarnation = msg_inc; state_change = now; } in { new_state = new_member; broadcasts = [ msg ]; events = [ Suspect_event member.node ]; } else no_change member | _ -> no_change member let handle_dead (member : member_snapshot) (msg : protocol_msg) ~(now : Mtime.span) : member_snapshot transition = match msg with | Dead { node = msg_node; incarnation = msg_inc; declarator = _ } -> if not (equal_node_id msg_node member.node.id) then no_change member else if member.state = Dead then no_change member else if compare_incarnation msg_inc member.incarnation >= 0 then let new_member = { member with state = Dead; incarnation = msg_inc; state_change = now; } in { new_state = new_member; broadcasts = [ msg ]; events = [ Leave member.node ]; } else no_change member | _ -> no_change member let suspicion_timeout (config : config) ~node_count = let base = float_of_int config.suspicion_mult *. log (float_of_int (max 1 node_count) +. 1.) *. config.protocol_interval in Float.min base config.suspicion_max_timeout let retransmit_limit (config : config) ~node_count = let log_n = log (float_of_int (max 1 node_count) +. 1.) in int_of_float (ceil (float_of_int config.retransmit_mult *. log_n)) let next_probe_target ~(self : node_id) ~probe_index ~(members : node_info list) = match members with | [] -> None | _ -> let len = List.length members in let rec find idx attempts = if attempts >= len then None else let candidate = List.nth members (idx mod len) in if equal_node_id candidate.id self then find (idx + 1) (attempts + 1) else Some (candidate, (idx + 1) mod len) in find probe_index 0 let invalidates ~(newer : protocol_msg) ~(older : protocol_msg) : bool = let newer_id = node_id_of_msg newer in let older_id = node_id_of_msg older in if not (equal_node_id newer_id older_id) then false else match (newer, older) with | Dead _, (Alive _ | Suspect _ | Dead _) -> true | Alive { incarnation = new_inc; _ }, Suspect { incarnation = old_inc; _ } -> compare_incarnation new_inc old_inc >= 0 | Alive { incarnation = new_inc; _ }, Alive { incarnation = old_inc; _ } -> compare_incarnation new_inc old_inc > 0 | Suspect { incarnation = new_inc; _ }, Suspect { incarnation = old_inc; _ } -> compare_incarnation new_inc old_inc > 0 | _ -> false let merge_member_state ~(local : member_snapshot) ~(remote : member_snapshot) : member_snapshot = if not (equal_node_id local.node.id remote.node.id) then local else match (local.state, remote.state) with | Dead, _ | Left, _ -> local | _, Dead | _, Left -> if compare_incarnation remote.incarnation local.incarnation >= 0 then remote else local | Alive, Alive | Suspect, Suspect -> if compare_incarnation remote.incarnation local.incarnation > 0 then remote else local | Alive, Suspect -> if compare_incarnation remote.incarnation local.incarnation > 0 then remote else local | Suspect, Alive -> if compare_incarnation remote.incarnation local.incarnation >= 0 then remote else local let select_indirect_targets ~(self : node_id) ~exclude ~count ~(members : node_info list) : node_info list = members |> List.filter (fun m -> (not (equal_node_id m.id self)) && not (equal_node_id m.id exclude)) |> fun candidates -> let len = List.length candidates in if len <= count then candidates else List.filteri (fun i _ -> i < count) candidates