this repo has no description
1(** Lock-free buffer pool using Kcas and Eio.
2
3 Provides pre-allocated buffers for zero-copy I/O operations. Uses
4 Kcas_data.Queue for lock-free buffer storage and Eio.Semaphore for blocking
5 acquire when pool is exhausted. *)
6
7type t = {
8 buffers : Cstruct.t Kcas_data.Queue.t;
9 buf_size : int;
10 total : int;
11 semaphore : Eio.Semaphore.t;
12}
13
14let create ~size ~count =
15 let buffers = Kcas_data.Queue.create () in
16 for _ = 1 to count do
17 Kcas.Xt.commit
18 {
19 tx =
20 (fun ~xt -> Kcas_data.Queue.Xt.add ~xt (Cstruct.create size) buffers);
21 }
22 done;
23 {
24 buffers;
25 buf_size = size;
26 total = count;
27 semaphore = Eio.Semaphore.make count;
28 }
29
30let acquire t =
31 Eio.Semaphore.acquire t.semaphore;
32 let buf_opt =
33 Kcas.Xt.commit
34 { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) }
35 in
36 match buf_opt with
37 | Some buf -> buf
38 | None ->
39 (* Should not happen if semaphore is properly synchronized,
40 but handle gracefully by allocating a new buffer *)
41 Cstruct.create t.buf_size
42
43let try_acquire t =
44 (* Check if semaphore has available permits without blocking *)
45 if Eio.Semaphore.get_value t.semaphore > 0 then begin
46 (* Race condition possible here - another fiber might acquire between
47 get_value and acquire. In that case, acquire will block briefly.
48 For truly non-blocking behavior, we'd need atomic CAS on semaphore. *)
49 Eio.Semaphore.acquire t.semaphore;
50 let buf_opt =
51 Kcas.Xt.commit
52 { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) }
53 in
54 match buf_opt with
55 | Some buf -> Some buf
56 | None -> Some (Cstruct.create t.buf_size)
57 end
58 else None
59
60let release t buf =
61 Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.add ~xt buf t.buffers) };
62 Eio.Semaphore.release t.semaphore
63
64let with_buffer t f =
65 let buf = acquire t in
66 Fun.protect ~finally:(fun () -> release t buf) (fun () -> f buf)
67
68let available t = Eio.Semaphore.get_value t.semaphore
69let total t = t.total
70let size t = t.buf_size