this repo has no description
1open Types
2
3type t = {
4 config : config;
5 self : node_info;
6 members : Membership.t;
7 incarnation : incarnation Kcas.Loc.t;
8 sequence : int Kcas.Loc.t;
9 broadcast_queue : Dissemination.t;
10 pending_acks : Pending_acks.t;
11 probe_index : int Kcas.Loc.t;
12 send_pool : Buffer_pool.t;
13 recv_pool : Buffer_pool.t;
14 tcp_recv_pool : Buffer_pool.t;
15 tcp_decompress_pool : Buffer_pool.t;
16 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t;
17 tcp_listener : [ `Generic ] Eio.Net.listening_socket_ty Eio.Resource.t;
18 event_stream : node_event Eio.Stream.t;
19 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t;
20 cipher_key : Crypto.key;
21 stats : stats Kcas.Loc.t;
22 shutdown : bool Kcas.Loc.t;
23 clock : float Eio.Time.clock_ty Eio.Resource.t;
24 mono_clock : Eio.Time.Mono.ty Eio.Resource.t;
25 secure_random : Eio.Flow.source_ty Eio.Resource.t;
26 sw : Eio.Switch.t;
27}
28
29let next_seq t =
30 Kcas.Xt.commit
31 {
32 tx =
33 (fun ~xt ->
34 let seq = Kcas.Xt.get ~xt t.sequence in
35 Kcas.Xt.set ~xt t.sequence (seq + 1);
36 seq);
37 }
38
39let get_incarnation t =
40 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) }
41
42let incr_my_incarnation t =
43 Kcas.Xt.commit
44 {
45 tx =
46 (fun ~xt ->
47 let inc = Kcas.Xt.get ~xt t.incarnation in
48 let new_inc = incr_incarnation inc in
49 Kcas.Xt.set ~xt t.incarnation new_inc;
50 new_inc);
51 }
52
53let is_shutdown t =
54 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) }
55
56let now_mtime t =
57 Eio.Time.Mono.now t.mono_clock
58 |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns
59
60let update_stats t f =
61 Kcas.Xt.commit
62 {
63 tx =
64 (fun ~xt ->
65 let s = Kcas.Xt.get ~xt t.stats in
66 Kcas.Xt.set ~xt t.stats (f s));
67 }
68
69let emit_event t ev = Eio.Stream.add t.event_stream ev
70
71let send_packet t ~dst (packet : packet) =
72 Buffer_pool.with_buffer t.send_pool (fun buf ->
73 match Codec.encode_packet packet ~buf with
74 | Error `Buffer_too_small -> ()
75 | Ok encoded_len ->
76 let encoded = Cstruct.sub buf 0 encoded_len in
77 let to_send =
78 if t.config.encryption_enabled then
79 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded
80 else encoded
81 in
82 Transport.send_udp t.udp_sock dst to_send;
83 update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 }))
84
85let make_packet t ~primary ~piggyback =
86 { cluster = t.config.cluster_name; primary; piggyback }
87
88let drain_piggyback t ~max_bytes =
89 Dissemination.drain t.broadcast_queue ~max_bytes
90 ~encode_size:Codec.encoded_size
91
92let enqueue_broadcast t msg =
93 let transmits =
94 Protocol_pure.retransmit_limit t.config
95 ~node_count:(Membership.count t.members)
96 in
97 Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t)
98 ~limit:t.config.max_gossip_queue_depth;
99 Dissemination.invalidate t.broadcast_queue
100 ~invalidates:Protocol_pure.invalidates msg
101
102let handle_ping t ~src (ping : protocol_msg) =
103 match ping with
104 | Ping { seq; _ } ->
105 let piggyback =
106 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
107 in
108 let ack = Ack { seq; responder = t.self; payload = None } in
109 let packet = make_packet t ~primary:ack ~piggyback in
110 send_packet t ~dst:src packet
111 | _ -> ()
112
113let handle_ping_req t ~src:_ (ping_req : protocol_msg) =
114 match ping_req with
115 | Ping_req { seq; target; sender = _ } -> (
116 match Membership.find t.members target with
117 | None -> ()
118 | Some member ->
119 let target_addr = (Membership.Member.node member).addr in
120 let ping = Ping { seq; target; sender = t.self } in
121 let packet = make_packet t ~primary:ping ~piggyback:[] in
122 send_packet t ~dst:target_addr packet)
123 | _ -> ()
124
125let handle_ack t (ack : protocol_msg) =
126 match ack with
127 | Ack { seq; responder = _; payload } ->
128 ignore (Pending_acks.complete t.pending_acks ~seq ~payload)
129 | _ -> ()
130
131let apply_member_transition t member_id transition_fn =
132 let now = now_mtime t in
133 match Membership.find t.members member_id with
134 | None -> ()
135 | Some member ->
136 let snap = Membership.Member.snapshot_now member in
137 let transition = transition_fn snap ~now in
138 if transition.Protocol_pure.new_state.state <> snap.state then begin
139 Membership.update_member t.members member_id
140 {
141 update =
142 (fun m ~xt ->
143 match transition.new_state.state with
144 | Alive ->
145 Membership.Member.set_alive ~xt m
146 ~incarnation:transition.new_state.incarnation ~now
147 | Suspect ->
148 Membership.Member.set_suspect ~xt m
149 ~incarnation:transition.new_state.incarnation ~now
150 | Dead | Left ->
151 Membership.Member.set_dead ~xt m
152 ~incarnation:transition.new_state.incarnation ~now);
153 }
154 |> ignore
155 end;
156 List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts;
157 List.iter (emit_event t) transition.events
158
159let handle_alive_msg t (msg : protocol_msg) =
160 match msg with
161 | Alive { node; incarnation = _ } ->
162 apply_member_transition t node.id (fun snap ~now ->
163 Protocol_pure.handle_alive ~self:t.self.id snap msg ~now)
164 | _ -> ()
165
166let handle_suspect_msg t (msg : protocol_msg) =
167 match msg with
168 | Suspect { node; incarnation = _; suspector = _ } ->
169 apply_member_transition t node (fun snap ~now ->
170 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
171 | _ -> ()
172
173let handle_dead_msg t (msg : protocol_msg) =
174 match msg with
175 | Dead { node; incarnation = _; declarator = _ } ->
176 apply_member_transition t node (fun snap ~now ->
177 Protocol_pure.handle_dead snap msg ~now)
178 | _ -> ()
179
180let handle_user_msg t (msg : protocol_msg) =
181 match msg with
182 | User_msg { topic; payload; origin } -> (
183 let handlers =
184 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) }
185 in
186 if List.length handlers = 0 then ()
187 else
188 match Membership.find t.members origin with
189 | None -> ()
190 | Some member ->
191 let node = Membership.Member.node member in
192 List.iter (fun h -> h node topic payload) handlers)
193 | _ -> ()
194
195let handle_message t ~src (msg : protocol_msg) =
196 match msg with
197 | Ping _ -> handle_ping t ~src msg
198 | Ping_req _ -> handle_ping_req t ~src msg
199 | Ack _ -> handle_ack t msg
200 | Alive _ -> handle_alive_msg t msg
201 | Suspect _ -> handle_suspect_msg t msg
202 | Dead _ -> handle_dead_msg t msg
203 | User_msg _ -> handle_user_msg t msg
204
205let handle_packet t ~src (packet : packet) =
206 if String.equal packet.cluster t.config.cluster_name then begin
207 handle_message t ~src packet.primary;
208 List.iter (handle_message t ~src) packet.piggyback;
209 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
210 end
211
212let process_udp_packet t ~buf ~src =
213 let decrypted_result =
214 if t.config.encryption_enabled then Crypto.decrypt ~key:t.cipher_key buf
215 else Ok buf
216 in
217 match decrypted_result with
218 | Error _ ->
219 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
220 | Ok decrypted -> (
221 match Codec.decode_packet decrypted with
222 | Error _ ->
223 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
224 | Ok packet -> handle_packet t ~src packet)
225
226let run_udp_receiver t =
227 while not (is_shutdown t) do
228 Buffer_pool.with_buffer t.recv_pool (fun buf ->
229 let n, src = Transport.recv_udp t.udp_sock buf in
230 let received = Cstruct.sub buf 0 n in
231 process_udp_packet t ~buf:received ~src)
232 done
233
234let build_local_state t ~is_join =
235 let members = Membership.to_list t.members in
236 let self_node =
237 let addr_bytes, port =
238 match t.self.addr with
239 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p)
240 | `Unix _ -> ("", 0)
241 in
242 Types.Wire.
243 {
244 pns_name = Types.node_id_to_string t.self.id;
245 pns_addr = addr_bytes;
246 pns_port = port;
247 pns_meta = t.self.meta;
248 pns_incarnation = Types.incarnation_to_int (get_incarnation t);
249 pns_state = 0;
250 pns_vsn = Types.default_vsn;
251 }
252 in
253 let member_nodes =
254 List.map
255 (fun member ->
256 let node = Membership.Member.node member in
257 let snap = Membership.Member.snapshot_now member in
258 let addr_bytes, port =
259 match node.addr with
260 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p)
261 | `Unix _ -> ("", 0)
262 in
263 Types.Wire.
264 {
265 pns_name = Types.node_id_to_string node.id;
266 pns_addr = addr_bytes;
267 pns_port = port;
268 pns_meta = node.meta;
269 pns_incarnation = Types.incarnation_to_int snap.incarnation;
270 pns_state = Types.member_state_to_int snap.state;
271 pns_vsn = Types.default_vsn;
272 })
273 members
274 in
275 let all_nodes = self_node :: member_nodes in
276 let header =
277 Types.Wire.
278 {
279 pp_nodes = List.length all_nodes;
280 pp_user_state_len = 0;
281 pp_join = is_join;
282 }
283 in
284 (header, all_nodes)
285
286let merge_remote_state t (nodes : Types.Wire.push_node_state list) ~is_join =
287 List.iter
288 (fun (pns : Types.Wire.push_node_state) ->
289 let node_id = Types.node_id_of_string pns.pns_name in
290 if not (Types.equal_node_id node_id t.self.id) then
291 let ip = Types.ip_of_bytes pns.pns_addr in
292 let node_info =
293 Types.make_node_info ~id:node_id
294 ~addr:(`Udp (ip, pns.pns_port))
295 ~meta:pns.pns_meta
296 in
297 match Membership.find t.members node_id with
298 | None ->
299 if pns.pns_state <= 1 then begin
300 let now = now_mtime t in
301 let member = Membership.Member.create ~now node_info in
302 Membership.add t.members member;
303 emit_event t (Types.Join node_info)
304 end
305 | Some existing ->
306 let snap = Membership.Member.snapshot_now existing in
307 let remote_inc = Types.incarnation_of_int pns.pns_incarnation in
308 if Types.compare_incarnation remote_inc snap.incarnation > 0 then begin
309 let now = now_mtime t in
310 let new_state = Types.member_state_of_int pns.pns_state in
311 Membership.update_member t.members node_id
312 {
313 update =
314 (fun m ~xt ->
315 match new_state with
316 | Types.Alive ->
317 Membership.Member.set_alive ~xt m
318 ~incarnation:remote_inc ~now
319 | Types.Suspect ->
320 Membership.Member.set_suspect ~xt m
321 ~incarnation:remote_inc ~now
322 | Types.Dead | Types.Left ->
323 Membership.Member.set_dead ~xt m
324 ~incarnation:remote_inc ~now);
325 }
326 |> ignore
327 end)
328 nodes;
329 if is_join then
330 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
331
332let read_exact flow buf n =
333 let rec loop offset remaining =
334 if remaining <= 0 then Ok ()
335 else
336 let chunk = Cstruct.sub buf offset remaining in
337 match Eio.Flow.single_read flow chunk with
338 | 0 -> Error `Connection_closed
339 | read -> loop (offset + read) (remaining - read)
340 | exception End_of_file -> Error `Connection_closed
341 | exception _ -> Error `Read_error
342 in
343 loop 0 n
344
345let read_available flow buf =
346 match Eio.Flow.single_read flow buf with
347 | n -> n
348 | exception End_of_file -> 0
349 | exception _ -> 0
350
351let decompress_payload data =
352 let _, msgpack = Msgpck.String.read data in
353 match msgpack with
354 | Msgpck.Map fields ->
355 let algo =
356 match List.assoc_opt (Msgpck.String "Algo") fields with
357 | Some (Msgpck.Int i) -> i
358 | Some (Msgpck.Int32 i) -> Int32.to_int i
359 | _ -> -1
360 in
361 let compressed_buf =
362 match List.assoc_opt (Msgpck.String "Buf") fields with
363 | Some (Msgpck.Bytes s) -> Some s
364 | Some (Msgpck.String s) -> Some s
365 | _ -> None
366 in
367 if algo = 0 then
368 match compressed_buf with
369 | Some buf -> (
370 match Lzw.decompress_lsb8 buf with
371 | Ok decompressed -> Some decompressed
372 | Error _ -> None)
373 | None -> None
374 else None
375 | _ -> None
376
377let decompress_payload_cstruct ~src ~dst =
378 match Codec.decode_compress_from_cstruct src with
379 | Error _ -> None
380 | Ok (algo, compressed) ->
381 if algo = 0 then
382 match Lzw.decompress_to_buffer ~src:compressed ~dst with
383 | Ok len -> Some len
384 | Error _ -> None
385 else None
386
387let handle_tcp_connection t flow =
388 Buffer_pool.with_buffer t.tcp_recv_pool (fun buf ->
389 Buffer_pool.with_buffer t.tcp_decompress_pool (fun decomp_buf ->
390 match read_exact flow buf 1 with
391 | Error _ -> ()
392 | Ok () -> (
393 let msg_type_byte = Cstruct.get_uint8 buf 0 in
394 let get_push_pull_payload () =
395 let n = read_available flow (Cstruct.shift buf 1) in
396 if n > 0 then Some (Cstruct.sub buf 1 n) else None
397 in
398 let payload_opt =
399 if
400 msg_type_byte
401 = Types.Wire.message_type_to_int Types.Wire.Encrypt_msg
402 then
403 match get_push_pull_payload () with
404 | Some encrypted -> (
405 match Crypto.decrypt ~key:t.cipher_key encrypted with
406 | Ok decrypted -> Some decrypted
407 | Error _ -> None)
408 | None -> None
409 else if
410 msg_type_byte
411 = Types.Wire.message_type_to_int Types.Wire.Compress_msg
412 then
413 match get_push_pull_payload () with
414 | Some compressed -> (
415 match
416 decompress_payload_cstruct ~src:compressed
417 ~dst:decomp_buf
418 with
419 | Some len ->
420 if len > 0 then
421 let inner_type = Cstruct.get_uint8 decomp_buf 0 in
422 if
423 inner_type
424 = Types.Wire.message_type_to_int
425 Types.Wire.Push_pull_msg
426 then Some (Cstruct.sub decomp_buf 1 (len - 1))
427 else None
428 else None
429 | None -> None)
430 | None -> None
431 else if
432 msg_type_byte
433 = Types.Wire.message_type_to_int Types.Wire.Has_label_msg
434 then
435 match read_exact flow buf 1 with
436 | Error _ -> None
437 | Ok () ->
438 let label_len = Cstruct.get_uint8 buf 0 in
439 if label_len > 0 then
440 match read_exact flow buf label_len with
441 | Error _ -> None
442 | Ok () -> (
443 match read_exact flow buf 1 with
444 | Error _ -> None
445 | Ok () ->
446 let inner_type = Cstruct.get_uint8 buf 0 in
447 if
448 inner_type
449 = Types.Wire.message_type_to_int
450 Types.Wire.Push_pull_msg
451 then get_push_pull_payload ()
452 else None)
453 else None
454 else if
455 msg_type_byte
456 = Types.Wire.message_type_to_int Types.Wire.Push_pull_msg
457 then get_push_pull_payload ()
458 else None
459 in
460 match payload_opt with
461 | None -> ()
462 | Some payload -> (
463 match Codec.decode_push_pull_msg_cstruct payload with
464 | Error _ -> ()
465 | Ok (header, nodes, _user_state) -> (
466 merge_remote_state t nodes ~is_join:header.pp_join;
467 let resp_header, resp_nodes =
468 build_local_state t ~is_join:false
469 in
470 let response =
471 Codec.encode_push_pull_msg ~header:resp_header
472 ~nodes:resp_nodes ~user_state:""
473 in
474 let resp_buf =
475 if t.config.encryption_enabled then
476 let plain = Cstruct.of_string response in
477 let encrypted =
478 Crypto.encrypt ~key:t.cipher_key
479 ~random:t.secure_random plain
480 in
481 encrypted
482 else Cstruct.of_string response
483 in
484 try Eio.Flow.write flow [ resp_buf ] with _ -> ())))))
485
486let run_tcp_listener t =
487 while not (is_shutdown t) do
488 match Eio.Net.accept ~sw:t.sw t.tcp_listener with
489 | flow, _addr ->
490 (try handle_tcp_connection t flow with _ -> ());
491 Eio.Flow.close flow
492 | exception _ -> ()
493 done
494
495let probe_member t (member : Membership.Member.t) =
496 let target = Membership.Member.node member in
497 let seq = next_seq t in
498 let piggyback =
499 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
500 in
501 let ping = Ping { seq; target = target.id; sender = t.self } in
502 let packet = make_packet t ~primary:ping ~piggyback in
503
504 let waiter = Pending_acks.register t.pending_acks ~seq in
505 send_packet t ~dst:target.addr packet;
506
507 match
508 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
509 with
510 | Some _ ->
511 let now = now_mtime t in
512 Membership.update_member t.members target.id
513 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
514 |> ignore;
515 true
516 | None ->
517 Pending_acks.cancel t.pending_acks ~seq;
518 false
519
520let indirect_probe t (member : Membership.Member.t) =
521 let target = Membership.Member.node member in
522 let seq = next_seq t in
523 let ping_req = Ping_req { seq; target = target.id; sender = t.self } in
524
525 let all_members = Membership.to_node_list t.members in
526 let indirect_targets =
527 Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id
528 ~count:t.config.indirect_checks ~members:all_members
529 in
530
531 let waiter = Pending_acks.register t.pending_acks ~seq in
532 List.iter
533 (fun node ->
534 let packet = make_packet t ~primary:ping_req ~piggyback:[] in
535 send_packet t ~dst:node.addr packet)
536 indirect_targets;
537
538 match
539 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
540 with
541 | Some _ ->
542 let now = now_mtime t in
543 Membership.update_member t.members target.id
544 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
545 |> ignore;
546 true
547 | None ->
548 Pending_acks.cancel t.pending_acks ~seq;
549 false
550
551let suspect_member t (member : Membership.Member.t) =
552 let node = Membership.Member.node member in
553 let inc = get_incarnation t in
554 let msg =
555 Suspect { node = node.id; incarnation = inc; suspector = t.self.id }
556 in
557 apply_member_transition t node.id (fun snap ~now ->
558 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
559
560let probe_cycle t =
561 let members = Membership.to_list t.members in
562 let member_nodes = List.map Membership.Member.node members in
563 let probe_idx =
564 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) }
565 in
566
567 match
568 Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx
569 ~members:member_nodes
570 with
571 | None -> ()
572 | Some (target_node, new_idx) -> (
573 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) };
574 match Membership.find t.members target_node.id with
575 | None -> ()
576 | Some member ->
577 let direct_ok = probe_member t member in
578 if not direct_ok then
579 let indirect_ok = indirect_probe t member in
580 if not indirect_ok then suspect_member t member)
581
582let run_protocol t =
583 while not (is_shutdown t) do
584 probe_cycle t;
585 Eio.Time.sleep t.clock t.config.protocol_interval
586 done
587
588let create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock ~mono_clock
589 ~secure_random =
590 match Crypto.init_key config.secret_key with
591 | Error _ -> Error `Invalid_key
592 | Ok cipher_key ->
593 Ok
594 {
595 config;
596 self;
597 members = Membership.create ();
598 incarnation = Kcas.Loc.make zero_incarnation;
599 sequence = Kcas.Loc.make 0;
600 broadcast_queue = Dissemination.create ();
601 pending_acks = Pending_acks.create ();
602 probe_index = Kcas.Loc.make 0;
603 send_pool =
604 Buffer_pool.create ~size:config.udp_buffer_size
605 ~count:config.send_buffer_count;
606 recv_pool =
607 Buffer_pool.create ~size:config.udp_buffer_size
608 ~count:config.recv_buffer_count;
609 tcp_recv_pool = Buffer_pool.create ~size:65536 ~count:4;
610 tcp_decompress_pool = Buffer_pool.create ~size:131072 ~count:4;
611 udp_sock;
612 tcp_listener;
613 event_stream = Eio.Stream.create 100;
614 user_handlers = Kcas.Loc.make [];
615 cipher_key;
616 stats = Kcas.Loc.make empty_stats;
617 shutdown = Kcas.Loc.make false;
618 clock;
619 mono_clock;
620 secure_random;
621 sw;
622 }
623
624let shutdown t =
625 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) }
626
627let add_member t node_info =
628 let now = now_mtime t in
629 let member = Membership.Member.create ~now node_info in
630 Membership.add t.members member;
631 emit_event t (Join node_info)
632
633let remove_member t node_id =
634 match Membership.find t.members node_id with
635 | None -> false
636 | Some member ->
637 let node = Membership.Member.node member in
638 let removed = Membership.remove t.members node_id in
639 if removed then emit_event t (Leave node);
640 removed
641
642let local_node t = t.self
643let members t = Membership.to_list t.members
644let member_count t = Membership.count t.members
645let events t = t.event_stream
646
647let stats t =
648 let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in
649 let alive, suspect, dead =
650 Membership.snapshot_all t.members
651 |> List.fold_left
652 (fun (a, s, d) snap ->
653 match snap.state with
654 | Alive -> (a + 1, s, d)
655 | Suspect -> (a, s + 1, d)
656 | Dead | Left -> (a, s, d + 1))
657 (0, 0, 0)
658 in
659 {
660 base with
661 nodes_alive = alive;
662 nodes_suspect = suspect;
663 nodes_dead = dead;
664 queue_depth = Dissemination.depth t.broadcast_queue;
665 buffers_available =
666 Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool;
667 buffers_total =
668 Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool;
669 }
670
671let broadcast t ~topic ~payload =
672 let msg = User_msg { topic; payload; origin = t.self.id } in
673 enqueue_broadcast t msg
674
675let send_direct t ~target ~topic ~payload =
676 match Membership.find t.members target with
677 | None -> Error `Unknown_node
678 | Some member ->
679 let node = Membership.Member.node member in
680 let msg = User_msg { topic; payload; origin = t.self.id } in
681 let packet = make_packet t ~primary:msg ~piggyback:[] in
682 send_packet t ~dst:node.addr packet;
683 Ok ()
684
685let send_to_addr t ~addr ~topic ~payload =
686 let msg = User_msg { topic; payload; origin = t.self.id } in
687 let packet = make_packet t ~primary:msg ~piggyback:[] in
688 send_packet t ~dst:addr packet
689
690let on_message t handler =
691 Kcas.Xt.commit
692 {
693 tx =
694 (fun ~xt ->
695 let handlers = Kcas.Xt.get ~xt t.user_handlers in
696 Kcas.Xt.set ~xt t.user_handlers (handler :: handlers));
697 }