this repo has no description
1(** Lock-free buffer pool using Kcas and Eio. 2 3 Provides pre-allocated buffers for zero-copy I/O operations. Uses 4 Kcas_data.Queue for lock-free buffer storage and Eio.Semaphore for blocking 5 acquire when pool is exhausted. *) 6 7type t = { 8 buffers : Cstruct.t Kcas_data.Queue.t; 9 buf_size : int; 10 total : int; 11 semaphore : Eio.Semaphore.t; 12} 13 14let create ~size ~count = 15 let buffers = Kcas_data.Queue.create () in 16 for _ = 1 to count do 17 Kcas.Xt.commit 18 { 19 tx = 20 (fun ~xt -> Kcas_data.Queue.Xt.add ~xt (Cstruct.create size) buffers); 21 } 22 done; 23 { 24 buffers; 25 buf_size = size; 26 total = count; 27 semaphore = Eio.Semaphore.make count; 28 } 29 30let acquire t = 31 Eio.Semaphore.acquire t.semaphore; 32 let buf_opt = 33 Kcas.Xt.commit 34 { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) } 35 in 36 match buf_opt with 37 | Some buf -> 38 Cstruct.memset buf 0; 39 buf 40 | None -> 41 (* Should not happen if semaphore is properly synchronized, 42 but handle gracefully by allocating a new buffer *) 43 Cstruct.create t.buf_size 44 45let try_acquire t = 46 (* Check if semaphore has available permits without blocking *) 47 if Eio.Semaphore.get_value t.semaphore > 0 then begin 48 (* Race condition possible here - another fiber might acquire between 49 get_value and acquire. In that case, acquire will block briefly. 50 For truly non-blocking behavior, we'd need atomic CAS on semaphore. *) 51 Eio.Semaphore.acquire t.semaphore; 52 let buf_opt = 53 Kcas.Xt.commit 54 { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) } 55 in 56 match buf_opt with 57 | Some buf -> 58 Cstruct.memset buf 0; 59 Some buf 60 | None -> Some (Cstruct.create t.buf_size) 61 end 62 else None 63 64let release t buf = 65 Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.add ~xt buf t.buffers) }; 66 Eio.Semaphore.release t.semaphore 67 68let with_buffer t f = 69 let buf = acquire t in 70 Fun.protect ~finally:(fun () -> release t buf) (fun () -> f buf) 71 72let available t = Eio.Semaphore.get_value t.semaphore 73let total t = t.total 74let size t = t.buf_size