The unpac monorepo manager self-hosting as a monorepo using unpac

eio_linux: refactor fixed buffer code

Instead of having separate Alloc, Alloc_or_wait and Free effects,
the scheduler now provides a single Get effect to return itself,
and the actual work is now done in the calling fiber. This is cleaner,
and seems to be slightly faster too.

Note that `alloc_fixed_or_wait` is currently not cancellable (it wasn't
before either, but it's more obvious now).

It would be possible to use DLS to store the scheduler rather than using
an effect. However, the improvement in speed is minimal and there are
some complications with sys-threads, so probably better to wait for
OCaml to support thread-local-storage first.

+34 -39
+1
lib_eio/core/eio__core.ml
··· 7 module Suspend = Suspend 8 module Cells = Cells 9 module Broadcast = Broadcast 10 module Trace = Trace 11 module Fiber_context = Cancel.Fiber_context 12 module Debug = Debug
··· 7 module Suspend = Suspend 8 module Cells = Cells 9 module Broadcast = Broadcast 10 + module Single_waiter = Single_waiter 11 module Trace = Trace 12 module Fiber_context = Cancel.Fiber_context 13 module Debug = Debug
+1
lib_eio/core/eio__core.mli
··· 606 607 module Cells = Cells 608 module Broadcast = Broadcast 609 610 (** Every fiber has an associated context. *) 611 module Fiber_context : sig
··· 606 607 module Cells = Cells 608 module Broadcast = Broadcast 609 + module Single_waiter = Single_waiter 610 611 (** Every fiber has an associated context. *) 612 module Fiber_context : sig
+26 -3
lib_eio_linux/low_level.ml
··· 207 raise @@ Err.wrap (Uring.error_of_errno res) "write" "" 208 ) 209 210 - let alloc_fixed () = Effect.perform Sched.Alloc 211 212 - let alloc_fixed_or_wait () = Effect.perform Sched.Alloc_or_wait 213 214 - let free_fixed buf = Effect.perform (Sched.Free buf) 215 216 let splice src ~dst ~len = 217 Fd.use_exn "splice-src" src @@ fun src ->
··· 207 raise @@ Err.wrap (Uring.error_of_errno res) "write" "" 208 ) 209 210 + let alloc_fixed () = 211 + let s = Sched.get () in 212 + match s.mem with 213 + | None -> None 214 + | Some mem -> 215 + match Uring.Region.alloc mem with 216 + | buf -> Some buf 217 + | exception Uring.Region.No_space -> None 218 219 + let alloc_fixed_or_wait () = 220 + let s = Sched.get () in 221 + match s.mem with 222 + | None -> failwith "No fixed buffer available" 223 + | Some mem -> 224 + match Uring.Region.alloc mem with 225 + | buf -> buf 226 + | exception Uring.Region.No_space -> 227 + let id = Eio.Private.Trace.mint_id () in 228 + let trigger = Eio.Private.Single_waiter.create () in 229 + Queue.push trigger s.mem_q; 230 + (* todo: remove protect; but needs to remove from queue on cancel *) 231 + Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id 232 233 + let free_fixed buf = 234 + let s = Sched.get () in 235 + match Queue.take_opt s.mem_q with 236 + | None -> Uring.Region.free buf 237 + | Some k -> Eio.Private.Single_waiter.wake k (Ok buf) 238 239 let splice src ~dst ~len = 240 Fd.use_exn "splice-src" src @@ fun src ->
+6 -36
lib_eio_linux/sched.ml
··· 50 uring: io_job Uring.t; 51 mem: Uring.Region.t option; 52 io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *) 53 - mem_q : Uring.Region.chunk Suspended.t Queue.t; 54 55 (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) 56 run_q : runnable Lf_queue.t; ··· 74 type _ Effect.t += 75 | Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t 76 | Cancel : io_job Uring.job -> unit Effect.t 77 - | Alloc : Uring.Region.chunk option Effect.t 78 - | Alloc_or_wait : Uring.Region.chunk Effect.t 79 - | Free : Uring.Region.chunk -> unit Effect.t 80 81 let wake_buffer = 82 let b = Bytes.create 8 in ··· 339 | _, Exactly len -> Suspended.continue action len 340 | n, Upto _ -> Suspended.continue action n 341 342 - let alloc_buf_or_wait st k = 343 - match st.mem with 344 - | None -> Suspended.discontinue k (Failure "No fixed buffer available") 345 - | Some mem -> 346 - match Uring.Region.alloc mem with 347 - | buf -> Suspended.continue k buf 348 - | exception Uring.Region.No_space -> 349 - Queue.push k st.mem_q; 350 - schedule st 351 - 352 - let free_buf st buf = 353 - match Queue.take_opt st.mem_q with 354 - | None -> Uring.Region.free buf 355 - | Some k -> enqueue_thread st k buf 356 - 357 let rec enqueue_poll_add fd poll_mask st action = 358 Trace.log "poll_add"; 359 let retry = with_cancel_hook ~action st (fun () -> ··· 411 Fiber_context.destroy fiber; 412 Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ()) 413 ); 414 - effc = fun (type a) (e : a Effect.t) -> 415 match e with 416 | Enter fn -> Some (fun k -> 417 match Fiber_context.get_error fiber with 418 | Some e -> discontinue k e ··· 466 let enqueue x = enqueue_thread st k (x, st.thread_pool) in 467 Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn; 468 schedule st 469 - ) 470 - | Alloc -> Some (fun k -> 471 - match st.mem with 472 - | None -> continue k None 473 - | Some mem -> 474 - match Uring.Region.alloc mem with 475 - | buf -> continue k (Some buf) 476 - | exception Uring.Region.No_space -> continue k None 477 - ) 478 - | Alloc_or_wait -> Some (fun k -> 479 - let k = { Suspended.k; fiber } in 480 - alloc_buf_or_wait st k 481 - ) 482 - | Free buf -> Some (fun k -> 483 - free_buf st buf; 484 - continue k () 485 ) 486 | e -> extra_effects.effc e 487 }
··· 50 uring: io_job Uring.t; 51 mem: Uring.Region.t option; 52 io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *) 53 + mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t; 54 55 (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) 56 run_q : runnable Lf_queue.t; ··· 74 type _ Effect.t += 75 | Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t 76 | Cancel : io_job Uring.job -> unit Effect.t 77 + | Get : t Effect.t 78 + 79 + let get () = Effect.perform Get 80 81 let wake_buffer = 82 let b = Bytes.create 8 in ··· 339 | _, Exactly len -> Suspended.continue action len 340 | n, Upto _ -> Suspended.continue action n 341 342 let rec enqueue_poll_add fd poll_mask st action = 343 Trace.log "poll_add"; 344 let retry = with_cancel_hook ~action st (fun () -> ··· 396 Fiber_context.destroy fiber; 397 Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ()) 398 ); 399 + effc = fun (type a) (e : a Effect.t) : ((a, _) continuation -> _) option -> 400 match e with 401 + | Get -> Some (fun k -> continue k st) 402 | Enter fn -> Some (fun k -> 403 match Fiber_context.get_error fiber with 404 | Some e -> discontinue k e ··· 452 let enqueue x = enqueue_thread st k (x, st.thread_pool) in 453 Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn; 454 schedule st 455 ) 456 | e -> extra_effects.effc e 457 }