this repo has no description
at main 47 lines 1.3 kB view raw
1(* Worker pool *) 2open Eio 3 4type request = { 5 description : string; 6 request : Bos.Cmd.t; 7 output_file : Fpath.t option; 8} 9 10type response = (Run.t, exn) result 11type resolver = response Eio.Promise.u 12type t = (request * resolver) Eio.Stream.t 13 14let stream : t = Eio.Stream.create 0 15let handle_job env request output_file = Run.run env request output_file 16 17exception Worker_failure of Run.t 18 19let rec run_worker env id : unit = 20 let { request; output_file; description = _ }, reply = 21 Eio.Stream.take stream 22 in 23 (try 24 let result = handle_job env request output_file in 25 match result.status with 26 | `Exited 0 -> Promise.resolve reply (Ok result) 27 | _ -> Promise.resolve_error reply (Worker_failure result) 28 with e -> Promise.resolve_error reply e); 29 run_worker env id 30 31let submit description request output_file = 32 let reply, resolve_reply = Promise.create () in 33 Eio.Stream.add stream ({ description; request; output_file }, resolve_reply); 34 Promise.await reply 35 36let start_workers env sw n = 37 let spawn_worker name = 38 Fiber.fork_daemon ~sw (fun () -> 39 try 40 run_worker env name; 41 `Stop_daemon 42 with Stdlib.Exit -> `Stop_daemon) 43 in 44 for i = 0 to n - 1 do 45 spawn_worker i 46 done; 47 ()