this repo has no description
1open Types
2
3type t = {
4 config : config;
5 self : node_info;
6 members : Membership.t;
7 incarnation : incarnation Kcas.Loc.t;
8 sequence : int Kcas.Loc.t;
9 broadcast_queue : Dissemination.t;
10 pending_acks : Pending_acks.t;
11 probe_index : int Kcas.Loc.t;
12 send_pool : Buffer_pool.t;
13 recv_pool : Buffer_pool.t;
14 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t;
15 event_stream : node_event Eio.Stream.t;
16 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t;
17 cipher_key : Crypto.key;
18 stats : stats Kcas.Loc.t;
19 shutdown : bool Kcas.Loc.t;
20 clock : float Eio.Time.clock_ty Eio.Resource.t;
21 mono_clock : Eio.Time.Mono.ty Eio.Resource.t;
22 secure_random : Eio.Flow.source_ty Eio.Resource.t;
23}
24
25let next_seq t =
26 Kcas.Xt.commit
27 {
28 tx =
29 (fun ~xt ->
30 let seq = Kcas.Xt.get ~xt t.sequence in
31 Kcas.Xt.set ~xt t.sequence (seq + 1);
32 seq);
33 }
34
35let get_incarnation t =
36 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) }
37
38let incr_my_incarnation t =
39 Kcas.Xt.commit
40 {
41 tx =
42 (fun ~xt ->
43 let inc = Kcas.Xt.get ~xt t.incarnation in
44 let new_inc = incr_incarnation inc in
45 Kcas.Xt.set ~xt t.incarnation new_inc;
46 new_inc);
47 }
48
49let is_shutdown t =
50 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) }
51
52let now_mtime t =
53 Eio.Time.Mono.now t.mono_clock
54 |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns
55
56let update_stats t f =
57 Kcas.Xt.commit
58 {
59 tx =
60 (fun ~xt ->
61 let s = Kcas.Xt.get ~xt t.stats in
62 Kcas.Xt.set ~xt t.stats (f s));
63 }
64
65let emit_event t ev = Eio.Stream.add t.event_stream ev
66
67let send_packet t ~dst (packet : packet) =
68 Buffer_pool.with_buffer t.send_pool (fun buf ->
69 match Codec.encode_packet packet ~buf with
70 | Error `Buffer_too_small -> ()
71 | Ok encoded_len ->
72 let encoded = Cstruct.sub buf 0 encoded_len in
73 let to_send =
74 if t.config.encryption_enabled then
75 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded
76 else encoded
77 in
78 Transport.send_udp t.udp_sock dst to_send;
79 update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 }))
80
81let make_packet t ~primary ~piggyback =
82 { cluster = t.config.cluster_name; primary; piggyback }
83
84let drain_piggyback t ~max_bytes =
85 Dissemination.drain t.broadcast_queue ~max_bytes
86 ~encode_size:Codec.encoded_size
87
88let enqueue_broadcast t msg =
89 let transmits =
90 Protocol_pure.retransmit_limit t.config
91 ~node_count:(Membership.count t.members)
92 in
93 Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t);
94 Dissemination.invalidate t.broadcast_queue
95 ~invalidates:Protocol_pure.invalidates msg
96
97let handle_ping t ~src (ping : protocol_msg) =
98 match ping with
99 | Ping { seq; _ } ->
100 let piggyback =
101 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
102 in
103 let ack = Ack { seq; responder = t.self; payload = None } in
104 let packet = make_packet t ~primary:ack ~piggyback in
105 send_packet t ~dst:src packet
106 | _ -> ()
107
108let handle_ping_req t ~src:_ (ping_req : protocol_msg) =
109 match ping_req with
110 | Ping_req { seq; target; sender = _ } -> (
111 match Membership.find t.members target with
112 | None -> ()
113 | Some member ->
114 let target_addr = (Membership.Member.node member).addr in
115 let ping = Ping { seq; target; sender = t.self } in
116 let packet = make_packet t ~primary:ping ~piggyback:[] in
117 send_packet t ~dst:target_addr packet)
118 | _ -> ()
119
120let handle_ack t (ack : protocol_msg) =
121 match ack with
122 | Ack { seq; responder = _; payload } ->
123 ignore (Pending_acks.complete t.pending_acks ~seq ~payload)
124 | _ -> ()
125
126let apply_member_transition t member_id transition_fn =
127 let now = now_mtime t in
128 match Membership.find t.members member_id with
129 | None -> ()
130 | Some member ->
131 let snap = Membership.Member.snapshot_now member in
132 let transition = transition_fn snap ~now in
133 if transition.Protocol_pure.new_state.state <> snap.state then begin
134 Membership.update_member t.members member_id
135 {
136 update =
137 (fun m ~xt ->
138 match transition.new_state.state with
139 | Alive ->
140 Membership.Member.set_alive ~xt m
141 ~incarnation:transition.new_state.incarnation ~now
142 | Suspect ->
143 Membership.Member.set_suspect ~xt m
144 ~incarnation:transition.new_state.incarnation ~now
145 | Dead | Left ->
146 Membership.Member.set_dead ~xt m
147 ~incarnation:transition.new_state.incarnation ~now);
148 }
149 |> ignore
150 end;
151 List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts;
152 List.iter (emit_event t) transition.events
153
154let handle_alive_msg t (msg : protocol_msg) =
155 match msg with
156 | Alive { node; incarnation = _ } ->
157 apply_member_transition t node.id (fun snap ~now ->
158 Protocol_pure.handle_alive ~self:t.self.id snap msg ~now)
159 | _ -> ()
160
161let handle_suspect_msg t (msg : protocol_msg) =
162 match msg with
163 | Suspect { node; incarnation = _; suspector = _ } ->
164 apply_member_transition t node (fun snap ~now ->
165 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
166 | _ -> ()
167
168let handle_dead_msg t (msg : protocol_msg) =
169 match msg with
170 | Dead { node; incarnation = _; declarator = _ } ->
171 apply_member_transition t node (fun snap ~now ->
172 Protocol_pure.handle_dead snap msg ~now)
173 | _ -> ()
174
175let handle_user_msg t (msg : protocol_msg) =
176 match msg with
177 | User_msg { topic; payload; origin } -> (
178 let handlers =
179 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) }
180 in
181 match Membership.find t.members origin with
182 | None -> ()
183 | Some member ->
184 let node = Membership.Member.node member in
185 List.iter (fun h -> h node topic payload) handlers)
186 | _ -> ()
187
188let handle_message t ~src (msg : protocol_msg) =
189 match msg with
190 | Ping _ -> handle_ping t ~src msg
191 | Ping_req _ -> handle_ping_req t ~src msg
192 | Ack _ -> handle_ack t msg
193 | Alive _ -> handle_alive_msg t msg
194 | Suspect _ -> handle_suspect_msg t msg
195 | Dead _ -> handle_dead_msg t msg
196 | User_msg _ -> handle_user_msg t msg
197
198let handle_packet t ~src (packet : packet) =
199 if String.equal packet.cluster t.config.cluster_name then begin
200 handle_message t ~src packet.primary;
201 List.iter (handle_message t ~src) packet.piggyback;
202 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
203 end
204
205let process_udp_packet t ~buf ~src =
206 let decrypted_result =
207 if t.config.encryption_enabled then Crypto.decrypt ~key:t.cipher_key buf
208 else Ok buf
209 in
210 match decrypted_result with
211 | Error _ ->
212 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
213 | Ok decrypted -> (
214 match Codec.decode_packet decrypted with
215 | Error _ ->
216 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
217 | Ok packet -> handle_packet t ~src packet)
218
219let run_udp_receiver t =
220 while not (is_shutdown t) do
221 Buffer_pool.with_buffer t.recv_pool (fun buf ->
222 let n, src = Transport.recv_udp t.udp_sock buf in
223 let received = Cstruct.sub buf 0 n in
224 process_udp_packet t ~buf:received ~src)
225 done
226
227let probe_member t (member : Membership.Member.t) =
228 let target = Membership.Member.node member in
229 let seq = next_seq t in
230 let piggyback =
231 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
232 in
233 let ping = Ping { seq; target = target.id; sender = t.self } in
234 let packet = make_packet t ~primary:ping ~piggyback in
235
236 let waiter = Pending_acks.register t.pending_acks ~seq in
237 send_packet t ~dst:target.addr packet;
238
239 match
240 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
241 with
242 | Some _ ->
243 let now = now_mtime t in
244 Membership.update_member t.members target.id
245 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
246 |> ignore;
247 true
248 | None ->
249 Pending_acks.cancel t.pending_acks ~seq;
250 false
251
252let indirect_probe t (member : Membership.Member.t) =
253 let target = Membership.Member.node member in
254 let seq = next_seq t in
255 let ping_req = Ping_req { seq; target = target.id; sender = t.self } in
256
257 let all_members = Membership.to_node_list t.members in
258 let indirect_targets =
259 Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id
260 ~count:t.config.indirect_checks ~members:all_members
261 in
262
263 let waiter = Pending_acks.register t.pending_acks ~seq in
264 List.iter
265 (fun node ->
266 let packet = make_packet t ~primary:ping_req ~piggyback:[] in
267 send_packet t ~dst:node.addr packet)
268 indirect_targets;
269
270 match
271 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
272 with
273 | Some _ ->
274 let now = now_mtime t in
275 Membership.update_member t.members target.id
276 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
277 |> ignore;
278 true
279 | None ->
280 Pending_acks.cancel t.pending_acks ~seq;
281 false
282
283let suspect_member t (member : Membership.Member.t) =
284 let node = Membership.Member.node member in
285 let inc = get_incarnation t in
286 let msg =
287 Suspect { node = node.id; incarnation = inc; suspector = t.self.id }
288 in
289 apply_member_transition t node.id (fun snap ~now ->
290 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
291
292let probe_cycle t =
293 let members = Membership.to_list t.members in
294 let member_nodes = List.map Membership.Member.node members in
295 let probe_idx =
296 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) }
297 in
298
299 match
300 Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx
301 ~members:member_nodes
302 with
303 | None -> ()
304 | Some (target_node, new_idx) -> (
305 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) };
306 match Membership.find t.members target_node.id with
307 | None -> ()
308 | Some member ->
309 let direct_ok = probe_member t member in
310 if not direct_ok then
311 let indirect_ok = indirect_probe t member in
312 if not indirect_ok then suspect_member t member)
313
314let run_protocol t =
315 while not (is_shutdown t) do
316 probe_cycle t;
317 Eio.Time.sleep t.clock t.config.protocol_interval
318 done
319
320let create ~config ~self ~udp_sock ~clock ~mono_clock ~secure_random =
321 match Crypto.init_key config.secret_key with
322 | Error _ -> Error `Invalid_key
323 | Ok cipher_key ->
324 Ok
325 {
326 config;
327 self;
328 members = Membership.create ();
329 incarnation = Kcas.Loc.make zero_incarnation;
330 sequence = Kcas.Loc.make 0;
331 broadcast_queue = Dissemination.create ();
332 pending_acks = Pending_acks.create ();
333 probe_index = Kcas.Loc.make 0;
334 send_pool =
335 Buffer_pool.create ~size:config.udp_buffer_size
336 ~count:config.send_buffer_count;
337 recv_pool =
338 Buffer_pool.create ~size:config.udp_buffer_size
339 ~count:config.recv_buffer_count;
340 udp_sock;
341 event_stream = Eio.Stream.create 100;
342 user_handlers = Kcas.Loc.make [];
343 cipher_key;
344 stats = Kcas.Loc.make empty_stats;
345 shutdown = Kcas.Loc.make false;
346 clock;
347 mono_clock;
348 secure_random;
349 }
350
351let shutdown t =
352 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) }
353
354let add_member t node_info =
355 let now = now_mtime t in
356 let member = Membership.Member.create ~now node_info in
357 Membership.add t.members member;
358 emit_event t (Join node_info)
359
360let remove_member t node_id =
361 match Membership.find t.members node_id with
362 | None -> false
363 | Some member ->
364 let node = Membership.Member.node member in
365 let removed = Membership.remove t.members node_id in
366 if removed then emit_event t (Leave node);
367 removed
368
369let local_node t = t.self
370let members t = Membership.to_list t.members
371let member_count t = Membership.count t.members
372let events t = t.event_stream
373
374let stats t =
375 let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in
376 let alive, suspect, dead =
377 Membership.snapshot_all t.members
378 |> List.fold_left
379 (fun (a, s, d) snap ->
380 match snap.state with
381 | Alive -> (a + 1, s, d)
382 | Suspect -> (a, s + 1, d)
383 | Dead | Left -> (a, s, d + 1))
384 (0, 0, 0)
385 in
386 {
387 base with
388 nodes_alive = alive;
389 nodes_suspect = suspect;
390 nodes_dead = dead;
391 queue_depth = Dissemination.depth t.broadcast_queue;
392 buffers_available =
393 Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool;
394 buffers_total =
395 Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool;
396 }
397
398let broadcast t ~topic ~payload =
399 let msg = User_msg { topic; payload; origin = t.self.id } in
400 enqueue_broadcast t msg
401
402let on_message t handler =
403 Kcas.Xt.commit
404 {
405 tx =
406 (fun ~xt ->
407 let handlers = Kcas.Xt.get ~xt t.user_handlers in
408 Kcas.Xt.set ~xt t.user_handlers (handler :: handlers));
409 }