this repo has no description
1open Types
2
3type t = {
4 config : config;
5 self : node_info;
6 members : Membership.t;
7 incarnation : incarnation Kcas.Loc.t;
8 sequence : int Kcas.Loc.t;
9 broadcast_queue : Dissemination.t;
10 pending_acks : Pending_acks.t;
11 probe_index : int Kcas.Loc.t;
12 send_pool : Buffer_pool.t;
13 recv_pool : Buffer_pool.t;
14 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t;
15 tcp_listener : [ `Generic ] Eio.Net.listening_socket_ty Eio.Resource.t;
16 event_stream : node_event Eio.Stream.t;
17 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t;
18 cipher_key : Crypto.key;
19 stats : stats Kcas.Loc.t;
20 shutdown : bool Kcas.Loc.t;
21 clock : float Eio.Time.clock_ty Eio.Resource.t;
22 mono_clock : Eio.Time.Mono.ty Eio.Resource.t;
23 secure_random : Eio.Flow.source_ty Eio.Resource.t;
24 sw : Eio.Switch.t;
25}
26
27let next_seq t =
28 Kcas.Xt.commit
29 {
30 tx =
31 (fun ~xt ->
32 let seq = Kcas.Xt.get ~xt t.sequence in
33 Kcas.Xt.set ~xt t.sequence (seq + 1);
34 seq);
35 }
36
37let get_incarnation t =
38 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) }
39
40let incr_my_incarnation t =
41 Kcas.Xt.commit
42 {
43 tx =
44 (fun ~xt ->
45 let inc = Kcas.Xt.get ~xt t.incarnation in
46 let new_inc = incr_incarnation inc in
47 Kcas.Xt.set ~xt t.incarnation new_inc;
48 new_inc);
49 }
50
51let is_shutdown t =
52 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) }
53
54let now_mtime t =
55 Eio.Time.Mono.now t.mono_clock
56 |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns
57
58let update_stats t f =
59 Kcas.Xt.commit
60 {
61 tx =
62 (fun ~xt ->
63 let s = Kcas.Xt.get ~xt t.stats in
64 Kcas.Xt.set ~xt t.stats (f s));
65 }
66
67let emit_event t ev = Eio.Stream.add t.event_stream ev
68
69let send_packet t ~dst (packet : packet) =
70 Buffer_pool.with_buffer t.send_pool (fun buf ->
71 match Codec.encode_packet packet ~buf with
72 | Error `Buffer_too_small -> ()
73 | Ok encoded_len ->
74 let encoded = Cstruct.sub buf 0 encoded_len in
75 let to_send =
76 if t.config.encryption_enabled then
77 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded
78 else encoded
79 in
80 Transport.send_udp t.udp_sock dst to_send;
81 update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 }))
82
83let make_packet t ~primary ~piggyback =
84 { cluster = t.config.cluster_name; primary; piggyback }
85
86let drain_piggyback t ~max_bytes =
87 Dissemination.drain t.broadcast_queue ~max_bytes
88 ~encode_size:Codec.encoded_size
89
90let enqueue_broadcast t msg =
91 let transmits =
92 Protocol_pure.retransmit_limit t.config
93 ~node_count:(Membership.count t.members)
94 in
95 Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t);
96 Dissemination.invalidate t.broadcast_queue
97 ~invalidates:Protocol_pure.invalidates msg
98
99let handle_ping t ~src (ping : protocol_msg) =
100 match ping with
101 | Ping { seq; _ } ->
102 let piggyback =
103 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
104 in
105 let ack = Ack { seq; responder = t.self; payload = None } in
106 let packet = make_packet t ~primary:ack ~piggyback in
107 send_packet t ~dst:src packet
108 | _ -> ()
109
110let handle_ping_req t ~src:_ (ping_req : protocol_msg) =
111 match ping_req with
112 | Ping_req { seq; target; sender = _ } -> (
113 match Membership.find t.members target with
114 | None -> ()
115 | Some member ->
116 let target_addr = (Membership.Member.node member).addr in
117 let ping = Ping { seq; target; sender = t.self } in
118 let packet = make_packet t ~primary:ping ~piggyback:[] in
119 send_packet t ~dst:target_addr packet)
120 | _ -> ()
121
122let handle_ack t (ack : protocol_msg) =
123 match ack with
124 | Ack { seq; responder = _; payload } ->
125 ignore (Pending_acks.complete t.pending_acks ~seq ~payload)
126 | _ -> ()
127
128let apply_member_transition t member_id transition_fn =
129 let now = now_mtime t in
130 match Membership.find t.members member_id with
131 | None -> ()
132 | Some member ->
133 let snap = Membership.Member.snapshot_now member in
134 let transition = transition_fn snap ~now in
135 if transition.Protocol_pure.new_state.state <> snap.state then begin
136 Membership.update_member t.members member_id
137 {
138 update =
139 (fun m ~xt ->
140 match transition.new_state.state with
141 | Alive ->
142 Membership.Member.set_alive ~xt m
143 ~incarnation:transition.new_state.incarnation ~now
144 | Suspect ->
145 Membership.Member.set_suspect ~xt m
146 ~incarnation:transition.new_state.incarnation ~now
147 | Dead | Left ->
148 Membership.Member.set_dead ~xt m
149 ~incarnation:transition.new_state.incarnation ~now);
150 }
151 |> ignore
152 end;
153 List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts;
154 List.iter (emit_event t) transition.events
155
156let handle_alive_msg t (msg : protocol_msg) =
157 match msg with
158 | Alive { node; incarnation = _ } ->
159 apply_member_transition t node.id (fun snap ~now ->
160 Protocol_pure.handle_alive ~self:t.self.id snap msg ~now)
161 | _ -> ()
162
163let handle_suspect_msg t (msg : protocol_msg) =
164 match msg with
165 | Suspect { node; incarnation = _; suspector = _ } ->
166 apply_member_transition t node (fun snap ~now ->
167 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
168 | _ -> ()
169
170let handle_dead_msg t (msg : protocol_msg) =
171 match msg with
172 | Dead { node; incarnation = _; declarator = _ } ->
173 apply_member_transition t node (fun snap ~now ->
174 Protocol_pure.handle_dead snap msg ~now)
175 | _ -> ()
176
177let handle_user_msg t (msg : protocol_msg) =
178 match msg with
179 | User_msg { topic; payload; origin } -> (
180 let handlers =
181 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) }
182 in
183 match Membership.find t.members origin with
184 | None -> ()
185 | Some member ->
186 let node = Membership.Member.node member in
187 List.iter (fun h -> h node topic payload) handlers)
188 | _ -> ()
189
190let handle_message t ~src (msg : protocol_msg) =
191 match msg with
192 | Ping _ -> handle_ping t ~src msg
193 | Ping_req _ -> handle_ping_req t ~src msg
194 | Ack _ -> handle_ack t msg
195 | Alive _ -> handle_alive_msg t msg
196 | Suspect _ -> handle_suspect_msg t msg
197 | Dead _ -> handle_dead_msg t msg
198 | User_msg _ -> handle_user_msg t msg
199
200let handle_packet t ~src (packet : packet) =
201 if String.equal packet.cluster t.config.cluster_name then begin
202 handle_message t ~src packet.primary;
203 List.iter (handle_message t ~src) packet.piggyback;
204 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
205 end
206
207let process_udp_packet t ~buf ~src =
208 let decrypted_result =
209 if t.config.encryption_enabled then Crypto.decrypt ~key:t.cipher_key buf
210 else Ok buf
211 in
212 match decrypted_result with
213 | Error _ ->
214 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
215 | Ok decrypted -> (
216 match Codec.decode_packet decrypted with
217 | Error _ ->
218 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
219 | Ok packet -> handle_packet t ~src packet)
220
221let run_udp_receiver t =
222 while not (is_shutdown t) do
223 Buffer_pool.with_buffer t.recv_pool (fun buf ->
224 let n, src = Transport.recv_udp t.udp_sock buf in
225 let received = Cstruct.sub buf 0 n in
226 process_udp_packet t ~buf:received ~src)
227 done
228
229let build_local_state t ~is_join =
230 let members = Membership.to_list t.members in
231 let self_node =
232 let addr_bytes, port =
233 match t.self.addr with
234 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p)
235 | `Unix _ -> ("", 0)
236 in
237 Types.Wire.
238 {
239 pns_name = Types.node_id_to_string t.self.id;
240 pns_addr = addr_bytes;
241 pns_port = port;
242 pns_meta = t.self.meta;
243 pns_incarnation = Types.incarnation_to_int (get_incarnation t);
244 pns_state = 0;
245 pns_vsn = Types.default_vsn;
246 }
247 in
248 let member_nodes =
249 List.map
250 (fun member ->
251 let node = Membership.Member.node member in
252 let snap = Membership.Member.snapshot_now member in
253 let addr_bytes, port =
254 match node.addr with
255 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p)
256 | `Unix _ -> ("", 0)
257 in
258 Types.Wire.
259 {
260 pns_name = Types.node_id_to_string node.id;
261 pns_addr = addr_bytes;
262 pns_port = port;
263 pns_meta = node.meta;
264 pns_incarnation = Types.incarnation_to_int snap.incarnation;
265 pns_state = Types.member_state_to_int snap.state;
266 pns_vsn = Types.default_vsn;
267 })
268 members
269 in
270 let all_nodes = self_node :: member_nodes in
271 let header =
272 Types.Wire.
273 {
274 pp_nodes = List.length all_nodes;
275 pp_user_state_len = 0;
276 pp_join = is_join;
277 }
278 in
279 (header, all_nodes)
280
281let merge_remote_state t (nodes : Types.Wire.push_node_state list) ~is_join =
282 List.iter
283 (fun (pns : Types.Wire.push_node_state) ->
284 let node_id = Types.node_id_of_string pns.pns_name in
285 if not (Types.equal_node_id node_id t.self.id) then
286 let ip = Types.ip_of_bytes pns.pns_addr in
287 let node_info =
288 Types.make_node_info ~id:node_id
289 ~addr:(`Udp (ip, pns.pns_port))
290 ~meta:pns.pns_meta
291 in
292 match Membership.find t.members node_id with
293 | None ->
294 if pns.pns_state <= 1 then begin
295 let now = now_mtime t in
296 let member = Membership.Member.create ~now node_info in
297 Membership.add t.members member;
298 emit_event t (Types.Join node_info)
299 end
300 | Some existing ->
301 let snap = Membership.Member.snapshot_now existing in
302 let remote_inc = Types.incarnation_of_int pns.pns_incarnation in
303 if Types.compare_incarnation remote_inc snap.incarnation > 0 then begin
304 let now = now_mtime t in
305 let new_state = Types.member_state_of_int pns.pns_state in
306 Membership.update_member t.members node_id
307 {
308 update =
309 (fun m ~xt ->
310 match new_state with
311 | Types.Alive ->
312 Membership.Member.set_alive ~xt m
313 ~incarnation:remote_inc ~now
314 | Types.Suspect ->
315 Membership.Member.set_suspect ~xt m
316 ~incarnation:remote_inc ~now
317 | Types.Dead | Types.Left ->
318 Membership.Member.set_dead ~xt m
319 ~incarnation:remote_inc ~now);
320 }
321 |> ignore
322 end)
323 nodes;
324 if is_join then
325 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
326
327let read_exact flow buf n =
328 let rec loop offset remaining =
329 if remaining <= 0 then Ok ()
330 else
331 let chunk = Cstruct.sub buf offset remaining in
332 match Eio.Flow.single_read flow chunk with
333 | 0 -> Error `Connection_closed
334 | read -> loop (offset + read) (remaining - read)
335 | exception End_of_file -> Error `Connection_closed
336 | exception _ -> Error `Read_error
337 in
338 loop 0 n
339
340let read_available flow buf =
341 match Eio.Flow.single_read flow buf with
342 | n -> n
343 | exception End_of_file -> 0
344 | exception _ -> 0
345
346let handle_tcp_connection t flow =
347 let buf = Cstruct.create 65536 in
348 match read_exact flow buf 1 with
349 | Error _ -> ()
350 | Ok () -> (
351 let msg_type_byte = Cstruct.get_uint8 buf 0 in
352 let get_push_pull_payload () =
353 let n = read_available flow (Cstruct.shift buf 1) in
354 if n > 0 then Some (Cstruct.sub buf 1 n) else None
355 in
356 let payload_opt =
357 if msg_type_byte = Types.Wire.message_type_to_int Types.Wire.Encrypt_msg
358 then
359 match get_push_pull_payload () with
360 | Some encrypted -> (
361 match Crypto.decrypt ~key:t.cipher_key encrypted with
362 | Ok decrypted -> Some decrypted
363 | Error _ -> None)
364 | None -> None
365 else if
366 msg_type_byte
367 = Types.Wire.message_type_to_int Types.Wire.Has_label_msg
368 then
369 match read_exact flow buf 1 with
370 | Error _ -> None
371 | Ok () ->
372 let label_len = Cstruct.get_uint8 buf 0 in
373 if label_len > 0 then
374 match read_exact flow buf label_len with
375 | Error _ -> None
376 | Ok () -> (
377 match read_exact flow buf 1 with
378 | Error _ -> None
379 | Ok () ->
380 let inner_type = Cstruct.get_uint8 buf 0 in
381 if
382 inner_type
383 = Types.Wire.message_type_to_int
384 Types.Wire.Push_pull_msg
385 then get_push_pull_payload ()
386 else None)
387 else None
388 else if
389 msg_type_byte
390 = Types.Wire.message_type_to_int Types.Wire.Push_pull_msg
391 then get_push_pull_payload ()
392 else None
393 in
394 match payload_opt with
395 | None -> ()
396 | Some payload -> (
397 let data = Cstruct.to_string payload in
398 match Codec.decode_push_pull_msg data with
399 | Error _ -> ()
400 | Ok (header, nodes, _user_state) -> (
401 merge_remote_state t nodes ~is_join:header.pp_join;
402 let resp_header, resp_nodes =
403 build_local_state t ~is_join:false
404 in
405 let response =
406 Codec.encode_push_pull_msg ~header:resp_header ~nodes:resp_nodes
407 ~user_state:""
408 in
409 let resp_buf =
410 if t.config.encryption_enabled then
411 let plain = Cstruct.of_string response in
412 let encrypted =
413 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random
414 plain
415 in
416 encrypted
417 else Cstruct.of_string response
418 in
419 try Eio.Flow.write flow [ resp_buf ] with _ -> ())))
420
421let run_tcp_listener t =
422 while not (is_shutdown t) do
423 match Eio.Net.accept ~sw:t.sw t.tcp_listener with
424 | flow, _addr ->
425 (try handle_tcp_connection t flow with _ -> ());
426 Eio.Flow.close flow
427 | exception _ -> ()
428 done
429
430let probe_member t (member : Membership.Member.t) =
431 let target = Membership.Member.node member in
432 let seq = next_seq t in
433 let piggyback =
434 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
435 in
436 let ping = Ping { seq; target = target.id; sender = t.self } in
437 let packet = make_packet t ~primary:ping ~piggyback in
438
439 let waiter = Pending_acks.register t.pending_acks ~seq in
440 send_packet t ~dst:target.addr packet;
441
442 match
443 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
444 with
445 | Some _ ->
446 let now = now_mtime t in
447 Membership.update_member t.members target.id
448 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
449 |> ignore;
450 true
451 | None ->
452 Pending_acks.cancel t.pending_acks ~seq;
453 false
454
455let indirect_probe t (member : Membership.Member.t) =
456 let target = Membership.Member.node member in
457 let seq = next_seq t in
458 let ping_req = Ping_req { seq; target = target.id; sender = t.self } in
459
460 let all_members = Membership.to_node_list t.members in
461 let indirect_targets =
462 Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id
463 ~count:t.config.indirect_checks ~members:all_members
464 in
465
466 let waiter = Pending_acks.register t.pending_acks ~seq in
467 List.iter
468 (fun node ->
469 let packet = make_packet t ~primary:ping_req ~piggyback:[] in
470 send_packet t ~dst:node.addr packet)
471 indirect_targets;
472
473 match
474 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
475 with
476 | Some _ ->
477 let now = now_mtime t in
478 Membership.update_member t.members target.id
479 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
480 |> ignore;
481 true
482 | None ->
483 Pending_acks.cancel t.pending_acks ~seq;
484 false
485
486let suspect_member t (member : Membership.Member.t) =
487 let node = Membership.Member.node member in
488 let inc = get_incarnation t in
489 let msg =
490 Suspect { node = node.id; incarnation = inc; suspector = t.self.id }
491 in
492 apply_member_transition t node.id (fun snap ~now ->
493 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
494
495let probe_cycle t =
496 let members = Membership.to_list t.members in
497 let member_nodes = List.map Membership.Member.node members in
498 let probe_idx =
499 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) }
500 in
501
502 match
503 Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx
504 ~members:member_nodes
505 with
506 | None -> ()
507 | Some (target_node, new_idx) -> (
508 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) };
509 match Membership.find t.members target_node.id with
510 | None -> ()
511 | Some member ->
512 let direct_ok = probe_member t member in
513 if not direct_ok then
514 let indirect_ok = indirect_probe t member in
515 if not indirect_ok then suspect_member t member)
516
517let run_protocol t =
518 while not (is_shutdown t) do
519 probe_cycle t;
520 Eio.Time.sleep t.clock t.config.protocol_interval
521 done
522
523let create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock ~mono_clock
524 ~secure_random =
525 match Crypto.init_key config.secret_key with
526 | Error _ -> Error `Invalid_key
527 | Ok cipher_key ->
528 Ok
529 {
530 config;
531 self;
532 members = Membership.create ();
533 incarnation = Kcas.Loc.make zero_incarnation;
534 sequence = Kcas.Loc.make 0;
535 broadcast_queue = Dissemination.create ();
536 pending_acks = Pending_acks.create ();
537 probe_index = Kcas.Loc.make 0;
538 send_pool =
539 Buffer_pool.create ~size:config.udp_buffer_size
540 ~count:config.send_buffer_count;
541 recv_pool =
542 Buffer_pool.create ~size:config.udp_buffer_size
543 ~count:config.recv_buffer_count;
544 udp_sock;
545 tcp_listener;
546 event_stream = Eio.Stream.create 100;
547 user_handlers = Kcas.Loc.make [];
548 cipher_key;
549 stats = Kcas.Loc.make empty_stats;
550 shutdown = Kcas.Loc.make false;
551 clock;
552 mono_clock;
553 secure_random;
554 sw;
555 }
556
557let shutdown t =
558 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) }
559
560let add_member t node_info =
561 let now = now_mtime t in
562 let member = Membership.Member.create ~now node_info in
563 Membership.add t.members member;
564 emit_event t (Join node_info)
565
566let remove_member t node_id =
567 match Membership.find t.members node_id with
568 | None -> false
569 | Some member ->
570 let node = Membership.Member.node member in
571 let removed = Membership.remove t.members node_id in
572 if removed then emit_event t (Leave node);
573 removed
574
575let local_node t = t.self
576let members t = Membership.to_list t.members
577let member_count t = Membership.count t.members
578let events t = t.event_stream
579
580let stats t =
581 let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in
582 let alive, suspect, dead =
583 Membership.snapshot_all t.members
584 |> List.fold_left
585 (fun (a, s, d) snap ->
586 match snap.state with
587 | Alive -> (a + 1, s, d)
588 | Suspect -> (a, s + 1, d)
589 | Dead | Left -> (a, s, d + 1))
590 (0, 0, 0)
591 in
592 {
593 base with
594 nodes_alive = alive;
595 nodes_suspect = suspect;
596 nodes_dead = dead;
597 queue_depth = Dissemination.depth t.broadcast_queue;
598 buffers_available =
599 Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool;
600 buffers_total =
601 Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool;
602 }
603
604let broadcast t ~topic ~payload =
605 let msg = User_msg { topic; payload; origin = t.self.id } in
606 enqueue_broadcast t msg
607
608let on_message t handler =
609 Kcas.Xt.commit
610 {
611 tx =
612 (fun ~xt ->
613 let handlers = Kcas.Xt.get ~xt t.user_handlers in
614 Kcas.Xt.set ~xt t.user_handlers (handler :: handlers));
615 }