(** Lock-free buffer pool using Kcas and Eio. Provides pre-allocated buffers for zero-copy I/O operations. Uses Kcas_data.Queue for lock-free buffer storage and Eio.Semaphore for blocking acquire when pool is exhausted. *) type t = { buffers : Cstruct.t Kcas_data.Queue.t; buf_size : int; total : int; semaphore : Eio.Semaphore.t; } let create ~size ~count = let buffers = Kcas_data.Queue.create () in for _ = 1 to count do Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.add ~xt (Cstruct.create size) buffers); } done; { buffers; buf_size = size; total = count; semaphore = Eio.Semaphore.make count; } let acquire t = Eio.Semaphore.acquire t.semaphore; let buf_opt = Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) } in match buf_opt with | Some buf -> Cstruct.memset buf 0; buf | None -> (* Should not happen if semaphore is properly synchronized, but handle gracefully by allocating a new buffer *) Cstruct.create t.buf_size let try_acquire t = (* Check if semaphore has available permits without blocking *) if Eio.Semaphore.get_value t.semaphore > 0 then begin (* Race condition possible here - another fiber might acquire between get_value and acquire. In that case, acquire will block briefly. For truly non-blocking behavior, we'd need atomic CAS on semaphore. *) Eio.Semaphore.acquire t.semaphore; let buf_opt = Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) } in match buf_opt with | Some buf -> Cstruct.memset buf 0; Some buf | None -> Some (Cstruct.create t.buf_size) end else None let release t buf = Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.add ~xt buf t.buffers) }; Eio.Semaphore.release t.semaphore let with_buffer t f = let buf = acquire t in Fun.protect ~finally:(fun () -> release t buf) (fun () -> f buf) let available t = Eio.Semaphore.get_value t.semaphore let total t = t.total let size t = t.buf_size