this repo has no description
at main 6.7 kB view raw
1open Types 2 3type 'a transition = { 4 new_state : 'a; 5 broadcasts : protocol_msg list; 6 events : node_event list; 7} 8 9let no_change state = { new_state = state; broadcasts = []; events = [] } 10 11let node_id_of_msg = function 12 | Ping { sender; _ } -> sender.id 13 | Ping_req { sender; _ } -> sender.id 14 | Ack { responder; _ } -> responder.id 15 | Alive { node; _ } -> node.id 16 | Suspect { node; _ } -> node 17 | Dead { node; _ } -> node 18 | User_msg { origin; _ } -> origin 19 20let incarnation_of_msg = function 21 | Alive { incarnation; _ } -> Some incarnation 22 | Suspect { incarnation; _ } -> Some incarnation 23 | Dead { incarnation; _ } -> Some incarnation 24 | Ping _ | Ping_req _ | Ack _ | User_msg _ -> None 25 26let handle_alive ~(self : node_id) (member : member_snapshot) 27 (msg : protocol_msg) ~(now : Mtime.span) : member_snapshot transition = 28 let _ = self in 29 match msg with 30 | Alive { node; incarnation = msg_inc } -> 31 if not (equal_node_id node.id member.node.id) then no_change member 32 else 33 let cmp = compare_incarnation msg_inc member.incarnation in 34 if cmp > 0 then 35 let new_member = 36 { node; state = Alive; incarnation = msg_inc; state_change = now } 37 in 38 let events = 39 match member.state with 40 | Dead | Left -> [ Join node ] 41 | Suspect -> [ Alive_event node ] 42 | Alive -> [ Update node ] 43 in 44 { new_state = new_member; broadcasts = [ msg ]; events } 45 else if cmp = 0 && member.state = Suspect then 46 let new_member = { member with state = Alive; state_change = now } in 47 { 48 new_state = new_member; 49 broadcasts = [ msg ]; 50 events = [ Alive_event node ]; 51 } 52 else no_change member 53 | _ -> no_change member 54 55let handle_suspect ~(self : node_id) (member : member_snapshot) 56 (msg : protocol_msg) ~(now : Mtime.span) : member_snapshot transition = 57 match msg with 58 | Suspect { node = msg_node; incarnation = msg_inc; suspector = _ } -> 59 if not (equal_node_id msg_node member.node.id) then no_change member 60 else if member.state = Dead then no_change member 61 else if equal_node_id msg_node self then 62 let new_inc = incr_incarnation member.incarnation in 63 let refute = Alive { node = member.node; incarnation = new_inc } in 64 { 65 new_state = { member with incarnation = new_inc }; 66 broadcasts = [ refute ]; 67 events = []; 68 } 69 else 70 let dominated = 71 compare_incarnation msg_inc member.incarnation > 0 72 || compare_incarnation msg_inc member.incarnation = 0 73 && member.state = Alive 74 in 75 if dominated then 76 let new_member = 77 { 78 member with 79 state = Suspect; 80 incarnation = msg_inc; 81 state_change = now; 82 } 83 in 84 { 85 new_state = new_member; 86 broadcasts = [ msg ]; 87 events = [ Suspect_event member.node ]; 88 } 89 else no_change member 90 | _ -> no_change member 91 92let handle_dead (member : member_snapshot) (msg : protocol_msg) 93 ~(now : Mtime.span) : member_snapshot transition = 94 match msg with 95 | Dead { node = msg_node; incarnation = msg_inc; declarator = _ } -> 96 if not (equal_node_id msg_node member.node.id) then no_change member 97 else if member.state = Dead then no_change member 98 else if compare_incarnation msg_inc member.incarnation >= 0 then 99 let new_member = 100 { 101 member with 102 state = Dead; 103 incarnation = msg_inc; 104 state_change = now; 105 } 106 in 107 { 108 new_state = new_member; 109 broadcasts = [ msg ]; 110 events = [ Leave member.node ]; 111 } 112 else no_change member 113 | _ -> no_change member 114 115let suspicion_timeout (config : config) ~node_count = 116 let base = 117 float_of_int config.suspicion_mult 118 *. log (float_of_int (max 1 node_count) +. 1.) 119 *. config.protocol_interval 120 in 121 Float.min base config.suspicion_max_timeout 122 123let retransmit_limit (config : config) ~node_count = 124 let log_n = log (float_of_int (max 1 node_count) +. 1.) in 125 int_of_float (ceil (float_of_int config.retransmit_mult *. log_n)) 126 127let next_probe_target ~(self : node_id) ~probe_index ~(members : node_info list) 128 = 129 match members with 130 | [] -> None 131 | _ -> 132 let len = List.length members in 133 let rec find idx attempts = 134 if attempts >= len then None 135 else 136 let candidate = List.nth members (idx mod len) in 137 if equal_node_id candidate.id self then find (idx + 1) (attempts + 1) 138 else Some (candidate, (idx + 1) mod len) 139 in 140 find probe_index 0 141 142let invalidates ~(newer : protocol_msg) ~(older : protocol_msg) : bool = 143 let newer_id = node_id_of_msg newer in 144 let older_id = node_id_of_msg older in 145 if not (equal_node_id newer_id older_id) then false 146 else 147 match (newer, older) with 148 | Dead _, (Alive _ | Suspect _ | Dead _) -> true 149 | Alive { incarnation = new_inc; _ }, Suspect { incarnation = old_inc; _ } 150 -> 151 compare_incarnation new_inc old_inc >= 0 152 | Alive { incarnation = new_inc; _ }, Alive { incarnation = old_inc; _ } -> 153 compare_incarnation new_inc old_inc > 0 154 | Suspect { incarnation = new_inc; _ }, Suspect { incarnation = old_inc; _ } 155 -> 156 compare_incarnation new_inc old_inc > 0 157 | _ -> false 158 159let merge_member_state ~(local : member_snapshot) ~(remote : member_snapshot) : 160 member_snapshot = 161 if not (equal_node_id local.node.id remote.node.id) then local 162 else 163 match (local.state, remote.state) with 164 | Dead, _ | Left, _ -> local 165 | _, Dead | _, Left -> 166 if compare_incarnation remote.incarnation local.incarnation >= 0 then 167 remote 168 else local 169 | Alive, Alive | Suspect, Suspect -> 170 if compare_incarnation remote.incarnation local.incarnation > 0 then 171 remote 172 else local 173 | Alive, Suspect -> 174 if compare_incarnation remote.incarnation local.incarnation > 0 then 175 remote 176 else local 177 | Suspect, Alive -> 178 if compare_incarnation remote.incarnation local.incarnation >= 0 then 179 remote 180 else local 181 182let select_indirect_targets ~(self : node_id) ~exclude ~count 183 ~(members : node_info list) : node_info list = 184 members 185 |> List.filter (fun m -> 186 (not (equal_node_id m.id self)) && not (equal_node_id m.id exclude)) 187 |> fun candidates -> 188 let len = List.length candidates in 189 if len <= count then candidates 190 else List.filteri (fun i _ -> i < count) candidates