this repo has no description
1open Types
2
3type 'a transition = {
4 new_state : 'a;
5 broadcasts : protocol_msg list;
6 events : node_event list;
7}
8
9let no_change state = { new_state = state; broadcasts = []; events = [] }
10
11let node_id_of_msg = function
12 | Ping { sender; _ } -> sender.id
13 | Ping_req { sender; _ } -> sender.id
14 | Ack { responder; _ } -> responder.id
15 | Alive { node; _ } -> node.id
16 | Suspect { node; _ } -> node
17 | Dead { node; _ } -> node
18 | User_msg { origin; _ } -> origin
19
20let incarnation_of_msg = function
21 | Alive { incarnation; _ } -> Some incarnation
22 | Suspect { incarnation; _ } -> Some incarnation
23 | Dead { incarnation; _ } -> Some incarnation
24 | Ping _ | Ping_req _ | Ack _ | User_msg _ -> None
25
26let handle_alive ~(self : node_id) (member : member_snapshot)
27 (msg : protocol_msg) ~(now : Mtime.span) : member_snapshot transition =
28 let _ = self in
29 match msg with
30 | Alive { node; incarnation = msg_inc } ->
31 if not (equal_node_id node.id member.node.id) then no_change member
32 else
33 let cmp = compare_incarnation msg_inc member.incarnation in
34 if cmp > 0 then
35 let new_member =
36 { node; state = Alive; incarnation = msg_inc; state_change = now }
37 in
38 let events =
39 match member.state with
40 | Dead | Left -> [ Join node ]
41 | Suspect -> [ Alive_event node ]
42 | Alive -> [ Update node ]
43 in
44 { new_state = new_member; broadcasts = [ msg ]; events }
45 else if cmp = 0 && member.state = Suspect then
46 let new_member = { member with state = Alive; state_change = now } in
47 {
48 new_state = new_member;
49 broadcasts = [ msg ];
50 events = [ Alive_event node ];
51 }
52 else no_change member
53 | _ -> no_change member
54
55let handle_suspect ~(self : node_id) (member : member_snapshot)
56 (msg : protocol_msg) ~(now : Mtime.span) : member_snapshot transition =
57 match msg with
58 | Suspect { node = msg_node; incarnation = msg_inc; suspector = _ } ->
59 if not (equal_node_id msg_node member.node.id) then no_change member
60 else if member.state = Dead then no_change member
61 else if equal_node_id msg_node self then
62 let new_inc = incr_incarnation member.incarnation in
63 let refute = Alive { node = member.node; incarnation = new_inc } in
64 {
65 new_state = { member with incarnation = new_inc };
66 broadcasts = [ refute ];
67 events = [];
68 }
69 else
70 let dominated =
71 compare_incarnation msg_inc member.incarnation > 0
72 || compare_incarnation msg_inc member.incarnation = 0
73 && member.state = Alive
74 in
75 if dominated then
76 let new_member =
77 {
78 member with
79 state = Suspect;
80 incarnation = msg_inc;
81 state_change = now;
82 }
83 in
84 {
85 new_state = new_member;
86 broadcasts = [ msg ];
87 events = [ Suspect_event member.node ];
88 }
89 else no_change member
90 | _ -> no_change member
91
92let handle_dead (member : member_snapshot) (msg : protocol_msg)
93 ~(now : Mtime.span) : member_snapshot transition =
94 match msg with
95 | Dead { node = msg_node; incarnation = msg_inc; declarator = _ } ->
96 if not (equal_node_id msg_node member.node.id) then no_change member
97 else if member.state = Dead then no_change member
98 else if compare_incarnation msg_inc member.incarnation >= 0 then
99 let new_member =
100 {
101 member with
102 state = Dead;
103 incarnation = msg_inc;
104 state_change = now;
105 }
106 in
107 {
108 new_state = new_member;
109 broadcasts = [ msg ];
110 events = [ Leave member.node ];
111 }
112 else no_change member
113 | _ -> no_change member
114
115let suspicion_timeout (config : config) ~node_count =
116 let base =
117 float_of_int config.suspicion_mult
118 *. log (float_of_int (max 1 node_count) +. 1.)
119 *. config.protocol_interval
120 in
121 Float.min base config.suspicion_max_timeout
122
123let retransmit_limit (config : config) ~node_count =
124 let log_n = log (float_of_int (max 1 node_count) +. 1.) in
125 int_of_float (ceil (float_of_int config.retransmit_mult *. log_n))
126
127let next_probe_target ~(self : node_id) ~probe_index ~(members : node_info list)
128 =
129 match members with
130 | [] -> None
131 | _ ->
132 let len = List.length members in
133 let rec find idx attempts =
134 if attempts >= len then None
135 else
136 let candidate = List.nth members (idx mod len) in
137 if equal_node_id candidate.id self then find (idx + 1) (attempts + 1)
138 else Some (candidate, (idx + 1) mod len)
139 in
140 find probe_index 0
141
142let invalidates ~(newer : protocol_msg) ~(older : protocol_msg) : bool =
143 let newer_id = node_id_of_msg newer in
144 let older_id = node_id_of_msg older in
145 if not (equal_node_id newer_id older_id) then false
146 else
147 match (newer, older) with
148 | Dead _, (Alive _ | Suspect _ | Dead _) -> true
149 | Alive { incarnation = new_inc; _ }, Suspect { incarnation = old_inc; _ }
150 ->
151 compare_incarnation new_inc old_inc >= 0
152 | Alive { incarnation = new_inc; _ }, Alive { incarnation = old_inc; _ } ->
153 compare_incarnation new_inc old_inc > 0
154 | Suspect { incarnation = new_inc; _ }, Suspect { incarnation = old_inc; _ }
155 ->
156 compare_incarnation new_inc old_inc > 0
157 | _ -> false
158
159let merge_member_state ~(local : member_snapshot) ~(remote : member_snapshot) :
160 member_snapshot =
161 if not (equal_node_id local.node.id remote.node.id) then local
162 else
163 match (local.state, remote.state) with
164 | Dead, _ | Left, _ -> local
165 | _, Dead | _, Left ->
166 if compare_incarnation remote.incarnation local.incarnation >= 0 then
167 remote
168 else local
169 | Alive, Alive | Suspect, Suspect ->
170 if compare_incarnation remote.incarnation local.incarnation > 0 then
171 remote
172 else local
173 | Alive, Suspect ->
174 if compare_incarnation remote.incarnation local.incarnation > 0 then
175 remote
176 else local
177 | Suspect, Alive ->
178 if compare_incarnation remote.incarnation local.incarnation >= 0 then
179 remote
180 else local
181
182let select_indirect_targets ~(self : node_id) ~exclude ~count
183 ~(members : node_info list) : node_info list =
184 members
185 |> List.filter (fun m ->
186 (not (equal_node_id m.id self)) && not (equal_node_id m.id exclude))
187 |> fun candidates ->
188 let len = List.length candidates in
189 if len <= count then candidates
190 else List.filteri (fun i _ -> i < count) candidates