this repo has no description
at main 24 kB view raw
1open Types 2 3type t = { 4 config : config; 5 self : node_info; 6 members : Membership.t; 7 incarnation : incarnation Kcas.Loc.t; 8 sequence : int Kcas.Loc.t; 9 broadcast_queue : Dissemination.t; 10 pending_acks : Pending_acks.t; 11 probe_index : int Kcas.Loc.t; 12 send_pool : Buffer_pool.t; 13 recv_pool : Buffer_pool.t; 14 tcp_recv_pool : Buffer_pool.t; 15 tcp_decompress_pool : Buffer_pool.t; 16 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t; 17 tcp_listener : [ `Generic ] Eio.Net.listening_socket_ty Eio.Resource.t; 18 event_stream : node_event Eio.Stream.t; 19 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t; 20 cipher_key : Crypto.key; 21 stats : stats Kcas.Loc.t; 22 shutdown : bool Kcas.Loc.t; 23 clock : float Eio.Time.clock_ty Eio.Resource.t; 24 mono_clock : Eio.Time.Mono.ty Eio.Resource.t; 25 secure_random : Eio.Flow.source_ty Eio.Resource.t; 26 sw : Eio.Switch.t; 27} 28 29let next_seq t = 30 Kcas.Xt.commit 31 { 32 tx = 33 (fun ~xt -> 34 let seq = Kcas.Xt.get ~xt t.sequence in 35 Kcas.Xt.set ~xt t.sequence (seq + 1); 36 seq); 37 } 38 39let get_incarnation t = 40 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) } 41 42let incr_my_incarnation t = 43 Kcas.Xt.commit 44 { 45 tx = 46 (fun ~xt -> 47 let inc = Kcas.Xt.get ~xt t.incarnation in 48 let new_inc = incr_incarnation inc in 49 Kcas.Xt.set ~xt t.incarnation new_inc; 50 new_inc); 51 } 52 53let is_shutdown t = 54 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) } 55 56let now_mtime t = 57 Eio.Time.Mono.now t.mono_clock 58 |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns 59 60let update_stats t f = 61 Kcas.Xt.commit 62 { 63 tx = 64 (fun ~xt -> 65 let s = Kcas.Xt.get ~xt t.stats in 66 Kcas.Xt.set ~xt t.stats (f s)); 67 } 68 69let emit_event t ev = Eio.Stream.add t.event_stream ev 70 71let send_packet t ~dst (packet : packet) = 72 Buffer_pool.with_buffer t.send_pool (fun buf -> 73 match Codec.encode_packet packet ~buf with 74 | Error `Buffer_too_small -> () 75 | Ok encoded_len -> 76 let encoded = Cstruct.sub buf 0 encoded_len in 77 let to_send = 78 if t.config.encryption_enabled then 79 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded 80 else encoded 81 in 82 Transport.send_udp t.udp_sock dst to_send; 83 update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 })) 84 85let make_packet t ~primary ~piggyback = 86 { cluster = t.config.cluster_name; primary; piggyback } 87 88let drain_piggyback t ~max_bytes = 89 Dissemination.drain t.broadcast_queue ~max_bytes 90 ~encode_size:Codec.encoded_size 91 92let enqueue_broadcast t msg = 93 let transmits = 94 Protocol_pure.retransmit_limit t.config 95 ~node_count:(Membership.count t.members) 96 in 97 Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t) 98 ~limit:t.config.max_gossip_queue_depth; 99 Dissemination.invalidate t.broadcast_queue 100 ~invalidates:Protocol_pure.invalidates msg 101 102let handle_ping t ~src (ping : protocol_msg) = 103 match ping with 104 | Ping { seq; _ } -> 105 let piggyback = 106 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) 107 in 108 let ack = Ack { seq; responder = t.self; payload = None } in 109 let packet = make_packet t ~primary:ack ~piggyback in 110 send_packet t ~dst:src packet 111 | _ -> () 112 113let handle_ping_req t ~src:_ (ping_req : protocol_msg) = 114 match ping_req with 115 | Ping_req { seq; target; sender = _ } -> ( 116 match Membership.find t.members target with 117 | None -> () 118 | Some member -> 119 let target_addr = (Membership.Member.node member).addr in 120 let ping = Ping { seq; target; sender = t.self } in 121 let packet = make_packet t ~primary:ping ~piggyback:[] in 122 send_packet t ~dst:target_addr packet) 123 | _ -> () 124 125let handle_ack t (ack : protocol_msg) = 126 match ack with 127 | Ack { seq; responder = _; payload } -> 128 ignore (Pending_acks.complete t.pending_acks ~seq ~payload) 129 | _ -> () 130 131let apply_member_transition t member_id transition_fn = 132 let now = now_mtime t in 133 match Membership.find t.members member_id with 134 | None -> () 135 | Some member -> 136 let snap = Membership.Member.snapshot_now member in 137 let transition = transition_fn snap ~now in 138 if transition.Protocol_pure.new_state.state <> snap.state then begin 139 Membership.update_member t.members member_id 140 { 141 update = 142 (fun m ~xt -> 143 match transition.new_state.state with 144 | Alive -> 145 Membership.Member.set_alive ~xt m 146 ~incarnation:transition.new_state.incarnation ~now 147 | Suspect -> 148 Membership.Member.set_suspect ~xt m 149 ~incarnation:transition.new_state.incarnation ~now 150 | Dead | Left -> 151 Membership.Member.set_dead ~xt m 152 ~incarnation:transition.new_state.incarnation ~now); 153 } 154 |> ignore 155 end; 156 List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts; 157 List.iter (emit_event t) transition.events 158 159let handle_alive_msg t (msg : protocol_msg) = 160 match msg with 161 | Alive { node; incarnation = _ } -> 162 apply_member_transition t node.id (fun snap ~now -> 163 Protocol_pure.handle_alive ~self:t.self.id snap msg ~now) 164 | _ -> () 165 166let handle_suspect_msg t (msg : protocol_msg) = 167 match msg with 168 | Suspect { node; incarnation = _; suspector = _ } -> 169 apply_member_transition t node (fun snap ~now -> 170 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) 171 | _ -> () 172 173let handle_dead_msg t (msg : protocol_msg) = 174 match msg with 175 | Dead { node; incarnation = _; declarator = _ } -> 176 apply_member_transition t node (fun snap ~now -> 177 Protocol_pure.handle_dead snap msg ~now) 178 | _ -> () 179 180let handle_user_msg t (msg : protocol_msg) = 181 match msg with 182 | User_msg { topic; payload; origin } -> ( 183 let handlers = 184 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) } 185 in 186 if List.length handlers = 0 then () 187 else 188 match Membership.find t.members origin with 189 | None -> () 190 | Some member -> 191 let node = Membership.Member.node member in 192 List.iter (fun h -> h node topic payload) handlers) 193 | _ -> () 194 195let handle_message t ~src (msg : protocol_msg) = 196 match msg with 197 | Ping _ -> handle_ping t ~src msg 198 | Ping_req _ -> handle_ping_req t ~src msg 199 | Ack _ -> handle_ack t msg 200 | Alive _ -> handle_alive_msg t msg 201 | Suspect _ -> handle_suspect_msg t msg 202 | Dead _ -> handle_dead_msg t msg 203 | User_msg _ -> handle_user_msg t msg 204 205let handle_packet t ~src (packet : packet) = 206 if String.equal packet.cluster t.config.cluster_name then begin 207 handle_message t ~src packet.primary; 208 List.iter (handle_message t ~src) packet.piggyback; 209 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 }) 210 end 211 212let process_udp_packet t ~buf ~src = 213 let decrypted_result = 214 if t.config.encryption_enabled then Crypto.decrypt ~key:t.cipher_key buf 215 else Ok buf 216 in 217 match decrypted_result with 218 | Error _ -> 219 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) 220 | Ok decrypted -> ( 221 match Codec.decode_packet decrypted with 222 | Error _ -> 223 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) 224 | Ok packet -> handle_packet t ~src packet) 225 226let run_udp_receiver t = 227 while not (is_shutdown t) do 228 Buffer_pool.with_buffer t.recv_pool (fun buf -> 229 let n, src = Transport.recv_udp t.udp_sock buf in 230 let received = Cstruct.sub buf 0 n in 231 process_udp_packet t ~buf:received ~src) 232 done 233 234let build_local_state t ~is_join = 235 let members = Membership.to_list t.members in 236 let self_node = 237 let addr_bytes, port = 238 match t.self.addr with 239 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p) 240 | `Unix _ -> ("", 0) 241 in 242 Types.Wire. 243 { 244 pns_name = Types.node_id_to_string t.self.id; 245 pns_addr = addr_bytes; 246 pns_port = port; 247 pns_meta = t.self.meta; 248 pns_incarnation = Types.incarnation_to_int (get_incarnation t); 249 pns_state = 0; 250 pns_vsn = Types.default_vsn; 251 } 252 in 253 let member_nodes = 254 List.map 255 (fun member -> 256 let node = Membership.Member.node member in 257 let snap = Membership.Member.snapshot_now member in 258 let addr_bytes, port = 259 match node.addr with 260 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p) 261 | `Unix _ -> ("", 0) 262 in 263 Types.Wire. 264 { 265 pns_name = Types.node_id_to_string node.id; 266 pns_addr = addr_bytes; 267 pns_port = port; 268 pns_meta = node.meta; 269 pns_incarnation = Types.incarnation_to_int snap.incarnation; 270 pns_state = Types.member_state_to_int snap.state; 271 pns_vsn = Types.default_vsn; 272 }) 273 members 274 in 275 let all_nodes = self_node :: member_nodes in 276 let header = 277 Types.Wire. 278 { 279 pp_nodes = List.length all_nodes; 280 pp_user_state_len = 0; 281 pp_join = is_join; 282 } 283 in 284 (header, all_nodes) 285 286let merge_remote_state t (nodes : Types.Wire.push_node_state list) ~is_join = 287 List.iter 288 (fun (pns : Types.Wire.push_node_state) -> 289 let node_id = Types.node_id_of_string pns.pns_name in 290 if not (Types.equal_node_id node_id t.self.id) then 291 let ip = Types.ip_of_bytes pns.pns_addr in 292 let node_info = 293 Types.make_node_info ~id:node_id 294 ~addr:(`Udp (ip, pns.pns_port)) 295 ~meta:pns.pns_meta 296 in 297 match Membership.find t.members node_id with 298 | None -> 299 if pns.pns_state <= 1 then begin 300 let now = now_mtime t in 301 let member = Membership.Member.create ~now node_info in 302 Membership.add t.members member; 303 emit_event t (Types.Join node_info) 304 end 305 | Some existing -> 306 let snap = Membership.Member.snapshot_now existing in 307 let remote_inc = Types.incarnation_of_int pns.pns_incarnation in 308 if Types.compare_incarnation remote_inc snap.incarnation > 0 then begin 309 let now = now_mtime t in 310 let new_state = Types.member_state_of_int pns.pns_state in 311 Membership.update_member t.members node_id 312 { 313 update = 314 (fun m ~xt -> 315 match new_state with 316 | Types.Alive -> 317 Membership.Member.set_alive ~xt m 318 ~incarnation:remote_inc ~now 319 | Types.Suspect -> 320 Membership.Member.set_suspect ~xt m 321 ~incarnation:remote_inc ~now 322 | Types.Dead | Types.Left -> 323 Membership.Member.set_dead ~xt m 324 ~incarnation:remote_inc ~now); 325 } 326 |> ignore 327 end) 328 nodes; 329 if is_join then 330 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 }) 331 332let read_exact flow buf n = 333 let rec loop offset remaining = 334 if remaining <= 0 then Ok () 335 else 336 let chunk = Cstruct.sub buf offset remaining in 337 match Eio.Flow.single_read flow chunk with 338 | 0 -> Error `Connection_closed 339 | read -> loop (offset + read) (remaining - read) 340 | exception End_of_file -> Error `Connection_closed 341 | exception _ -> Error `Read_error 342 in 343 loop 0 n 344 345let read_available flow buf = 346 match Eio.Flow.single_read flow buf with 347 | n -> n 348 | exception End_of_file -> 0 349 | exception _ -> 0 350 351let decompress_payload data = 352 let _, msgpack = Msgpck.String.read data in 353 match msgpack with 354 | Msgpck.Map fields -> 355 let algo = 356 match List.assoc_opt (Msgpck.String "Algo") fields with 357 | Some (Msgpck.Int i) -> i 358 | Some (Msgpck.Int32 i) -> Int32.to_int i 359 | _ -> -1 360 in 361 let compressed_buf = 362 match List.assoc_opt (Msgpck.String "Buf") fields with 363 | Some (Msgpck.Bytes s) -> Some s 364 | Some (Msgpck.String s) -> Some s 365 | _ -> None 366 in 367 if algo = 0 then 368 match compressed_buf with 369 | Some buf -> ( 370 match Lzw.decompress_lsb8 buf with 371 | Ok decompressed -> Some decompressed 372 | Error _ -> None) 373 | None -> None 374 else None 375 | _ -> None 376 377let decompress_payload_cstruct ~src ~dst = 378 match Codec.decode_compress_from_cstruct src with 379 | Error _ -> None 380 | Ok (algo, compressed) -> 381 if algo = 0 then 382 match Lzw.decompress_to_buffer ~src:compressed ~dst with 383 | Ok len -> Some len 384 | Error _ -> None 385 else None 386 387let handle_tcp_connection t flow = 388 Buffer_pool.with_buffer t.tcp_recv_pool (fun buf -> 389 Buffer_pool.with_buffer t.tcp_decompress_pool (fun decomp_buf -> 390 match read_exact flow buf 1 with 391 | Error _ -> () 392 | Ok () -> ( 393 let msg_type_byte = Cstruct.get_uint8 buf 0 in 394 let get_push_pull_payload () = 395 let n = read_available flow (Cstruct.shift buf 1) in 396 if n > 0 then Some (Cstruct.sub buf 1 n) else None 397 in 398 let payload_opt = 399 if 400 msg_type_byte 401 = Types.Wire.message_type_to_int Types.Wire.Encrypt_msg 402 then 403 match get_push_pull_payload () with 404 | Some encrypted -> ( 405 match Crypto.decrypt ~key:t.cipher_key encrypted with 406 | Ok decrypted -> Some decrypted 407 | Error _ -> None) 408 | None -> None 409 else if 410 msg_type_byte 411 = Types.Wire.message_type_to_int Types.Wire.Compress_msg 412 then 413 match get_push_pull_payload () with 414 | Some compressed -> ( 415 match 416 decompress_payload_cstruct ~src:compressed 417 ~dst:decomp_buf 418 with 419 | Some len -> 420 if len > 0 then 421 let inner_type = Cstruct.get_uint8 decomp_buf 0 in 422 if 423 inner_type 424 = Types.Wire.message_type_to_int 425 Types.Wire.Push_pull_msg 426 then Some (Cstruct.sub decomp_buf 1 (len - 1)) 427 else None 428 else None 429 | None -> None) 430 | None -> None 431 else if 432 msg_type_byte 433 = Types.Wire.message_type_to_int Types.Wire.Has_label_msg 434 then 435 match read_exact flow buf 1 with 436 | Error _ -> None 437 | Ok () -> 438 let label_len = Cstruct.get_uint8 buf 0 in 439 if label_len > 0 then 440 match read_exact flow buf label_len with 441 | Error _ -> None 442 | Ok () -> ( 443 match read_exact flow buf 1 with 444 | Error _ -> None 445 | Ok () -> 446 let inner_type = Cstruct.get_uint8 buf 0 in 447 if 448 inner_type 449 = Types.Wire.message_type_to_int 450 Types.Wire.Push_pull_msg 451 then get_push_pull_payload () 452 else None) 453 else None 454 else if 455 msg_type_byte 456 = Types.Wire.message_type_to_int Types.Wire.Push_pull_msg 457 then get_push_pull_payload () 458 else None 459 in 460 match payload_opt with 461 | None -> () 462 | Some payload -> ( 463 match Codec.decode_push_pull_msg_cstruct payload with 464 | Error _ -> () 465 | Ok (header, nodes, _user_state) -> ( 466 merge_remote_state t nodes ~is_join:header.pp_join; 467 let resp_header, resp_nodes = 468 build_local_state t ~is_join:false 469 in 470 let response = 471 Codec.encode_push_pull_msg ~header:resp_header 472 ~nodes:resp_nodes ~user_state:"" 473 in 474 let resp_buf = 475 if t.config.encryption_enabled then 476 let plain = Cstruct.of_string response in 477 let encrypted = 478 Crypto.encrypt ~key:t.cipher_key 479 ~random:t.secure_random plain 480 in 481 encrypted 482 else Cstruct.of_string response 483 in 484 try Eio.Flow.write flow [ resp_buf ] with _ -> ()))))) 485 486let run_tcp_listener t = 487 while not (is_shutdown t) do 488 match Eio.Net.accept ~sw:t.sw t.tcp_listener with 489 | flow, _addr -> 490 (try handle_tcp_connection t flow with _ -> ()); 491 Eio.Flow.close flow 492 | exception _ -> () 493 done 494 495let probe_member t (member : Membership.Member.t) = 496 let target = Membership.Member.node member in 497 let seq = next_seq t in 498 let piggyback = 499 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) 500 in 501 let ping = Ping { seq; target = target.id; sender = t.self } in 502 let packet = make_packet t ~primary:ping ~piggyback in 503 504 let waiter = Pending_acks.register t.pending_acks ~seq in 505 send_packet t ~dst:target.addr packet; 506 507 match 508 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock 509 with 510 | Some _ -> 511 let now = now_mtime t in 512 Membership.update_member t.members target.id 513 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } 514 |> ignore; 515 true 516 | None -> 517 Pending_acks.cancel t.pending_acks ~seq; 518 false 519 520let indirect_probe t (member : Membership.Member.t) = 521 let target = Membership.Member.node member in 522 let seq = next_seq t in 523 let ping_req = Ping_req { seq; target = target.id; sender = t.self } in 524 525 let all_members = Membership.to_node_list t.members in 526 let indirect_targets = 527 Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id 528 ~count:t.config.indirect_checks ~members:all_members 529 in 530 531 let waiter = Pending_acks.register t.pending_acks ~seq in 532 List.iter 533 (fun node -> 534 let packet = make_packet t ~primary:ping_req ~piggyback:[] in 535 send_packet t ~dst:node.addr packet) 536 indirect_targets; 537 538 match 539 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock 540 with 541 | Some _ -> 542 let now = now_mtime t in 543 Membership.update_member t.members target.id 544 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } 545 |> ignore; 546 true 547 | None -> 548 Pending_acks.cancel t.pending_acks ~seq; 549 false 550 551let suspect_member t (member : Membership.Member.t) = 552 let node = Membership.Member.node member in 553 let inc = get_incarnation t in 554 let msg = 555 Suspect { node = node.id; incarnation = inc; suspector = t.self.id } 556 in 557 apply_member_transition t node.id (fun snap ~now -> 558 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) 559 560let probe_cycle t = 561 let members = Membership.to_list t.members in 562 let member_nodes = List.map Membership.Member.node members in 563 let probe_idx = 564 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) } 565 in 566 567 match 568 Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx 569 ~members:member_nodes 570 with 571 | None -> () 572 | Some (target_node, new_idx) -> ( 573 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) }; 574 match Membership.find t.members target_node.id with 575 | None -> () 576 | Some member -> 577 let direct_ok = probe_member t member in 578 if not direct_ok then 579 let indirect_ok = indirect_probe t member in 580 if not indirect_ok then suspect_member t member) 581 582let run_protocol t = 583 while not (is_shutdown t) do 584 probe_cycle t; 585 Eio.Time.sleep t.clock t.config.protocol_interval 586 done 587 588let create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock ~mono_clock 589 ~secure_random = 590 match Crypto.init_key config.secret_key with 591 | Error _ -> Error `Invalid_key 592 | Ok cipher_key -> 593 Ok 594 { 595 config; 596 self; 597 members = Membership.create (); 598 incarnation = Kcas.Loc.make zero_incarnation; 599 sequence = Kcas.Loc.make 0; 600 broadcast_queue = Dissemination.create (); 601 pending_acks = Pending_acks.create (); 602 probe_index = Kcas.Loc.make 0; 603 send_pool = 604 Buffer_pool.create ~size:config.udp_buffer_size 605 ~count:config.send_buffer_count; 606 recv_pool = 607 Buffer_pool.create ~size:config.udp_buffer_size 608 ~count:config.recv_buffer_count; 609 tcp_recv_pool = Buffer_pool.create ~size:65536 ~count:4; 610 tcp_decompress_pool = Buffer_pool.create ~size:131072 ~count:4; 611 udp_sock; 612 tcp_listener; 613 event_stream = Eio.Stream.create 100; 614 user_handlers = Kcas.Loc.make []; 615 cipher_key; 616 stats = Kcas.Loc.make empty_stats; 617 shutdown = Kcas.Loc.make false; 618 clock; 619 mono_clock; 620 secure_random; 621 sw; 622 } 623 624let shutdown t = 625 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) } 626 627let add_member t node_info = 628 let now = now_mtime t in 629 let member = Membership.Member.create ~now node_info in 630 Membership.add t.members member; 631 emit_event t (Join node_info) 632 633let remove_member t node_id = 634 match Membership.find t.members node_id with 635 | None -> false 636 | Some member -> 637 let node = Membership.Member.node member in 638 let removed = Membership.remove t.members node_id in 639 if removed then emit_event t (Leave node); 640 removed 641 642let local_node t = t.self 643let members t = Membership.to_list t.members 644let member_count t = Membership.count t.members 645let events t = t.event_stream 646 647let stats t = 648 let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in 649 let alive, suspect, dead = 650 Membership.snapshot_all t.members 651 |> List.fold_left 652 (fun (a, s, d) snap -> 653 match snap.state with 654 | Alive -> (a + 1, s, d) 655 | Suspect -> (a, s + 1, d) 656 | Dead | Left -> (a, s, d + 1)) 657 (0, 0, 0) 658 in 659 { 660 base with 661 nodes_alive = alive; 662 nodes_suspect = suspect; 663 nodes_dead = dead; 664 queue_depth = Dissemination.depth t.broadcast_queue; 665 buffers_available = 666 Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool; 667 buffers_total = 668 Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool; 669 } 670 671let broadcast t ~topic ~payload = 672 let msg = User_msg { topic; payload; origin = t.self.id } in 673 enqueue_broadcast t msg 674 675let send_direct t ~target ~topic ~payload = 676 match Membership.find t.members target with 677 | None -> Error `Unknown_node 678 | Some member -> 679 let node = Membership.Member.node member in 680 let msg = User_msg { topic; payload; origin = t.self.id } in 681 let packet = make_packet t ~primary:msg ~piggyback:[] in 682 send_packet t ~dst:node.addr packet; 683 Ok () 684 685let send_to_addr t ~addr ~topic ~payload = 686 let msg = User_msg { topic; payload; origin = t.self.id } in 687 let packet = make_packet t ~primary:msg ~piggyback:[] in 688 send_packet t ~dst:addr packet 689 690let on_message t handler = 691 Kcas.Xt.commit 692 { 693 tx = 694 (fun ~xt -> 695 let handlers = Kcas.Xt.get ~xt t.user_handlers in 696 Kcas.Xt.set ~xt t.user_handlers (handler :: handlers)); 697 }