this repo has no description
1open Types 2 3type t = { 4 config : config; 5 self : node_info; 6 members : Membership.t; 7 incarnation : incarnation Kcas.Loc.t; 8 sequence : int Kcas.Loc.t; 9 broadcast_queue : Dissemination.t; 10 pending_acks : Pending_acks.t; 11 probe_index : int Kcas.Loc.t; 12 send_pool : Buffer_pool.t; 13 recv_pool : Buffer_pool.t; 14 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t; 15 event_stream : node_event Eio.Stream.t; 16 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t; 17 cipher_key : Crypto.key; 18 stats : stats Kcas.Loc.t; 19 shutdown : bool Kcas.Loc.t; 20 clock : float Eio.Time.clock_ty Eio.Resource.t; 21 mono_clock : Eio.Time.Mono.ty Eio.Resource.t; 22 secure_random : Eio.Flow.source_ty Eio.Resource.t; 23} 24 25let next_seq t = 26 Kcas.Xt.commit 27 { 28 tx = 29 (fun ~xt -> 30 let seq = Kcas.Xt.get ~xt t.sequence in 31 Kcas.Xt.set ~xt t.sequence (seq + 1); 32 seq); 33 } 34 35let get_incarnation t = 36 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) } 37 38let incr_my_incarnation t = 39 Kcas.Xt.commit 40 { 41 tx = 42 (fun ~xt -> 43 let inc = Kcas.Xt.get ~xt t.incarnation in 44 let new_inc = incr_incarnation inc in 45 Kcas.Xt.set ~xt t.incarnation new_inc; 46 new_inc); 47 } 48 49let is_shutdown t = 50 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) } 51 52let now_mtime t = 53 Eio.Time.Mono.now t.mono_clock 54 |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns 55 56let update_stats t f = 57 Kcas.Xt.commit 58 { 59 tx = 60 (fun ~xt -> 61 let s = Kcas.Xt.get ~xt t.stats in 62 Kcas.Xt.set ~xt t.stats (f s)); 63 } 64 65let emit_event t ev = Eio.Stream.add t.event_stream ev 66 67let send_packet t ~dst (packet : packet) = 68 Buffer_pool.with_buffer t.send_pool (fun buf -> 69 match Codec.encode_packet packet ~buf with 70 | Error `Buffer_too_small -> () 71 | Ok encoded_len -> 72 let encoded = Cstruct.sub buf 0 encoded_len in 73 let to_send = 74 if t.config.encryption_enabled then 75 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded 76 else encoded 77 in 78 Transport.send_udp t.udp_sock dst to_send; 79 update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 })) 80 81let make_packet t ~primary ~piggyback = 82 { cluster = t.config.cluster_name; primary; piggyback } 83 84let drain_piggyback t ~max_bytes = 85 Dissemination.drain t.broadcast_queue ~max_bytes 86 ~encode_size:Codec.encoded_size 87 88let enqueue_broadcast t msg = 89 let transmits = 90 Protocol_pure.retransmit_limit t.config 91 ~node_count:(Membership.count t.members) 92 in 93 Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t); 94 Dissemination.invalidate t.broadcast_queue 95 ~invalidates:Protocol_pure.invalidates msg 96 97let handle_ping t ~src (ping : protocol_msg) = 98 match ping with 99 | Ping { seq; _ } -> 100 let piggyback = 101 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) 102 in 103 let ack = Ack { seq; responder = t.self; payload = None } in 104 let packet = make_packet t ~primary:ack ~piggyback in 105 send_packet t ~dst:src packet 106 | _ -> () 107 108let handle_ping_req t ~src:_ (ping_req : protocol_msg) = 109 match ping_req with 110 | Ping_req { seq; target; sender = _ } -> ( 111 match Membership.find t.members target with 112 | None -> () 113 | Some member -> 114 let target_addr = (Membership.Member.node member).addr in 115 let ping = Ping { seq; target; sender = t.self } in 116 let packet = make_packet t ~primary:ping ~piggyback:[] in 117 send_packet t ~dst:target_addr packet) 118 | _ -> () 119 120let handle_ack t (ack : protocol_msg) = 121 match ack with 122 | Ack { seq; responder = _; payload } -> 123 ignore (Pending_acks.complete t.pending_acks ~seq ~payload) 124 | _ -> () 125 126let apply_member_transition t member_id transition_fn = 127 let now = now_mtime t in 128 match Membership.find t.members member_id with 129 | None -> () 130 | Some member -> 131 let snap = Membership.Member.snapshot_now member in 132 let transition = transition_fn snap ~now in 133 if transition.Protocol_pure.new_state.state <> snap.state then begin 134 Membership.update_member t.members member_id 135 { 136 update = 137 (fun m ~xt -> 138 match transition.new_state.state with 139 | Alive -> 140 Membership.Member.set_alive ~xt m 141 ~incarnation:transition.new_state.incarnation ~now 142 | Suspect -> 143 Membership.Member.set_suspect ~xt m 144 ~incarnation:transition.new_state.incarnation ~now 145 | Dead | Left -> 146 Membership.Member.set_dead ~xt m 147 ~incarnation:transition.new_state.incarnation ~now); 148 } 149 |> ignore 150 end; 151 List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts; 152 List.iter (emit_event t) transition.events 153 154let handle_alive_msg t (msg : protocol_msg) = 155 match msg with 156 | Alive { node; incarnation = _ } -> 157 apply_member_transition t node.id (fun snap ~now -> 158 Protocol_pure.handle_alive ~self:t.self.id snap msg ~now) 159 | _ -> () 160 161let handle_suspect_msg t (msg : protocol_msg) = 162 match msg with 163 | Suspect { node; incarnation = _; suspector = _ } -> 164 apply_member_transition t node (fun snap ~now -> 165 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) 166 | _ -> () 167 168let handle_dead_msg t (msg : protocol_msg) = 169 match msg with 170 | Dead { node; incarnation = _; declarator = _ } -> 171 apply_member_transition t node (fun snap ~now -> 172 Protocol_pure.handle_dead snap msg ~now) 173 | _ -> () 174 175let handle_user_msg t (msg : protocol_msg) = 176 match msg with 177 | User_msg { topic; payload; origin } -> ( 178 let handlers = 179 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) } 180 in 181 match Membership.find t.members origin with 182 | None -> () 183 | Some member -> 184 let node = Membership.Member.node member in 185 List.iter (fun h -> h node topic payload) handlers) 186 | _ -> () 187 188let handle_message t ~src (msg : protocol_msg) = 189 match msg with 190 | Ping _ -> handle_ping t ~src msg 191 | Ping_req _ -> handle_ping_req t ~src msg 192 | Ack _ -> handle_ack t msg 193 | Alive _ -> handle_alive_msg t msg 194 | Suspect _ -> handle_suspect_msg t msg 195 | Dead _ -> handle_dead_msg t msg 196 | User_msg _ -> handle_user_msg t msg 197 198let handle_packet t ~src (packet : packet) = 199 if String.equal packet.cluster t.config.cluster_name then begin 200 handle_message t ~src packet.primary; 201 List.iter (handle_message t ~src) packet.piggyback; 202 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 }) 203 end 204 205let process_udp_packet t ~buf ~src = 206 let decrypted_result = 207 if t.config.encryption_enabled then Crypto.decrypt ~key:t.cipher_key buf 208 else Ok buf 209 in 210 match decrypted_result with 211 | Error _ -> 212 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) 213 | Ok decrypted -> ( 214 match Codec.decode_packet decrypted with 215 | Error _ -> 216 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) 217 | Ok packet -> handle_packet t ~src packet) 218 219let run_udp_receiver t = 220 while not (is_shutdown t) do 221 Buffer_pool.with_buffer t.recv_pool (fun buf -> 222 let n, src = Transport.recv_udp t.udp_sock buf in 223 let received = Cstruct.sub buf 0 n in 224 process_udp_packet t ~buf:received ~src) 225 done 226 227let probe_member t (member : Membership.Member.t) = 228 let target = Membership.Member.node member in 229 let seq = next_seq t in 230 let piggyback = 231 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) 232 in 233 let ping = Ping { seq; target = target.id; sender = t.self } in 234 let packet = make_packet t ~primary:ping ~piggyback in 235 236 let waiter = Pending_acks.register t.pending_acks ~seq in 237 send_packet t ~dst:target.addr packet; 238 239 match 240 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock 241 with 242 | Some _ -> 243 let now = now_mtime t in 244 Membership.update_member t.members target.id 245 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } 246 |> ignore; 247 true 248 | None -> 249 Pending_acks.cancel t.pending_acks ~seq; 250 false 251 252let indirect_probe t (member : Membership.Member.t) = 253 let target = Membership.Member.node member in 254 let seq = next_seq t in 255 let ping_req = Ping_req { seq; target = target.id; sender = t.self } in 256 257 let all_members = Membership.to_node_list t.members in 258 let indirect_targets = 259 Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id 260 ~count:t.config.indirect_checks ~members:all_members 261 in 262 263 let waiter = Pending_acks.register t.pending_acks ~seq in 264 List.iter 265 (fun node -> 266 let packet = make_packet t ~primary:ping_req ~piggyback:[] in 267 send_packet t ~dst:node.addr packet) 268 indirect_targets; 269 270 match 271 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock 272 with 273 | Some _ -> 274 let now = now_mtime t in 275 Membership.update_member t.members target.id 276 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } 277 |> ignore; 278 true 279 | None -> 280 Pending_acks.cancel t.pending_acks ~seq; 281 false 282 283let suspect_member t (member : Membership.Member.t) = 284 let node = Membership.Member.node member in 285 let inc = get_incarnation t in 286 let msg = 287 Suspect { node = node.id; incarnation = inc; suspector = t.self.id } 288 in 289 apply_member_transition t node.id (fun snap ~now -> 290 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) 291 292let probe_cycle t = 293 let members = Membership.to_list t.members in 294 let member_nodes = List.map Membership.Member.node members in 295 let probe_idx = 296 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) } 297 in 298 299 match 300 Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx 301 ~members:member_nodes 302 with 303 | None -> () 304 | Some (target_node, new_idx) -> ( 305 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) }; 306 match Membership.find t.members target_node.id with 307 | None -> () 308 | Some member -> 309 let direct_ok = probe_member t member in 310 if not direct_ok then 311 let indirect_ok = indirect_probe t member in 312 if not indirect_ok then suspect_member t member) 313 314let run_protocol t = 315 while not (is_shutdown t) do 316 probe_cycle t; 317 Eio.Time.sleep t.clock t.config.protocol_interval 318 done 319 320let create ~config ~self ~udp_sock ~clock ~mono_clock ~secure_random = 321 match Crypto.init_key config.secret_key with 322 | Error _ -> Error `Invalid_key 323 | Ok cipher_key -> 324 Ok 325 { 326 config; 327 self; 328 members = Membership.create (); 329 incarnation = Kcas.Loc.make zero_incarnation; 330 sequence = Kcas.Loc.make 0; 331 broadcast_queue = Dissemination.create (); 332 pending_acks = Pending_acks.create (); 333 probe_index = Kcas.Loc.make 0; 334 send_pool = 335 Buffer_pool.create ~size:config.udp_buffer_size 336 ~count:config.send_buffer_count; 337 recv_pool = 338 Buffer_pool.create ~size:config.udp_buffer_size 339 ~count:config.recv_buffer_count; 340 udp_sock; 341 event_stream = Eio.Stream.create 100; 342 user_handlers = Kcas.Loc.make []; 343 cipher_key; 344 stats = Kcas.Loc.make empty_stats; 345 shutdown = Kcas.Loc.make false; 346 clock; 347 mono_clock; 348 secure_random; 349 } 350 351let shutdown t = 352 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) } 353 354let add_member t node_info = 355 let now = now_mtime t in 356 let member = Membership.Member.create ~now node_info in 357 Membership.add t.members member; 358 emit_event t (Join node_info) 359 360let remove_member t node_id = 361 match Membership.find t.members node_id with 362 | None -> false 363 | Some member -> 364 let node = Membership.Member.node member in 365 let removed = Membership.remove t.members node_id in 366 if removed then emit_event t (Leave node); 367 removed 368 369let local_node t = t.self 370let members t = Membership.to_list t.members 371let member_count t = Membership.count t.members 372let events t = t.event_stream 373 374let stats t = 375 let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in 376 let alive, suspect, dead = 377 Membership.snapshot_all t.members 378 |> List.fold_left 379 (fun (a, s, d) snap -> 380 match snap.state with 381 | Alive -> (a + 1, s, d) 382 | Suspect -> (a, s + 1, d) 383 | Dead | Left -> (a, s, d + 1)) 384 (0, 0, 0) 385 in 386 { 387 base with 388 nodes_alive = alive; 389 nodes_suspect = suspect; 390 nodes_dead = dead; 391 queue_depth = Dissemination.depth t.broadcast_queue; 392 buffers_available = 393 Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool; 394 buffers_total = 395 Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool; 396 } 397 398let broadcast t ~topic ~payload = 399 let msg = User_msg { topic; payload; origin = t.self.id } in 400 enqueue_broadcast t msg 401 402let on_message t handler = 403 Kcas.Xt.commit 404 { 405 tx = 406 (fun ~xt -> 407 let handlers = Kcas.Xt.get ~xt t.user_handlers in 408 Kcas.Xt.set ~xt t.user_handlers (handler :: handlers)); 409 }