this repo has no description
1open Types 2 3type t = { 4 config : config; 5 self : node_info; 6 members : Membership.t; 7 incarnation : incarnation Kcas.Loc.t; 8 sequence : int Kcas.Loc.t; 9 broadcast_queue : Dissemination.t; 10 pending_acks : Pending_acks.t; 11 probe_index : int Kcas.Loc.t; 12 send_pool : Buffer_pool.t; 13 recv_pool : Buffer_pool.t; 14 tcp_recv_pool : Buffer_pool.t; 15 tcp_decompress_pool : Buffer_pool.t; 16 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t; 17 tcp_listener : [ `Generic ] Eio.Net.listening_socket_ty Eio.Resource.t; 18 event_stream : node_event Eio.Stream.t; 19 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t; 20 cipher_key : Crypto.key; 21 stats : stats Kcas.Loc.t; 22 shutdown : bool Kcas.Loc.t; 23 clock : float Eio.Time.clock_ty Eio.Resource.t; 24 mono_clock : Eio.Time.Mono.ty Eio.Resource.t; 25 secure_random : Eio.Flow.source_ty Eio.Resource.t; 26 sw : Eio.Switch.t; 27} 28 29let next_seq t = 30 Kcas.Xt.commit 31 { 32 tx = 33 (fun ~xt -> 34 let seq = Kcas.Xt.get ~xt t.sequence in 35 Kcas.Xt.set ~xt t.sequence (seq + 1); 36 seq); 37 } 38 39let get_incarnation t = 40 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) } 41 42let incr_my_incarnation t = 43 Kcas.Xt.commit 44 { 45 tx = 46 (fun ~xt -> 47 let inc = Kcas.Xt.get ~xt t.incarnation in 48 let new_inc = incr_incarnation inc in 49 Kcas.Xt.set ~xt t.incarnation new_inc; 50 new_inc); 51 } 52 53let is_shutdown t = 54 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) } 55 56let now_mtime t = 57 Eio.Time.Mono.now t.mono_clock 58 |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns 59 60let update_stats t f = 61 Kcas.Xt.commit 62 { 63 tx = 64 (fun ~xt -> 65 let s = Kcas.Xt.get ~xt t.stats in 66 Kcas.Xt.set ~xt t.stats (f s)); 67 } 68 69let emit_event t ev = Eio.Stream.add t.event_stream ev 70 71let send_packet t ~dst (packet : packet) = 72 Buffer_pool.with_buffer t.send_pool (fun buf -> 73 match Codec.encode_packet packet ~buf with 74 | Error `Buffer_too_small -> () 75 | Ok encoded_len -> 76 let encoded = Cstruct.sub buf 0 encoded_len in 77 let to_send = 78 if t.config.encryption_enabled then 79 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded 80 else encoded 81 in 82 Transport.send_udp t.udp_sock dst to_send; 83 update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 })) 84 85let make_packet t ~primary ~piggyback = 86 { cluster = t.config.cluster_name; primary; piggyback } 87 88let drain_piggyback t ~max_bytes = 89 Dissemination.drain t.broadcast_queue ~max_bytes 90 ~encode_size:Codec.encoded_size 91 92let enqueue_broadcast t msg = 93 let transmits = 94 Protocol_pure.retransmit_limit t.config 95 ~node_count:(Membership.count t.members) 96 in 97 Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t); 98 Dissemination.invalidate t.broadcast_queue 99 ~invalidates:Protocol_pure.invalidates msg 100 101let handle_ping t ~src (ping : protocol_msg) = 102 match ping with 103 | Ping { seq; _ } -> 104 let piggyback = 105 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) 106 in 107 let ack = Ack { seq; responder = t.self; payload = None } in 108 let packet = make_packet t ~primary:ack ~piggyback in 109 send_packet t ~dst:src packet 110 | _ -> () 111 112let handle_ping_req t ~src:_ (ping_req : protocol_msg) = 113 match ping_req with 114 | Ping_req { seq; target; sender = _ } -> ( 115 match Membership.find t.members target with 116 | None -> () 117 | Some member -> 118 let target_addr = (Membership.Member.node member).addr in 119 let ping = Ping { seq; target; sender = t.self } in 120 let packet = make_packet t ~primary:ping ~piggyback:[] in 121 send_packet t ~dst:target_addr packet) 122 | _ -> () 123 124let handle_ack t (ack : protocol_msg) = 125 match ack with 126 | Ack { seq; responder = _; payload } -> 127 ignore (Pending_acks.complete t.pending_acks ~seq ~payload) 128 | _ -> () 129 130let apply_member_transition t member_id transition_fn = 131 let now = now_mtime t in 132 match Membership.find t.members member_id with 133 | None -> () 134 | Some member -> 135 let snap = Membership.Member.snapshot_now member in 136 let transition = transition_fn snap ~now in 137 if transition.Protocol_pure.new_state.state <> snap.state then begin 138 Membership.update_member t.members member_id 139 { 140 update = 141 (fun m ~xt -> 142 match transition.new_state.state with 143 | Alive -> 144 Membership.Member.set_alive ~xt m 145 ~incarnation:transition.new_state.incarnation ~now 146 | Suspect -> 147 Membership.Member.set_suspect ~xt m 148 ~incarnation:transition.new_state.incarnation ~now 149 | Dead | Left -> 150 Membership.Member.set_dead ~xt m 151 ~incarnation:transition.new_state.incarnation ~now); 152 } 153 |> ignore 154 end; 155 List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts; 156 List.iter (emit_event t) transition.events 157 158let handle_alive_msg t (msg : protocol_msg) = 159 match msg with 160 | Alive { node; incarnation = _ } -> 161 apply_member_transition t node.id (fun snap ~now -> 162 Protocol_pure.handle_alive ~self:t.self.id snap msg ~now) 163 | _ -> () 164 165let handle_suspect_msg t (msg : protocol_msg) = 166 match msg with 167 | Suspect { node; incarnation = _; suspector = _ } -> 168 apply_member_transition t node (fun snap ~now -> 169 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) 170 | _ -> () 171 172let handle_dead_msg t (msg : protocol_msg) = 173 match msg with 174 | Dead { node; incarnation = _; declarator = _ } -> 175 apply_member_transition t node (fun snap ~now -> 176 Protocol_pure.handle_dead snap msg ~now) 177 | _ -> () 178 179let handle_user_msg t (msg : protocol_msg) = 180 match msg with 181 | User_msg { topic; payload; origin } -> ( 182 let handlers = 183 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) } 184 in 185 if List.length handlers = 0 then () 186 else 187 match Membership.find t.members origin with 188 | None -> () 189 | Some member -> 190 let node = Membership.Member.node member in 191 List.iter (fun h -> h node topic payload) handlers) 192 | _ -> () 193 194let handle_message t ~src (msg : protocol_msg) = 195 match msg with 196 | Ping _ -> handle_ping t ~src msg 197 | Ping_req _ -> handle_ping_req t ~src msg 198 | Ack _ -> handle_ack t msg 199 | Alive _ -> handle_alive_msg t msg 200 | Suspect _ -> handle_suspect_msg t msg 201 | Dead _ -> handle_dead_msg t msg 202 | User_msg _ -> handle_user_msg t msg 203 204let handle_packet t ~src (packet : packet) = 205 if String.equal packet.cluster t.config.cluster_name then begin 206 handle_message t ~src packet.primary; 207 List.iter (handle_message t ~src) packet.piggyback; 208 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 }) 209 end 210 211let process_udp_packet t ~buf ~src = 212 let decrypted_result = 213 if t.config.encryption_enabled then Crypto.decrypt ~key:t.cipher_key buf 214 else Ok buf 215 in 216 match decrypted_result with 217 | Error _ -> 218 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) 219 | Ok decrypted -> ( 220 match Codec.decode_packet decrypted with 221 | Error _ -> 222 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) 223 | Ok packet -> handle_packet t ~src packet) 224 225let run_udp_receiver t = 226 while not (is_shutdown t) do 227 Buffer_pool.with_buffer t.recv_pool (fun buf -> 228 let n, src = Transport.recv_udp t.udp_sock buf in 229 let received = Cstruct.sub buf 0 n in 230 process_udp_packet t ~buf:received ~src) 231 done 232 233let build_local_state t ~is_join = 234 let members = Membership.to_list t.members in 235 let self_node = 236 let addr_bytes, port = 237 match t.self.addr with 238 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p) 239 | `Unix _ -> ("", 0) 240 in 241 Types.Wire. 242 { 243 pns_name = Types.node_id_to_string t.self.id; 244 pns_addr = addr_bytes; 245 pns_port = port; 246 pns_meta = t.self.meta; 247 pns_incarnation = Types.incarnation_to_int (get_incarnation t); 248 pns_state = 0; 249 pns_vsn = Types.default_vsn; 250 } 251 in 252 let member_nodes = 253 List.map 254 (fun member -> 255 let node = Membership.Member.node member in 256 let snap = Membership.Member.snapshot_now member in 257 let addr_bytes, port = 258 match node.addr with 259 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p) 260 | `Unix _ -> ("", 0) 261 in 262 Types.Wire. 263 { 264 pns_name = Types.node_id_to_string node.id; 265 pns_addr = addr_bytes; 266 pns_port = port; 267 pns_meta = node.meta; 268 pns_incarnation = Types.incarnation_to_int snap.incarnation; 269 pns_state = Types.member_state_to_int snap.state; 270 pns_vsn = Types.default_vsn; 271 }) 272 members 273 in 274 let all_nodes = self_node :: member_nodes in 275 let header = 276 Types.Wire. 277 { 278 pp_nodes = List.length all_nodes; 279 pp_user_state_len = 0; 280 pp_join = is_join; 281 } 282 in 283 (header, all_nodes) 284 285let merge_remote_state t (nodes : Types.Wire.push_node_state list) ~is_join = 286 List.iter 287 (fun (pns : Types.Wire.push_node_state) -> 288 let node_id = Types.node_id_of_string pns.pns_name in 289 if not (Types.equal_node_id node_id t.self.id) then 290 let ip = Types.ip_of_bytes pns.pns_addr in 291 let node_info = 292 Types.make_node_info ~id:node_id 293 ~addr:(`Udp (ip, pns.pns_port)) 294 ~meta:pns.pns_meta 295 in 296 match Membership.find t.members node_id with 297 | None -> 298 if pns.pns_state <= 1 then begin 299 let now = now_mtime t in 300 let member = Membership.Member.create ~now node_info in 301 Membership.add t.members member; 302 emit_event t (Types.Join node_info) 303 end 304 | Some existing -> 305 let snap = Membership.Member.snapshot_now existing in 306 let remote_inc = Types.incarnation_of_int pns.pns_incarnation in 307 if Types.compare_incarnation remote_inc snap.incarnation > 0 then begin 308 let now = now_mtime t in 309 let new_state = Types.member_state_of_int pns.pns_state in 310 Membership.update_member t.members node_id 311 { 312 update = 313 (fun m ~xt -> 314 match new_state with 315 | Types.Alive -> 316 Membership.Member.set_alive ~xt m 317 ~incarnation:remote_inc ~now 318 | Types.Suspect -> 319 Membership.Member.set_suspect ~xt m 320 ~incarnation:remote_inc ~now 321 | Types.Dead | Types.Left -> 322 Membership.Member.set_dead ~xt m 323 ~incarnation:remote_inc ~now); 324 } 325 |> ignore 326 end) 327 nodes; 328 if is_join then 329 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 }) 330 331let read_exact flow buf n = 332 let rec loop offset remaining = 333 if remaining <= 0 then Ok () 334 else 335 let chunk = Cstruct.sub buf offset remaining in 336 match Eio.Flow.single_read flow chunk with 337 | 0 -> Error `Connection_closed 338 | read -> loop (offset + read) (remaining - read) 339 | exception End_of_file -> Error `Connection_closed 340 | exception _ -> Error `Read_error 341 in 342 loop 0 n 343 344let read_available flow buf = 345 match Eio.Flow.single_read flow buf with 346 | n -> n 347 | exception End_of_file -> 0 348 | exception _ -> 0 349 350let decompress_payload data = 351 let _, msgpack = Msgpck.String.read data in 352 match msgpack with 353 | Msgpck.Map fields -> 354 let algo = 355 match List.assoc_opt (Msgpck.String "Algo") fields with 356 | Some (Msgpck.Int i) -> i 357 | Some (Msgpck.Int32 i) -> Int32.to_int i 358 | _ -> -1 359 in 360 let compressed_buf = 361 match List.assoc_opt (Msgpck.String "Buf") fields with 362 | Some (Msgpck.Bytes s) -> Some s 363 | Some (Msgpck.String s) -> Some s 364 | _ -> None 365 in 366 if algo = 0 then 367 match compressed_buf with 368 | Some buf -> ( 369 match Lzw.decompress_lsb8 buf with 370 | Ok decompressed -> Some decompressed 371 | Error _ -> None) 372 | None -> None 373 else None 374 | _ -> None 375 376let decompress_payload_cstruct ~src ~dst = 377 match Codec.decode_compress_from_cstruct src with 378 | Error _ -> None 379 | Ok (algo, compressed) -> 380 if algo = 0 then 381 match Lzw.decompress_to_buffer ~src:compressed ~dst with 382 | Ok len -> Some len 383 | Error _ -> None 384 else None 385 386let handle_tcp_connection t flow = 387 Buffer_pool.with_buffer t.tcp_recv_pool (fun buf -> 388 Buffer_pool.with_buffer t.tcp_decompress_pool (fun decomp_buf -> 389 match read_exact flow buf 1 with 390 | Error _ -> () 391 | Ok () -> ( 392 let msg_type_byte = Cstruct.get_uint8 buf 0 in 393 let get_push_pull_payload () = 394 let n = read_available flow (Cstruct.shift buf 1) in 395 if n > 0 then Some (Cstruct.sub buf 1 n) else None 396 in 397 let payload_opt = 398 if 399 msg_type_byte 400 = Types.Wire.message_type_to_int Types.Wire.Encrypt_msg 401 then 402 match get_push_pull_payload () with 403 | Some encrypted -> ( 404 match Crypto.decrypt ~key:t.cipher_key encrypted with 405 | Ok decrypted -> Some decrypted 406 | Error _ -> None) 407 | None -> None 408 else if 409 msg_type_byte 410 = Types.Wire.message_type_to_int Types.Wire.Compress_msg 411 then 412 match get_push_pull_payload () with 413 | Some compressed -> ( 414 match 415 decompress_payload_cstruct ~src:compressed 416 ~dst:decomp_buf 417 with 418 | Some len -> 419 if len > 0 then 420 let inner_type = Cstruct.get_uint8 decomp_buf 0 in 421 if 422 inner_type 423 = Types.Wire.message_type_to_int 424 Types.Wire.Push_pull_msg 425 then Some (Cstruct.sub decomp_buf 1 (len - 1)) 426 else None 427 else None 428 | None -> None) 429 | None -> None 430 else if 431 msg_type_byte 432 = Types.Wire.message_type_to_int Types.Wire.Has_label_msg 433 then 434 match read_exact flow buf 1 with 435 | Error _ -> None 436 | Ok () -> 437 let label_len = Cstruct.get_uint8 buf 0 in 438 if label_len > 0 then 439 match read_exact flow buf label_len with 440 | Error _ -> None 441 | Ok () -> ( 442 match read_exact flow buf 1 with 443 | Error _ -> None 444 | Ok () -> 445 let inner_type = Cstruct.get_uint8 buf 0 in 446 if 447 inner_type 448 = Types.Wire.message_type_to_int 449 Types.Wire.Push_pull_msg 450 then get_push_pull_payload () 451 else None) 452 else None 453 else if 454 msg_type_byte 455 = Types.Wire.message_type_to_int Types.Wire.Push_pull_msg 456 then get_push_pull_payload () 457 else None 458 in 459 match payload_opt with 460 | None -> () 461 | Some payload -> ( 462 match Codec.decode_push_pull_msg_cstruct payload with 463 | Error _ -> () 464 | Ok (header, nodes, _user_state) -> ( 465 merge_remote_state t nodes ~is_join:header.pp_join; 466 let resp_header, resp_nodes = 467 build_local_state t ~is_join:false 468 in 469 let response = 470 Codec.encode_push_pull_msg ~header:resp_header 471 ~nodes:resp_nodes ~user_state:"" 472 in 473 let resp_buf = 474 if t.config.encryption_enabled then 475 let plain = Cstruct.of_string response in 476 let encrypted = 477 Crypto.encrypt ~key:t.cipher_key 478 ~random:t.secure_random plain 479 in 480 encrypted 481 else Cstruct.of_string response 482 in 483 try Eio.Flow.write flow [ resp_buf ] with _ -> ()))))) 484 485let run_tcp_listener t = 486 while not (is_shutdown t) do 487 match Eio.Net.accept ~sw:t.sw t.tcp_listener with 488 | flow, _addr -> 489 (try handle_tcp_connection t flow with _ -> ()); 490 Eio.Flow.close flow 491 | exception _ -> () 492 done 493 494let probe_member t (member : Membership.Member.t) = 495 let target = Membership.Member.node member in 496 let seq = next_seq t in 497 let piggyback = 498 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) 499 in 500 let ping = Ping { seq; target = target.id; sender = t.self } in 501 let packet = make_packet t ~primary:ping ~piggyback in 502 503 let waiter = Pending_acks.register t.pending_acks ~seq in 504 send_packet t ~dst:target.addr packet; 505 506 match 507 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock 508 with 509 | Some _ -> 510 let now = now_mtime t in 511 Membership.update_member t.members target.id 512 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } 513 |> ignore; 514 true 515 | None -> 516 Pending_acks.cancel t.pending_acks ~seq; 517 false 518 519let indirect_probe t (member : Membership.Member.t) = 520 let target = Membership.Member.node member in 521 let seq = next_seq t in 522 let ping_req = Ping_req { seq; target = target.id; sender = t.self } in 523 524 let all_members = Membership.to_node_list t.members in 525 let indirect_targets = 526 Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id 527 ~count:t.config.indirect_checks ~members:all_members 528 in 529 530 let waiter = Pending_acks.register t.pending_acks ~seq in 531 List.iter 532 (fun node -> 533 let packet = make_packet t ~primary:ping_req ~piggyback:[] in 534 send_packet t ~dst:node.addr packet) 535 indirect_targets; 536 537 match 538 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock 539 with 540 | Some _ -> 541 let now = now_mtime t in 542 Membership.update_member t.members target.id 543 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } 544 |> ignore; 545 true 546 | None -> 547 Pending_acks.cancel t.pending_acks ~seq; 548 false 549 550let suspect_member t (member : Membership.Member.t) = 551 let node = Membership.Member.node member in 552 let inc = get_incarnation t in 553 let msg = 554 Suspect { node = node.id; incarnation = inc; suspector = t.self.id } 555 in 556 apply_member_transition t node.id (fun snap ~now -> 557 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) 558 559let probe_cycle t = 560 let members = Membership.to_list t.members in 561 let member_nodes = List.map Membership.Member.node members in 562 let probe_idx = 563 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) } 564 in 565 566 match 567 Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx 568 ~members:member_nodes 569 with 570 | None -> () 571 | Some (target_node, new_idx) -> ( 572 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) }; 573 match Membership.find t.members target_node.id with 574 | None -> () 575 | Some member -> 576 let direct_ok = probe_member t member in 577 if not direct_ok then 578 let indirect_ok = indirect_probe t member in 579 if not indirect_ok then suspect_member t member) 580 581let run_protocol t = 582 while not (is_shutdown t) do 583 probe_cycle t; 584 Eio.Time.sleep t.clock t.config.protocol_interval 585 done 586 587let create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock ~mono_clock 588 ~secure_random = 589 match Crypto.init_key config.secret_key with 590 | Error _ -> Error `Invalid_key 591 | Ok cipher_key -> 592 Ok 593 { 594 config; 595 self; 596 members = Membership.create (); 597 incarnation = Kcas.Loc.make zero_incarnation; 598 sequence = Kcas.Loc.make 0; 599 broadcast_queue = Dissemination.create (); 600 pending_acks = Pending_acks.create (); 601 probe_index = Kcas.Loc.make 0; 602 send_pool = 603 Buffer_pool.create ~size:config.udp_buffer_size 604 ~count:config.send_buffer_count; 605 recv_pool = 606 Buffer_pool.create ~size:config.udp_buffer_size 607 ~count:config.recv_buffer_count; 608 tcp_recv_pool = Buffer_pool.create ~size:65536 ~count:4; 609 tcp_decompress_pool = Buffer_pool.create ~size:131072 ~count:4; 610 udp_sock; 611 tcp_listener; 612 event_stream = Eio.Stream.create 100; 613 user_handlers = Kcas.Loc.make []; 614 cipher_key; 615 stats = Kcas.Loc.make empty_stats; 616 shutdown = Kcas.Loc.make false; 617 clock; 618 mono_clock; 619 secure_random; 620 sw; 621 } 622 623let shutdown t = 624 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) } 625 626let add_member t node_info = 627 let now = now_mtime t in 628 let member = Membership.Member.create ~now node_info in 629 Membership.add t.members member; 630 emit_event t (Join node_info) 631 632let remove_member t node_id = 633 match Membership.find t.members node_id with 634 | None -> false 635 | Some member -> 636 let node = Membership.Member.node member in 637 let removed = Membership.remove t.members node_id in 638 if removed then emit_event t (Leave node); 639 removed 640 641let local_node t = t.self 642let members t = Membership.to_list t.members 643let member_count t = Membership.count t.members 644let events t = t.event_stream 645 646let stats t = 647 let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in 648 let alive, suspect, dead = 649 Membership.snapshot_all t.members 650 |> List.fold_left 651 (fun (a, s, d) snap -> 652 match snap.state with 653 | Alive -> (a + 1, s, d) 654 | Suspect -> (a, s + 1, d) 655 | Dead | Left -> (a, s, d + 1)) 656 (0, 0, 0) 657 in 658 { 659 base with 660 nodes_alive = alive; 661 nodes_suspect = suspect; 662 nodes_dead = dead; 663 queue_depth = Dissemination.depth t.broadcast_queue; 664 buffers_available = 665 Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool; 666 buffers_total = 667 Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool; 668 } 669 670let broadcast t ~topic ~payload = 671 let msg = User_msg { topic; payload; origin = t.self.id } in 672 enqueue_broadcast t msg 673 674let send_direct t ~target ~topic ~payload = 675 match Membership.find t.members target with 676 | None -> Error `Unknown_node 677 | Some member -> 678 let node = Membership.Member.node member in 679 let msg = User_msg { topic; payload; origin = t.self.id } in 680 let packet = make_packet t ~primary:msg ~piggyback:[] in 681 send_packet t ~dst:node.addr packet; 682 Ok () 683 684let send_to_addr t ~addr ~topic ~payload = 685 let msg = User_msg { topic; payload; origin = t.self.id } in 686 let packet = make_packet t ~primary:msg ~piggyback:[] in 687 send_packet t ~dst:addr packet 688 689let on_message t handler = 690 Kcas.Xt.commit 691 { 692 tx = 693 (fun ~xt -> 694 let handlers = Kcas.Xt.get ~xt t.user_handlers in 695 Kcas.Xt.set ~xt t.user_handlers (handler :: handlers)); 696 }