this repo has no description
1type waiter = {
2 promise : string option Eio.Promise.t;
3 resolver : string option Eio.Promise.u;
4}
5
6type t = { table : (int, waiter) Kcas_data.Hashtbl.t }
7
8let create () = { table = Kcas_data.Hashtbl.create () }
9
10let register t ~seq =
11 let promise, resolver = Eio.Promise.create () in
12 let w = { promise; resolver } in
13 Kcas.Xt.commit
14 { tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.replace ~xt t.table seq w) };
15 w
16
17let complete t ~seq ~payload =
18 let found =
19 Kcas.Xt.commit
20 {
21 tx =
22 (fun ~xt ->
23 match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table seq with
24 | None -> None
25 | Some w ->
26 Kcas_data.Hashtbl.Xt.remove ~xt t.table seq;
27 Some w);
28 }
29 in
30 match found with
31 | None -> false
32 | Some w ->
33 Eio.Promise.resolve w.resolver payload;
34 true
35
36let wait w ~timeout ~clock =
37 try
38 Some
39 (Eio.Time.with_timeout_exn clock timeout (fun () ->
40 Eio.Promise.await w.promise))
41 with Eio.Time.Timeout -> None
42
43let cancel t ~seq =
44 Kcas.Xt.commit
45 { tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.remove ~xt t.table seq) }
46
47let pending_count t = Kcas_data.Hashtbl.length t.table