this repo has no description
at main 52 lines 1.5 kB view raw
1(* Worker pool *) 2open Eio 3 4type request = { 5 description : string; 6 request : Bos.Cmd.t; 7 output_file : Fpath.t option; 8} 9 10type response = (Run.t, exn) result 11type resolver = response Eio.Promise.u 12 13type t = (request * resolver) Eio.Stream.t 14 15let stream : t = Eio.Stream.create 0 16 17let handle_job env request output_file = Run.run env request output_file 18 19exception Worker_failure of Run.t 20 21let rec run_worker env id : unit = 22 let { request; output_file; description }, reply = Eio.Stream.take stream in 23 Atomic.incr Stats.stats.processes; 24 Atomic.set Stats.stats.process_activity.(id) description; 25 (try 26 let result = handle_job env request output_file in 27 match result.status with 28 | `Exited 0 -> 29 Atomic.decr Stats.stats.processes; 30 Atomic.set Stats.stats.process_activity.(id) "idle"; 31 Promise.resolve reply (Ok result) 32 | _ -> Promise.resolve_error reply (Worker_failure result) 33 with e -> Promise.resolve_error reply e); 34 run_worker env id 35 36let submit description request output_file = 37 let reply, resolve_reply = Promise.create () in 38 Eio.Stream.add stream ({ description; request; output_file }, resolve_reply); 39 Promise.await reply 40 41let start_workers env sw n = 42 let spawn_worker name = 43 Fiber.fork_daemon ~sw (fun () -> 44 try 45 run_worker env name; 46 `Stop_daemon 47 with Stdlib.Exit -> `Stop_daemon) 48 in 49 for i = 0 to n - 1 do 50 spawn_worker i 51 done; 52 ()