this repo has no description
at main 1.2 kB view raw
1type waiter = { 2 promise : string option Eio.Promise.t; 3 resolver : string option Eio.Promise.u; 4} 5 6type t = { table : (int, waiter) Kcas_data.Hashtbl.t } 7 8let create () = { table = Kcas_data.Hashtbl.create () } 9 10let register t ~seq = 11 let promise, resolver = Eio.Promise.create () in 12 let w = { promise; resolver } in 13 Kcas.Xt.commit 14 { tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.replace ~xt t.table seq w) }; 15 w 16 17let complete t ~seq ~payload = 18 let found = 19 Kcas.Xt.commit 20 { 21 tx = 22 (fun ~xt -> 23 match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table seq with 24 | None -> None 25 | Some w -> 26 Kcas_data.Hashtbl.Xt.remove ~xt t.table seq; 27 Some w); 28 } 29 in 30 match found with 31 | None -> false 32 | Some w -> 33 Eio.Promise.resolve w.resolver payload; 34 true 35 36let wait w ~timeout ~clock = 37 try 38 Some 39 (Eio.Time.with_timeout_exn clock timeout (fun () -> 40 Eio.Promise.await w.promise)) 41 with Eio.Time.Timeout -> None 42 43let cancel t ~seq = 44 Kcas.Xt.commit 45 { tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.remove ~xt t.table seq) } 46 47let pending_count t = Kcas_data.Hashtbl.length t.table