TCP/TLS connection pooling for Eio
at main 672 lines 24 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(** Conpool - Protocol-aware TCP/IP connection pooling library for Eio *) 7 8let src = Logs.Src.create "conpool" ~doc:"Connection pooling library" 9 10module Log = (val Logs.src_log src : Logs.LOG) 11 12(* Re-export submodules *) 13module Endpoint = Endpoint 14module Config = Config 15module Stats = Stats 16module Cmd = Cmd 17 18(* Track whether TLS tracing has been suppressed *) 19let tls_tracing_suppressed = ref false 20 21(* Suppress TLS tracing debug output (hexdumps) unless explicitly enabled *) 22let suppress_tls_tracing () = 23 if not !tls_tracing_suppressed then begin 24 tls_tracing_suppressed := true; 25 match List.find_opt (fun s -> Logs.Src.name s = "tls.tracing") (Logs.Src.list ()) with 26 | Some tls_src -> 27 (match Logs.Src.level tls_src with 28 | Some Logs.Debug -> Logs.Src.set_level tls_src (Some Logs.Warning) 29 | _ -> ()) 30 | None -> () 31 end 32 33(** {1 Error Types} *) 34 35type error = 36 | Dns_resolution_failed of { hostname : string } 37 | Connection_failed of { 38 endpoint : Endpoint.t; 39 attempts : int; 40 last_error : string; 41 } 42 | Connection_timeout of { endpoint : Endpoint.t; timeout : float } 43 | Invalid_config of string 44 | Invalid_endpoint of string 45 46let pp_error ppf = function 47 | Dns_resolution_failed { hostname } -> 48 Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname 49 | Connection_failed { endpoint; attempts; last_error } -> 50 Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp 51 endpoint attempts last_error 52 | Connection_timeout { endpoint; timeout } -> 53 Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint 54 timeout 55 | Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg 56 | Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg 57 58type Eio.Exn.err += E of error 59 60let err e = Eio.Exn.create (E e) 61 62let () = 63 Eio.Exn.register_pp (fun f -> function 64 | E e -> 65 Fmt.string f "Conpool "; 66 pp_error f e; 67 true 68 | _ -> false) 69 70(** {1 Connection Types} *) 71 72type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] 73type connection = connection_ty Eio.Resource.t 74 75(** {1 Internal Types} *) 76 77(** Internal connection wrapper with protocol state and tracking. *) 78type 'state pooled_connection = { 79 pc_flow : connection; 80 pc_tls_flow : Tls_eio.t option; 81 pc_state : 'state; 82 pc_created_at : float; 83 mutable pc_last_used : float; 84 (** Last time this connection was used (for idle timeout). *) 85 mutable pc_use_count : int; 86 (** Number of times this connection has been used. *) 87 pc_endpoint : Endpoint.t; 88 mutable pc_active_users : int; 89 pc_user_available : Eio.Condition.t; 90 mutable pc_closed : bool; 91 pc_connection_cancel : exn -> unit; 92 (** Cancels the connection-lifetime switch, stopping any protocol fibers. *) 93} 94 95(** Statistics for an endpoint. *) 96type endp_stats = { 97 mutable active : int; 98 mutable idle : int; 99 (** Number of idle connections (active_users = 0). *) 100 mutable total_created : int; 101 mutable total_reused : int; 102 mutable total_closed : int; 103 mutable errors : int; 104 (** Number of connection errors encountered. *) 105} 106 107(** Endpoint pool storing connections. *) 108type 'state endpoint_pool = { 109 connections : 'state pooled_connection list ref; 110 ep_mutex : Eio.Mutex.t; 111 stats : endp_stats; 112 stats_mutex : Eio.Mutex.t; 113} 114 115(** Internal pool representation. *) 116type ('state, 'clock, 'net) internal = { 117 sw : Eio.Switch.t; 118 net : 'net; 119 clock : 'clock; 120 config : Config.t; 121 tls : Tls.Config.client option; 122 protocol : 'state Config.protocol_config; 123 endpoints : (Endpoint.t, 'state endpoint_pool) Hashtbl.t; 124 endpoints_mutex : Eio.Mutex.t; 125} 126 127(** {1 Public Types} *) 128 129type 'state t = 130 Pool : ('state, 'clock Eio.Time.clock, 'net Eio.Net.t) internal -> 'state t 131 132type 'state connection_info = { 133 flow : connection; 134 tls_epoch : Tls.Core.epoch_data option; 135 state : 'state; 136} 137 138(** {1 Default Protocol Handler} 139 140 For simple exclusive-access protocols (HTTP/1.x, Redis, etc.), 141 use unit state with no special initialization. *) 142 143let default_protocol : unit Config.protocol_config = { 144 Config.init_state = (fun ~sw:_ ~flow:_ ~tls_epoch:_ -> ()); 145 on_acquire = (fun () -> ()); 146 on_release = (fun () -> ()); 147 is_healthy = (fun () -> true); 148 on_close = (fun () -> ()); 149 access_mode = (fun () -> Config.Exclusive); 150} 151 152(** {1 Helper Functions} *) 153 154let get_time pool = Eio.Time.now pool.clock 155 156let create_endp_stats () = { 157 active = 0; 158 idle = 0; 159 total_created = 0; 160 total_reused = 0; 161 total_closed = 0; 162 errors = 0; 163} 164 165let snapshot_stats (stats : endp_stats) : Stats.t = 166 Stats.make ~active:stats.active ~idle:stats.idle 167 ~total_created:stats.total_created ~total_reused:stats.total_reused 168 ~total_closed:stats.total_closed ~errors:stats.errors 169 170(** {1 Connection Creation} *) 171 172let create_connection pool endpoint = 173 Log.debug (fun m -> m "Creating connection to %a" Endpoint.pp endpoint); 174 175 (* DNS resolution *) 176 let addr = 177 try 178 let addrs = 179 Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) 180 ~service:(string_of_int (Endpoint.port endpoint)) 181 in 182 match addrs with 183 | addr :: _ -> addr 184 | [] -> 185 raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint })) 186 with Eio.Io _ as ex -> 187 let bt = Printexc.get_raw_backtrace () in 188 Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint 189 in 190 191 (* TCP connection with optional timeout *) 192 let socket = 193 try 194 match Config.connect_timeout pool.config with 195 | Some timeout -> 196 Eio.Time.with_timeout_exn pool.clock timeout (fun () -> 197 Eio.Net.connect ~sw:pool.sw pool.net addr) 198 | None -> Eio.Net.connect ~sw:pool.sw pool.net addr 199 with Eio.Io _ as ex -> 200 let bt = Printexc.get_raw_backtrace () in 201 Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint 202 in 203 204 Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint); 205 206 (* Optional TLS handshake *) 207 let flow, tls_flow = 208 match pool.tls with 209 | None -> 210 ((socket :> connection), None) 211 | Some tls_config -> 212 try 213 Log.debug (fun m -> 214 m "Initiating TLS handshake with %a" Endpoint.pp endpoint); 215 let host = 216 Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint))) 217 in 218 let tls = Tls_eio.client_of_flow ~host tls_config socket in 219 suppress_tls_tracing (); 220 Log.info (fun m -> 221 m "TLS connection established to %a" Endpoint.pp endpoint); 222 ((tls :> connection), Some tls) 223 with Eio.Io _ as ex -> 224 let bt = Printexc.get_raw_backtrace () in 225 Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint 226 in 227 228 (* Get TLS epoch if available *) 229 let tls_epoch = 230 match tls_flow with 231 | Some tls_flow -> ( 232 match Tls_eio.epoch tls_flow with 233 | Ok epoch -> Some epoch 234 | Error () -> None) 235 | None -> None 236 in 237 238 (* Create connection-lifetime sub-switch via a fiber. 239 This switch lives for the connection's lifetime and can be used 240 by the protocol handler to spawn long-running fibers (e.g., HTTP/2 reader). *) 241 let conn_sw_ref = ref None in 242 let conn_cancel_ref = ref (fun (_ : exn) -> ()) in 243 let ready_promise, ready_resolver = Eio.Promise.create () in 244 245 (* Use fork_daemon so connection fibers don't prevent parent switch from completing. 246 When the parent switch completes, all connection daemon fibers are cancelled, 247 which triggers cleanup of their inner switches and connection resources. *) 248 Eio.Fiber.fork_daemon ~sw:pool.sw (fun () -> 249 (try 250 Eio.Switch.run (fun conn_sw -> 251 conn_sw_ref := Some conn_sw; 252 conn_cancel_ref := (fun exn -> Eio.Switch.fail conn_sw exn); 253 (* Signal that the switch is ready *) 254 Eio.Promise.resolve ready_resolver (); 255 (* Block until the switch is cancelled *) 256 Eio.Fiber.await_cancel () 257 ) 258 with 259 | Eio.Cancel.Cancelled _ -> () 260 | exn -> 261 Log.warn (fun m -> m "Connection fiber caught exception: %s" (Printexc.to_string exn))); 262 `Stop_daemon 263 ); 264 265 (* Wait for the switch to be created *) 266 Eio.Promise.await ready_promise; 267 let conn_sw = Option.get !conn_sw_ref in 268 let conn_cancel = !conn_cancel_ref in 269 270 (* Initialize protocol-specific state with connection switch *) 271 Log.debug (fun m -> m "Initializing protocol state for %a" Endpoint.pp endpoint); 272 let state = pool.protocol.init_state ~sw:conn_sw ~flow ~tls_epoch in 273 274 let now = get_time pool in 275 276 Log.info (fun m -> m "Created connection to %a" Endpoint.pp endpoint); 277 278 { 279 pc_flow = flow; 280 pc_tls_flow = tls_flow; 281 pc_state = state; 282 pc_created_at = now; 283 pc_last_used = now; 284 pc_use_count = 0; 285 pc_endpoint = endpoint; 286 pc_active_users = 0; 287 pc_user_available = Eio.Condition.create (); 288 pc_closed = false; 289 pc_connection_cancel = conn_cancel; 290 } 291 292(** {1 Connection Health Checking} *) 293 294(** Health check result distinguishing errors from normal lifecycle. *) 295type health_status = 296 | Healthy 297 | Unhealthy_error of string 298 (** Connection failed due to an error (protocol failure, etc.) *) 299 | Unhealthy_lifecycle of string 300 (** Connection should close due to normal lifecycle (timeout, max uses, etc.) *) 301 302let check_health pool conn = 303 if conn.pc_closed then 304 Unhealthy_lifecycle "already closed" 305 else 306 (* Check protocol-specific health *) 307 let protocol_healthy = pool.protocol.is_healthy conn.pc_state in 308 if not protocol_healthy then begin 309 Log.debug (fun m -> m "Connection unhealthy: protocol check failed"); 310 Unhealthy_error "protocol check failed" 311 end else 312 let now = get_time pool in 313 (* Check connection age *) 314 let age = now -. conn.pc_created_at in 315 let max_lifetime = Config.max_connection_lifetime pool.config in 316 if age > max_lifetime then begin 317 Log.debug (fun m -> m "Connection unhealthy: exceeded max lifetime (%.1fs > %.1fs)" 318 age max_lifetime); 319 Unhealthy_lifecycle "exceeded max lifetime" 320 end else 321 (* Check idle time - only for idle connections *) 322 let idle_time = now -. conn.pc_last_used in 323 let max_idle = Config.max_idle_time pool.config in 324 if conn.pc_active_users = 0 && idle_time > max_idle then begin 325 Log.debug (fun m -> m "Connection unhealthy: exceeded max idle time (%.1fs > %.1fs)" 326 idle_time max_idle); 327 Unhealthy_lifecycle "exceeded max idle time" 328 end else 329 (* Check use count *) 330 match Config.max_connection_uses pool.config with 331 | Some max_uses when conn.pc_use_count >= max_uses -> 332 Log.debug (fun m -> m "Connection unhealthy: exceeded max uses (%d >= %d)" 333 conn.pc_use_count max_uses); 334 Unhealthy_lifecycle "exceeded max uses" 335 | _ -> 336 Healthy 337 338let is_healthy pool conn = 339 match check_health pool conn with 340 | Healthy -> true 341 | Unhealthy_error _ | Unhealthy_lifecycle _ -> false 342 343(** {1 Connection Cleanup} *) 344 345let close_connection pool conn = 346 if not conn.pc_closed then begin 347 conn.pc_closed <- true; 348 Log.debug (fun m -> 349 m "Closing connection to %a" Endpoint.pp conn.pc_endpoint); 350 351 (* Cancel connection-lifetime switch first - this stops any protocol fibers *) 352 (try conn.pc_connection_cancel (Failure "Connection closed") 353 with _ -> ()); 354 355 (* Call protocol cleanup *) 356 pool.protocol.on_close conn.pc_state; 357 358 (* Close the underlying flow *) 359 Eio.Cancel.protect (fun () -> 360 try Eio.Flow.close conn.pc_flow with _ -> ()) 361 end 362 363(** {1 Endpoint Pool Management} *) 364 365let get_or_create_endpoint_pool pool endpoint = 366 match 367 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 368 Hashtbl.find_opt pool.endpoints endpoint) 369 with 370 | Some ep_pool -> ep_pool 371 | None -> 372 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 373 match Hashtbl.find_opt pool.endpoints endpoint with 374 | Some ep_pool -> ep_pool 375 | None -> 376 Log.info (fun m -> 377 m "Creating endpoint pool for %a" Endpoint.pp endpoint); 378 let ep_pool = { 379 connections = ref []; 380 ep_mutex = Eio.Mutex.create (); 381 stats = create_endp_stats (); 382 stats_mutex = Eio.Mutex.create (); 383 } in 384 Hashtbl.add pool.endpoints endpoint ep_pool; 385 ep_pool) 386 387(** {1 Connection Acquisition} *) 388 389let rec acquire_connection pool ep_pool endpoint = 390 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () -> 391 (* Find an existing healthy connection with available capacity *) 392 let rec find_available = function 393 | [] -> None 394 | conn :: rest -> 395 if not (is_healthy pool conn) then begin 396 conn.pc_closed <- true; 397 find_available rest 398 end else begin 399 match pool.protocol.access_mode conn.pc_state with 400 | Config.Exclusive -> 401 if conn.pc_active_users = 0 then 402 Some conn 403 else 404 find_available rest 405 | Config.Shared max_concurrent -> 406 if conn.pc_active_users < max_concurrent then 407 Some conn 408 else 409 find_available rest 410 end 411 in 412 413 (* Clean up closed connections *) 414 ep_pool.connections := List.filter (fun c -> not c.pc_closed) !(ep_pool.connections); 415 416 match find_available !(ep_pool.connections) with 417 | Some conn -> 418 (* Reuse existing connection *) 419 let was_idle = conn.pc_active_users = 0 in 420 conn.pc_active_users <- conn.pc_active_users + 1; 421 conn.pc_last_used <- get_time pool; 422 conn.pc_use_count <- conn.pc_use_count + 1; 423 424 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 425 ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1; 426 ep_pool.stats.active <- ep_pool.stats.active + 1; 427 (* Decrement idle count when connection becomes active *) 428 if was_idle then 429 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 430 431 Log.debug (fun m -> 432 m "Reusing connection to %a (users=%d)" 433 Endpoint.pp endpoint conn.pc_active_users); 434 435 (* Notify protocol handler of acquisition *) 436 pool.protocol.on_acquire conn.pc_state; 437 conn 438 439 | None -> 440 (* Need to create a new connection *) 441 let max_conns = Config.max_connections_per_endpoint pool.config in 442 let current_conns = List.length !(ep_pool.connections) in 443 444 if current_conns >= max_conns then begin 445 (* Wait for a connection to become available *) 446 Log.debug (fun m -> 447 m "At connection limit for %a (%d), waiting..." 448 Endpoint.pp endpoint max_conns); 449 450 (* Find a connection to wait on (prefer shared mode) *) 451 let wait_conn = List.find_opt (fun c -> 452 match pool.protocol.access_mode c.pc_state with 453 | Config.Shared _ -> true 454 | Config.Exclusive -> false 455 ) !(ep_pool.connections) in 456 457 match wait_conn with 458 | Some conn -> 459 (* Wait for user slot *) 460 while conn.pc_active_users >= 461 (match pool.protocol.access_mode conn.pc_state with 462 | Config.Shared n -> n 463 | Config.Exclusive -> 1) 464 && not conn.pc_closed do 465 Eio.Condition.await_no_mutex conn.pc_user_available 466 done; 467 if conn.pc_closed then 468 acquire_connection pool ep_pool endpoint 469 else begin 470 conn.pc_active_users <- conn.pc_active_users + 1; 471 conn.pc_last_used <- get_time pool; 472 conn.pc_use_count <- conn.pc_use_count + 1; 473 474 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 475 ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1; 476 ep_pool.stats.active <- ep_pool.stats.active + 1); 477 478 (* Notify protocol handler of acquisition *) 479 pool.protocol.on_acquire conn.pc_state; 480 conn 481 end 482 | None -> 483 (* All connections are exclusive and in use - wait for any *) 484 let any_conn = List.hd !(ep_pool.connections) in 485 while any_conn.pc_active_users > 0 && not any_conn.pc_closed do 486 Eio.Condition.await_no_mutex any_conn.pc_user_available 487 done; 488 if any_conn.pc_closed then 489 acquire_connection pool ep_pool endpoint 490 else begin 491 (* Connection was idle (active_users = 0), now becoming active *) 492 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 493 ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1; 494 ep_pool.stats.active <- ep_pool.stats.active + 1; 495 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 496 any_conn.pc_active_users <- 1; 497 any_conn.pc_last_used <- get_time pool; 498 any_conn.pc_use_count <- any_conn.pc_use_count + 1; 499 (* Notify protocol handler of acquisition *) 500 pool.protocol.on_acquire any_conn.pc_state; 501 any_conn 502 end 503 end else begin 504 (* Create new connection *) 505 let conn = create_connection pool endpoint in 506 conn.pc_active_users <- 1; 507 ep_pool.connections := conn :: !(ep_pool.connections); 508 509 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 510 ep_pool.stats.total_created <- ep_pool.stats.total_created + 1; 511 ep_pool.stats.active <- ep_pool.stats.active + 1); 512 513 Log.info (fun m -> 514 m "Created new connection to %a (total=%d)" 515 Endpoint.pp endpoint (List.length !(ep_pool.connections))); 516 517 (* Notify protocol handler of acquisition *) 518 pool.protocol.on_acquire conn.pc_state; 519 conn 520 end) 521 522(** {1 Connection Release} *) 523 524let release_connection pool ep_pool conn = 525 (* Notify protocol handler of release *) 526 pool.protocol.on_release conn.pc_state; 527 528 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () -> 529 let was_active = conn.pc_active_users > 0 in 530 conn.pc_active_users <- max 0 (conn.pc_active_users - 1); 531 let now_idle = conn.pc_active_users = 0 in 532 533 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 534 ep_pool.stats.active <- max 0 (ep_pool.stats.active - 1); 535 (* Track idle count: increment when connection becomes idle *) 536 if was_active && now_idle then 537 ep_pool.stats.idle <- ep_pool.stats.idle + 1); 538 539 (* Signal waiting fibers *) 540 Eio.Condition.broadcast conn.pc_user_available; 541 542 Log.debug (fun m -> 543 m "Released connection to %a (users=%d)" 544 Endpoint.pp conn.pc_endpoint conn.pc_active_users); 545 546 (* Check if connection should be closed *) 547 match check_health pool conn with 548 | Healthy -> () 549 | Unhealthy_error reason -> 550 conn.pc_closed <- true; 551 552 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 553 ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1; 554 ep_pool.stats.errors <- ep_pool.stats.errors + 1; 555 if now_idle then 556 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 557 558 Log.warn (fun m -> m "Closing connection due to error: %s" reason); 559 close_connection pool conn; 560 ep_pool.connections := List.filter (fun c -> c != conn) !(ep_pool.connections) 561 562 | Unhealthy_lifecycle reason -> 563 conn.pc_closed <- true; 564 565 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 566 ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1; 567 if now_idle then 568 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 569 570 Log.debug (fun m -> m "Closing connection due to lifecycle: %s" reason); 571 close_connection pool conn; 572 ep_pool.connections := List.filter (fun c -> c != conn) !(ep_pool.connections)) 573 574(** {1 Public API} *) 575 576let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) 577 ?tls ?(config = Config.default) ~protocol () = 578 579 Log.info (fun m -> 580 m "Creating connection pool (max_per_endpoint=%d)" 581 (Config.max_connections_per_endpoint config)); 582 583 let pool = { 584 sw; 585 net; 586 clock; 587 config; 588 tls; 589 protocol; 590 endpoints = Hashtbl.create 16; 591 endpoints_mutex = Eio.Mutex.create (); 592 } in 593 594 (* Auto-cleanup on switch release *) 595 Eio.Switch.on_release sw (fun () -> 596 Eio.Cancel.protect (fun () -> 597 Log.info (fun m -> m "Closing connection pool"); 598 Hashtbl.iter (fun _endpoint ep_pool -> 599 List.iter (fun conn -> 600 close_connection pool conn 601 ) !(ep_pool.connections) 602 ) pool.endpoints; 603 Hashtbl.clear pool.endpoints)); 604 605 Pool pool 606 607let create_basic ~sw ~net ~clock ?tls ?config () = 608 create ~sw ~net ~clock ?tls ?config ~protocol:default_protocol () 609 610let connection ~sw (Pool pool) endpoint = 611 Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint); 612 613 let ep_pool = get_or_create_endpoint_pool pool endpoint in 614 let conn = acquire_connection pool ep_pool endpoint in 615 616 (* Release connection when switch ends *) 617 Eio.Switch.on_release sw (fun () -> 618 release_connection pool ep_pool conn); 619 620 (* Get TLS epoch if available *) 621 let tls_epoch = 622 match conn.pc_tls_flow with 623 | Some tls_flow -> ( 624 match Tls_eio.epoch tls_flow with 625 | Ok epoch -> Some epoch 626 | Error () -> None) 627 | None -> None 628 in 629 630 { 631 flow = conn.pc_flow; 632 tls_epoch; 633 state = conn.pc_state; 634 } 635 636let with_connection pool endpoint f = 637 Eio.Switch.run (fun sw -> f (connection ~sw pool endpoint)) 638 639let stats (Pool pool) endpoint = 640 match Hashtbl.find_opt pool.endpoints endpoint with 641 | Some ep_pool -> 642 Eio.Mutex.use_ro ep_pool.stats_mutex (fun () -> snapshot_stats ep_pool.stats) 643 | None -> 644 Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0 645 ~total_closed:0 ~errors:0 646 647let all_stats (Pool pool) = 648 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 649 Hashtbl.fold 650 (fun endpoint ep_pool acc -> 651 let stats = 652 Eio.Mutex.use_ro ep_pool.stats_mutex (fun () -> 653 snapshot_stats ep_pool.stats) 654 in 655 (endpoint, stats) :: acc) 656 pool.endpoints []) 657 658let clear_endpoint (Pool pool) endpoint = 659 Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint); 660 match Hashtbl.find_opt pool.endpoints endpoint with 661 | Some ep_pool -> 662 Eio.Cancel.protect (fun () -> 663 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () -> 664 List.iter (fun conn -> 665 close_connection pool conn 666 ) !(ep_pool.connections); 667 ep_pool.connections := []); 668 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 669 Hashtbl.remove pool.endpoints endpoint)) 670 | None -> 671 Log.debug (fun m -> 672 m "No endpoint pool found for %a" Endpoint.pp endpoint)