this repo has no description
1open Types
2
3type t = {
4 config : config;
5 self : node_info;
6 members : Membership.t;
7 incarnation : incarnation Kcas.Loc.t;
8 sequence : int Kcas.Loc.t;
9 broadcast_queue : Dissemination.t;
10 pending_acks : Pending_acks.t;
11 probe_index : int Kcas.Loc.t;
12 send_pool : Buffer_pool.t;
13 recv_pool : Buffer_pool.t;
14 tcp_recv_pool : Buffer_pool.t;
15 tcp_decompress_pool : Buffer_pool.t;
16 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t;
17 tcp_listener : [ `Generic ] Eio.Net.listening_socket_ty Eio.Resource.t;
18 event_stream : node_event Eio.Stream.t;
19 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t;
20 cipher_key : Crypto.key;
21 stats : stats Kcas.Loc.t;
22 shutdown : bool Kcas.Loc.t;
23 clock : float Eio.Time.clock_ty Eio.Resource.t;
24 mono_clock : Eio.Time.Mono.ty Eio.Resource.t;
25 secure_random : Eio.Flow.source_ty Eio.Resource.t;
26 sw : Eio.Switch.t;
27}
28
29let next_seq t =
30 Kcas.Xt.commit
31 {
32 tx =
33 (fun ~xt ->
34 let seq = Kcas.Xt.get ~xt t.sequence in
35 Kcas.Xt.set ~xt t.sequence (seq + 1);
36 seq);
37 }
38
39let get_incarnation t =
40 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) }
41
42let incr_my_incarnation t =
43 Kcas.Xt.commit
44 {
45 tx =
46 (fun ~xt ->
47 let inc = Kcas.Xt.get ~xt t.incarnation in
48 let new_inc = incr_incarnation inc in
49 Kcas.Xt.set ~xt t.incarnation new_inc;
50 new_inc);
51 }
52
53let is_shutdown t =
54 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) }
55
56let now_mtime t =
57 Eio.Time.Mono.now t.mono_clock
58 |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns
59
60let update_stats t f =
61 Kcas.Xt.commit
62 {
63 tx =
64 (fun ~xt ->
65 let s = Kcas.Xt.get ~xt t.stats in
66 Kcas.Xt.set ~xt t.stats (f s));
67 }
68
69let emit_event t ev = Eio.Stream.add t.event_stream ev
70
71let send_packet t ~dst (packet : packet) =
72 Buffer_pool.with_buffer t.send_pool (fun buf ->
73 match Codec.encode_packet packet ~buf with
74 | Error `Buffer_too_small -> ()
75 | Ok encoded_len ->
76 let encoded = Cstruct.sub buf 0 encoded_len in
77 let to_send =
78 if t.config.encryption_enabled then
79 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded
80 else encoded
81 in
82 Transport.send_udp t.udp_sock dst to_send;
83 update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 }))
84
85let make_packet t ~primary ~piggyback =
86 { cluster = t.config.cluster_name; primary; piggyback }
87
88let drain_piggyback t ~max_bytes =
89 Dissemination.drain t.broadcast_queue ~max_bytes
90 ~encode_size:Codec.encoded_size
91
92let enqueue_broadcast t msg =
93 let transmits =
94 Protocol_pure.retransmit_limit t.config
95 ~node_count:(Membership.count t.members)
96 in
97 Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t);
98 Dissemination.invalidate t.broadcast_queue
99 ~invalidates:Protocol_pure.invalidates msg
100
101let handle_ping t ~src (ping : protocol_msg) =
102 match ping with
103 | Ping { seq; _ } ->
104 let piggyback =
105 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
106 in
107 let ack = Ack { seq; responder = t.self; payload = None } in
108 let packet = make_packet t ~primary:ack ~piggyback in
109 send_packet t ~dst:src packet
110 | _ -> ()
111
112let handle_ping_req t ~src:_ (ping_req : protocol_msg) =
113 match ping_req with
114 | Ping_req { seq; target; sender = _ } -> (
115 match Membership.find t.members target with
116 | None -> ()
117 | Some member ->
118 let target_addr = (Membership.Member.node member).addr in
119 let ping = Ping { seq; target; sender = t.self } in
120 let packet = make_packet t ~primary:ping ~piggyback:[] in
121 send_packet t ~dst:target_addr packet)
122 | _ -> ()
123
124let handle_ack t (ack : protocol_msg) =
125 match ack with
126 | Ack { seq; responder = _; payload } ->
127 ignore (Pending_acks.complete t.pending_acks ~seq ~payload)
128 | _ -> ()
129
130let apply_member_transition t member_id transition_fn =
131 let now = now_mtime t in
132 match Membership.find t.members member_id with
133 | None -> ()
134 | Some member ->
135 let snap = Membership.Member.snapshot_now member in
136 let transition = transition_fn snap ~now in
137 if transition.Protocol_pure.new_state.state <> snap.state then begin
138 Membership.update_member t.members member_id
139 {
140 update =
141 (fun m ~xt ->
142 match transition.new_state.state with
143 | Alive ->
144 Membership.Member.set_alive ~xt m
145 ~incarnation:transition.new_state.incarnation ~now
146 | Suspect ->
147 Membership.Member.set_suspect ~xt m
148 ~incarnation:transition.new_state.incarnation ~now
149 | Dead | Left ->
150 Membership.Member.set_dead ~xt m
151 ~incarnation:transition.new_state.incarnation ~now);
152 }
153 |> ignore
154 end;
155 List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts;
156 List.iter (emit_event t) transition.events
157
158let handle_alive_msg t (msg : protocol_msg) =
159 match msg with
160 | Alive { node; incarnation = _ } ->
161 apply_member_transition t node.id (fun snap ~now ->
162 Protocol_pure.handle_alive ~self:t.self.id snap msg ~now)
163 | _ -> ()
164
165let handle_suspect_msg t (msg : protocol_msg) =
166 match msg with
167 | Suspect { node; incarnation = _; suspector = _ } ->
168 apply_member_transition t node (fun snap ~now ->
169 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
170 | _ -> ()
171
172let handle_dead_msg t (msg : protocol_msg) =
173 match msg with
174 | Dead { node; incarnation = _; declarator = _ } ->
175 apply_member_transition t node (fun snap ~now ->
176 Protocol_pure.handle_dead snap msg ~now)
177 | _ -> ()
178
179let handle_user_msg t (msg : protocol_msg) =
180 match msg with
181 | User_msg { topic; payload; origin } -> (
182 let handlers =
183 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) }
184 in
185 if List.length handlers = 0 then ()
186 else
187 match Membership.find t.members origin with
188 | None -> ()
189 | Some member ->
190 let node = Membership.Member.node member in
191 List.iter (fun h -> h node topic payload) handlers)
192 | _ -> ()
193
194let handle_message t ~src (msg : protocol_msg) =
195 match msg with
196 | Ping _ -> handle_ping t ~src msg
197 | Ping_req _ -> handle_ping_req t ~src msg
198 | Ack _ -> handle_ack t msg
199 | Alive _ -> handle_alive_msg t msg
200 | Suspect _ -> handle_suspect_msg t msg
201 | Dead _ -> handle_dead_msg t msg
202 | User_msg _ -> handle_user_msg t msg
203
204let handle_packet t ~src (packet : packet) =
205 if String.equal packet.cluster t.config.cluster_name then begin
206 handle_message t ~src packet.primary;
207 List.iter (handle_message t ~src) packet.piggyback;
208 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
209 end
210
211let process_udp_packet t ~buf ~src =
212 let decrypted_result =
213 if t.config.encryption_enabled then Crypto.decrypt ~key:t.cipher_key buf
214 else Ok buf
215 in
216 match decrypted_result with
217 | Error _ ->
218 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
219 | Ok decrypted -> (
220 match Codec.decode_packet decrypted with
221 | Error _ ->
222 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
223 | Ok packet -> handle_packet t ~src packet)
224
225let run_udp_receiver t =
226 while not (is_shutdown t) do
227 Buffer_pool.with_buffer t.recv_pool (fun buf ->
228 let n, src = Transport.recv_udp t.udp_sock buf in
229 let received = Cstruct.sub buf 0 n in
230 process_udp_packet t ~buf:received ~src)
231 done
232
233let build_local_state t ~is_join =
234 let members = Membership.to_list t.members in
235 let self_node =
236 let addr_bytes, port =
237 match t.self.addr with
238 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p)
239 | `Unix _ -> ("", 0)
240 in
241 Types.Wire.
242 {
243 pns_name = Types.node_id_to_string t.self.id;
244 pns_addr = addr_bytes;
245 pns_port = port;
246 pns_meta = t.self.meta;
247 pns_incarnation = Types.incarnation_to_int (get_incarnation t);
248 pns_state = 0;
249 pns_vsn = Types.default_vsn;
250 }
251 in
252 let member_nodes =
253 List.map
254 (fun member ->
255 let node = Membership.Member.node member in
256 let snap = Membership.Member.snapshot_now member in
257 let addr_bytes, port =
258 match node.addr with
259 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p)
260 | `Unix _ -> ("", 0)
261 in
262 Types.Wire.
263 {
264 pns_name = Types.node_id_to_string node.id;
265 pns_addr = addr_bytes;
266 pns_port = port;
267 pns_meta = node.meta;
268 pns_incarnation = Types.incarnation_to_int snap.incarnation;
269 pns_state = Types.member_state_to_int snap.state;
270 pns_vsn = Types.default_vsn;
271 })
272 members
273 in
274 let all_nodes = self_node :: member_nodes in
275 let header =
276 Types.Wire.
277 {
278 pp_nodes = List.length all_nodes;
279 pp_user_state_len = 0;
280 pp_join = is_join;
281 }
282 in
283 (header, all_nodes)
284
285let merge_remote_state t (nodes : Types.Wire.push_node_state list) ~is_join =
286 List.iter
287 (fun (pns : Types.Wire.push_node_state) ->
288 let node_id = Types.node_id_of_string pns.pns_name in
289 if not (Types.equal_node_id node_id t.self.id) then
290 let ip = Types.ip_of_bytes pns.pns_addr in
291 let node_info =
292 Types.make_node_info ~id:node_id
293 ~addr:(`Udp (ip, pns.pns_port))
294 ~meta:pns.pns_meta
295 in
296 match Membership.find t.members node_id with
297 | None ->
298 if pns.pns_state <= 1 then begin
299 let now = now_mtime t in
300 let member = Membership.Member.create ~now node_info in
301 Membership.add t.members member;
302 emit_event t (Types.Join node_info)
303 end
304 | Some existing ->
305 let snap = Membership.Member.snapshot_now existing in
306 let remote_inc = Types.incarnation_of_int pns.pns_incarnation in
307 if Types.compare_incarnation remote_inc snap.incarnation > 0 then begin
308 let now = now_mtime t in
309 let new_state = Types.member_state_of_int pns.pns_state in
310 Membership.update_member t.members node_id
311 {
312 update =
313 (fun m ~xt ->
314 match new_state with
315 | Types.Alive ->
316 Membership.Member.set_alive ~xt m
317 ~incarnation:remote_inc ~now
318 | Types.Suspect ->
319 Membership.Member.set_suspect ~xt m
320 ~incarnation:remote_inc ~now
321 | Types.Dead | Types.Left ->
322 Membership.Member.set_dead ~xt m
323 ~incarnation:remote_inc ~now);
324 }
325 |> ignore
326 end)
327 nodes;
328 if is_join then
329 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
330
331let read_exact flow buf n =
332 let rec loop offset remaining =
333 if remaining <= 0 then Ok ()
334 else
335 let chunk = Cstruct.sub buf offset remaining in
336 match Eio.Flow.single_read flow chunk with
337 | 0 -> Error `Connection_closed
338 | read -> loop (offset + read) (remaining - read)
339 | exception End_of_file -> Error `Connection_closed
340 | exception _ -> Error `Read_error
341 in
342 loop 0 n
343
344let read_available flow buf =
345 match Eio.Flow.single_read flow buf with
346 | n -> n
347 | exception End_of_file -> 0
348 | exception _ -> 0
349
350let decompress_payload data =
351 let _, msgpack = Msgpck.String.read data in
352 match msgpack with
353 | Msgpck.Map fields ->
354 let algo =
355 match List.assoc_opt (Msgpck.String "Algo") fields with
356 | Some (Msgpck.Int i) -> i
357 | Some (Msgpck.Int32 i) -> Int32.to_int i
358 | _ -> -1
359 in
360 let compressed_buf =
361 match List.assoc_opt (Msgpck.String "Buf") fields with
362 | Some (Msgpck.Bytes s) -> Some s
363 | Some (Msgpck.String s) -> Some s
364 | _ -> None
365 in
366 if algo = 0 then
367 match compressed_buf with
368 | Some buf -> (
369 match Lzw.decompress_lsb8 buf with
370 | Ok decompressed -> Some decompressed
371 | Error _ -> None)
372 | None -> None
373 else None
374 | _ -> None
375
376let decompress_payload_cstruct ~src ~dst =
377 match Codec.decode_compress_from_cstruct src with
378 | Error _ -> None
379 | Ok (algo, compressed) ->
380 if algo = 0 then
381 match Lzw.decompress_to_buffer ~src:compressed ~dst with
382 | Ok len -> Some len
383 | Error _ -> None
384 else None
385
386let handle_tcp_connection t flow =
387 Buffer_pool.with_buffer t.tcp_recv_pool (fun buf ->
388 Buffer_pool.with_buffer t.tcp_decompress_pool (fun decomp_buf ->
389 match read_exact flow buf 1 with
390 | Error _ -> ()
391 | Ok () -> (
392 let msg_type_byte = Cstruct.get_uint8 buf 0 in
393 let get_push_pull_payload () =
394 let n = read_available flow (Cstruct.shift buf 1) in
395 if n > 0 then Some (Cstruct.sub buf 1 n) else None
396 in
397 let payload_opt =
398 if
399 msg_type_byte
400 = Types.Wire.message_type_to_int Types.Wire.Encrypt_msg
401 then
402 match get_push_pull_payload () with
403 | Some encrypted -> (
404 match Crypto.decrypt ~key:t.cipher_key encrypted with
405 | Ok decrypted -> Some decrypted
406 | Error _ -> None)
407 | None -> None
408 else if
409 msg_type_byte
410 = Types.Wire.message_type_to_int Types.Wire.Compress_msg
411 then
412 match get_push_pull_payload () with
413 | Some compressed -> (
414 match
415 decompress_payload_cstruct ~src:compressed
416 ~dst:decomp_buf
417 with
418 | Some len ->
419 if len > 0 then
420 let inner_type = Cstruct.get_uint8 decomp_buf 0 in
421 if
422 inner_type
423 = Types.Wire.message_type_to_int
424 Types.Wire.Push_pull_msg
425 then Some (Cstruct.sub decomp_buf 1 (len - 1))
426 else None
427 else None
428 | None -> None)
429 | None -> None
430 else if
431 msg_type_byte
432 = Types.Wire.message_type_to_int Types.Wire.Has_label_msg
433 then
434 match read_exact flow buf 1 with
435 | Error _ -> None
436 | Ok () ->
437 let label_len = Cstruct.get_uint8 buf 0 in
438 if label_len > 0 then
439 match read_exact flow buf label_len with
440 | Error _ -> None
441 | Ok () -> (
442 match read_exact flow buf 1 with
443 | Error _ -> None
444 | Ok () ->
445 let inner_type = Cstruct.get_uint8 buf 0 in
446 if
447 inner_type
448 = Types.Wire.message_type_to_int
449 Types.Wire.Push_pull_msg
450 then get_push_pull_payload ()
451 else None)
452 else None
453 else if
454 msg_type_byte
455 = Types.Wire.message_type_to_int Types.Wire.Push_pull_msg
456 then get_push_pull_payload ()
457 else None
458 in
459 match payload_opt with
460 | None -> ()
461 | Some payload -> (
462 match Codec.decode_push_pull_msg_cstruct payload with
463 | Error _ -> ()
464 | Ok (header, nodes, _user_state) -> (
465 merge_remote_state t nodes ~is_join:header.pp_join;
466 let resp_header, resp_nodes =
467 build_local_state t ~is_join:false
468 in
469 let response =
470 Codec.encode_push_pull_msg ~header:resp_header
471 ~nodes:resp_nodes ~user_state:""
472 in
473 let resp_buf =
474 if t.config.encryption_enabled then
475 let plain = Cstruct.of_string response in
476 let encrypted =
477 Crypto.encrypt ~key:t.cipher_key
478 ~random:t.secure_random plain
479 in
480 encrypted
481 else Cstruct.of_string response
482 in
483 try Eio.Flow.write flow [ resp_buf ] with _ -> ())))))
484
485let run_tcp_listener t =
486 while not (is_shutdown t) do
487 match Eio.Net.accept ~sw:t.sw t.tcp_listener with
488 | flow, _addr ->
489 (try handle_tcp_connection t flow with _ -> ());
490 Eio.Flow.close flow
491 | exception _ -> ()
492 done
493
494let probe_member t (member : Membership.Member.t) =
495 let target = Membership.Member.node member in
496 let seq = next_seq t in
497 let piggyback =
498 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
499 in
500 let ping = Ping { seq; target = target.id; sender = t.self } in
501 let packet = make_packet t ~primary:ping ~piggyback in
502
503 let waiter = Pending_acks.register t.pending_acks ~seq in
504 send_packet t ~dst:target.addr packet;
505
506 match
507 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
508 with
509 | Some _ ->
510 let now = now_mtime t in
511 Membership.update_member t.members target.id
512 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
513 |> ignore;
514 true
515 | None ->
516 Pending_acks.cancel t.pending_acks ~seq;
517 false
518
519let indirect_probe t (member : Membership.Member.t) =
520 let target = Membership.Member.node member in
521 let seq = next_seq t in
522 let ping_req = Ping_req { seq; target = target.id; sender = t.self } in
523
524 let all_members = Membership.to_node_list t.members in
525 let indirect_targets =
526 Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id
527 ~count:t.config.indirect_checks ~members:all_members
528 in
529
530 let waiter = Pending_acks.register t.pending_acks ~seq in
531 List.iter
532 (fun node ->
533 let packet = make_packet t ~primary:ping_req ~piggyback:[] in
534 send_packet t ~dst:node.addr packet)
535 indirect_targets;
536
537 match
538 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
539 with
540 | Some _ ->
541 let now = now_mtime t in
542 Membership.update_member t.members target.id
543 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
544 |> ignore;
545 true
546 | None ->
547 Pending_acks.cancel t.pending_acks ~seq;
548 false
549
550let suspect_member t (member : Membership.Member.t) =
551 let node = Membership.Member.node member in
552 let inc = get_incarnation t in
553 let msg =
554 Suspect { node = node.id; incarnation = inc; suspector = t.self.id }
555 in
556 apply_member_transition t node.id (fun snap ~now ->
557 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
558
559let probe_cycle t =
560 let members = Membership.to_list t.members in
561 let member_nodes = List.map Membership.Member.node members in
562 let probe_idx =
563 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) }
564 in
565
566 match
567 Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx
568 ~members:member_nodes
569 with
570 | None -> ()
571 | Some (target_node, new_idx) -> (
572 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) };
573 match Membership.find t.members target_node.id with
574 | None -> ()
575 | Some member ->
576 let direct_ok = probe_member t member in
577 if not direct_ok then
578 let indirect_ok = indirect_probe t member in
579 if not indirect_ok then suspect_member t member)
580
581let run_protocol t =
582 while not (is_shutdown t) do
583 probe_cycle t;
584 Eio.Time.sleep t.clock t.config.protocol_interval
585 done
586
587let create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock ~mono_clock
588 ~secure_random =
589 match Crypto.init_key config.secret_key with
590 | Error _ -> Error `Invalid_key
591 | Ok cipher_key ->
592 Ok
593 {
594 config;
595 self;
596 members = Membership.create ();
597 incarnation = Kcas.Loc.make zero_incarnation;
598 sequence = Kcas.Loc.make 0;
599 broadcast_queue = Dissemination.create ();
600 pending_acks = Pending_acks.create ();
601 probe_index = Kcas.Loc.make 0;
602 send_pool =
603 Buffer_pool.create ~size:config.udp_buffer_size
604 ~count:config.send_buffer_count;
605 recv_pool =
606 Buffer_pool.create ~size:config.udp_buffer_size
607 ~count:config.recv_buffer_count;
608 tcp_recv_pool = Buffer_pool.create ~size:65536 ~count:4;
609 tcp_decompress_pool = Buffer_pool.create ~size:131072 ~count:4;
610 udp_sock;
611 tcp_listener;
612 event_stream = Eio.Stream.create 100;
613 user_handlers = Kcas.Loc.make [];
614 cipher_key;
615 stats = Kcas.Loc.make empty_stats;
616 shutdown = Kcas.Loc.make false;
617 clock;
618 mono_clock;
619 secure_random;
620 sw;
621 }
622
623let shutdown t =
624 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) }
625
626let add_member t node_info =
627 let now = now_mtime t in
628 let member = Membership.Member.create ~now node_info in
629 Membership.add t.members member;
630 emit_event t (Join node_info)
631
632let remove_member t node_id =
633 match Membership.find t.members node_id with
634 | None -> false
635 | Some member ->
636 let node = Membership.Member.node member in
637 let removed = Membership.remove t.members node_id in
638 if removed then emit_event t (Leave node);
639 removed
640
641let local_node t = t.self
642let members t = Membership.to_list t.members
643let member_count t = Membership.count t.members
644let events t = t.event_stream
645
646let stats t =
647 let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in
648 let alive, suspect, dead =
649 Membership.snapshot_all t.members
650 |> List.fold_left
651 (fun (a, s, d) snap ->
652 match snap.state with
653 | Alive -> (a + 1, s, d)
654 | Suspect -> (a, s + 1, d)
655 | Dead | Left -> (a, s, d + 1))
656 (0, 0, 0)
657 in
658 {
659 base with
660 nodes_alive = alive;
661 nodes_suspect = suspect;
662 nodes_dead = dead;
663 queue_depth = Dissemination.depth t.broadcast_queue;
664 buffers_available =
665 Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool;
666 buffers_total =
667 Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool;
668 }
669
670let broadcast t ~topic ~payload =
671 let msg = User_msg { topic; payload; origin = t.self.id } in
672 enqueue_broadcast t msg
673
674let send_direct t ~target ~topic ~payload =
675 match Membership.find t.members target with
676 | None -> Error `Unknown_node
677 | Some member ->
678 let node = Membership.Member.node member in
679 let msg = User_msg { topic; payload; origin = t.self.id } in
680 let packet = make_packet t ~primary:msg ~piggyback:[] in
681 send_packet t ~dst:node.addr packet;
682 Ok ()
683
684let send_to_addr t ~addr ~topic ~payload =
685 let msg = User_msg { topic; payload; origin = t.self.id } in
686 let packet = make_packet t ~primary:msg ~piggyback:[] in
687 send_packet t ~dst:addr packet
688
689let on_message t handler =
690 Kcas.Xt.commit
691 {
692 tx =
693 (fun ~xt ->
694 let handlers = Kcas.Xt.get ~xt t.user_handlers in
695 Kcas.Xt.set ~xt t.user_handlers (handler :: handlers));
696 }