TCP/TLS connection pooling for Eio
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(** Conpool - Protocol-aware TCP/IP connection pooling library for Eio *)
7
8let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
9
10module Log = (val Logs.src_log src : Logs.LOG)
11
12(* Re-export submodules *)
13module Endpoint = Endpoint
14module Config = Config
15module Stats = Stats
16module Cmd = Cmd
17
18(* Track whether TLS tracing has been suppressed *)
19let tls_tracing_suppressed = ref false
20
21(* Suppress TLS tracing debug output (hexdumps) unless explicitly enabled *)
22let suppress_tls_tracing () =
23 if not !tls_tracing_suppressed then begin
24 tls_tracing_suppressed := true;
25 match List.find_opt (fun s -> Logs.Src.name s = "tls.tracing") (Logs.Src.list ()) with
26 | Some tls_src ->
27 (match Logs.Src.level tls_src with
28 | Some Logs.Debug -> Logs.Src.set_level tls_src (Some Logs.Warning)
29 | _ -> ())
30 | None -> ()
31 end
32
33(** {1 Error Types} *)
34
35type error =
36 | Dns_resolution_failed of { hostname : string }
37 | Connection_failed of {
38 endpoint : Endpoint.t;
39 attempts : int;
40 last_error : string;
41 }
42 | Connection_timeout of { endpoint : Endpoint.t; timeout : float }
43 | Invalid_config of string
44 | Invalid_endpoint of string
45
46let pp_error ppf = function
47 | Dns_resolution_failed { hostname } ->
48 Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname
49 | Connection_failed { endpoint; attempts; last_error } ->
50 Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp
51 endpoint attempts last_error
52 | Connection_timeout { endpoint; timeout } ->
53 Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint
54 timeout
55 | Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg
56 | Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg
57
58type Eio.Exn.err += E of error
59
60let err e = Eio.Exn.create (E e)
61
62let () =
63 Eio.Exn.register_pp (fun f -> function
64 | E e ->
65 Fmt.string f "Conpool ";
66 pp_error f e;
67 true
68 | _ -> false)
69
70(** {1 Connection Types} *)
71
72type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
73type connection = connection_ty Eio.Resource.t
74
75(** {1 Internal Types} *)
76
77(** Internal connection wrapper with protocol state and tracking. *)
78type 'state pooled_connection = {
79 pc_flow : connection;
80 pc_tls_flow : Tls_eio.t option;
81 pc_state : 'state;
82 pc_created_at : float;
83 mutable pc_last_used : float;
84 (** Last time this connection was used (for idle timeout). *)
85 mutable pc_use_count : int;
86 (** Number of times this connection has been used. *)
87 pc_endpoint : Endpoint.t;
88 mutable pc_active_users : int;
89 pc_user_available : Eio.Condition.t;
90 mutable pc_closed : bool;
91 pc_connection_cancel : exn -> unit;
92 (** Cancels the connection-lifetime switch, stopping any protocol fibers. *)
93}
94
95(** Statistics for an endpoint. *)
96type endp_stats = {
97 mutable active : int;
98 mutable idle : int;
99 (** Number of idle connections (active_users = 0). *)
100 mutable total_created : int;
101 mutable total_reused : int;
102 mutable total_closed : int;
103 mutable errors : int;
104 (** Number of connection errors encountered. *)
105}
106
107(** Endpoint pool storing connections. *)
108type 'state endpoint_pool = {
109 connections : 'state pooled_connection list ref;
110 ep_mutex : Eio.Mutex.t;
111 stats : endp_stats;
112 stats_mutex : Eio.Mutex.t;
113}
114
115(** Internal pool representation. *)
116type ('state, 'clock, 'net) internal = {
117 sw : Eio.Switch.t;
118 net : 'net;
119 clock : 'clock;
120 config : Config.t;
121 tls : Tls.Config.client option;
122 protocol : 'state Config.protocol_config;
123 endpoints : (Endpoint.t, 'state endpoint_pool) Hashtbl.t;
124 endpoints_mutex : Eio.Mutex.t;
125}
126
127(** {1 Public Types} *)
128
129type 'state t =
130 Pool : ('state, 'clock Eio.Time.clock, 'net Eio.Net.t) internal -> 'state t
131
132type 'state connection_info = {
133 flow : connection;
134 tls_epoch : Tls.Core.epoch_data option;
135 state : 'state;
136}
137
138(** {1 Default Protocol Handler}
139
140 For simple exclusive-access protocols (HTTP/1.x, Redis, etc.),
141 use unit state with no special initialization. *)
142
143let default_protocol : unit Config.protocol_config = {
144 Config.init_state = (fun ~sw:_ ~flow:_ ~tls_epoch:_ -> ());
145 on_acquire = (fun () -> ());
146 on_release = (fun () -> ());
147 is_healthy = (fun () -> true);
148 on_close = (fun () -> ());
149 access_mode = (fun () -> Config.Exclusive);
150}
151
152(** {1 Helper Functions} *)
153
154let get_time pool = Eio.Time.now pool.clock
155
156let create_endp_stats () = {
157 active = 0;
158 idle = 0;
159 total_created = 0;
160 total_reused = 0;
161 total_closed = 0;
162 errors = 0;
163}
164
165let snapshot_stats (stats : endp_stats) : Stats.t =
166 Stats.make ~active:stats.active ~idle:stats.idle
167 ~total_created:stats.total_created ~total_reused:stats.total_reused
168 ~total_closed:stats.total_closed ~errors:stats.errors
169
170(** {1 Connection Creation} *)
171
172let create_connection pool endpoint =
173 Log.debug (fun m -> m "Creating connection to %a" Endpoint.pp endpoint);
174
175 (* DNS resolution *)
176 let addr =
177 try
178 let addrs =
179 Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint)
180 ~service:(string_of_int (Endpoint.port endpoint))
181 in
182 match addrs with
183 | addr :: _ -> addr
184 | [] ->
185 raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
186 with Eio.Io _ as ex ->
187 let bt = Printexc.get_raw_backtrace () in
188 Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint
189 in
190
191 (* TCP connection with optional timeout *)
192 let socket =
193 try
194 match Config.connect_timeout pool.config with
195 | Some timeout ->
196 Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
197 Eio.Net.connect ~sw:pool.sw pool.net addr)
198 | None -> Eio.Net.connect ~sw:pool.sw pool.net addr
199 with Eio.Io _ as ex ->
200 let bt = Printexc.get_raw_backtrace () in
201 Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint
202 in
203
204 Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint);
205
206 (* Optional TLS handshake *)
207 let flow, tls_flow =
208 match pool.tls with
209 | None ->
210 ((socket :> connection), None)
211 | Some tls_config ->
212 try
213 Log.debug (fun m ->
214 m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
215 let host =
216 Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
217 in
218 let tls = Tls_eio.client_of_flow ~host tls_config socket in
219 suppress_tls_tracing ();
220 Log.info (fun m ->
221 m "TLS connection established to %a" Endpoint.pp endpoint);
222 ((tls :> connection), Some tls)
223 with Eio.Io _ as ex ->
224 let bt = Printexc.get_raw_backtrace () in
225 Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint
226 in
227
228 (* Get TLS epoch if available *)
229 let tls_epoch =
230 match tls_flow with
231 | Some tls_flow -> (
232 match Tls_eio.epoch tls_flow with
233 | Ok epoch -> Some epoch
234 | Error () -> None)
235 | None -> None
236 in
237
238 (* Create connection-lifetime sub-switch via a fiber.
239 This switch lives for the connection's lifetime and can be used
240 by the protocol handler to spawn long-running fibers (e.g., HTTP/2 reader). *)
241 let conn_sw_ref = ref None in
242 let conn_cancel_ref = ref (fun (_ : exn) -> ()) in
243 let ready_promise, ready_resolver = Eio.Promise.create () in
244
245 (* Use fork_daemon so connection fibers don't prevent parent switch from completing.
246 When the parent switch completes, all connection daemon fibers are cancelled,
247 which triggers cleanup of their inner switches and connection resources. *)
248 Eio.Fiber.fork_daemon ~sw:pool.sw (fun () ->
249 (try
250 Eio.Switch.run (fun conn_sw ->
251 conn_sw_ref := Some conn_sw;
252 conn_cancel_ref := (fun exn -> Eio.Switch.fail conn_sw exn);
253 (* Signal that the switch is ready *)
254 Eio.Promise.resolve ready_resolver ();
255 (* Block until the switch is cancelled *)
256 Eio.Fiber.await_cancel ()
257 )
258 with
259 | Eio.Cancel.Cancelled _ -> ()
260 | exn ->
261 Log.warn (fun m -> m "Connection fiber caught exception: %s" (Printexc.to_string exn)));
262 `Stop_daemon
263 );
264
265 (* Wait for the switch to be created *)
266 Eio.Promise.await ready_promise;
267 let conn_sw = Option.get !conn_sw_ref in
268 let conn_cancel = !conn_cancel_ref in
269
270 (* Initialize protocol-specific state with connection switch *)
271 Log.debug (fun m -> m "Initializing protocol state for %a" Endpoint.pp endpoint);
272 let state = pool.protocol.init_state ~sw:conn_sw ~flow ~tls_epoch in
273
274 let now = get_time pool in
275
276 Log.info (fun m -> m "Created connection to %a" Endpoint.pp endpoint);
277
278 {
279 pc_flow = flow;
280 pc_tls_flow = tls_flow;
281 pc_state = state;
282 pc_created_at = now;
283 pc_last_used = now;
284 pc_use_count = 0;
285 pc_endpoint = endpoint;
286 pc_active_users = 0;
287 pc_user_available = Eio.Condition.create ();
288 pc_closed = false;
289 pc_connection_cancel = conn_cancel;
290 }
291
292(** {1 Connection Health Checking} *)
293
294(** Health check result distinguishing errors from normal lifecycle. *)
295type health_status =
296 | Healthy
297 | Unhealthy_error of string
298 (** Connection failed due to an error (protocol failure, etc.) *)
299 | Unhealthy_lifecycle of string
300 (** Connection should close due to normal lifecycle (timeout, max uses, etc.) *)
301
302let check_health pool conn =
303 if conn.pc_closed then
304 Unhealthy_lifecycle "already closed"
305 else
306 (* Check protocol-specific health *)
307 let protocol_healthy = pool.protocol.is_healthy conn.pc_state in
308 if not protocol_healthy then begin
309 Log.debug (fun m -> m "Connection unhealthy: protocol check failed");
310 Unhealthy_error "protocol check failed"
311 end else
312 let now = get_time pool in
313 (* Check connection age *)
314 let age = now -. conn.pc_created_at in
315 let max_lifetime = Config.max_connection_lifetime pool.config in
316 if age > max_lifetime then begin
317 Log.debug (fun m -> m "Connection unhealthy: exceeded max lifetime (%.1fs > %.1fs)"
318 age max_lifetime);
319 Unhealthy_lifecycle "exceeded max lifetime"
320 end else
321 (* Check idle time - only for idle connections *)
322 let idle_time = now -. conn.pc_last_used in
323 let max_idle = Config.max_idle_time pool.config in
324 if conn.pc_active_users = 0 && idle_time > max_idle then begin
325 Log.debug (fun m -> m "Connection unhealthy: exceeded max idle time (%.1fs > %.1fs)"
326 idle_time max_idle);
327 Unhealthy_lifecycle "exceeded max idle time"
328 end else
329 (* Check use count *)
330 match Config.max_connection_uses pool.config with
331 | Some max_uses when conn.pc_use_count >= max_uses ->
332 Log.debug (fun m -> m "Connection unhealthy: exceeded max uses (%d >= %d)"
333 conn.pc_use_count max_uses);
334 Unhealthy_lifecycle "exceeded max uses"
335 | _ ->
336 Healthy
337
338let is_healthy pool conn =
339 match check_health pool conn with
340 | Healthy -> true
341 | Unhealthy_error _ | Unhealthy_lifecycle _ -> false
342
343(** {1 Connection Cleanup} *)
344
345let close_connection pool conn =
346 if not conn.pc_closed then begin
347 conn.pc_closed <- true;
348 Log.debug (fun m ->
349 m "Closing connection to %a" Endpoint.pp conn.pc_endpoint);
350
351 (* Cancel connection-lifetime switch first - this stops any protocol fibers *)
352 (try conn.pc_connection_cancel (Failure "Connection closed")
353 with _ -> ());
354
355 (* Call protocol cleanup *)
356 pool.protocol.on_close conn.pc_state;
357
358 (* Close the underlying flow *)
359 Eio.Cancel.protect (fun () ->
360 try Eio.Flow.close conn.pc_flow with _ -> ())
361 end
362
363(** {1 Endpoint Pool Management} *)
364
365let get_or_create_endpoint_pool pool endpoint =
366 match
367 Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
368 Hashtbl.find_opt pool.endpoints endpoint)
369 with
370 | Some ep_pool -> ep_pool
371 | None ->
372 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
373 match Hashtbl.find_opt pool.endpoints endpoint with
374 | Some ep_pool -> ep_pool
375 | None ->
376 Log.info (fun m ->
377 m "Creating endpoint pool for %a" Endpoint.pp endpoint);
378 let ep_pool = {
379 connections = ref [];
380 ep_mutex = Eio.Mutex.create ();
381 stats = create_endp_stats ();
382 stats_mutex = Eio.Mutex.create ();
383 } in
384 Hashtbl.add pool.endpoints endpoint ep_pool;
385 ep_pool)
386
387(** {1 Connection Acquisition} *)
388
389let rec acquire_connection pool ep_pool endpoint =
390 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
391 (* Find an existing healthy connection with available capacity *)
392 let rec find_available = function
393 | [] -> None
394 | conn :: rest ->
395 if not (is_healthy pool conn) then begin
396 conn.pc_closed <- true;
397 find_available rest
398 end else begin
399 match pool.protocol.access_mode conn.pc_state with
400 | Config.Exclusive ->
401 if conn.pc_active_users = 0 then
402 Some conn
403 else
404 find_available rest
405 | Config.Shared max_concurrent ->
406 if conn.pc_active_users < max_concurrent then
407 Some conn
408 else
409 find_available rest
410 end
411 in
412
413 (* Clean up closed connections *)
414 ep_pool.connections := List.filter (fun c -> not c.pc_closed) !(ep_pool.connections);
415
416 match find_available !(ep_pool.connections) with
417 | Some conn ->
418 (* Reuse existing connection *)
419 let was_idle = conn.pc_active_users = 0 in
420 conn.pc_active_users <- conn.pc_active_users + 1;
421 conn.pc_last_used <- get_time pool;
422 conn.pc_use_count <- conn.pc_use_count + 1;
423
424 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
425 ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
426 ep_pool.stats.active <- ep_pool.stats.active + 1;
427 (* Decrement idle count when connection becomes active *)
428 if was_idle then
429 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
430
431 Log.debug (fun m ->
432 m "Reusing connection to %a (users=%d)"
433 Endpoint.pp endpoint conn.pc_active_users);
434
435 (* Notify protocol handler of acquisition *)
436 pool.protocol.on_acquire conn.pc_state;
437 conn
438
439 | None ->
440 (* Need to create a new connection *)
441 let max_conns = Config.max_connections_per_endpoint pool.config in
442 let current_conns = List.length !(ep_pool.connections) in
443
444 if current_conns >= max_conns then begin
445 (* Wait for a connection to become available *)
446 Log.debug (fun m ->
447 m "At connection limit for %a (%d), waiting..."
448 Endpoint.pp endpoint max_conns);
449
450 (* Find a connection to wait on (prefer shared mode) *)
451 let wait_conn = List.find_opt (fun c ->
452 match pool.protocol.access_mode c.pc_state with
453 | Config.Shared _ -> true
454 | Config.Exclusive -> false
455 ) !(ep_pool.connections) in
456
457 match wait_conn with
458 | Some conn ->
459 (* Wait for user slot *)
460 while conn.pc_active_users >=
461 (match pool.protocol.access_mode conn.pc_state with
462 | Config.Shared n -> n
463 | Config.Exclusive -> 1)
464 && not conn.pc_closed do
465 Eio.Condition.await_no_mutex conn.pc_user_available
466 done;
467 if conn.pc_closed then
468 acquire_connection pool ep_pool endpoint
469 else begin
470 conn.pc_active_users <- conn.pc_active_users + 1;
471 conn.pc_last_used <- get_time pool;
472 conn.pc_use_count <- conn.pc_use_count + 1;
473
474 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
475 ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
476 ep_pool.stats.active <- ep_pool.stats.active + 1);
477
478 (* Notify protocol handler of acquisition *)
479 pool.protocol.on_acquire conn.pc_state;
480 conn
481 end
482 | None ->
483 (* All connections are exclusive and in use - wait for any *)
484 let any_conn = List.hd !(ep_pool.connections) in
485 while any_conn.pc_active_users > 0 && not any_conn.pc_closed do
486 Eio.Condition.await_no_mutex any_conn.pc_user_available
487 done;
488 if any_conn.pc_closed then
489 acquire_connection pool ep_pool endpoint
490 else begin
491 (* Connection was idle (active_users = 0), now becoming active *)
492 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
493 ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
494 ep_pool.stats.active <- ep_pool.stats.active + 1;
495 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
496 any_conn.pc_active_users <- 1;
497 any_conn.pc_last_used <- get_time pool;
498 any_conn.pc_use_count <- any_conn.pc_use_count + 1;
499 (* Notify protocol handler of acquisition *)
500 pool.protocol.on_acquire any_conn.pc_state;
501 any_conn
502 end
503 end else begin
504 (* Create new connection *)
505 let conn = create_connection pool endpoint in
506 conn.pc_active_users <- 1;
507 ep_pool.connections := conn :: !(ep_pool.connections);
508
509 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
510 ep_pool.stats.total_created <- ep_pool.stats.total_created + 1;
511 ep_pool.stats.active <- ep_pool.stats.active + 1);
512
513 Log.info (fun m ->
514 m "Created new connection to %a (total=%d)"
515 Endpoint.pp endpoint (List.length !(ep_pool.connections)));
516
517 (* Notify protocol handler of acquisition *)
518 pool.protocol.on_acquire conn.pc_state;
519 conn
520 end)
521
522(** {1 Connection Release} *)
523
524let release_connection pool ep_pool conn =
525 (* Notify protocol handler of release *)
526 pool.protocol.on_release conn.pc_state;
527
528 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
529 let was_active = conn.pc_active_users > 0 in
530 conn.pc_active_users <- max 0 (conn.pc_active_users - 1);
531 let now_idle = conn.pc_active_users = 0 in
532
533 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
534 ep_pool.stats.active <- max 0 (ep_pool.stats.active - 1);
535 (* Track idle count: increment when connection becomes idle *)
536 if was_active && now_idle then
537 ep_pool.stats.idle <- ep_pool.stats.idle + 1);
538
539 (* Signal waiting fibers *)
540 Eio.Condition.broadcast conn.pc_user_available;
541
542 Log.debug (fun m ->
543 m "Released connection to %a (users=%d)"
544 Endpoint.pp conn.pc_endpoint conn.pc_active_users);
545
546 (* Check if connection should be closed *)
547 match check_health pool conn with
548 | Healthy -> ()
549 | Unhealthy_error reason ->
550 conn.pc_closed <- true;
551
552 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
553 ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1;
554 ep_pool.stats.errors <- ep_pool.stats.errors + 1;
555 if now_idle then
556 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
557
558 Log.warn (fun m -> m "Closing connection due to error: %s" reason);
559 close_connection pool conn;
560 ep_pool.connections := List.filter (fun c -> c != conn) !(ep_pool.connections)
561
562 | Unhealthy_lifecycle reason ->
563 conn.pc_closed <- true;
564
565 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
566 ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1;
567 if now_idle then
568 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
569
570 Log.debug (fun m -> m "Closing connection due to lifecycle: %s" reason);
571 close_connection pool conn;
572 ep_pool.connections := List.filter (fun c -> c != conn) !(ep_pool.connections))
573
574(** {1 Public API} *)
575
576let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock)
577 ?tls ?(config = Config.default) ~protocol () =
578
579 Log.info (fun m ->
580 m "Creating connection pool (max_per_endpoint=%d)"
581 (Config.max_connections_per_endpoint config));
582
583 let pool = {
584 sw;
585 net;
586 clock;
587 config;
588 tls;
589 protocol;
590 endpoints = Hashtbl.create 16;
591 endpoints_mutex = Eio.Mutex.create ();
592 } in
593
594 (* Auto-cleanup on switch release *)
595 Eio.Switch.on_release sw (fun () ->
596 Eio.Cancel.protect (fun () ->
597 Log.info (fun m -> m "Closing connection pool");
598 Hashtbl.iter (fun _endpoint ep_pool ->
599 List.iter (fun conn ->
600 close_connection pool conn
601 ) !(ep_pool.connections)
602 ) pool.endpoints;
603 Hashtbl.clear pool.endpoints));
604
605 Pool pool
606
607let create_basic ~sw ~net ~clock ?tls ?config () =
608 create ~sw ~net ~clock ?tls ?config ~protocol:default_protocol ()
609
610let connection ~sw (Pool pool) endpoint =
611 Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
612
613 let ep_pool = get_or_create_endpoint_pool pool endpoint in
614 let conn = acquire_connection pool ep_pool endpoint in
615
616 (* Release connection when switch ends *)
617 Eio.Switch.on_release sw (fun () ->
618 release_connection pool ep_pool conn);
619
620 (* Get TLS epoch if available *)
621 let tls_epoch =
622 match conn.pc_tls_flow with
623 | Some tls_flow -> (
624 match Tls_eio.epoch tls_flow with
625 | Ok epoch -> Some epoch
626 | Error () -> None)
627 | None -> None
628 in
629
630 {
631 flow = conn.pc_flow;
632 tls_epoch;
633 state = conn.pc_state;
634 }
635
636let with_connection pool endpoint f =
637 Eio.Switch.run (fun sw -> f (connection ~sw pool endpoint))
638
639let stats (Pool pool) endpoint =
640 match Hashtbl.find_opt pool.endpoints endpoint with
641 | Some ep_pool ->
642 Eio.Mutex.use_ro ep_pool.stats_mutex (fun () -> snapshot_stats ep_pool.stats)
643 | None ->
644 Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0
645 ~total_closed:0 ~errors:0
646
647let all_stats (Pool pool) =
648 Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
649 Hashtbl.fold
650 (fun endpoint ep_pool acc ->
651 let stats =
652 Eio.Mutex.use_ro ep_pool.stats_mutex (fun () ->
653 snapshot_stats ep_pool.stats)
654 in
655 (endpoint, stats) :: acc)
656 pool.endpoints [])
657
658let clear_endpoint (Pool pool) endpoint =
659 Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint);
660 match Hashtbl.find_opt pool.endpoints endpoint with
661 | Some ep_pool ->
662 Eio.Cancel.protect (fun () ->
663 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
664 List.iter (fun conn ->
665 close_connection pool conn
666 ) !(ep_pool.connections);
667 ep_pool.connections := []);
668 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
669 Hashtbl.remove pool.endpoints endpoint))
670 | None ->
671 Log.debug (fun m ->
672 m "No endpoint pool found for %a" Endpoint.pp endpoint)