this repo has no description
1(** Lock-free buffer pool using Kcas and Eio. 2 3 Provides pre-allocated buffers for zero-copy I/O operations. Uses 4 Kcas_data.Queue for lock-free buffer storage and Eio.Semaphore for blocking 5 acquire when pool is exhausted. *) 6 7type t = { 8 buffers : Cstruct.t Kcas_data.Queue.t; 9 buf_size : int; 10 total : int; 11 semaphore : Eio.Semaphore.t; 12} 13 14let create ~size ~count = 15 let buffers = Kcas_data.Queue.create () in 16 for _ = 1 to count do 17 Kcas.Xt.commit 18 { 19 tx = 20 (fun ~xt -> Kcas_data.Queue.Xt.add ~xt (Cstruct.create size) buffers); 21 } 22 done; 23 { 24 buffers; 25 buf_size = size; 26 total = count; 27 semaphore = Eio.Semaphore.make count; 28 } 29 30let acquire t = 31 Eio.Semaphore.acquire t.semaphore; 32 let buf_opt = 33 Kcas.Xt.commit 34 { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) } 35 in 36 match buf_opt with 37 | Some buf -> buf 38 | None -> 39 (* Should not happen if semaphore is properly synchronized, 40 but handle gracefully by allocating a new buffer *) 41 Cstruct.create t.buf_size 42 43let try_acquire t = 44 (* Check if semaphore has available permits without blocking *) 45 if Eio.Semaphore.get_value t.semaphore > 0 then begin 46 (* Race condition possible here - another fiber might acquire between 47 get_value and acquire. In that case, acquire will block briefly. 48 For truly non-blocking behavior, we'd need atomic CAS on semaphore. *) 49 Eio.Semaphore.acquire t.semaphore; 50 let buf_opt = 51 Kcas.Xt.commit 52 { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) } 53 in 54 match buf_opt with 55 | Some buf -> Some buf 56 | None -> Some (Cstruct.create t.buf_size) 57 end 58 else None 59 60let release t buf = 61 Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.add ~xt buf t.buffers) }; 62 Eio.Semaphore.release t.semaphore 63 64let with_buffer t f = 65 let buf = acquire t in 66 Fun.protect ~finally:(fun () -> release t buf) (fun () -> f buf) 67 68let available t = Eio.Semaphore.get_value t.semaphore 69let total t = t.total 70let size t = t.buf_size