open Types type item = { msg : protocol_msg; transmits : int Kcas.Loc.t; created : Mtime.span; } type t = { queue : item Kcas_data.Queue.t; depth : int Kcas.Loc.t } let create () = { queue = Kcas_data.Queue.create (); depth = Kcas.Loc.make 0 } let enqueue t msg ~transmits ~created ~limit = let item = { msg; transmits = Kcas.Loc.make transmits; created } in Kcas.Xt.commit { tx = (fun ~xt -> let d = Kcas.Xt.get ~xt t.depth in if d >= limit then ignore (Kcas_data.Queue.Xt.take_opt ~xt t.queue) else Kcas.Xt.set ~xt t.depth (d + 1); Kcas_data.Queue.Xt.add ~xt item t.queue); } let depth t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.depth) } let drain t ~max_bytes ~encode_size = let rec loop acc bytes_used = let result = Kcas.Xt.commit { tx = (fun ~xt -> match Kcas_data.Queue.Xt.take_opt ~xt t.queue with | None -> `Done (List.rev acc) | Some item -> let msg_size = encode_size item.msg in if bytes_used + msg_size > max_bytes && acc <> [] then begin Kcas_data.Queue.Xt.add ~xt item t.queue; `Done (List.rev acc) end else begin let remaining = Kcas.Xt.get ~xt item.transmits - 1 in if remaining > 0 then begin Kcas.Xt.set ~xt item.transmits remaining; Kcas_data.Queue.Xt.add ~xt item t.queue end else Kcas.Xt.modify ~xt t.depth pred; `Continue (item.msg, msg_size) end); } in match result with | `Done msgs -> msgs | `Continue (msg, msg_size) -> loop (msg :: acc) (bytes_used + msg_size) in loop [] 0 let invalidate t ~invalidates newer_msg = let items = Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.to_seq ~xt t.queue |> List.of_seq) } in let valid_items, removed_count = List.fold_left (fun (valid, removed) item -> if invalidates ~newer:newer_msg ~older:item.msg then (valid, removed + 1) else (item :: valid, removed)) ([], 0) items in if removed_count > 0 then begin Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.clear ~xt t.queue; List.iter (fun item -> Kcas_data.Queue.Xt.add ~xt item t.queue) (List.rev valid_items); Kcas.Xt.modify ~xt t.depth (fun d -> d - removed_count)); } end