this repo has no description
1open Types 2 3type item = { 4 msg : protocol_msg; 5 transmits : int Kcas.Loc.t; 6 created : Mtime.span; 7} 8 9type t = { queue : item Kcas_data.Queue.t; depth : int Kcas.Loc.t } 10 11let create () = { queue = Kcas_data.Queue.create (); depth = Kcas.Loc.make 0 } 12 13let enqueue t msg ~transmits ~created = 14 let item = { msg; transmits = Kcas.Loc.make transmits; created } in 15 Kcas.Xt.commit 16 { 17 tx = 18 (fun ~xt -> 19 Kcas_data.Queue.Xt.add ~xt item t.queue; 20 Kcas.Xt.modify ~xt t.depth succ); 21 } 22 23let depth t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.depth) } 24 25let drain t ~max_bytes ~encode_size = 26 let rec loop acc bytes_used = 27 let result = 28 Kcas.Xt.commit 29 { 30 tx = 31 (fun ~xt -> 32 match Kcas_data.Queue.Xt.take_opt ~xt t.queue with 33 | None -> `Done (List.rev acc) 34 | Some item -> 35 let msg_size = encode_size item.msg in 36 if bytes_used + msg_size > max_bytes && acc <> [] then begin 37 Kcas_data.Queue.Xt.add ~xt item t.queue; 38 `Done (List.rev acc) 39 end 40 else begin 41 let remaining = Kcas.Xt.get ~xt item.transmits - 1 in 42 if remaining > 0 then begin 43 Kcas.Xt.set ~xt item.transmits remaining; 44 Kcas_data.Queue.Xt.add ~xt item t.queue 45 end 46 else Kcas.Xt.modify ~xt t.depth pred; 47 `Continue (item.msg, msg_size) 48 end); 49 } 50 in 51 match result with 52 | `Done msgs -> msgs 53 | `Continue (msg, msg_size) -> loop (msg :: acc) (bytes_used + msg_size) 54 in 55 loop [] 0 56 57let invalidate t ~invalidates newer_msg = 58 let items = 59 Kcas.Xt.commit 60 { tx = (fun ~xt -> Kcas_data.Queue.Xt.to_seq ~xt t.queue |> List.of_seq) } 61 in 62 let valid_items, removed_count = 63 List.fold_left 64 (fun (valid, removed) item -> 65 if invalidates ~newer:newer_msg ~older:item.msg then (valid, removed + 1) 66 else (item :: valid, removed)) 67 ([], 0) items 68 in 69 if removed_count > 0 then begin 70 Kcas.Xt.commit 71 { 72 tx = 73 (fun ~xt -> 74 Kcas_data.Queue.Xt.clear ~xt t.queue; 75 List.iter 76 (fun item -> Kcas_data.Queue.Xt.add ~xt item t.queue) 77 (List.rev valid_items); 78 Kcas.Xt.modify ~xt t.depth (fun d -> d - removed_count)); 79 } 80 end