this repo has no description
1(* Worker pool *)
2open Eio
3
4type request = {
5 description : string;
6 request : Bos.Cmd.t;
7 output_file : Fpath.t option;
8}
9
10type response = (Run.t, exn) result
11type resolver = response Eio.Promise.u
12
13type t = (request * resolver) Eio.Stream.t
14
15let stream : t = Eio.Stream.create 0
16
17let handle_job env request output_file = Run.run env request output_file
18
19exception Worker_failure of Run.t
20
21let rec run_worker env id : unit =
22 let { request; output_file; description }, reply = Eio.Stream.take stream in
23 Atomic.incr Stats.stats.processes;
24 Atomic.set Stats.stats.process_activity.(id) description;
25 (try
26 let result = handle_job env request output_file in
27 match result.status with
28 | `Exited 0 ->
29 Atomic.decr Stats.stats.processes;
30 Atomic.set Stats.stats.process_activity.(id) "idle";
31 Promise.resolve reply (Ok result)
32 | _ -> Promise.resolve_error reply (Worker_failure result)
33 with e -> Promise.resolve_error reply e);
34 run_worker env id
35
36let submit description request output_file =
37 let reply, resolve_reply = Promise.create () in
38 Eio.Stream.add stream ({ description; request; output_file }, resolve_reply);
39 Promise.await reply
40
41let start_workers env sw n =
42 let spawn_worker name =
43 Fiber.fork_daemon ~sw (fun () ->
44 try
45 run_worker env name;
46 `Stop_daemon
47 with Stdlib.Exit -> `Stop_daemon)
48 in
49 for i = 0 to n - 1 do
50 spawn_worker i
51 done;
52 ()