this repo has no description
1open Types 2 3type item = { 4 msg : protocol_msg; 5 transmits : int Kcas.Loc.t; 6 created : Mtime.span; 7} 8 9type t = { queue : item Kcas_data.Queue.t; depth : int Kcas.Loc.t } 10 11let create () = { queue = Kcas_data.Queue.create (); depth = Kcas.Loc.make 0 } 12 13let enqueue t msg ~transmits ~created = 14 let item = { msg; transmits = Kcas.Loc.make transmits; created } in 15 Kcas.Xt.commit 16 { 17 tx = 18 (fun ~xt -> 19 Kcas_data.Queue.Xt.add ~xt item t.queue; 20 Kcas.Xt.modify ~xt t.depth succ); 21 } 22 23let depth t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.depth) } 24 25let drain t ~max_bytes ~encode_size = 26 let rec loop acc bytes_used = 27 Kcas.Xt.commit 28 { 29 tx = 30 (fun ~xt -> 31 match Kcas_data.Queue.Xt.take_opt ~xt t.queue with 32 | None -> List.rev acc 33 | Some item -> 34 let msg_size = encode_size item.msg in 35 if bytes_used + msg_size > max_bytes && acc <> [] then begin 36 Kcas_data.Queue.Xt.add ~xt item t.queue; 37 List.rev acc 38 end 39 else 40 let remaining = Kcas.Xt.get ~xt item.transmits - 1 in 41 if remaining > 0 then begin 42 Kcas.Xt.set ~xt item.transmits remaining; 43 Kcas_data.Queue.Xt.add ~xt item t.queue 44 end 45 else Kcas.Xt.modify ~xt t.depth pred; 46 loop (item.msg :: acc) (bytes_used + msg_size)); 47 } 48 in 49 loop [] 0 50 51let invalidate t ~invalidates newer_msg = 52 let items = 53 Kcas.Xt.commit 54 { tx = (fun ~xt -> Kcas_data.Queue.Xt.to_seq ~xt t.queue |> List.of_seq) } 55 in 56 let valid_items, removed_count = 57 List.fold_left 58 (fun (valid, removed) item -> 59 if invalidates ~newer:newer_msg ~older:item.msg then (valid, removed + 1) 60 else (item :: valid, removed)) 61 ([], 0) items 62 in 63 if removed_count > 0 then begin 64 Kcas.Xt.commit 65 { 66 tx = 67 (fun ~xt -> 68 Kcas_data.Queue.Xt.clear ~xt t.queue; 69 List.iter 70 (fun item -> Kcas_data.Queue.Xt.add ~xt item t.queue) 71 (List.rev valid_items); 72 Kcas.Xt.modify ~xt t.depth (fun d -> d - removed_count)); 73 } 74 end