this repo has no description
1open Types
2
3type t = {
4 config : config;
5 self : node_info;
6 members : Membership.t;
7 incarnation : incarnation Kcas.Loc.t;
8 sequence : int Kcas.Loc.t;
9 broadcast_queue : Dissemination.t;
10 pending_acks : Pending_acks.t;
11 probe_index : int Kcas.Loc.t;
12 send_pool : Buffer_pool.t;
13 recv_pool : Buffer_pool.t;
14 tcp_recv_pool : Buffer_pool.t;
15 tcp_decompress_pool : Buffer_pool.t;
16 udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t;
17 tcp_listener : [ `Generic ] Eio.Net.listening_socket_ty Eio.Resource.t;
18 event_stream : node_event Eio.Stream.t;
19 user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t;
20 cipher_key : Crypto.key;
21 stats : stats Kcas.Loc.t;
22 shutdown : bool Kcas.Loc.t;
23 clock : float Eio.Time.clock_ty Eio.Resource.t;
24 mono_clock : Eio.Time.Mono.ty Eio.Resource.t;
25 secure_random : Eio.Flow.source_ty Eio.Resource.t;
26 sw : Eio.Switch.t;
27}
28
29let next_seq t =
30 Kcas.Xt.commit
31 {
32 tx =
33 (fun ~xt ->
34 let seq = Kcas.Xt.get ~xt t.sequence in
35 Kcas.Xt.set ~xt t.sequence (seq + 1);
36 seq);
37 }
38
39let get_incarnation t =
40 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) }
41
42let incr_my_incarnation t =
43 Kcas.Xt.commit
44 {
45 tx =
46 (fun ~xt ->
47 let inc = Kcas.Xt.get ~xt t.incarnation in
48 let new_inc = incr_incarnation inc in
49 Kcas.Xt.set ~xt t.incarnation new_inc;
50 new_inc);
51 }
52
53let is_shutdown t =
54 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) }
55
56let now_mtime t =
57 Eio.Time.Mono.now t.mono_clock
58 |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns
59
60let update_stats t f =
61 Kcas.Xt.commit
62 {
63 tx =
64 (fun ~xt ->
65 let s = Kcas.Xt.get ~xt t.stats in
66 Kcas.Xt.set ~xt t.stats (f s));
67 }
68
69let emit_event t ev = Eio.Stream.add t.event_stream ev
70
71let send_packet t ~dst (packet : packet) =
72 Buffer_pool.with_buffer t.send_pool (fun buf ->
73 match Codec.encode_packet packet ~buf with
74 | Error `Buffer_too_small -> ()
75 | Ok encoded_len ->
76 let encoded = Cstruct.sub buf 0 encoded_len in
77 let to_send =
78 if t.config.encryption_enabled then
79 Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded
80 else encoded
81 in
82 Transport.send_udp t.udp_sock dst to_send;
83 update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 }))
84
85let make_packet t ~primary ~piggyback =
86 { cluster = t.config.cluster_name; primary; piggyback }
87
88let drain_piggyback t ~max_bytes =
89 Dissemination.drain t.broadcast_queue ~max_bytes
90 ~encode_size:Codec.encoded_size
91
92let enqueue_broadcast t msg =
93 let transmits =
94 Protocol_pure.retransmit_limit t.config
95 ~node_count:(Membership.count t.members)
96 in
97 Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t);
98 Dissemination.invalidate t.broadcast_queue
99 ~invalidates:Protocol_pure.invalidates msg
100
101let handle_ping t ~src (ping : protocol_msg) =
102 match ping with
103 | Ping { seq; _ } ->
104 let piggyback =
105 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
106 in
107 let ack = Ack { seq; responder = t.self; payload = None } in
108 let packet = make_packet t ~primary:ack ~piggyback in
109 send_packet t ~dst:src packet
110 | _ -> ()
111
112let handle_ping_req t ~src:_ (ping_req : protocol_msg) =
113 match ping_req with
114 | Ping_req { seq; target; sender = _ } -> (
115 match Membership.find t.members target with
116 | None -> ()
117 | Some member ->
118 let target_addr = (Membership.Member.node member).addr in
119 let ping = Ping { seq; target; sender = t.self } in
120 let packet = make_packet t ~primary:ping ~piggyback:[] in
121 send_packet t ~dst:target_addr packet)
122 | _ -> ()
123
124let handle_ack t (ack : protocol_msg) =
125 match ack with
126 | Ack { seq; responder = _; payload } ->
127 ignore (Pending_acks.complete t.pending_acks ~seq ~payload)
128 | _ -> ()
129
130let apply_member_transition t member_id transition_fn =
131 let now = now_mtime t in
132 match Membership.find t.members member_id with
133 | None -> ()
134 | Some member ->
135 let snap = Membership.Member.snapshot_now member in
136 let transition = transition_fn snap ~now in
137 if transition.Protocol_pure.new_state.state <> snap.state then begin
138 Membership.update_member t.members member_id
139 {
140 update =
141 (fun m ~xt ->
142 match transition.new_state.state with
143 | Alive ->
144 Membership.Member.set_alive ~xt m
145 ~incarnation:transition.new_state.incarnation ~now
146 | Suspect ->
147 Membership.Member.set_suspect ~xt m
148 ~incarnation:transition.new_state.incarnation ~now
149 | Dead | Left ->
150 Membership.Member.set_dead ~xt m
151 ~incarnation:transition.new_state.incarnation ~now);
152 }
153 |> ignore
154 end;
155 List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts;
156 List.iter (emit_event t) transition.events
157
158let handle_alive_msg t (msg : protocol_msg) =
159 match msg with
160 | Alive { node; incarnation = _ } ->
161 apply_member_transition t node.id (fun snap ~now ->
162 Protocol_pure.handle_alive ~self:t.self.id snap msg ~now)
163 | _ -> ()
164
165let handle_suspect_msg t (msg : protocol_msg) =
166 match msg with
167 | Suspect { node; incarnation = _; suspector = _ } ->
168 apply_member_transition t node (fun snap ~now ->
169 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
170 | _ -> ()
171
172let handle_dead_msg t (msg : protocol_msg) =
173 match msg with
174 | Dead { node; incarnation = _; declarator = _ } ->
175 apply_member_transition t node (fun snap ~now ->
176 Protocol_pure.handle_dead snap msg ~now)
177 | _ -> ()
178
179let handle_user_msg t (msg : protocol_msg) =
180 match msg with
181 | User_msg { topic; payload; origin } -> (
182 let handlers =
183 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) }
184 in
185 match Membership.find t.members origin with
186 | None -> ()
187 | Some member ->
188 let node = Membership.Member.node member in
189 List.iter (fun h -> h node topic payload) handlers)
190 | _ -> ()
191
192let handle_message t ~src (msg : protocol_msg) =
193 match msg with
194 | Ping _ -> handle_ping t ~src msg
195 | Ping_req _ -> handle_ping_req t ~src msg
196 | Ack _ -> handle_ack t msg
197 | Alive _ -> handle_alive_msg t msg
198 | Suspect _ -> handle_suspect_msg t msg
199 | Dead _ -> handle_dead_msg t msg
200 | User_msg _ -> handle_user_msg t msg
201
202let handle_packet t ~src (packet : packet) =
203 if String.equal packet.cluster t.config.cluster_name then begin
204 handle_message t ~src packet.primary;
205 List.iter (handle_message t ~src) packet.piggyback;
206 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
207 end
208
209let process_udp_packet t ~buf ~src =
210 let decrypted_result =
211 if t.config.encryption_enabled then Crypto.decrypt ~key:t.cipher_key buf
212 else Ok buf
213 in
214 match decrypted_result with
215 | Error _ ->
216 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
217 | Ok decrypted -> (
218 match Codec.decode_packet decrypted with
219 | Error _ ->
220 update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 })
221 | Ok packet -> handle_packet t ~src packet)
222
223let run_udp_receiver t =
224 while not (is_shutdown t) do
225 Buffer_pool.with_buffer t.recv_pool (fun buf ->
226 let n, src = Transport.recv_udp t.udp_sock buf in
227 let received = Cstruct.sub buf 0 n in
228 process_udp_packet t ~buf:received ~src)
229 done
230
231let build_local_state t ~is_join =
232 let members = Membership.to_list t.members in
233 let self_node =
234 let addr_bytes, port =
235 match t.self.addr with
236 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p)
237 | `Unix _ -> ("", 0)
238 in
239 Types.Wire.
240 {
241 pns_name = Types.node_id_to_string t.self.id;
242 pns_addr = addr_bytes;
243 pns_port = port;
244 pns_meta = t.self.meta;
245 pns_incarnation = Types.incarnation_to_int (get_incarnation t);
246 pns_state = 0;
247 pns_vsn = Types.default_vsn;
248 }
249 in
250 let member_nodes =
251 List.map
252 (fun member ->
253 let node = Membership.Member.node member in
254 let snap = Membership.Member.snapshot_now member in
255 let addr_bytes, port =
256 match node.addr with
257 | `Udp (ip, p) -> (Types.ip_to_bytes ip, p)
258 | `Unix _ -> ("", 0)
259 in
260 Types.Wire.
261 {
262 pns_name = Types.node_id_to_string node.id;
263 pns_addr = addr_bytes;
264 pns_port = port;
265 pns_meta = node.meta;
266 pns_incarnation = Types.incarnation_to_int snap.incarnation;
267 pns_state = Types.member_state_to_int snap.state;
268 pns_vsn = Types.default_vsn;
269 })
270 members
271 in
272 let all_nodes = self_node :: member_nodes in
273 let header =
274 Types.Wire.
275 {
276 pp_nodes = List.length all_nodes;
277 pp_user_state_len = 0;
278 pp_join = is_join;
279 }
280 in
281 (header, all_nodes)
282
283let merge_remote_state t (nodes : Types.Wire.push_node_state list) ~is_join =
284 List.iter
285 (fun (pns : Types.Wire.push_node_state) ->
286 let node_id = Types.node_id_of_string pns.pns_name in
287 if not (Types.equal_node_id node_id t.self.id) then
288 let ip = Types.ip_of_bytes pns.pns_addr in
289 let node_info =
290 Types.make_node_info ~id:node_id
291 ~addr:(`Udp (ip, pns.pns_port))
292 ~meta:pns.pns_meta
293 in
294 match Membership.find t.members node_id with
295 | None ->
296 if pns.pns_state <= 1 then begin
297 let now = now_mtime t in
298 let member = Membership.Member.create ~now node_info in
299 Membership.add t.members member;
300 emit_event t (Types.Join node_info)
301 end
302 | Some existing ->
303 let snap = Membership.Member.snapshot_now existing in
304 let remote_inc = Types.incarnation_of_int pns.pns_incarnation in
305 if Types.compare_incarnation remote_inc snap.incarnation > 0 then begin
306 let now = now_mtime t in
307 let new_state = Types.member_state_of_int pns.pns_state in
308 Membership.update_member t.members node_id
309 {
310 update =
311 (fun m ~xt ->
312 match new_state with
313 | Types.Alive ->
314 Membership.Member.set_alive ~xt m
315 ~incarnation:remote_inc ~now
316 | Types.Suspect ->
317 Membership.Member.set_suspect ~xt m
318 ~incarnation:remote_inc ~now
319 | Types.Dead | Types.Left ->
320 Membership.Member.set_dead ~xt m
321 ~incarnation:remote_inc ~now);
322 }
323 |> ignore
324 end)
325 nodes;
326 if is_join then
327 update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 })
328
329let read_exact flow buf n =
330 let rec loop offset remaining =
331 if remaining <= 0 then Ok ()
332 else
333 let chunk = Cstruct.sub buf offset remaining in
334 match Eio.Flow.single_read flow chunk with
335 | 0 -> Error `Connection_closed
336 | read -> loop (offset + read) (remaining - read)
337 | exception End_of_file -> Error `Connection_closed
338 | exception _ -> Error `Read_error
339 in
340 loop 0 n
341
342let read_available flow buf =
343 match Eio.Flow.single_read flow buf with
344 | n -> n
345 | exception End_of_file -> 0
346 | exception _ -> 0
347
348let decompress_payload data =
349 let _, msgpack = Msgpck.String.read data in
350 match msgpack with
351 | Msgpck.Map fields ->
352 let algo =
353 match List.assoc_opt (Msgpck.String "Algo") fields with
354 | Some (Msgpck.Int i) -> i
355 | Some (Msgpck.Int32 i) -> Int32.to_int i
356 | _ -> -1
357 in
358 let compressed_buf =
359 match List.assoc_opt (Msgpck.String "Buf") fields with
360 | Some (Msgpck.Bytes s) -> Some s
361 | Some (Msgpck.String s) -> Some s
362 | _ -> None
363 in
364 if algo = 0 then
365 match compressed_buf with
366 | Some buf -> (
367 match Lzw.decompress_lsb8 buf with
368 | Ok decompressed -> Some decompressed
369 | Error _ -> None)
370 | None -> None
371 else None
372 | _ -> None
373
374let decompress_payload_cstruct ~src ~dst =
375 match Codec.decode_compress_from_cstruct src with
376 | Error _ -> None
377 | Ok (algo, compressed) ->
378 if algo = 0 then
379 match Lzw.decompress_to_buffer ~src:compressed ~dst with
380 | Ok len -> Some len
381 | Error _ -> None
382 else None
383
384let handle_tcp_connection t flow =
385 Buffer_pool.with_buffer t.tcp_recv_pool (fun buf ->
386 Buffer_pool.with_buffer t.tcp_decompress_pool (fun decomp_buf ->
387 match read_exact flow buf 1 with
388 | Error _ -> ()
389 | Ok () -> (
390 let msg_type_byte = Cstruct.get_uint8 buf 0 in
391 let get_push_pull_payload () =
392 let n = read_available flow (Cstruct.shift buf 1) in
393 if n > 0 then Some (Cstruct.sub buf 1 n) else None
394 in
395 let payload_opt =
396 if
397 msg_type_byte
398 = Types.Wire.message_type_to_int Types.Wire.Encrypt_msg
399 then
400 match get_push_pull_payload () with
401 | Some encrypted -> (
402 match Crypto.decrypt ~key:t.cipher_key encrypted with
403 | Ok decrypted -> Some decrypted
404 | Error _ -> None)
405 | None -> None
406 else if
407 msg_type_byte
408 = Types.Wire.message_type_to_int Types.Wire.Compress_msg
409 then
410 match get_push_pull_payload () with
411 | Some compressed -> (
412 match
413 decompress_payload_cstruct ~src:compressed
414 ~dst:decomp_buf
415 with
416 | Some len ->
417 if len > 0 then
418 let inner_type = Cstruct.get_uint8 decomp_buf 0 in
419 if
420 inner_type
421 = Types.Wire.message_type_to_int
422 Types.Wire.Push_pull_msg
423 then Some (Cstruct.sub decomp_buf 1 (len - 1))
424 else None
425 else None
426 | None -> None)
427 | None -> None
428 else if
429 msg_type_byte
430 = Types.Wire.message_type_to_int Types.Wire.Has_label_msg
431 then
432 match read_exact flow buf 1 with
433 | Error _ -> None
434 | Ok () ->
435 let label_len = Cstruct.get_uint8 buf 0 in
436 if label_len > 0 then
437 match read_exact flow buf label_len with
438 | Error _ -> None
439 | Ok () -> (
440 match read_exact flow buf 1 with
441 | Error _ -> None
442 | Ok () ->
443 let inner_type = Cstruct.get_uint8 buf 0 in
444 if
445 inner_type
446 = Types.Wire.message_type_to_int
447 Types.Wire.Push_pull_msg
448 then get_push_pull_payload ()
449 else None)
450 else None
451 else if
452 msg_type_byte
453 = Types.Wire.message_type_to_int Types.Wire.Push_pull_msg
454 then get_push_pull_payload ()
455 else None
456 in
457 match payload_opt with
458 | None -> ()
459 | Some payload -> (
460 match Codec.decode_push_pull_msg_cstruct payload with
461 | Error _ -> ()
462 | Ok (header, nodes, _user_state) -> (
463 merge_remote_state t nodes ~is_join:header.pp_join;
464 let resp_header, resp_nodes =
465 build_local_state t ~is_join:false
466 in
467 let response =
468 Codec.encode_push_pull_msg ~header:resp_header
469 ~nodes:resp_nodes ~user_state:""
470 in
471 let resp_buf =
472 if t.config.encryption_enabled then
473 let plain = Cstruct.of_string response in
474 let encrypted =
475 Crypto.encrypt ~key:t.cipher_key
476 ~random:t.secure_random plain
477 in
478 encrypted
479 else Cstruct.of_string response
480 in
481 try Eio.Flow.write flow [ resp_buf ] with _ -> ())))))
482
483let run_tcp_listener t =
484 while not (is_shutdown t) do
485 match Eio.Net.accept ~sw:t.sw t.tcp_listener with
486 | flow, _addr ->
487 (try handle_tcp_connection t flow with _ -> ());
488 Eio.Flow.close flow
489 | exception _ -> ()
490 done
491
492let probe_member t (member : Membership.Member.t) =
493 let target = Membership.Member.node member in
494 let seq = next_seq t in
495 let piggyback =
496 drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100)
497 in
498 let ping = Ping { seq; target = target.id; sender = t.self } in
499 let packet = make_packet t ~primary:ping ~piggyback in
500
501 let waiter = Pending_acks.register t.pending_acks ~seq in
502 send_packet t ~dst:target.addr packet;
503
504 match
505 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
506 with
507 | Some _ ->
508 let now = now_mtime t in
509 Membership.update_member t.members target.id
510 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
511 |> ignore;
512 true
513 | None ->
514 Pending_acks.cancel t.pending_acks ~seq;
515 false
516
517let indirect_probe t (member : Membership.Member.t) =
518 let target = Membership.Member.node member in
519 let seq = next_seq t in
520 let ping_req = Ping_req { seq; target = target.id; sender = t.self } in
521
522 let all_members = Membership.to_node_list t.members in
523 let indirect_targets =
524 Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id
525 ~count:t.config.indirect_checks ~members:all_members
526 in
527
528 let waiter = Pending_acks.register t.pending_acks ~seq in
529 List.iter
530 (fun node ->
531 let packet = make_packet t ~primary:ping_req ~piggyback:[] in
532 send_packet t ~dst:node.addr packet)
533 indirect_targets;
534
535 match
536 Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock
537 with
538 | Some _ ->
539 let now = now_mtime t in
540 Membership.update_member t.members target.id
541 { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) }
542 |> ignore;
543 true
544 | None ->
545 Pending_acks.cancel t.pending_acks ~seq;
546 false
547
548let suspect_member t (member : Membership.Member.t) =
549 let node = Membership.Member.node member in
550 let inc = get_incarnation t in
551 let msg =
552 Suspect { node = node.id; incarnation = inc; suspector = t.self.id }
553 in
554 apply_member_transition t node.id (fun snap ~now ->
555 Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now)
556
557let probe_cycle t =
558 let members = Membership.to_list t.members in
559 let member_nodes = List.map Membership.Member.node members in
560 let probe_idx =
561 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) }
562 in
563
564 match
565 Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx
566 ~members:member_nodes
567 with
568 | None -> ()
569 | Some (target_node, new_idx) -> (
570 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) };
571 match Membership.find t.members target_node.id with
572 | None -> ()
573 | Some member ->
574 let direct_ok = probe_member t member in
575 if not direct_ok then
576 let indirect_ok = indirect_probe t member in
577 if not indirect_ok then suspect_member t member)
578
579let run_protocol t =
580 while not (is_shutdown t) do
581 probe_cycle t;
582 Eio.Time.sleep t.clock t.config.protocol_interval
583 done
584
585let create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock ~mono_clock
586 ~secure_random =
587 match Crypto.init_key config.secret_key with
588 | Error _ -> Error `Invalid_key
589 | Ok cipher_key ->
590 Ok
591 {
592 config;
593 self;
594 members = Membership.create ();
595 incarnation = Kcas.Loc.make zero_incarnation;
596 sequence = Kcas.Loc.make 0;
597 broadcast_queue = Dissemination.create ();
598 pending_acks = Pending_acks.create ();
599 probe_index = Kcas.Loc.make 0;
600 send_pool =
601 Buffer_pool.create ~size:config.udp_buffer_size
602 ~count:config.send_buffer_count;
603 recv_pool =
604 Buffer_pool.create ~size:config.udp_buffer_size
605 ~count:config.recv_buffer_count;
606 tcp_recv_pool = Buffer_pool.create ~size:65536 ~count:4;
607 tcp_decompress_pool = Buffer_pool.create ~size:131072 ~count:4;
608 udp_sock;
609 tcp_listener;
610 event_stream = Eio.Stream.create 100;
611 user_handlers = Kcas.Loc.make [];
612 cipher_key;
613 stats = Kcas.Loc.make empty_stats;
614 shutdown = Kcas.Loc.make false;
615 clock;
616 mono_clock;
617 secure_random;
618 sw;
619 }
620
621let shutdown t =
622 Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) }
623
624let add_member t node_info =
625 let now = now_mtime t in
626 let member = Membership.Member.create ~now node_info in
627 Membership.add t.members member;
628 emit_event t (Join node_info)
629
630let remove_member t node_id =
631 match Membership.find t.members node_id with
632 | None -> false
633 | Some member ->
634 let node = Membership.Member.node member in
635 let removed = Membership.remove t.members node_id in
636 if removed then emit_event t (Leave node);
637 removed
638
639let local_node t = t.self
640let members t = Membership.to_list t.members
641let member_count t = Membership.count t.members
642let events t = t.event_stream
643
644let stats t =
645 let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in
646 let alive, suspect, dead =
647 Membership.snapshot_all t.members
648 |> List.fold_left
649 (fun (a, s, d) snap ->
650 match snap.state with
651 | Alive -> (a + 1, s, d)
652 | Suspect -> (a, s + 1, d)
653 | Dead | Left -> (a, s, d + 1))
654 (0, 0, 0)
655 in
656 {
657 base with
658 nodes_alive = alive;
659 nodes_suspect = suspect;
660 nodes_dead = dead;
661 queue_depth = Dissemination.depth t.broadcast_queue;
662 buffers_available =
663 Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool;
664 buffers_total =
665 Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool;
666 }
667
668let broadcast t ~topic ~payload =
669 let msg = User_msg { topic; payload; origin = t.self.id } in
670 enqueue_broadcast t msg
671
672let on_message t handler =
673 Kcas.Xt.commit
674 {
675 tx =
676 (fun ~xt ->
677 let handlers = Kcas.Xt.get ~xt t.user_handlers in
678 Kcas.Xt.set ~xt t.user_handlers (handler :: handlers));
679 }