this repo has no description
1open Types
2
3type item = {
4 msg : protocol_msg;
5 transmits : int Kcas.Loc.t;
6 created : Mtime.span;
7}
8
9type t = { queue : item Kcas_data.Queue.t; depth : int Kcas.Loc.t }
10
11let create () = { queue = Kcas_data.Queue.create (); depth = Kcas.Loc.make 0 }
12
13let enqueue t msg ~transmits ~created =
14 let item = { msg; transmits = Kcas.Loc.make transmits; created } in
15 Kcas.Xt.commit
16 {
17 tx =
18 (fun ~xt ->
19 Kcas_data.Queue.Xt.add ~xt item t.queue;
20 Kcas.Xt.modify ~xt t.depth succ);
21 }
22
23let depth t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.depth) }
24
25let drain t ~max_bytes ~encode_size =
26 let rec loop acc bytes_used =
27 let result =
28 Kcas.Xt.commit
29 {
30 tx =
31 (fun ~xt ->
32 match Kcas_data.Queue.Xt.take_opt ~xt t.queue with
33 | None -> `Done (List.rev acc)
34 | Some item ->
35 let msg_size = encode_size item.msg in
36 if bytes_used + msg_size > max_bytes && acc <> [] then begin
37 Kcas_data.Queue.Xt.add ~xt item t.queue;
38 `Done (List.rev acc)
39 end
40 else begin
41 let remaining = Kcas.Xt.get ~xt item.transmits - 1 in
42 if remaining > 0 then begin
43 Kcas.Xt.set ~xt item.transmits remaining;
44 Kcas_data.Queue.Xt.add ~xt item t.queue
45 end
46 else Kcas.Xt.modify ~xt t.depth pred;
47 `Continue (item.msg, msg_size)
48 end);
49 }
50 in
51 match result with
52 | `Done msgs -> msgs
53 | `Continue (msg, msg_size) -> loop (msg :: acc) (bytes_used + msg_size)
54 in
55 loop [] 0
56
57let invalidate t ~invalidates newer_msg =
58 let items =
59 Kcas.Xt.commit
60 { tx = (fun ~xt -> Kcas_data.Queue.Xt.to_seq ~xt t.queue |> List.of_seq) }
61 in
62 let valid_items, removed_count =
63 List.fold_left
64 (fun (valid, removed) item ->
65 if invalidates ~newer:newer_msg ~older:item.msg then (valid, removed + 1)
66 else (item :: valid, removed))
67 ([], 0) items
68 in
69 if removed_count > 0 then begin
70 Kcas.Xt.commit
71 {
72 tx =
73 (fun ~xt ->
74 Kcas_data.Queue.Xt.clear ~xt t.queue;
75 List.iter
76 (fun item -> Kcas_data.Queue.Xt.add ~xt item t.queue)
77 (List.rev valid_items);
78 Kcas.Xt.modify ~xt t.depth (fun d -> d - removed_count));
79 }
80 end