this repo has no description
1(** Lock-free buffer pool using Kcas and Eio.
2
3 Provides pre-allocated buffers for zero-copy I/O operations. Uses
4 Kcas_data.Queue for lock-free buffer storage and Eio.Semaphore for blocking
5 acquire when pool is exhausted. *)
6
7type t = {
8 buffers : Cstruct.t Kcas_data.Queue.t;
9 buf_size : int;
10 total : int;
11 semaphore : Eio.Semaphore.t;
12}
13
14let create ~size ~count =
15 let buffers = Kcas_data.Queue.create () in
16 for _ = 1 to count do
17 Kcas.Xt.commit
18 {
19 tx =
20 (fun ~xt -> Kcas_data.Queue.Xt.add ~xt (Cstruct.create size) buffers);
21 }
22 done;
23 {
24 buffers;
25 buf_size = size;
26 total = count;
27 semaphore = Eio.Semaphore.make count;
28 }
29
30let acquire t =
31 Eio.Semaphore.acquire t.semaphore;
32 let buf_opt =
33 Kcas.Xt.commit
34 { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) }
35 in
36 match buf_opt with
37 | Some buf ->
38 Cstruct.memset buf 0;
39 buf
40 | None ->
41 (* Should not happen if semaphore is properly synchronized,
42 but handle gracefully by allocating a new buffer *)
43 Cstruct.create t.buf_size
44
45let try_acquire t =
46 (* Check if semaphore has available permits without blocking *)
47 if Eio.Semaphore.get_value t.semaphore > 0 then begin
48 (* Race condition possible here - another fiber might acquire between
49 get_value and acquire. In that case, acquire will block briefly.
50 For truly non-blocking behavior, we'd need atomic CAS on semaphore. *)
51 Eio.Semaphore.acquire t.semaphore;
52 let buf_opt =
53 Kcas.Xt.commit
54 { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) }
55 in
56 match buf_opt with
57 | Some buf ->
58 Cstruct.memset buf 0;
59 Some buf
60 | None -> Some (Cstruct.create t.buf_size)
61 end
62 else None
63
64let release t buf =
65 Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.add ~xt buf t.buffers) };
66 Eio.Semaphore.release t.semaphore
67
68let with_buffer t f =
69 let buf = acquire t in
70 Fun.protect ~finally:(fun () -> release t buf) (fun () -> f buf)
71
72let available t = Eio.Semaphore.get_value t.semaphore
73let total t = t.total
74let size t = t.buf_size