open Types type t = { config : config; self : node_info; members : Membership.t; incarnation : incarnation Kcas.Loc.t; sequence : int Kcas.Loc.t; broadcast_queue : Dissemination.t; pending_acks : Pending_acks.t; probe_index : int Kcas.Loc.t; send_pool : Buffer_pool.t; recv_pool : Buffer_pool.t; udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t; event_stream : node_event Eio.Stream.t; user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t; cipher_key : Crypto.key; stats : stats Kcas.Loc.t; shutdown : bool Kcas.Loc.t; clock : float Eio.Time.clock_ty Eio.Resource.t; mono_clock : Eio.Time.Mono.ty Eio.Resource.t; secure_random : Eio.Flow.source_ty Eio.Resource.t; } let next_seq t = Kcas.Xt.commit { tx = (fun ~xt -> let seq = Kcas.Xt.get ~xt t.sequence in Kcas.Xt.set ~xt t.sequence (seq + 1); seq); } let get_incarnation t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) } let incr_my_incarnation t = Kcas.Xt.commit { tx = (fun ~xt -> let inc = Kcas.Xt.get ~xt t.incarnation in let new_inc = incr_incarnation inc in Kcas.Xt.set ~xt t.incarnation new_inc; new_inc); } let is_shutdown t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) } let now_mtime t = Eio.Time.Mono.now t.mono_clock |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns let update_stats t f = Kcas.Xt.commit { tx = (fun ~xt -> let s = Kcas.Xt.get ~xt t.stats in Kcas.Xt.set ~xt t.stats (f s)); } let emit_event t ev = Eio.Stream.add t.event_stream ev let send_packet t ~dst (packet : packet) = Buffer_pool.with_buffer t.send_pool (fun buf -> match Codec.encode_packet packet ~buf with | Error `Buffer_too_small -> () | Ok encoded_len -> let encoded = Cstruct.sub buf 0 encoded_len in let encrypted = Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded in Transport.send_udp t.udp_sock dst encrypted; update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 })) let make_packet t ~primary ~piggyback = { cluster = t.config.cluster_name; primary; piggyback } let drain_piggyback t ~max_bytes = Dissemination.drain t.broadcast_queue ~max_bytes ~encode_size:Codec.encoded_size let enqueue_broadcast t msg = let transmits = Protocol_pure.retransmit_limit t.config ~node_count:(Membership.count t.members) in Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t); Dissemination.invalidate t.broadcast_queue ~invalidates:Protocol_pure.invalidates msg let handle_ping t ~src (ping : protocol_msg) = match ping with | Ping { seq; sender = _ } -> let piggyback = drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) in let ack = Ack { seq; responder = t.self; payload = None } in let packet = make_packet t ~primary:ack ~piggyback in send_packet t ~dst:src packet | _ -> () let handle_ping_req t ~src:_ (ping_req : protocol_msg) = match ping_req with | Ping_req { seq; target; sender = _ } -> ( match Membership.find t.members target with | None -> () | Some member -> let target_addr = (Membership.Member.node member).addr in let ping = Ping { seq; sender = t.self } in let packet = make_packet t ~primary:ping ~piggyback:[] in send_packet t ~dst:target_addr packet) | _ -> () let handle_ack t (ack : protocol_msg) = match ack with | Ack { seq; responder = _; payload } -> ignore (Pending_acks.complete t.pending_acks ~seq ~payload) | _ -> () let apply_member_transition t member_id transition_fn = let now = now_mtime t in match Membership.find t.members member_id with | None -> () | Some member -> let snap = Membership.Member.snapshot_now member in let transition = transition_fn snap ~now in if transition.Protocol_pure.new_state.state <> snap.state then begin Membership.update_member t.members member_id { update = (fun m ~xt -> match transition.new_state.state with | Alive -> Membership.Member.set_alive ~xt m ~incarnation:transition.new_state.incarnation ~now | Suspect -> Membership.Member.set_suspect ~xt m ~incarnation:transition.new_state.incarnation ~now | Dead -> Membership.Member.set_dead ~xt m ~incarnation:transition.new_state.incarnation ~now); } |> ignore end; List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts; List.iter (emit_event t) transition.events let handle_alive_msg t (msg : protocol_msg) = match msg with | Alive { node; incarnation = _ } -> apply_member_transition t node.id (fun snap ~now -> Protocol_pure.handle_alive ~self:t.self.id snap msg ~now) | _ -> () let handle_suspect_msg t (msg : protocol_msg) = match msg with | Suspect { node; incarnation = _; suspector = _ } -> apply_member_transition t node (fun snap ~now -> Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) | _ -> () let handle_dead_msg t (msg : protocol_msg) = match msg with | Dead { node; incarnation = _; declarator = _ } -> apply_member_transition t node (fun snap ~now -> Protocol_pure.handle_dead snap msg ~now) | _ -> () let handle_user_msg t (msg : protocol_msg) = match msg with | User_msg { topic; payload; origin } -> ( let handlers = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) } in match Membership.find t.members origin with | None -> () | Some member -> let node = Membership.Member.node member in List.iter (fun h -> h node topic payload) handlers) | _ -> () let handle_message t ~src (msg : protocol_msg) = match msg with | Ping _ -> handle_ping t ~src msg | Ping_req _ -> handle_ping_req t ~src msg | Ack _ -> handle_ack t msg | Alive _ -> handle_alive_msg t msg | Suspect _ -> handle_suspect_msg t msg | Dead _ -> handle_dead_msg t msg | User_msg _ -> handle_user_msg t msg let handle_packet t ~src (packet : packet) = if String.equal packet.cluster t.config.cluster_name then begin handle_message t ~src packet.primary; List.iter (handle_message t ~src) packet.piggyback; update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 }) end let process_udp_packet t ~buf ~src = match Crypto.decrypt ~key:t.cipher_key buf with | Error _ -> update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) | Ok decrypted -> ( match Codec.decode_packet decrypted with | Error _ -> update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) | Ok packet -> handle_packet t ~src packet) let run_udp_receiver t = while not (is_shutdown t) do Buffer_pool.with_buffer t.recv_pool (fun buf -> let n, src = Transport.recv_udp t.udp_sock buf in let received = Cstruct.sub buf 0 n in process_udp_packet t ~buf:received ~src) done let probe_member t (member : Membership.Member.t) = let target = Membership.Member.node member in let seq = next_seq t in let piggyback = drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) in let ping = Ping { seq; sender = t.self } in let packet = make_packet t ~primary:ping ~piggyback in let waiter = Pending_acks.register t.pending_acks ~seq in send_packet t ~dst:target.addr packet; match Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock with | Some _ -> let now = now_mtime t in Membership.update_member t.members target.id { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } |> ignore; true | None -> Pending_acks.cancel t.pending_acks ~seq; false let indirect_probe t (member : Membership.Member.t) = let target = Membership.Member.node member in let seq = next_seq t in let ping_req = Ping_req { seq; target = target.id; sender = t.self } in let all_members = Membership.to_node_list t.members in let indirect_targets = Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id ~count:t.config.indirect_checks ~members:all_members in let waiter = Pending_acks.register t.pending_acks ~seq in List.iter (fun node -> let packet = make_packet t ~primary:ping_req ~piggyback:[] in send_packet t ~dst:node.addr packet) indirect_targets; match Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock with | Some _ -> let now = now_mtime t in Membership.update_member t.members target.id { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } |> ignore; true | None -> Pending_acks.cancel t.pending_acks ~seq; false let suspect_member t (member : Membership.Member.t) = let node = Membership.Member.node member in let inc = get_incarnation t in let msg = Suspect { node = node.id; incarnation = inc; suspector = t.self.id } in apply_member_transition t node.id (fun snap ~now -> Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) let probe_cycle t = let members = Membership.to_list t.members in let member_nodes = List.map Membership.Member.node members in let probe_idx = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) } in match Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx ~members:member_nodes with | None -> () | Some (target_node, new_idx) -> ( Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) }; match Membership.find t.members target_node.id with | None -> () | Some member -> let direct_ok = probe_member t member in if not direct_ok then let indirect_ok = indirect_probe t member in if not indirect_ok then suspect_member t member) let run_protocol t = while not (is_shutdown t) do probe_cycle t; Eio.Time.sleep t.clock t.config.protocol_interval done let create ~config ~self ~udp_sock ~clock ~mono_clock ~secure_random = match Crypto.init_key config.secret_key with | Error _ -> Error `Invalid_key | Ok cipher_key -> Ok { config; self; members = Membership.create (); incarnation = Kcas.Loc.make zero_incarnation; sequence = Kcas.Loc.make 0; broadcast_queue = Dissemination.create (); pending_acks = Pending_acks.create (); probe_index = Kcas.Loc.make 0; send_pool = Buffer_pool.create ~size:config.udp_buffer_size ~count:config.send_buffer_count; recv_pool = Buffer_pool.create ~size:config.udp_buffer_size ~count:config.recv_buffer_count; udp_sock; event_stream = Eio.Stream.create 100; user_handlers = Kcas.Loc.make []; cipher_key; stats = Kcas.Loc.make empty_stats; shutdown = Kcas.Loc.make false; clock; mono_clock; secure_random; } let shutdown t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) } let add_member t node_info = let now = now_mtime t in let member = Membership.Member.create ~now node_info in Membership.add t.members member; emit_event t (Join node_info) let remove_member t node_id = match Membership.find t.members node_id with | None -> false | Some member -> let node = Membership.Member.node member in let removed = Membership.remove t.members node_id in if removed then emit_event t (Leave node); removed let local_node t = t.self let members t = Membership.to_list t.members let member_count t = Membership.count t.members let events t = t.event_stream let stats t = let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in let alive, suspect, dead = Membership.snapshot_all t.members |> List.fold_left (fun (a, s, d) snap -> match snap.state with | Alive -> (a + 1, s, d) | Suspect -> (a, s + 1, d) | Dead -> (a, s, d + 1)) (0, 0, 0) in { base with nodes_alive = alive; nodes_suspect = suspect; nodes_dead = dead; queue_depth = Dissemination.depth t.broadcast_queue; buffers_available = Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool; buffers_total = Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool; } let broadcast t ~topic ~payload = let msg = User_msg { topic; payload; origin = t.self.id } in enqueue_broadcast t msg let on_message t handler = Kcas.Xt.commit { tx = (fun ~xt -> let handlers = Kcas.Xt.get ~xt t.user_handlers in Kcas.Xt.set ~xt t.user_handlers (handler :: handlers)); }