this repo has no description
1open Types 2 3type t = { 4 config : config; 5 self : node_info; 6 members : Membership.t; 7 incarnation : incarnation Kcas.Loc.t; 8 sequence : int Kcas.Loc.t; 9 broadcast_queue : Dissemination.t; 10 pending_acks : Pending_acks.t; 11 probe_index : int Kcas.Loc.t; 12 send_pool : Buffer_pool.t; 13 recv_pool : Buffer_pool.t; 14 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t; 15 tcp_listener : [ `Generic ] Eio.Net.listening_socket_ty Eio.Resource.t; 16 event_stream : node_event Eio.Stream.t; 17 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t; 18 cipher_key : Crypto.key; 19 stats : stats Kcas.Loc.t; 20 shutdown : bool Kcas.Loc.t; 21 clock : float Eio.Time.clock_ty Eio.Resource.t; 22 mono_clock : Eio.Time.Mono.ty Eio.Resource.t; 23 secure_random : Eio.Flow.source_ty Eio.Resource.t; 24 sw : Eio.Switch.t; 25} 26 27let next_seq t = 28 Kcas.Xt.commit 29 { 30 tx = 31 (fun ~xt -> 32 let seq = Kcas.Xt.get ~xt t.sequence in 33 Kcas.Xt.set ~xt t.sequence (seq + 1); 34 seq); 35 } 36 37let get_incarnation t = 38 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) } 39 40let incr_my_incarnation t = 41 Kcas.Xt.commit 42 { 43 tx = 44 (fun ~xt -> 45 let inc = Kcas.Xt.get ~xt t.incarnation in 46 let new_inc = incr_incarnation inc in 47 Kcas.Xt.set ~xt t.incarnation new_inc; 48 new_inc); 49 } 50 51let is_shutdown t = 52 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) } 53 54let now_mtime t = 55 Eio.Time.Mono.now t.mono_clock 56 |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns 57 58let update_stats t f = 59 Kcas.Xt.commit 60 { 61 tx = 62 (fun ~xt -> 63 let s = Kcas.Xt.get ~xt t.stats in 64 Kcas.Xt.set ~xt t.stats (f s)); 65 } 66 67let emit_event t ev = Eio.Stream.add t.event_stream ev 68 69let send_packet t ~dst (packet : packet) = 70 Buffer_pool.with_buffer t.send_pool (fun buf -> 71 match Codec.encode_packet packet ~buf with 72 | Error `Buffer_too_small -> () 73 | Ok encoded_len -> 74 let encoded = Cstruct.sub buf 0 encoded_len in 75 let to_send = 76 if t.config.encryption_enabled then 77 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded 78 else encoded 79 in 80 Transport.send_udp t.udp_sock dst to_send; 81 update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 })) 82 83let make_packet t ~primary ~piggyback = 84 { cluster = t.config.cluster_name; primary; piggyback } 85 86let drain_piggyback t ~max_bytes = 87 Dissemination.drain t.broadcast_queue ~max_bytes 88 ~encode_size:Codec.encoded_size 89 90let enqueue_broadcast t msg = 91 let transmits = 92 Protocol_pure.retransmit_limit t.config 93 ~node_count:(Membership.count t.members) 94 in 95 Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t); 96 Dissemination.invalidate t.broadcast_queue 97 ~invalidates:Protocol_pure.invalidates msg 98 99let handle_ping t ~src (ping : protocol_msg) = 100 match ping with 101 | Ping { seq; _ } -> 102 let piggyback = 103 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) 104 in 105 let ack = Ack { seq; responder = t.self; payload = None } in 106 let packet = make_packet t ~primary:ack ~piggyback in 107 send_packet t ~dst:src packet 108 | _ -> () 109 110let handle_ping_req t ~src:_ (ping_req : protocol_msg) = 111 match ping_req with 112 | Ping_req { seq; target; sender = _ } -> ( 113 match Membership.find t.members target with 114 | None -> () 115 | Some member -> 116 let target_addr = (Membership.Member.node member).addr in 117 let ping = Ping { seq; target; sender = t.self } in 118 let packet = make_packet t ~primary:ping ~piggyback:[] in 119 send_packet t ~dst:target_addr packet) 120 | _ -> () 121 122let handle_ack t (ack : protocol_msg) = 123 match ack with 124 | Ack { seq; responder = _; payload } -> 125 ignore (Pending_acks.complete t.pending_acks ~seq ~payload) 126 | _ -> () 127 128let apply_member_transition t member_id transition_fn = 129 let now = now_mtime t in 130 match Membership.find t.members member_id with 131 | None -> () 132 | Some member -> 133 let snap = Membership.Member.snapshot_now member in 134 let transition = transition_fn snap ~now in 135 if transition.Protocol_pure.new_state.state <> snap.state then begin 136 Membership.update_member t.members member_id 137 { 138 update = 139 (fun m ~xt -> 140 match transition.new_state.state with 141 | Alive -> 142 Membership.Member.set_alive ~xt m 143 ~incarnation:transition.new_state.incarnation ~now 144 | Suspect -> 145 Membership.Member.set_suspect ~xt m 146 ~incarnation:transition.new_state.incarnation ~now 147 | Dead | Left -> 148 Membership.Member.set_dead ~xt m 149 ~incarnation:transition.new_state.incarnation ~now); 150 } 151 |> ignore 152 end; 153 List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts; 154 List.iter (emit_event t) transition.events 155 156let handle_alive_msg t (msg : protocol_msg) = 157 match msg with 158 | Alive { node; incarnation = _ } -> 159 apply_member_transition t node.id (fun snap ~now -> 160 Protocol_pure.handle_alive ~self:t.self.id snap msg ~now) 161 | _ -> () 162 163let handle_suspect_msg t (msg : protocol_msg) = 164 match msg with 165 | Suspect { node; incarnation = _; suspector = _ } -> 166 apply_member_transition t node (fun snap ~now -> 167 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) 168 | _ -> () 169 170let handle_dead_msg t (msg : protocol_msg) = 171 match msg with 172 | Dead { node; incarnation = _; declarator = _ } -> 173 apply_member_transition t node (fun snap ~now -> 174 Protocol_pure.handle_dead snap msg ~now) 175 | _ -> () 176 177let handle_user_msg t (msg : protocol_msg) = 178 match msg with 179 | User_msg { topic; payload; origin } -> ( 180 let handlers = 181 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) } 182 in 183 match Membership.find t.members origin with 184 | None -> () 185 | Some member -> 186 let node = Membership.Member.node member in 187 List.iter (fun h -> h node topic payload) handlers) 188 | _ -> () 189 190let handle_message t ~src (msg : protocol_msg) = 191 match msg with 192 | Ping _ -> handle_ping t ~src msg 193 | Ping_req _ -> handle_ping_req t ~src msg 194 | Ack _ -> handle_ack t msg 195 | Alive _ -> handle_alive_msg t msg 196 | Suspect _ -> handle_suspect_msg t msg 197 | Dead _ -> handle_dead_msg t msg 198 | User_msg _ -> handle_user_msg t msg 199 200let handle_packet t ~src (packet : packet) = 201 if String.equal packet.cluster t.config.cluster_name then begin 202 handle_message t ~src packet.primary; 203 List.iter (handle_message t ~src) packet.piggyback; 204 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 }) 205 end 206 207let process_udp_packet t ~buf ~src = 208 let decrypted_result = 209 if t.config.encryption_enabled then Crypto.decrypt ~key:t.cipher_key buf 210 else Ok buf 211 in 212 match decrypted_result with 213 | Error _ -> 214 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) 215 | Ok decrypted -> ( 216 match Codec.decode_packet decrypted with 217 | Error _ -> 218 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) 219 | Ok packet -> handle_packet t ~src packet) 220 221let run_udp_receiver t = 222 while not (is_shutdown t) do 223 Buffer_pool.with_buffer t.recv_pool (fun buf -> 224 let n, src = Transport.recv_udp t.udp_sock buf in 225 let received = Cstruct.sub buf 0 n in 226 process_udp_packet t ~buf:received ~src) 227 done 228 229let build_local_state t ~is_join = 230 let members = Membership.to_list t.members in 231 let self_node = 232 let addr_bytes, port = 233 match t.self.addr with 234 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p) 235 | `Unix _ -> ("", 0) 236 in 237 Types.Wire. 238 { 239 pns_name = Types.node_id_to_string t.self.id; 240 pns_addr = addr_bytes; 241 pns_port = port; 242 pns_meta = t.self.meta; 243 pns_incarnation = Types.incarnation_to_int (get_incarnation t); 244 pns_state = 0; 245 pns_vsn = Types.default_vsn; 246 } 247 in 248 let member_nodes = 249 List.map 250 (fun member -> 251 let node = Membership.Member.node member in 252 let snap = Membership.Member.snapshot_now member in 253 let addr_bytes, port = 254 match node.addr with 255 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p) 256 | `Unix _ -> ("", 0) 257 in 258 Types.Wire. 259 { 260 pns_name = Types.node_id_to_string node.id; 261 pns_addr = addr_bytes; 262 pns_port = port; 263 pns_meta = node.meta; 264 pns_incarnation = Types.incarnation_to_int snap.incarnation; 265 pns_state = Types.member_state_to_int snap.state; 266 pns_vsn = Types.default_vsn; 267 }) 268 members 269 in 270 let all_nodes = self_node :: member_nodes in 271 let header = 272 Types.Wire. 273 { 274 pp_nodes = List.length all_nodes; 275 pp_user_state_len = 0; 276 pp_join = is_join; 277 } 278 in 279 (header, all_nodes) 280 281let merge_remote_state t (nodes : Types.Wire.push_node_state list) ~is_join = 282 List.iter 283 (fun (pns : Types.Wire.push_node_state) -> 284 let node_id = Types.node_id_of_string pns.pns_name in 285 if not (Types.equal_node_id node_id t.self.id) then 286 let ip = Types.ip_of_bytes pns.pns_addr in 287 let node_info = 288 Types.make_node_info ~id:node_id 289 ~addr:(`Udp (ip, pns.pns_port)) 290 ~meta:pns.pns_meta 291 in 292 match Membership.find t.members node_id with 293 | None -> 294 if pns.pns_state <= 1 then begin 295 let now = now_mtime t in 296 let member = Membership.Member.create ~now node_info in 297 Membership.add t.members member; 298 emit_event t (Types.Join node_info) 299 end 300 | Some existing -> 301 let snap = Membership.Member.snapshot_now existing in 302 let remote_inc = Types.incarnation_of_int pns.pns_incarnation in 303 if Types.compare_incarnation remote_inc snap.incarnation > 0 then begin 304 let now = now_mtime t in 305 let new_state = Types.member_state_of_int pns.pns_state in 306 Membership.update_member t.members node_id 307 { 308 update = 309 (fun m ~xt -> 310 match new_state with 311 | Types.Alive -> 312 Membership.Member.set_alive ~xt m 313 ~incarnation:remote_inc ~now 314 | Types.Suspect -> 315 Membership.Member.set_suspect ~xt m 316 ~incarnation:remote_inc ~now 317 | Types.Dead | Types.Left -> 318 Membership.Member.set_dead ~xt m 319 ~incarnation:remote_inc ~now); 320 } 321 |> ignore 322 end) 323 nodes; 324 if is_join then 325 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 }) 326 327let read_exact flow buf n = 328 let rec loop offset remaining = 329 if remaining <= 0 then Ok () 330 else 331 let chunk = Cstruct.sub buf offset remaining in 332 match Eio.Flow.single_read flow chunk with 333 | 0 -> Error `Connection_closed 334 | read -> loop (offset + read) (remaining - read) 335 | exception End_of_file -> Error `Connection_closed 336 | exception _ -> Error `Read_error 337 in 338 loop 0 n 339 340let read_available flow buf = 341 match Eio.Flow.single_read flow buf with 342 | n -> n 343 | exception End_of_file -> 0 344 | exception _ -> 0 345 346let handle_tcp_connection t flow = 347 let buf = Cstruct.create 65536 in 348 match read_exact flow buf 1 with 349 | Error _ -> () 350 | Ok () -> ( 351 let msg_type_byte = Cstruct.get_uint8 buf 0 in 352 let get_push_pull_payload () = 353 let n = read_available flow (Cstruct.shift buf 1) in 354 if n > 0 then Some (Cstruct.sub buf 1 n) else None 355 in 356 let payload_opt = 357 if msg_type_byte = Types.Wire.message_type_to_int Types.Wire.Encrypt_msg 358 then 359 match get_push_pull_payload () with 360 | Some encrypted -> ( 361 match Crypto.decrypt ~key:t.cipher_key encrypted with 362 | Ok decrypted -> Some decrypted 363 | Error _ -> None) 364 | None -> None 365 else if 366 msg_type_byte 367 = Types.Wire.message_type_to_int Types.Wire.Has_label_msg 368 then 369 match read_exact flow buf 1 with 370 | Error _ -> None 371 | Ok () -> 372 let label_len = Cstruct.get_uint8 buf 0 in 373 if label_len > 0 then 374 match read_exact flow buf label_len with 375 | Error _ -> None 376 | Ok () -> ( 377 match read_exact flow buf 1 with 378 | Error _ -> None 379 | Ok () -> 380 let inner_type = Cstruct.get_uint8 buf 0 in 381 if 382 inner_type 383 = Types.Wire.message_type_to_int 384 Types.Wire.Push_pull_msg 385 then get_push_pull_payload () 386 else None) 387 else None 388 else if 389 msg_type_byte 390 = Types.Wire.message_type_to_int Types.Wire.Push_pull_msg 391 then get_push_pull_payload () 392 else None 393 in 394 match payload_opt with 395 | None -> () 396 | Some payload -> ( 397 let data = Cstruct.to_string payload in 398 match Codec.decode_push_pull_msg data with 399 | Error _ -> () 400 | Ok (header, nodes, _user_state) -> ( 401 merge_remote_state t nodes ~is_join:header.pp_join; 402 let resp_header, resp_nodes = 403 build_local_state t ~is_join:false 404 in 405 let response = 406 Codec.encode_push_pull_msg ~header:resp_header ~nodes:resp_nodes 407 ~user_state:"" 408 in 409 let resp_buf = 410 if t.config.encryption_enabled then 411 let plain = Cstruct.of_string response in 412 let encrypted = 413 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random 414 plain 415 in 416 encrypted 417 else Cstruct.of_string response 418 in 419 try Eio.Flow.write flow [ resp_buf ] with _ -> ()))) 420 421let run_tcp_listener t = 422 while not (is_shutdown t) do 423 match Eio.Net.accept ~sw:t.sw t.tcp_listener with 424 | flow, _addr -> 425 (try handle_tcp_connection t flow with _ -> ()); 426 Eio.Flow.close flow 427 | exception _ -> () 428 done 429 430let probe_member t (member : Membership.Member.t) = 431 let target = Membership.Member.node member in 432 let seq = next_seq t in 433 let piggyback = 434 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) 435 in 436 let ping = Ping { seq; target = target.id; sender = t.self } in 437 let packet = make_packet t ~primary:ping ~piggyback in 438 439 let waiter = Pending_acks.register t.pending_acks ~seq in 440 send_packet t ~dst:target.addr packet; 441 442 match 443 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock 444 with 445 | Some _ -> 446 let now = now_mtime t in 447 Membership.update_member t.members target.id 448 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } 449 |> ignore; 450 true 451 | None -> 452 Pending_acks.cancel t.pending_acks ~seq; 453 false 454 455let indirect_probe t (member : Membership.Member.t) = 456 let target = Membership.Member.node member in 457 let seq = next_seq t in 458 let ping_req = Ping_req { seq; target = target.id; sender = t.self } in 459 460 let all_members = Membership.to_node_list t.members in 461 let indirect_targets = 462 Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id 463 ~count:t.config.indirect_checks ~members:all_members 464 in 465 466 let waiter = Pending_acks.register t.pending_acks ~seq in 467 List.iter 468 (fun node -> 469 let packet = make_packet t ~primary:ping_req ~piggyback:[] in 470 send_packet t ~dst:node.addr packet) 471 indirect_targets; 472 473 match 474 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock 475 with 476 | Some _ -> 477 let now = now_mtime t in 478 Membership.update_member t.members target.id 479 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } 480 |> ignore; 481 true 482 | None -> 483 Pending_acks.cancel t.pending_acks ~seq; 484 false 485 486let suspect_member t (member : Membership.Member.t) = 487 let node = Membership.Member.node member in 488 let inc = get_incarnation t in 489 let msg = 490 Suspect { node = node.id; incarnation = inc; suspector = t.self.id } 491 in 492 apply_member_transition t node.id (fun snap ~now -> 493 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) 494 495let probe_cycle t = 496 let members = Membership.to_list t.members in 497 let member_nodes = List.map Membership.Member.node members in 498 let probe_idx = 499 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) } 500 in 501 502 match 503 Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx 504 ~members:member_nodes 505 with 506 | None -> () 507 | Some (target_node, new_idx) -> ( 508 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) }; 509 match Membership.find t.members target_node.id with 510 | None -> () 511 | Some member -> 512 let direct_ok = probe_member t member in 513 if not direct_ok then 514 let indirect_ok = indirect_probe t member in 515 if not indirect_ok then suspect_member t member) 516 517let run_protocol t = 518 while not (is_shutdown t) do 519 probe_cycle t; 520 Eio.Time.sleep t.clock t.config.protocol_interval 521 done 522 523let create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock ~mono_clock 524 ~secure_random = 525 match Crypto.init_key config.secret_key with 526 | Error _ -> Error `Invalid_key 527 | Ok cipher_key -> 528 Ok 529 { 530 config; 531 self; 532 members = Membership.create (); 533 incarnation = Kcas.Loc.make zero_incarnation; 534 sequence = Kcas.Loc.make 0; 535 broadcast_queue = Dissemination.create (); 536 pending_acks = Pending_acks.create (); 537 probe_index = Kcas.Loc.make 0; 538 send_pool = 539 Buffer_pool.create ~size:config.udp_buffer_size 540 ~count:config.send_buffer_count; 541 recv_pool = 542 Buffer_pool.create ~size:config.udp_buffer_size 543 ~count:config.recv_buffer_count; 544 udp_sock; 545 tcp_listener; 546 event_stream = Eio.Stream.create 100; 547 user_handlers = Kcas.Loc.make []; 548 cipher_key; 549 stats = Kcas.Loc.make empty_stats; 550 shutdown = Kcas.Loc.make false; 551 clock; 552 mono_clock; 553 secure_random; 554 sw; 555 } 556 557let shutdown t = 558 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) } 559 560let add_member t node_info = 561 let now = now_mtime t in 562 let member = Membership.Member.create ~now node_info in 563 Membership.add t.members member; 564 emit_event t (Join node_info) 565 566let remove_member t node_id = 567 match Membership.find t.members node_id with 568 | None -> false 569 | Some member -> 570 let node = Membership.Member.node member in 571 let removed = Membership.remove t.members node_id in 572 if removed then emit_event t (Leave node); 573 removed 574 575let local_node t = t.self 576let members t = Membership.to_list t.members 577let member_count t = Membership.count t.members 578let events t = t.event_stream 579 580let stats t = 581 let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in 582 let alive, suspect, dead = 583 Membership.snapshot_all t.members 584 |> List.fold_left 585 (fun (a, s, d) snap -> 586 match snap.state with 587 | Alive -> (a + 1, s, d) 588 | Suspect -> (a, s + 1, d) 589 | Dead | Left -> (a, s, d + 1)) 590 (0, 0, 0) 591 in 592 { 593 base with 594 nodes_alive = alive; 595 nodes_suspect = suspect; 596 nodes_dead = dead; 597 queue_depth = Dissemination.depth t.broadcast_queue; 598 buffers_available = 599 Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool; 600 buffers_total = 601 Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool; 602 } 603 604let broadcast t ~topic ~payload = 605 let msg = User_msg { topic; payload; origin = t.self.id } in 606 enqueue_broadcast t msg 607 608let on_message t handler = 609 Kcas.Xt.commit 610 { 611 tx = 612 (fun ~xt -> 613 let handlers = Kcas.Xt.get ~xt t.user_handlers in 614 Kcas.Xt.set ~xt t.user_handlers (handler :: handlers)); 615 }