this repo has no description
1(* Worker pool *)
2open Eio
3
4type request = {
5 description : string;
6 request : Bos.Cmd.t;
7 output_file : Fpath.t option;
8}
9
10type response = (Run.t, exn) result
11type resolver = response Eio.Promise.u
12type t = (request * resolver) Eio.Stream.t
13
14let stream : t = Eio.Stream.create 0
15let handle_job env request output_file = Run.run env request output_file
16
17exception Worker_failure of Run.t
18
19let rec run_worker env id : unit =
20 let { request; output_file; description = _ }, reply =
21 Eio.Stream.take stream
22 in
23 (try
24 let result = handle_job env request output_file in
25 match result.status with
26 | `Exited 0 -> Promise.resolve reply (Ok result)
27 | _ -> Promise.resolve_error reply (Worker_failure result)
28 with e -> Promise.resolve_error reply e);
29 run_worker env id
30
31let submit description request output_file =
32 let reply, resolve_reply = Promise.create () in
33 Eio.Stream.add stream ({ description; request; output_file }, resolve_reply);
34 Promise.await reply
35
36let start_workers env sw n =
37 let spawn_worker name =
38 Fiber.fork_daemon ~sw (fun () ->
39 try
40 run_worker env name;
41 `Stop_daemon
42 with Stdlib.Exit -> `Stop_daemon)
43 in
44 for i = 0 to n - 1 do
45 spawn_worker i
46 done;
47 ()