this repo has no description
1open Types
2
3type t = {
4 config : config;
5 self : node_info;
6 members : Membership.t;
7 incarnation : incarnation Kcas.Loc.t;
8 sequence : int Kcas.Loc.t;
9 broadcast_queue : Dissemination.t;
10 pending_acks : Pending_acks.t;
11 probe_index : int Kcas.Loc.t;
12 send_pool : Buffer_pool.t;
13 recv_pool : Buffer_pool.t;
14 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t;
15 tcp_listener : [ `Generic ] Eio.Net.listening_socket_ty Eio.Resource.t;
16 event_stream : node_event Eio.Stream.t;
17 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t;
18 cipher_key : Crypto.key;
19 stats : stats Kcas.Loc.t;
20 shutdown : bool Kcas.Loc.t;
21 clock : float Eio.Time.clock_ty Eio.Resource.t;
22 mono_clock : Eio.Time.Mono.ty Eio.Resource.t;
23 secure_random : Eio.Flow.source_ty Eio.Resource.t;
24 sw : Eio.Switch.t;
25}
26
27let next_seq t =
28 Kcas.Xt.commit
29 {
30 tx =
31 (fun ~xt ->
32 let seq = Kcas.Xt.get ~xt t.sequence in
33 Kcas.Xt.set ~xt t.sequence (seq + 1);
34 seq);
35 }
36
37let get_incarnation t =
38 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) }
39
40let incr_my_incarnation t =
41 Kcas.Xt.commit
42 {
43 tx =
44 (fun ~xt ->
45 let inc = Kcas.Xt.get ~xt t.incarnation in
46 let new_inc = incr_incarnation inc in
47 Kcas.Xt.set ~xt t.incarnation new_inc;
48 new_inc);
49 }
50
51let is_shutdown t =
52 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) }
53
54let now_mtime t =
55 Eio.Time.Mono.now t.mono_clock
56 |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns
57
58let update_stats t f =
59 Kcas.Xt.commit
60 {
61 tx =
62 (fun ~xt ->
63 let s = Kcas.Xt.get ~xt t.stats in
64 Kcas.Xt.set ~xt t.stats (f s));
65 }
66
67let emit_event t ev = Eio.Stream.add t.event_stream ev
68
69let send_packet t ~dst (packet : packet) =
70 Buffer_pool.with_buffer t.send_pool (fun buf ->
71 match Codec.encode_packet packet ~buf with
72 | Error `Buffer_too_small -> ()
73 | Ok encoded_len ->
74 let encoded = Cstruct.sub buf 0 encoded_len in
75 let to_send =
76 if t.config.encryption_enabled then
77 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded
78 else encoded
79 in
80 Transport.send_udp t.udp_sock dst to_send;
81 update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 }))
82
83let make_packet t ~primary ~piggyback =
84 { cluster = t.config.cluster_name; primary; piggyback }
85
86let drain_piggyback t ~max_bytes =
87 Dissemination.drain t.broadcast_queue ~max_bytes
88 ~encode_size:Codec.encoded_size
89
90let enqueue_broadcast t msg =
91 let transmits =
92 Protocol_pure.retransmit_limit t.config
93 ~node_count:(Membership.count t.members)
94 in
95 Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t);
96 Dissemination.invalidate t.broadcast_queue
97 ~invalidates:Protocol_pure.invalidates msg
98
99let handle_ping t ~src (ping : protocol_msg) =
100 match ping with
101 | Ping { seq; _ } ->
102 let piggyback =
103 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
104 in
105 let ack = Ack { seq; responder = t.self; payload = None } in
106 let packet = make_packet t ~primary:ack ~piggyback in
107 send_packet t ~dst:src packet
108 | _ -> ()
109
110let handle_ping_req t ~src:_ (ping_req : protocol_msg) =
111 match ping_req with
112 | Ping_req { seq; target; sender = _ } -> (
113 match Membership.find t.members target with
114 | None -> ()
115 | Some member ->
116 let target_addr = (Membership.Member.node member).addr in
117 let ping = Ping { seq; target; sender = t.self } in
118 let packet = make_packet t ~primary:ping ~piggyback:[] in
119 send_packet t ~dst:target_addr packet)
120 | _ -> ()
121
122let handle_ack t (ack : protocol_msg) =
123 match ack with
124 | Ack { seq; responder = _; payload } ->
125 ignore (Pending_acks.complete t.pending_acks ~seq ~payload)
126 | _ -> ()
127
128let apply_member_transition t member_id transition_fn =
129 let now = now_mtime t in
130 match Membership.find t.members member_id with
131 | None -> ()
132 | Some member ->
133 let snap = Membership.Member.snapshot_now member in
134 let transition = transition_fn snap ~now in
135 if transition.Protocol_pure.new_state.state <> snap.state then begin
136 Membership.update_member t.members member_id
137 {
138 update =
139 (fun m ~xt ->
140 match transition.new_state.state with
141 | Alive ->
142 Membership.Member.set_alive ~xt m
143 ~incarnation:transition.new_state.incarnation ~now
144 | Suspect ->
145 Membership.Member.set_suspect ~xt m
146 ~incarnation:transition.new_state.incarnation ~now
147 | Dead | Left ->
148 Membership.Member.set_dead ~xt m
149 ~incarnation:transition.new_state.incarnation ~now);
150 }
151 |> ignore
152 end;
153 List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts;
154 List.iter (emit_event t) transition.events
155
156let handle_alive_msg t (msg : protocol_msg) =
157 match msg with
158 | Alive { node; incarnation = _ } ->
159 apply_member_transition t node.id (fun snap ~now ->
160 Protocol_pure.handle_alive ~self:t.self.id snap msg ~now)
161 | _ -> ()
162
163let handle_suspect_msg t (msg : protocol_msg) =
164 match msg with
165 | Suspect { node; incarnation = _; suspector = _ } ->
166 apply_member_transition t node (fun snap ~now ->
167 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
168 | _ -> ()
169
170let handle_dead_msg t (msg : protocol_msg) =
171 match msg with
172 | Dead { node; incarnation = _; declarator = _ } ->
173 apply_member_transition t node (fun snap ~now ->
174 Protocol_pure.handle_dead snap msg ~now)
175 | _ -> ()
176
177let handle_user_msg t (msg : protocol_msg) =
178 match msg with
179 | User_msg { topic; payload; origin } -> (
180 let handlers =
181 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) }
182 in
183 match Membership.find t.members origin with
184 | None -> ()
185 | Some member ->
186 let node = Membership.Member.node member in
187 List.iter (fun h -> h node topic payload) handlers)
188 | _ -> ()
189
190let handle_message t ~src (msg : protocol_msg) =
191 match msg with
192 | Ping _ -> handle_ping t ~src msg
193 | Ping_req _ -> handle_ping_req t ~src msg
194 | Ack _ -> handle_ack t msg
195 | Alive _ -> handle_alive_msg t msg
196 | Suspect _ -> handle_suspect_msg t msg
197 | Dead _ -> handle_dead_msg t msg
198 | User_msg _ -> handle_user_msg t msg
199
200let handle_packet t ~src (packet : packet) =
201 if String.equal packet.cluster t.config.cluster_name then begin
202 handle_message t ~src packet.primary;
203 List.iter (handle_message t ~src) packet.piggyback;
204 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
205 end
206
207let process_udp_packet t ~buf ~src =
208 let decrypted_result =
209 if t.config.encryption_enabled then Crypto.decrypt ~key:t.cipher_key buf
210 else Ok buf
211 in
212 match decrypted_result with
213 | Error _ ->
214 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
215 | Ok decrypted -> (
216 match Codec.decode_packet decrypted with
217 | Error _ ->
218 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
219 | Ok packet -> handle_packet t ~src packet)
220
221let run_udp_receiver t =
222 while not (is_shutdown t) do
223 Buffer_pool.with_buffer t.recv_pool (fun buf ->
224 let n, src = Transport.recv_udp t.udp_sock buf in
225 let received = Cstruct.sub buf 0 n in
226 process_udp_packet t ~buf:received ~src)
227 done
228
229let build_local_state t ~is_join =
230 let members = Membership.to_list t.members in
231 let self_node =
232 let addr_bytes, port =
233 match t.self.addr with
234 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p)
235 | `Unix _ -> ("", 0)
236 in
237 Types.Wire.
238 {
239 pns_name = Types.node_id_to_string t.self.id;
240 pns_addr = addr_bytes;
241 pns_port = port;
242 pns_meta = t.self.meta;
243 pns_incarnation = Types.incarnation_to_int (get_incarnation t);
244 pns_state = 0;
245 pns_vsn = Types.default_vsn;
246 }
247 in
248 let member_nodes =
249 List.map
250 (fun member ->
251 let node = Membership.Member.node member in
252 let snap = Membership.Member.snapshot_now member in
253 let addr_bytes, port =
254 match node.addr with
255 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p)
256 | `Unix _ -> ("", 0)
257 in
258 Types.Wire.
259 {
260 pns_name = Types.node_id_to_string node.id;
261 pns_addr = addr_bytes;
262 pns_port = port;
263 pns_meta = node.meta;
264 pns_incarnation = Types.incarnation_to_int snap.incarnation;
265 pns_state = Types.member_state_to_int snap.state;
266 pns_vsn = Types.default_vsn;
267 })
268 members
269 in
270 let all_nodes = self_node :: member_nodes in
271 let header =
272 Types.Wire.
273 {
274 pp_nodes = List.length all_nodes;
275 pp_user_state_len = 0;
276 pp_join = is_join;
277 }
278 in
279 (header, all_nodes)
280
281let merge_remote_state t (nodes : Types.Wire.push_node_state list) ~is_join =
282 List.iter
283 (fun (pns : Types.Wire.push_node_state) ->
284 let node_id = Types.node_id_of_string pns.pns_name in
285 if not (Types.equal_node_id node_id t.self.id) then
286 let ip = Types.ip_of_bytes pns.pns_addr in
287 let node_info =
288 Types.make_node_info ~id:node_id
289 ~addr:(`Udp (ip, pns.pns_port))
290 ~meta:pns.pns_meta
291 in
292 match Membership.find t.members node_id with
293 | None ->
294 if pns.pns_state <= 1 then begin
295 let now = now_mtime t in
296 let member = Membership.Member.create ~now node_info in
297 Membership.add t.members member;
298 emit_event t (Types.Join node_info)
299 end
300 | Some existing ->
301 let snap = Membership.Member.snapshot_now existing in
302 let remote_inc = Types.incarnation_of_int pns.pns_incarnation in
303 if Types.compare_incarnation remote_inc snap.incarnation > 0 then begin
304 let now = now_mtime t in
305 let new_state = Types.member_state_of_int pns.pns_state in
306 Membership.update_member t.members node_id
307 {
308 update =
309 (fun m ~xt ->
310 match new_state with
311 | Types.Alive ->
312 Membership.Member.set_alive ~xt m
313 ~incarnation:remote_inc ~now
314 | Types.Suspect ->
315 Membership.Member.set_suspect ~xt m
316 ~incarnation:remote_inc ~now
317 | Types.Dead | Types.Left ->
318 Membership.Member.set_dead ~xt m
319 ~incarnation:remote_inc ~now);
320 }
321 |> ignore
322 end)
323 nodes;
324 if is_join then
325 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
326
327let read_exact flow buf n =
328 let rec loop offset remaining =
329 if remaining <= 0 then Ok ()
330 else
331 let chunk = Cstruct.sub buf offset remaining in
332 match Eio.Flow.single_read flow chunk with
333 | 0 -> Error `Connection_closed
334 | read -> loop (offset + read) (remaining - read)
335 | exception End_of_file -> Error `Connection_closed
336 | exception _ -> Error `Read_error
337 in
338 loop 0 n
339
340let read_available flow buf =
341 match Eio.Flow.single_read flow buf with
342 | n -> n
343 | exception End_of_file -> 0
344 | exception _ -> 0
345
346let decompress_payload data =
347 let _, msgpack = Msgpck.String.read data in
348 match msgpack with
349 | Msgpck.Map fields ->
350 let algo =
351 match List.assoc_opt (Msgpck.String "Algo") fields with
352 | Some (Msgpck.Int i) -> i
353 | Some (Msgpck.Int32 i) -> Int32.to_int i
354 | _ -> -1
355 in
356 let compressed_buf =
357 match List.assoc_opt (Msgpck.String "Buf") fields with
358 | Some (Msgpck.Bytes s) -> Some s
359 | Some (Msgpck.String s) -> Some s
360 | _ -> None
361 in
362 if algo = 0 then
363 match compressed_buf with
364 | Some buf -> (
365 match Lzw.decompress_lsb8 buf with
366 | Ok decompressed -> Some decompressed
367 | Error _ -> None)
368 | None -> None
369 else None
370 | _ -> None
371
372let handle_tcp_connection t flow =
373 let buf = Cstruct.create 65536 in
374 match read_exact flow buf 1 with
375 | Error _ -> ()
376 | Ok () -> (
377 let msg_type_byte = Cstruct.get_uint8 buf 0 in
378 let get_push_pull_payload () =
379 let n = read_available flow (Cstruct.shift buf 1) in
380 if n > 0 then Some (Cstruct.sub buf 1 n) else None
381 in
382 let payload_opt =
383 if msg_type_byte = Types.Wire.message_type_to_int Types.Wire.Encrypt_msg
384 then
385 match get_push_pull_payload () with
386 | Some encrypted -> (
387 match Crypto.decrypt ~key:t.cipher_key encrypted with
388 | Ok decrypted -> Some decrypted
389 | Error _ -> None)
390 | None -> None
391 else if
392 msg_type_byte = Types.Wire.message_type_to_int Types.Wire.Compress_msg
393 then
394 match get_push_pull_payload () with
395 | Some compressed -> (
396 let data = Cstruct.to_string compressed in
397 match decompress_payload data with
398 | Some decompressed ->
399 if String.length decompressed > 0 then
400 let inner_type = Char.code decompressed.[0] in
401 if
402 inner_type
403 = Types.Wire.message_type_to_int Types.Wire.Push_pull_msg
404 then
405 Some
406 (Cstruct.of_string
407 (String.sub decompressed 1
408 (String.length decompressed - 1)))
409 else None
410 else None
411 | None -> None)
412 | None -> None
413 else if
414 msg_type_byte
415 = Types.Wire.message_type_to_int Types.Wire.Has_label_msg
416 then
417 match read_exact flow buf 1 with
418 | Error _ -> None
419 | Ok () ->
420 let label_len = Cstruct.get_uint8 buf 0 in
421 if label_len > 0 then
422 match read_exact flow buf label_len with
423 | Error _ -> None
424 | Ok () -> (
425 match read_exact flow buf 1 with
426 | Error _ -> None
427 | Ok () ->
428 let inner_type = Cstruct.get_uint8 buf 0 in
429 if
430 inner_type
431 = Types.Wire.message_type_to_int
432 Types.Wire.Push_pull_msg
433 then get_push_pull_payload ()
434 else None)
435 else None
436 else if
437 msg_type_byte
438 = Types.Wire.message_type_to_int Types.Wire.Push_pull_msg
439 then get_push_pull_payload ()
440 else None
441 in
442 match payload_opt with
443 | None -> ()
444 | Some payload -> (
445 let data = Cstruct.to_string payload in
446 match Codec.decode_push_pull_msg data with
447 | Error _ -> ()
448 | Ok (header, nodes, _user_state) -> (
449 merge_remote_state t nodes ~is_join:header.pp_join;
450 let resp_header, resp_nodes =
451 build_local_state t ~is_join:false
452 in
453 let response =
454 Codec.encode_push_pull_msg ~header:resp_header ~nodes:resp_nodes
455 ~user_state:""
456 in
457 let resp_buf =
458 if t.config.encryption_enabled then
459 let plain = Cstruct.of_string response in
460 let encrypted =
461 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random
462 plain
463 in
464 encrypted
465 else Cstruct.of_string response
466 in
467 try Eio.Flow.write flow [ resp_buf ] with _ -> ())))
468
469let run_tcp_listener t =
470 while not (is_shutdown t) do
471 match Eio.Net.accept ~sw:t.sw t.tcp_listener with
472 | flow, _addr ->
473 (try handle_tcp_connection t flow with _ -> ());
474 Eio.Flow.close flow
475 | exception _ -> ()
476 done
477
478let probe_member t (member : Membership.Member.t) =
479 let target = Membership.Member.node member in
480 let seq = next_seq t in
481 let piggyback =
482 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
483 in
484 let ping = Ping { seq; target = target.id; sender = t.self } in
485 let packet = make_packet t ~primary:ping ~piggyback in
486
487 let waiter = Pending_acks.register t.pending_acks ~seq in
488 send_packet t ~dst:target.addr packet;
489
490 match
491 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
492 with
493 | Some _ ->
494 let now = now_mtime t in
495 Membership.update_member t.members target.id
496 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
497 |> ignore;
498 true
499 | None ->
500 Pending_acks.cancel t.pending_acks ~seq;
501 false
502
503let indirect_probe t (member : Membership.Member.t) =
504 let target = Membership.Member.node member in
505 let seq = next_seq t in
506 let ping_req = Ping_req { seq; target = target.id; sender = t.self } in
507
508 let all_members = Membership.to_node_list t.members in
509 let indirect_targets =
510 Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id
511 ~count:t.config.indirect_checks ~members:all_members
512 in
513
514 let waiter = Pending_acks.register t.pending_acks ~seq in
515 List.iter
516 (fun node ->
517 let packet = make_packet t ~primary:ping_req ~piggyback:[] in
518 send_packet t ~dst:node.addr packet)
519 indirect_targets;
520
521 match
522 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
523 with
524 | Some _ ->
525 let now = now_mtime t in
526 Membership.update_member t.members target.id
527 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
528 |> ignore;
529 true
530 | None ->
531 Pending_acks.cancel t.pending_acks ~seq;
532 false
533
534let suspect_member t (member : Membership.Member.t) =
535 let node = Membership.Member.node member in
536 let inc = get_incarnation t in
537 let msg =
538 Suspect { node = node.id; incarnation = inc; suspector = t.self.id }
539 in
540 apply_member_transition t node.id (fun snap ~now ->
541 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
542
543let probe_cycle t =
544 let members = Membership.to_list t.members in
545 let member_nodes = List.map Membership.Member.node members in
546 let probe_idx =
547 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) }
548 in
549
550 match
551 Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx
552 ~members:member_nodes
553 with
554 | None -> ()
555 | Some (target_node, new_idx) -> (
556 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) };
557 match Membership.find t.members target_node.id with
558 | None -> ()
559 | Some member ->
560 let direct_ok = probe_member t member in
561 if not direct_ok then
562 let indirect_ok = indirect_probe t member in
563 if not indirect_ok then suspect_member t member)
564
565let run_protocol t =
566 while not (is_shutdown t) do
567 probe_cycle t;
568 Eio.Time.sleep t.clock t.config.protocol_interval
569 done
570
571let create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock ~mono_clock
572 ~secure_random =
573 match Crypto.init_key config.secret_key with
574 | Error _ -> Error `Invalid_key
575 | Ok cipher_key ->
576 Ok
577 {
578 config;
579 self;
580 members = Membership.create ();
581 incarnation = Kcas.Loc.make zero_incarnation;
582 sequence = Kcas.Loc.make 0;
583 broadcast_queue = Dissemination.create ();
584 pending_acks = Pending_acks.create ();
585 probe_index = Kcas.Loc.make 0;
586 send_pool =
587 Buffer_pool.create ~size:config.udp_buffer_size
588 ~count:config.send_buffer_count;
589 recv_pool =
590 Buffer_pool.create ~size:config.udp_buffer_size
591 ~count:config.recv_buffer_count;
592 udp_sock;
593 tcp_listener;
594 event_stream = Eio.Stream.create 100;
595 user_handlers = Kcas.Loc.make [];
596 cipher_key;
597 stats = Kcas.Loc.make empty_stats;
598 shutdown = Kcas.Loc.make false;
599 clock;
600 mono_clock;
601 secure_random;
602 sw;
603 }
604
605let shutdown t =
606 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) }
607
608let add_member t node_info =
609 let now = now_mtime t in
610 let member = Membership.Member.create ~now node_info in
611 Membership.add t.members member;
612 emit_event t (Join node_info)
613
614let remove_member t node_id =
615 match Membership.find t.members node_id with
616 | None -> false
617 | Some member ->
618 let node = Membership.Member.node member in
619 let removed = Membership.remove t.members node_id in
620 if removed then emit_event t (Leave node);
621 removed
622
623let local_node t = t.self
624let members t = Membership.to_list t.members
625let member_count t = Membership.count t.members
626let events t = t.event_stream
627
628let stats t =
629 let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in
630 let alive, suspect, dead =
631 Membership.snapshot_all t.members
632 |> List.fold_left
633 (fun (a, s, d) snap ->
634 match snap.state with
635 | Alive -> (a + 1, s, d)
636 | Suspect -> (a, s + 1, d)
637 | Dead | Left -> (a, s, d + 1))
638 (0, 0, 0)
639 in
640 {
641 base with
642 nodes_alive = alive;
643 nodes_suspect = suspect;
644 nodes_dead = dead;
645 queue_depth = Dissemination.depth t.broadcast_queue;
646 buffers_available =
647 Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool;
648 buffers_total =
649 Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool;
650 }
651
652let broadcast t ~topic ~payload =
653 let msg = User_msg { topic; payload; origin = t.self.id } in
654 enqueue_broadcast t msg
655
656let on_message t handler =
657 Kcas.Xt.commit
658 {
659 tx =
660 (fun ~xt ->
661 let handlers = Kcas.Xt.get ~xt t.user_handlers in
662 Kcas.Xt.set ~xt t.user_handlers (handler :: handlers));
663 }