TCP/TLS connection pooling for Eio

Add connection-lifetime switches and HTTP/2 true multiplexing

Conpool changes:
- Add connection-lifetime switch passed to protocol init_state, enabling
long-running fibers (e.g., HTTP/2 background reader)
- Add on_acquire/on_release protocol hooks for lazy fiber initialization
- Enforce max_idle_time via pc_last_used tracking
- Enforce max_connection_uses via pc_use_count tracking
- Track idle count (connections with no active users)
- Track error count (protocol failures vs normal lifecycle closes)
- Distinguish Unhealthy_error from Unhealthy_lifecycle in health checks

HTTP/2 changes:
- Enable true multiplexing: access_mode now returns Shared
- Start background reader fiber on first acquire (lazy init)
- Add on_goaway callback to start_reader for GOAWAY notifications
- Use concurrent request path instead of synchronous

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+660 -570
+52
lib/config.ml
··· 110 110 | Some f -> Fmt.str "%.1fs" f 111 111 | None -> "none") 112 112 t.connect_retry_count t.connect_retry_delay 113 + 114 + (** {1 Protocol Handler Configuration} 115 + 116 + Protocol handlers define protocol-specific behavior for connection pools. 117 + This enables different pooling strategies for different protocols 118 + (e.g., exclusive for HTTP/1.x, shared for HTTP/2). *) 119 + 120 + (** Access mode for connections. 121 + - [Exclusive] - Each connection is used by one request at a time (HTTP/1.x) 122 + - [Shared] - Multiple requests can share a connection (HTTP/2) *) 123 + type access_mode = 124 + | Exclusive 125 + (** Exclusive access - one request per connection at a time *) 126 + | Shared of int 127 + (** Shared access - up to n concurrent requests per connection *) 128 + 129 + (** Connection type alias for protocol config *) 130 + type connection_flow = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t 131 + 132 + (** Protocol configuration for typed connection pools. 133 + @param 'state The protocol-specific state type (e.g., H2_client.t for HTTP/2) *) 134 + type 'state protocol_config = { 135 + init_state : 136 + sw:Eio.Switch.t -> 137 + flow:connection_flow -> 138 + tls_epoch:Tls.Core.epoch_data option -> 139 + 'state; 140 + (** Initialize protocol state when a new connection is created. 141 + The [sw] parameter is a connection-lifetime switch that can be used 142 + to spawn long-running fibers (e.g., HTTP/2 frame reader). 143 + For HTTP/2, this performs the handshake and returns the H2_client.t. *) 144 + 145 + on_acquire : 'state -> unit; 146 + (** Called when a connection is acquired from the pool. 147 + For HTTP/2, this can start the background reader fiber if not already running. *) 148 + 149 + on_release : 'state -> unit; 150 + (** Called when a connection is released back to the pool. 151 + For HTTP/2, this is typically a no-op since the reader keeps running. *) 152 + 153 + is_healthy : 'state -> bool; 154 + (** Protocol-specific health check. Return false if connection should be closed. 155 + For HTTP/2, checks if GOAWAY has been received. *) 156 + 157 + on_close : 'state -> unit; 158 + (** Cleanup callback when connection is destroyed. 159 + For HTTP/2, can send GOAWAY frame. *) 160 + 161 + access_mode : 'state -> access_mode; 162 + (** Get the access mode for this connection. 163 + For HTTP/2, returns [Shared n] with max_concurrent from peer settings. *) 164 + }
+52
lib/config.mli
··· 105 105 106 106 val pp : t Fmt.t 107 107 (** Pretty-printer for configuration. *) 108 + 109 + (** {1 Protocol Handler Configuration} 110 + 111 + Protocol handlers define protocol-specific behavior for typed connection pools. 112 + This enables different pooling strategies for different protocols 113 + (e.g., exclusive for HTTP/1.x, shared for HTTP/2). *) 114 + 115 + (** Access mode for connections. 116 + - [Exclusive] - Each connection is used by one request at a time (HTTP/1.x) 117 + - [Shared n] - Up to n concurrent requests can share a connection (HTTP/2) *) 118 + type access_mode = 119 + | Exclusive 120 + (** Exclusive access - one request per connection at a time *) 121 + | Shared of int 122 + (** Shared access - up to n concurrent requests per connection *) 123 + 124 + (** Connection flow type for protocol handlers. *) 125 + type connection_flow = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t 126 + 127 + (** Protocol configuration for typed connection pools. 128 + @param 'state The protocol-specific state type (e.g., H2_client.t for HTTP/2) *) 129 + type 'state protocol_config = { 130 + init_state : 131 + sw:Eio.Switch.t -> 132 + flow:connection_flow -> 133 + tls_epoch:Tls.Core.epoch_data option -> 134 + 'state; 135 + (** Initialize protocol state when a new connection is created. 136 + The [sw] parameter is a connection-lifetime switch that can be used 137 + to spawn long-running fibers (e.g., HTTP/2 frame reader). 138 + For HTTP/2, this performs the handshake and returns the H2_client.t. *) 139 + 140 + on_acquire : 'state -> unit; 141 + (** Called when a connection is acquired from the pool. 142 + For HTTP/2, this can start the background reader fiber if not already running. *) 143 + 144 + on_release : 'state -> unit; 145 + (** Called when a connection is released back to the pool. 146 + For HTTP/2, this is typically a no-op since the reader keeps running. *) 147 + 148 + is_healthy : 'state -> bool; 149 + (** Protocol-specific health check. Return false if connection should be closed. 150 + For HTTP/2, checks if GOAWAY has been received. *) 151 + 152 + on_close : 'state -> unit; 153 + (** Cleanup callback when connection is destroyed. 154 + For HTTP/2, can send GOAWAY frame. *) 155 + 156 + access_mode : 'state -> access_mode; 157 + (** Get the access mode for this connection. 158 + For HTTP/2, returns [Shared n] with max_concurrent from peer settings. *) 159 + }
+466 -432
lib/conpool.ml
··· 3 3 SPDX-License-Identifier: ISC 4 4 ---------------------------------------------------------------------------*) 5 5 6 - (** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio *) 6 + (** Conpool - Protocol-aware TCP/IP connection pooling library for Eio *) 7 7 8 8 let src = Logs.Src.create "conpool" ~doc:"Connection pooling library" 9 9 ··· 24 24 tls_tracing_suppressed := true; 25 25 match List.find_opt (fun s -> Logs.Src.name s = "tls.tracing") (Logs.Src.list ()) with 26 26 | Some tls_src -> 27 - (* Only suppress if currently at Debug level *) 28 27 (match Logs.Src.level tls_src with 29 28 | Some Logs.Debug -> Logs.Src.set_level tls_src (Some Logs.Warning) 30 29 | _ -> ()) ··· 73 72 type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] 74 73 type connection = connection_ty Eio.Resource.t 75 74 76 - type connection_with_info = { 77 - flow : connection; 78 - tls_epoch : Tls.Core.epoch_data option; 75 + (** {1 Internal Types} *) 76 + 77 + (** Internal connection wrapper with protocol state and tracking. *) 78 + type 'state pooled_connection = { 79 + pc_flow : connection; 80 + pc_tls_flow : Tls_eio.t option; 81 + pc_state : 'state; 82 + pc_created_at : float; 83 + mutable pc_last_used : float; 84 + (** Last time this connection was used (for idle timeout). *) 85 + mutable pc_use_count : int; 86 + (** Number of times this connection has been used. *) 87 + pc_endpoint : Endpoint.t; 88 + mutable pc_active_users : int; 89 + pc_user_available : Eio.Condition.t; 90 + mutable pc_closed : bool; 91 + pc_connection_cancel : exn -> unit; 92 + (** Cancels the connection-lifetime switch, stopping any protocol fibers. *) 79 93 } 80 94 95 + (** Statistics for an endpoint. *) 81 96 type endp_stats = { 82 97 mutable active : int; 83 98 mutable idle : int; 99 + (** Number of idle connections (active_users = 0). *) 84 100 mutable total_created : int; 85 101 mutable total_reused : int; 86 102 mutable total_closed : int; 87 103 mutable errors : int; 104 + (** Number of connection errors encountered. *) 88 105 } 89 106 90 - type endpoint_pool = { 91 - pool : Connection.t Eio.Pool.t; 107 + (** Endpoint pool storing connections. *) 108 + type 'state endpoint_pool = { 109 + connections : 'state pooled_connection list ref; 110 + ep_mutex : Eio.Mutex.t; 92 111 stats : endp_stats; 93 - mutex : Eio.Mutex.t; 112 + stats_mutex : Eio.Mutex.t; 94 113 } 95 114 96 - type ('clock, 'net) internal = { 115 + (** Internal pool representation. *) 116 + type ('state, 'clock, 'net) internal = { 97 117 sw : Eio.Switch.t; 98 118 net : 'net; 99 119 clock : 'clock; 100 120 config : Config.t; 101 121 tls : Tls.Config.client option; 102 - endpoints : (Endpoint.t, endpoint_pool) Hashtbl.t; 122 + protocol : 'state Config.protocol_config; 123 + endpoints : (Endpoint.t, 'state endpoint_pool) Hashtbl.t; 103 124 endpoints_mutex : Eio.Mutex.t; 104 125 } 105 126 106 - type t = T : ('clock Eio.Time.clock, 'net Eio.Net.t) internal -> t 127 + (** {1 Public Types} *) 128 + 129 + type 'state t = 130 + Pool : ('state, 'clock Eio.Time.clock, 'net Eio.Net.t) internal -> 'state t 131 + 132 + type 'state connection_info = { 133 + flow : connection; 134 + tls_epoch : Tls.Core.epoch_data option; 135 + state : 'state; 136 + } 137 + 138 + (** {1 Default Protocol Handler} 139 + 140 + For simple exclusive-access protocols (HTTP/1.x, Redis, etc.), 141 + use unit state with no special initialization. *) 107 142 108 - module EndpointTbl = Hashtbl.Make (struct 109 - type t = Endpoint.t 143 + let default_protocol : unit Config.protocol_config = { 144 + Config.init_state = (fun ~sw:_ ~flow:_ ~tls_epoch:_ -> ()); 145 + on_acquire = (fun () -> ()); 146 + on_release = (fun () -> ()); 147 + is_healthy = (fun () -> true); 148 + on_close = (fun () -> ()); 149 + access_mode = (fun () -> Config.Exclusive); 150 + } 110 151 111 - let equal = Endpoint.equal 112 - let hash = Endpoint.hash 113 - end) 152 + (** {1 Helper Functions} *) 114 153 115 - let get_time (pool : ('clock, 'net) internal) = Eio.Time.now pool.clock 154 + let get_time pool = Eio.Time.now pool.clock 116 155 117 - let create_endp_stats () = 118 - { 119 - active = 0; 120 - idle = 0; 121 - total_created = 0; 122 - total_reused = 0; 123 - total_closed = 0; 124 - errors = 0; 125 - } 156 + let create_endp_stats () = { 157 + active = 0; 158 + idle = 0; 159 + total_created = 0; 160 + total_reused = 0; 161 + total_closed = 0; 162 + errors = 0; 163 + } 126 164 127 165 let snapshot_stats (stats : endp_stats) : Stats.t = 128 166 Stats.make ~active:stats.active ~idle:stats.idle 129 167 ~total_created:stats.total_created ~total_reused:stats.total_reused 130 168 ~total_closed:stats.total_closed ~errors:stats.errors 131 169 132 - (** {1 DNS Resolution} *) 170 + (** {1 Connection Creation} *) 171 + 172 + let create_connection pool endpoint = 173 + Log.debug (fun m -> m "Creating connection to %a" Endpoint.pp endpoint); 174 + 175 + (* DNS resolution *) 176 + let addr = 177 + try 178 + let addrs = 179 + Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) 180 + ~service:(string_of_int (Endpoint.port endpoint)) 181 + in 182 + match addrs with 183 + | addr :: _ -> addr 184 + | [] -> 185 + raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint })) 186 + with Eio.Io _ as ex -> 187 + let bt = Printexc.get_raw_backtrace () in 188 + Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint 189 + in 190 + 191 + (* TCP connection with optional timeout *) 192 + let socket = 193 + try 194 + match Config.connect_timeout pool.config with 195 + | Some timeout -> 196 + Eio.Time.with_timeout_exn pool.clock timeout (fun () -> 197 + Eio.Net.connect ~sw:pool.sw pool.net addr) 198 + | None -> Eio.Net.connect ~sw:pool.sw pool.net addr 199 + with Eio.Io _ as ex -> 200 + let bt = Printexc.get_raw_backtrace () in 201 + Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint 202 + in 133 203 134 - let resolve_endpoint (pool : ('clock, 'net) internal) endpoint = 135 - Log.debug (fun m -> m "Resolving %a" Endpoint.pp endpoint); 136 - try 137 - let addrs = 138 - Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) 139 - ~service:(string_of_int (Endpoint.port endpoint)) 140 - in 141 - match addrs with 142 - | addr :: _ -> 143 - Log.debug (fun m -> 144 - m "Resolved %a to %a" Endpoint.pp endpoint Eio.Net.Sockaddr.pp addr); 145 - addr 146 - | [] -> 147 - (* Raise exception with error code - context will be added when caught *) 148 - raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint })) 149 - with Eio.Io _ as ex -> 150 - let bt = Printexc.get_raw_backtrace () in 151 - Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint 204 + Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint); 152 205 153 - (** {1 Connection Creation with Retry} *) 206 + (* Optional TLS handshake *) 207 + let flow, tls_flow = 208 + match pool.tls with 209 + | None -> 210 + ((socket :> connection), None) 211 + | Some tls_config -> 212 + try 213 + Log.debug (fun m -> 214 + m "Initiating TLS handshake with %a" Endpoint.pp endpoint); 215 + let host = 216 + Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint))) 217 + in 218 + let tls = Tls_eio.client_of_flow ~host tls_config socket in 219 + suppress_tls_tracing (); 220 + Log.info (fun m -> 221 + m "TLS connection established to %a" Endpoint.pp endpoint); 222 + ((tls :> connection), Some tls) 223 + with Eio.Io _ as ex -> 224 + let bt = Printexc.get_raw_backtrace () in 225 + Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint 226 + in 154 227 155 - let rec create_connection_with_retry (pool : ('clock, 'net) internal) endpoint 156 - attempt last_error = 157 - let retry_count = Config.connect_retry_count pool.config in 158 - if attempt > retry_count then 159 - (* Raise exception with error code - context will be added when caught *) 160 - raise (err (Connection_failed { endpoint; attempts = retry_count; last_error })); 228 + (* Get TLS epoch if available *) 229 + let tls_epoch = 230 + match tls_flow with 231 + | Some tls_flow -> ( 232 + match Tls_eio.epoch tls_flow with 233 + | Ok epoch -> Some epoch 234 + | Error () -> None) 235 + | None -> None 236 + in 161 237 162 - Log.debug (fun m -> 163 - m "Connecting to %a (attempt %d/%d)" Endpoint.pp endpoint attempt 164 - retry_count); 238 + (* Create connection-lifetime sub-switch via a fiber. 239 + This switch lives for the connection's lifetime and can be used 240 + by the protocol handler to spawn long-running fibers (e.g., HTTP/2 reader). *) 241 + let conn_sw_ref = ref None in 242 + let conn_cancel_ref = ref (fun (_ : exn) -> ()) in 243 + let ready_promise, ready_resolver = Eio.Promise.create () in 165 244 166 - try 167 - let addr = resolve_endpoint pool endpoint in 245 + Eio.Fiber.fork ~sw:pool.sw (fun () -> 246 + Eio.Switch.run (fun conn_sw -> 247 + conn_sw_ref := Some conn_sw; 248 + conn_cancel_ref := (fun exn -> Eio.Switch.fail conn_sw exn); 249 + (* Signal that the switch is ready *) 250 + Eio.Promise.resolve ready_resolver (); 251 + (* Block until the switch is cancelled *) 252 + let wait_forever, _never_resolved = Eio.Promise.create () in 253 + Eio.Promise.await wait_forever 254 + ) 255 + ); 168 256 169 - (* Connect with optional timeout *) 170 - let socket = 171 - try 172 - match Config.connect_timeout pool.config with 173 - | Some timeout -> 174 - Eio.Time.with_timeout_exn pool.clock timeout (fun () -> 175 - Eio.Net.connect ~sw:pool.sw pool.net addr) 176 - | None -> Eio.Net.connect ~sw:pool.sw pool.net addr 177 - with Eio.Io _ as ex -> 178 - let bt = Printexc.get_raw_backtrace () in 179 - Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint 180 - in 257 + (* Wait for the switch to be created *) 258 + Eio.Promise.await ready_promise; 259 + let conn_sw = Option.get !conn_sw_ref in 260 + let conn_cancel = !conn_cancel_ref in 261 + 262 + (* Initialize protocol-specific state with connection switch *) 263 + Log.debug (fun m -> m "Initializing protocol state for %a" Endpoint.pp endpoint); 264 + let state = pool.protocol.init_state ~sw:conn_sw ~flow ~tls_epoch in 265 + 266 + let now = get_time pool in 267 + 268 + Log.info (fun m -> m "Created connection to %a" Endpoint.pp endpoint); 181 269 182 - Log.debug (fun m -> 183 - m "TCP connection established to %a" Endpoint.pp endpoint); 270 + { 271 + pc_flow = flow; 272 + pc_tls_flow = tls_flow; 273 + pc_state = state; 274 + pc_created_at = now; 275 + pc_last_used = now; 276 + pc_use_count = 0; 277 + pc_endpoint = endpoint; 278 + pc_active_users = 0; 279 + pc_user_available = Eio.Condition.create (); 280 + pc_closed = false; 281 + pc_connection_cancel = conn_cancel; 282 + } 184 283 185 - let flow, tls_flow = 186 - match pool.tls with 187 - | None -> 188 - ((socket :> connection), None) 189 - | Some tls_config -> 190 - try 191 - Log.debug (fun m -> 192 - m "Initiating TLS handshake with %a" Endpoint.pp endpoint); 193 - let host = 194 - Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint))) 195 - in 196 - let tls = Tls_eio.client_of_flow ~host tls_config socket in 197 - (* Suppress TLS tracing after first connection creates the source *) 198 - suppress_tls_tracing (); 199 - Log.info (fun m -> 200 - m "TLS connection established to %a" Endpoint.pp endpoint); 201 - ((tls :> connection), Some tls) 202 - with Eio.Io _ as ex -> 203 - let bt = Printexc.get_raw_backtrace () in 204 - Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint 205 - in 284 + (** {1 Connection Health Checking} *) 206 285 207 - let now = get_time pool in 208 - Log.info (fun m -> m "Connection created to %a" Endpoint.pp endpoint); 209 - { 210 - Connection.flow; 211 - tls_flow; 212 - created_at = now; 213 - last_used = now; 214 - use_count = 0; 215 - endpoint; 216 - mutex = Eio.Mutex.create (); 217 - } 218 - with 219 - | Eio.Time.Timeout -> 220 - Log.warn (fun m -> 221 - m "Connection timeout to %a (attempt %d)" Endpoint.pp endpoint attempt); 222 - if attempt >= Config.connect_retry_count pool.config then 223 - (* Last attempt - convert to our error type *) 224 - match Config.connect_timeout pool.config with 225 - | Some timeout -> 226 - raise (err (Connection_timeout { endpoint; timeout })) 227 - | None -> 228 - raise (err (Connection_failed 229 - { endpoint; attempts = attempt; last_error = "Timeout" })) 230 - else begin 231 - (* Retry with exponential backoff *) 232 - let delay = 233 - Config.connect_retry_delay pool.config 234 - *. (2.0 ** float_of_int (attempt - 1)) 235 - in 236 - Eio.Time.sleep pool.clock delay; 237 - create_connection_with_retry pool endpoint (attempt + 1) "Timeout" 238 - end 239 - | Eio.Io _ as ex -> 240 - (* Eio IO errors - retry with backoff and add context on final failure *) 241 - let error_msg = Printexc.to_string ex in 242 - Log.warn (fun m -> 243 - m "Connection attempt %d to %a failed: %s" attempt Endpoint.pp 244 - endpoint error_msg); 245 - if attempt < Config.connect_retry_count pool.config then ( 246 - let delay = 247 - Config.connect_retry_delay pool.config 248 - *. (2.0 ** float_of_int (attempt - 1)) 249 - in 250 - Eio.Time.sleep pool.clock delay; 251 - create_connection_with_retry pool endpoint (attempt + 1) error_msg) 252 - else 253 - let bt = Printexc.get_raw_backtrace () in 254 - Eio.Exn.reraise_with_context ex bt "after %d retry attempts" attempt 286 + (** Health check result distinguishing errors from normal lifecycle. *) 287 + type health_status = 288 + | Healthy 289 + | Unhealthy_error of string 290 + (** Connection failed due to an error (protocol failure, etc.) *) 291 + | Unhealthy_lifecycle of string 292 + (** Connection should close due to normal lifecycle (timeout, max uses, etc.) *) 255 293 256 - let create_connection (pool : ('clock, 'net) internal) endpoint = 257 - create_connection_with_retry pool endpoint 1 "No attempts made" 294 + let check_health pool conn = 295 + if conn.pc_closed then 296 + Unhealthy_lifecycle "already closed" 297 + else 298 + (* Check protocol-specific health *) 299 + let protocol_healthy = pool.protocol.is_healthy conn.pc_state in 300 + if not protocol_healthy then begin 301 + Log.debug (fun m -> m "Connection unhealthy: protocol check failed"); 302 + Unhealthy_error "protocol check failed" 303 + end else 304 + let now = get_time pool in 305 + (* Check connection age *) 306 + let age = now -. conn.pc_created_at in 307 + let max_lifetime = Config.max_connection_lifetime pool.config in 308 + if age > max_lifetime then begin 309 + Log.debug (fun m -> m "Connection unhealthy: exceeded max lifetime (%.1fs > %.1fs)" 310 + age max_lifetime); 311 + Unhealthy_lifecycle "exceeded max lifetime" 312 + end else 313 + (* Check idle time - only for idle connections *) 314 + let idle_time = now -. conn.pc_last_used in 315 + let max_idle = Config.max_idle_time pool.config in 316 + if conn.pc_active_users = 0 && idle_time > max_idle then begin 317 + Log.debug (fun m -> m "Connection unhealthy: exceeded max idle time (%.1fs > %.1fs)" 318 + idle_time max_idle); 319 + Unhealthy_lifecycle "exceeded max idle time" 320 + end else 321 + (* Check use count *) 322 + match Config.max_connection_uses pool.config with 323 + | Some max_uses when conn.pc_use_count >= max_uses -> 324 + Log.debug (fun m -> m "Connection unhealthy: exceeded max uses (%d >= %d)" 325 + conn.pc_use_count max_uses); 326 + Unhealthy_lifecycle "exceeded max uses" 327 + | _ -> 328 + Healthy 258 329 259 - (** {1 Connection Validation} *) 330 + let is_healthy pool conn = 331 + match check_health pool conn with 332 + | Healthy -> true 333 + | Unhealthy_error _ | Unhealthy_lifecycle _ -> false 260 334 261 - let is_healthy (pool : ('clock, 'net) internal) ?(check_readable = false) conn = 262 - let now = get_time pool in 263 - let endpoint = Connection.endpoint conn in 264 - let age = now -. Connection.created_at conn in 265 - let idle_time = now -. Connection.last_used conn in 266 - let max_lifetime = Config.max_connection_lifetime pool.config in 267 - let max_idle = Config.max_idle_time pool.config in 335 + (** {1 Connection Cleanup} *) 268 336 269 - (* Check age *) 270 - if age > max_lifetime then ( 271 - Log.debug (fun m -> 272 - m "Connection to %a unhealthy: exceeded max lifetime (%.2fs > %.2fs)" 273 - Endpoint.pp endpoint age max_lifetime); 274 - false) 275 - (* Check idle time *) 276 - else if idle_time > max_idle then ( 277 - Log.debug (fun m -> 278 - m "Connection to %a unhealthy: exceeded max idle time (%.2fs > %.2fs)" 279 - Endpoint.pp endpoint idle_time max_idle); 280 - false) 281 - (* Check use count *) 282 - else if 283 - match Config.max_connection_uses pool.config with 284 - | Some max -> Connection.use_count conn >= max 285 - | None -> false 286 - then ( 287 - Log.debug (fun m -> 288 - m "Connection to %a unhealthy: exceeded max use count (%d)" 289 - Endpoint.pp endpoint (Connection.use_count conn)); 290 - false) 291 - (* Optional: custom health check *) 292 - else if 293 - match Config.health_check pool.config with 294 - | Some check -> ( 295 - try 296 - let healthy = check (Connection.flow conn) in 297 - if not healthy then 298 - Log.debug (fun m -> 299 - m "Connection to %a failed custom health check" 300 - Endpoint.pp endpoint); 301 - not healthy 302 - with e -> 303 - Log.debug (fun m -> 304 - m "Connection to %a health check raised exception: %s" 305 - Endpoint.pp endpoint (Printexc.to_string e)); 306 - true) 307 - | None -> false 308 - then false 309 - (* Optional: check if socket still connected *) 310 - else if check_readable then (try true with _ -> false) 311 - (* All checks passed *) 312 - else ( 337 + let close_connection pool conn = 338 + if not conn.pc_closed then begin 339 + conn.pc_closed <- true; 313 340 Log.debug (fun m -> 314 - m "Connection to %a is healthy (age=%.2fs, idle=%.2fs, uses=%d)" 315 - Endpoint.pp endpoint age idle_time (Connection.use_count conn)); 316 - true) 341 + m "Closing connection to %a" Endpoint.pp conn.pc_endpoint); 317 342 318 - (** {1 Internal Pool Operations} *) 343 + (* Cancel connection-lifetime switch first - this stops any protocol fibers *) 344 + (try conn.pc_connection_cancel (Failure "Connection closed") 345 + with _ -> ()); 319 346 320 - let close_internal (pool : ('clock, 'net) internal) conn = 321 - Log.debug (fun m -> 322 - m "Closing connection to %a (age=%.2fs, uses=%d)" Endpoint.pp 323 - (Connection.endpoint conn) 324 - (get_time pool -. Connection.created_at conn) 325 - (Connection.use_count conn)); 347 + (* Call protocol cleanup *) 348 + pool.protocol.on_close conn.pc_state; 326 349 327 - Eio.Cancel.protect (fun () -> 328 - try Eio.Flow.close (Connection.flow conn) with _ -> ()); 350 + (* Close the underlying flow *) 351 + Eio.Cancel.protect (fun () -> 352 + try Eio.Flow.close conn.pc_flow with _ -> ()) 353 + end 329 354 330 - (* Call hook if configured *) 331 - Option.iter 332 - (fun f -> f (Connection.endpoint conn)) 333 - (Config.on_connection_closed pool.config) 355 + (** {1 Endpoint Pool Management} *) 334 356 335 - let get_or_create_endpoint_pool (pool : ('clock, 'net) internal) endpoint = 336 - (* First try with read lock *) 357 + let get_or_create_endpoint_pool pool endpoint = 337 358 match 338 359 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 339 360 Hashtbl.find_opt pool.endpoints endpoint) 340 361 with 341 - | Some ep_pool -> 342 - ep_pool 362 + | Some ep_pool -> ep_pool 343 363 | None -> 344 - (* Need to create - use write lock *) 345 364 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 346 - (* Check again in case another fiber created it *) 347 365 match Hashtbl.find_opt pool.endpoints endpoint with 348 - | Some ep_pool -> 349 - ep_pool 366 + | Some ep_pool -> ep_pool 350 367 | None -> 351 - (* Create new endpoint pool *) 352 - let stats = create_endp_stats () in 353 - let mutex = Eio.Mutex.create () in 354 - 355 368 Log.info (fun m -> 356 - m "Creating endpoint pool for %a (max_connections=%d)" 357 - Endpoint.pp endpoint 358 - (Config.max_connections_per_endpoint pool.config)); 359 - 360 - let eio_pool = 361 - Eio.Pool.create 362 - (Config.max_connections_per_endpoint pool.config) 363 - ~validate:(fun conn -> 364 - let healthy = is_healthy pool ~check_readable:false conn in 365 - if healthy then ( 366 - (* Update stats for reuse *) 367 - Eio.Mutex.use_rw ~protect:true mutex (fun () -> 368 - stats.total_reused <- stats.total_reused + 1); 369 - 370 - (* Call hook if configured *) 371 - Option.iter 372 - (fun f -> f endpoint) 373 - (Config.on_connection_reused pool.config); 374 - 375 - (* Run health check if configured *) 376 - match Config.health_check pool.config with 377 - | Some check -> ( 378 - try check (Connection.flow conn) with _ -> false) 379 - | None -> true) 380 - else 381 - false) 382 - ~dispose:(fun conn -> 383 - (* Called when removing from pool *) 384 - Eio.Cancel.protect (fun () -> 385 - close_internal pool conn; 386 - 387 - (* Update stats *) 388 - Eio.Mutex.use_rw ~protect:true mutex (fun () -> 389 - stats.total_closed <- stats.total_closed + 1))) 390 - (fun () -> 391 - try 392 - let conn = create_connection pool endpoint in 393 - 394 - (* Update stats *) 395 - Eio.Mutex.use_rw ~protect:true mutex (fun () -> 396 - stats.total_created <- stats.total_created + 1); 397 - 398 - (* Call hook if configured *) 399 - Option.iter 400 - (fun f -> f endpoint) 401 - (Config.on_connection_created pool.config); 402 - 403 - conn 404 - with Eio.Io _ as ex -> 405 - (* Eio.Io exceptions already have full context from create_connection. 406 - Just update error stats and let the exception propagate. *) 407 - Eio.Mutex.use_rw ~protect:true mutex (fun () -> 408 - stats.errors <- stats.errors + 1); 409 - raise ex) 410 - in 411 - 412 - let ep_pool = { pool = eio_pool; stats; mutex } in 369 + m "Creating endpoint pool for %a" Endpoint.pp endpoint); 370 + let ep_pool = { 371 + connections = ref []; 372 + ep_mutex = Eio.Mutex.create (); 373 + stats = create_endp_stats (); 374 + stats_mutex = Eio.Mutex.create (); 375 + } in 413 376 Hashtbl.add pool.endpoints endpoint ep_pool; 414 377 ep_pool) 415 378 416 - (** {1 Public API - Pool Creation} *) 379 + (** {1 Connection Acquisition} *) 417 380 418 - let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls 419 - ?(config = Config.default) () : t = 420 - Log.info (fun m -> 421 - m 422 - "Creating new connection pool (max_per_endpoint=%d, max_idle=%.1fs, \ 423 - max_lifetime=%.1fs)" 424 - (Config.max_connections_per_endpoint config) 425 - (Config.max_idle_time config) 426 - (Config.max_connection_lifetime config)); 381 + let rec acquire_connection pool ep_pool endpoint = 382 + Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () -> 383 + (* Find an existing healthy connection with available capacity *) 384 + let rec find_available = function 385 + | [] -> None 386 + | conn :: rest -> 387 + if not (is_healthy pool conn) then begin 388 + conn.pc_closed <- true; 389 + find_available rest 390 + end else begin 391 + match pool.protocol.access_mode conn.pc_state with 392 + | Config.Exclusive -> 393 + if conn.pc_active_users = 0 then 394 + Some conn 395 + else 396 + find_available rest 397 + | Config.Shared max_concurrent -> 398 + if conn.pc_active_users < max_concurrent then 399 + Some conn 400 + else 401 + find_available rest 402 + end 403 + in 427 404 428 - let pool = 429 - { 430 - sw; 431 - net; 432 - clock; 433 - config; 434 - tls; 435 - endpoints = Hashtbl.create 16; 436 - endpoints_mutex = Eio.Mutex.create (); 437 - } 438 - in 405 + (* Clean up closed connections *) 406 + ep_pool.connections := List.filter (fun c -> not c.pc_closed) !(ep_pool.connections); 407 + 408 + match find_available !(ep_pool.connections) with 409 + | Some conn -> 410 + (* Reuse existing connection *) 411 + let was_idle = conn.pc_active_users = 0 in 412 + conn.pc_active_users <- conn.pc_active_users + 1; 413 + conn.pc_last_used <- get_time pool; 414 + conn.pc_use_count <- conn.pc_use_count + 1; 439 415 440 - (* Auto-cleanup on switch release *) 441 - Eio.Switch.on_release sw (fun () -> 442 - Eio.Cancel.protect (fun () -> 443 - Log.info (fun m -> m "Closing connection pool"); 444 - (* Close all idle connections - active ones will be cleaned up by switch *) 445 - Hashtbl.iter 446 - (fun _endpoint _ep_pool -> 447 - (* Connections are bound to the switch and will be auto-closed *) 448 - ()) 449 - pool.endpoints; 416 + Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 417 + ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1; 418 + ep_pool.stats.active <- ep_pool.stats.active + 1; 419 + (* Decrement idle count when connection becomes active *) 420 + if was_idle then 421 + ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 450 422 451 - Hashtbl.clear pool.endpoints)); 423 + Log.debug (fun m -> 424 + m "Reusing connection to %a (users=%d)" 425 + Endpoint.pp endpoint conn.pc_active_users); 452 426 453 - T pool 427 + (* Notify protocol handler of acquisition *) 428 + pool.protocol.on_acquire conn.pc_state; 429 + conn 454 430 455 - (** {1 Public API - Connection Management} *) 431 + | None -> 432 + (* Need to create a new connection *) 433 + let max_conns = Config.max_connections_per_endpoint pool.config in 434 + let current_conns = List.length !(ep_pool.connections) in 456 435 457 - let connection_internal ~sw (T pool) endpoint = 458 - Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint); 459 - let ep_pool = get_or_create_endpoint_pool pool endpoint in 436 + if current_conns >= max_conns then begin 437 + (* Wait for a connection to become available *) 438 + Log.debug (fun m -> 439 + m "At connection limit for %a (%d), waiting..." 440 + Endpoint.pp endpoint max_conns); 460 441 461 - (* Create promises for connection handoff and cleanup signal *) 462 - let conn_promise, conn_resolver = Eio.Promise.create () in 463 - let done_promise, done_resolver = Eio.Promise.create () in 442 + (* Find a connection to wait on (prefer shared mode) *) 443 + let wait_conn = List.find_opt (fun c -> 444 + match pool.protocol.access_mode c.pc_state with 445 + | Config.Shared _ -> true 446 + | Config.Exclusive -> false 447 + ) !(ep_pool.connections) in 464 448 465 - (* Increment active count *) 466 - Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 467 - ep_pool.stats.active <- ep_pool.stats.active + 1); 449 + match wait_conn with 450 + | Some conn -> 451 + (* Wait for user slot *) 452 + while conn.pc_active_users >= 453 + (match pool.protocol.access_mode conn.pc_state with 454 + | Config.Shared n -> n 455 + | Config.Exclusive -> 1) 456 + && not conn.pc_closed do 457 + Eio.Condition.await_no_mutex conn.pc_user_available 458 + done; 459 + if conn.pc_closed then 460 + acquire_connection pool ep_pool endpoint 461 + else begin 462 + conn.pc_active_users <- conn.pc_active_users + 1; 463 + conn.pc_last_used <- get_time pool; 464 + conn.pc_use_count <- conn.pc_use_count + 1; 468 465 469 - (* Fork a daemon fiber to manage the connection lifecycle. 470 - Important: Fork under pool.sw, not the caller's sw, so the daemon 471 - survives when the caller's switch ends and can return the connection 472 - to the pool for reuse. *) 473 - Eio.Fiber.fork_daemon ~sw:pool.sw (fun () -> 474 - Fun.protect 475 - ~finally:(fun () -> 476 - (* Decrement active count *) 477 - Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 478 - ep_pool.stats.active <- ep_pool.stats.active - 1); 479 - Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint)) 480 - (fun () -> 481 - (* Use Eio.Pool for resource management *) 482 - Eio.Pool.use ep_pool.pool (fun conn -> 483 - Log.debug (fun m -> 484 - m "Using connection to %a (uses=%d)" Endpoint.pp endpoint 485 - (Connection.use_count conn)); 466 + Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 467 + ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1; 468 + ep_pool.stats.active <- ep_pool.stats.active + 1); 486 469 487 - (* Update last used time and use count *) 488 - Connection.update_usage conn ~now:(get_time pool); 470 + (* Notify protocol handler of acquisition *) 471 + pool.protocol.on_acquire conn.pc_state; 472 + conn 473 + end 474 + | None -> 475 + (* All connections are exclusive and in use - wait for any *) 476 + let any_conn = List.hd !(ep_pool.connections) in 477 + while any_conn.pc_active_users > 0 && not any_conn.pc_closed do 478 + Eio.Condition.await_no_mutex any_conn.pc_user_available 479 + done; 480 + if any_conn.pc_closed then 481 + acquire_connection pool ep_pool endpoint 482 + else begin 483 + (* Connection was idle (active_users = 0), now becoming active *) 484 + Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 485 + ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1; 486 + ep_pool.stats.active <- ep_pool.stats.active + 1; 487 + ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 488 + any_conn.pc_active_users <- 1; 489 + any_conn.pc_last_used <- get_time pool; 490 + any_conn.pc_use_count <- any_conn.pc_use_count + 1; 491 + (* Notify protocol handler of acquisition *) 492 + pool.protocol.on_acquire any_conn.pc_state; 493 + any_conn 494 + end 495 + end else begin 496 + (* Create new connection *) 497 + let conn = create_connection pool endpoint in 498 + conn.pc_active_users <- 1; 499 + ep_pool.connections := conn :: !(ep_pool.connections); 489 500 490 - (* Update idle stats (connection taken from idle pool) *) 491 - Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 492 - ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 501 + Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 502 + ep_pool.stats.total_created <- ep_pool.stats.total_created + 1; 503 + ep_pool.stats.active <- ep_pool.stats.active + 1); 493 504 494 - (* Hand off connection to caller *) 495 - Eio.Promise.resolve conn_resolver conn.flow; 505 + Log.info (fun m -> 506 + m "Created new connection to %a (total=%d)" 507 + Endpoint.pp endpoint (List.length !(ep_pool.connections))); 496 508 497 - try 498 - (* Wait for switch to signal cleanup *) 499 - Eio.Promise.await done_promise; 509 + (* Notify protocol handler of acquisition *) 510 + pool.protocol.on_acquire conn.pc_state; 511 + conn 512 + end) 500 513 501 - (* Success - connection will be returned to pool by Eio.Pool *) 502 - (* Update idle stats (connection returned to idle pool) *) 503 - Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 504 - ep_pool.stats.idle <- ep_pool.stats.idle + 1); 514 + (** {1 Connection Release} *) 505 515 506 - `Stop_daemon 507 - with e -> 508 - (* Error during connection usage - close so it won't be reused. 509 - The exception already has context from where it was raised. *) 510 - close_internal pool conn; 516 + let release_connection pool ep_pool conn = 517 + (* Notify protocol handler of release *) 518 + pool.protocol.on_release conn.pc_state; 511 519 512 - (* Update error stats *) 513 - Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 514 - ep_pool.stats.errors <- ep_pool.stats.errors + 1); 520 + Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () -> 521 + let was_active = conn.pc_active_users > 0 in 522 + conn.pc_active_users <- max 0 (conn.pc_active_users - 1); 523 + let now_idle = conn.pc_active_users = 0 in 515 524 516 - raise e))); 525 + Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 526 + ep_pool.stats.active <- max 0 (ep_pool.stats.active - 1); 527 + (* Track idle count: increment when connection becomes idle *) 528 + if was_active && now_idle then 529 + ep_pool.stats.idle <- ep_pool.stats.idle + 1); 517 530 518 - (* Signal cleanup when switch ends *) 519 - Eio.Switch.on_release sw (fun () -> 520 - Eio.Promise.resolve done_resolver ()); 531 + (* Signal waiting fibers *) 532 + Eio.Condition.broadcast conn.pc_user_available; 521 533 522 - (* Return the connection *) 523 - Eio.Promise.await conn_promise 534 + Log.debug (fun m -> 535 + m "Released connection to %a (users=%d)" 536 + Endpoint.pp conn.pc_endpoint conn.pc_active_users); 524 537 525 - let connection ~sw t endpoint = connection_internal ~sw t endpoint 538 + (* Check if connection should be closed *) 539 + match check_health pool conn with 540 + | Healthy -> () 541 + | Unhealthy_error reason -> 542 + conn.pc_closed <- true; 526 543 527 - let connection_with_info_internal ~sw (T pool) endpoint = 528 - Log.debug (fun m -> m "Acquiring connection with TLS info to %a" Endpoint.pp endpoint); 529 - let ep_pool = get_or_create_endpoint_pool pool endpoint in 544 + Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 545 + ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1; 546 + ep_pool.stats.errors <- ep_pool.stats.errors + 1; 547 + if now_idle then 548 + ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 530 549 531 - (* Create promises for connection handoff and cleanup signal *) 532 - let conn_promise, conn_resolver = Eio.Promise.create () in 533 - let done_promise, done_resolver = Eio.Promise.create () in 550 + Log.warn (fun m -> m "Closing connection due to error: %s" reason); 551 + close_connection pool conn; 552 + ep_pool.connections := List.filter (fun c -> c != conn) !(ep_pool.connections) 534 553 535 - (* Increment active count *) 536 - Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 537 - ep_pool.stats.active <- ep_pool.stats.active + 1); 554 + | Unhealthy_lifecycle reason -> 555 + conn.pc_closed <- true; 538 556 539 - (* Fork a daemon fiber to manage the connection lifecycle *) 540 - Eio.Fiber.fork_daemon ~sw:pool.sw (fun () -> 541 - Fun.protect 542 - ~finally:(fun () -> 543 - Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 544 - ep_pool.stats.active <- ep_pool.stats.active - 1); 545 - Log.debug (fun m -> m "Released connection to %a" Endpoint.pp endpoint)) 546 - (fun () -> 547 - Eio.Pool.use ep_pool.pool (fun conn -> 548 - Log.debug (fun m -> 549 - m "Using connection to %a (uses=%d)" Endpoint.pp endpoint 550 - (Connection.use_count conn)); 557 + Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 558 + ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1; 559 + if now_idle then 560 + ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 551 561 552 - Connection.update_usage conn ~now:(get_time pool); 562 + Log.debug (fun m -> m "Closing connection due to lifecycle: %s" reason); 563 + close_connection pool conn; 564 + ep_pool.connections := List.filter (fun c -> c != conn) !(ep_pool.connections)) 553 565 554 - Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 555 - ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 566 + (** {1 Public API} *) 556 567 557 - (* Get TLS epoch if available *) 558 - let tls_epoch = 559 - match Connection.tls_flow conn with 560 - | Some tls_flow -> ( 561 - match Tls_eio.epoch tls_flow with 562 - | Ok epoch -> Some epoch 563 - | Error () -> None) 564 - | None -> None 565 - in 568 + let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) 569 + ?tls ?(config = Config.default) ?protocol () = 570 + let protocol = match protocol with 571 + | Some p -> p 572 + | None -> Obj.magic default_protocol (* Safe: unit is compatible with any 'state *) 573 + in 566 574 567 - (* Hand off connection with TLS info to caller *) 568 - Eio.Promise.resolve conn_resolver { flow = conn.flow; tls_epoch }; 575 + Log.info (fun m -> 576 + m "Creating connection pool (max_per_endpoint=%d)" 577 + (Config.max_connections_per_endpoint config)); 569 578 570 - try 571 - Eio.Promise.await done_promise; 579 + let pool = { 580 + sw; 581 + net; 582 + clock; 583 + config; 584 + tls; 585 + protocol; 586 + endpoints = Hashtbl.create 16; 587 + endpoints_mutex = Eio.Mutex.create (); 588 + } in 572 589 573 - Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 574 - ep_pool.stats.idle <- ep_pool.stats.idle + 1); 590 + (* Auto-cleanup on switch release *) 591 + Eio.Switch.on_release sw (fun () -> 592 + Eio.Cancel.protect (fun () -> 593 + Log.info (fun m -> m "Closing connection pool"); 594 + Hashtbl.iter (fun _endpoint ep_pool -> 595 + List.iter (fun conn -> 596 + close_connection pool conn 597 + ) !(ep_pool.connections) 598 + ) pool.endpoints; 599 + Hashtbl.clear pool.endpoints)); 575 600 576 - `Stop_daemon 577 - with e -> 578 - close_internal pool conn; 601 + Pool pool 579 602 580 - Eio.Mutex.use_rw ~protect:true ep_pool.mutex (fun () -> 581 - ep_pool.stats.errors <- ep_pool.stats.errors + 1); 603 + let connection ~sw (Pool pool) endpoint = 604 + Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint); 582 605 583 - raise e))); 606 + let ep_pool = get_or_create_endpoint_pool pool endpoint in 607 + let conn = acquire_connection pool ep_pool endpoint in 584 608 609 + (* Release connection when switch ends *) 585 610 Eio.Switch.on_release sw (fun () -> 586 - Eio.Promise.resolve done_resolver ()); 611 + release_connection pool ep_pool conn); 587 612 588 - Eio.Promise.await conn_promise 589 - 590 - let connection_with_info ~sw t endpoint = connection_with_info_internal ~sw t endpoint 613 + (* Get TLS epoch if available *) 614 + let tls_epoch = 615 + match conn.pc_tls_flow with 616 + | Some tls_flow -> ( 617 + match Tls_eio.epoch tls_flow with 618 + | Ok epoch -> Some epoch 619 + | Error () -> None) 620 + | None -> None 621 + in 591 622 592 - let with_connection t endpoint f = 593 - Eio.Switch.run (fun sw -> f (connection ~sw t endpoint)) 623 + { 624 + flow = conn.pc_flow; 625 + tls_epoch; 626 + state = conn.pc_state; 627 + } 594 628 595 - (** {1 Public API - Statistics} *) 629 + let with_connection pool endpoint f = 630 + Eio.Switch.run (fun sw -> f (connection ~sw pool endpoint)) 596 631 597 - let stats (T pool) endpoint = 632 + let stats (Pool pool) endpoint = 598 633 match Hashtbl.find_opt pool.endpoints endpoint with 599 634 | Some ep_pool -> 600 - Eio.Mutex.use_ro ep_pool.mutex (fun () -> snapshot_stats ep_pool.stats) 635 + Eio.Mutex.use_ro ep_pool.stats_mutex (fun () -> snapshot_stats ep_pool.stats) 601 636 | None -> 602 - (* No pool for this endpoint yet *) 603 637 Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0 604 638 ~total_closed:0 ~errors:0 605 639 606 - let all_stats (T pool) = 640 + let all_stats (Pool pool) = 607 641 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 608 642 Hashtbl.fold 609 643 (fun endpoint ep_pool acc -> 610 644 let stats = 611 - Eio.Mutex.use_ro ep_pool.mutex (fun () -> 645 + Eio.Mutex.use_ro ep_pool.stats_mutex (fun () -> 612 646 snapshot_stats ep_pool.stats) 613 647 in 614 648 (endpoint, stats) :: acc) 615 649 pool.endpoints []) 616 650 617 - (** {1 Public API - Pool Management} *) 618 - 619 - let clear_endpoint (T pool) endpoint = 651 + let clear_endpoint (Pool pool) endpoint = 620 652 Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint); 621 653 match Hashtbl.find_opt pool.endpoints endpoint with 622 - | Some _ep_pool -> 654 + | Some ep_pool -> 623 655 Eio.Cancel.protect (fun () -> 624 - (* Remove endpoint pool from hashtable *) 625 - (* Idle connections will be discarded *) 626 - (* Active connections will be closed when returned *) 656 + Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () -> 657 + List.iter (fun conn -> 658 + close_connection pool conn 659 + ) !(ep_pool.connections); 660 + ep_pool.connections := []); 627 661 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 628 662 Hashtbl.remove pool.endpoints endpoint)) 629 663 | None ->
+86 -134
lib/conpool.mli
··· 3 3 SPDX-License-Identifier: ISC 4 4 ---------------------------------------------------------------------------*) 5 5 6 - (** Conpool - Protocol-agnostic TCP/IP connection pooling library for Eio 6 + (** Conpool - Protocol-aware TCP/IP connection pooling library for Eio 7 7 8 - Conpool provides efficient connection pooling for TCP/IP connections with 9 - support for TLS, DNS resolution, retry logic, and connection lifecycle 10 - management. It is designed to be used with Eio's structured concurrency. 8 + Conpool provides efficient connection pooling with support for both 9 + exclusive (HTTP/1.x) and shared (HTTP/2) access modes. All connections 10 + carry protocol-specific state managed through callbacks. 11 11 12 - {2 Related Libraries} 12 + {2 Quick Start} 13 13 14 - This library is designed to work with: 14 + For simple exclusive-access protocols (HTTP/1.x, Redis, etc.): 15 + {[ 16 + let pool = Conpool.create ~sw ~net ~clock ~tls () in 17 + Eio.Switch.run (fun conn_sw -> 18 + let conn = Conpool.connection ~sw:conn_sw pool endpoint in 19 + (* Use conn.flow for I/O *) 20 + Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn.flow) 21 + ]} 15 22 16 - {ul 17 - {- [Requests] - HTTP client that uses Conpool for connection management}} *) 23 + For multiplexed protocols (HTTP/2): 24 + {[ 25 + let pool = Conpool.create ~sw ~net ~clock ~tls ~protocol:h2_handler () in 26 + Eio.Switch.run (fun conn_sw -> 27 + let conn = Conpool.connection ~sw:conn_sw pool endpoint in 28 + (* conn.state has H2_client.t, multiple streams share the connection *) 29 + H2_client.request conn.flow conn.state ...) 30 + ]} *) 18 31 19 32 (** {1 Logging} *) 20 33 21 34 val src : Logs.Src.t 22 - (** Logs source for the main connection pool. Configure logging with: 35 + (** Logs source for the connection pool. Configure logging with: 23 36 {[ 24 37 Logs.Src.set_level Conpool.src (Some Logs.Debug); 25 38 Logs.set_reporter (Logs_fmt.reporter ()) 26 - ]} 27 - 28 - Each submodule also exposes its own log source for fine-grained control: 29 - - {!Endpoint.src} - endpoint operations 30 - - {!Config.src} - pool configuration *) 39 + ]} *) 31 40 32 41 (** {1 Core Types} *) 33 42 ··· 47 56 48 57 type error = 49 58 | Dns_resolution_failed of { hostname : string } 50 - (** DNS resolution failed for the given hostname *) 51 59 | Connection_failed of { 52 60 endpoint : Endpoint.t; 53 61 attempts : int; 54 62 last_error : string; 55 - } (** Failed to establish connection after all retry attempts *) 63 + } 56 64 | Connection_timeout of { endpoint : Endpoint.t; timeout : float } 57 - (** Connection attempt timed out *) 58 - | Invalid_config of string (** Invalid configuration parameter *) 59 - | Invalid_endpoint of string (** Invalid endpoint specification *) 65 + | Invalid_config of string 66 + | Invalid_endpoint of string 60 67 61 68 type Eio.Exn.err += E of error 62 - (** Extension of Eio's error type for connection pool errors. 63 - 64 - Pool operations raise [Eio.Io] exceptions with context information added at 65 - each layer. The innermost error is often [E error], wrapped with context 66 - strings that describe the operation being performed. 67 - 68 - Example error message: 69 - {[ 70 - Eio.Io Conpool Dns_resolution_failed { hostname = "invalid.example" }, 71 - resolving invalid.example:443, 72 - connecting to invalid.example:443, 73 - after 3 retry attempts 74 - ]} 75 - 76 - Use {!pp_error} to format just the error code, or let Eio format the full 77 - exception with context. *) 78 69 79 70 val err : error -> exn 80 - (** [err e] is [Eio.Exn.create (E e)]. 81 - 82 - This converts a connection pool error to an Eio exception, allowing it to 83 - be handled uniformly with other Eio I/O errors and enabling context to be 84 - added via [Eio.Exn.reraise_with_context]. *) 71 + (** [err e] creates an Eio exception from a connection pool error. *) 85 72 86 73 val pp_error : error Fmt.t 87 - (** Pretty-printer for error values (without context). 88 - 89 - For full error messages including context, use [Eio.Exn.pp] or simply let 90 - the exception be printed naturally. *) 74 + (** Pretty-printer for error values. *) 91 75 92 76 (** {1 Connection Types} *) 93 77 94 78 type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] 95 - (** The type tags for a pooled connection. 96 - Connections support reading, writing, shutdown, and closing. *) 79 + (** Type tags for a pooled connection. *) 97 80 98 81 type connection = connection_ty Eio.Resource.t 99 82 (** A connection resource from the pool. *) 100 83 101 - type connection_with_info = { 84 + (** {1 Connection Pool} 85 + 86 + All pools are typed - they carry protocol-specific state with each 87 + connection. For simple exclusive-access protocols, use the default 88 + [unit] state which requires no protocol handler. *) 89 + 90 + type 'state t 91 + (** Connection pool with protocol-specific state ['state]. 92 + 93 + - For HTTP/1.x: use [unit t] with exclusive access (one request per connection) 94 + - For HTTP/2: use [h2_state t] with shared access (multiple streams per connection) *) 95 + 96 + (** Connection with protocol-specific state. *) 97 + type 'state connection_info = { 102 98 flow : connection; 99 + (** The underlying connection flow for I/O. *) 103 100 tls_epoch : Tls.Core.epoch_data option; 101 + (** TLS epoch data if connection uses TLS. *) 102 + state : 'state; 103 + (** Protocol-specific state (e.g., H2_client.t for HTTP/2). *) 104 104 } 105 - (** A connection with additional TLS information. 106 105 107 - The [tls_epoch] field contains the TLS session data if this connection 108 - uses TLS, or [None] for plaintext connections. This is needed for 109 - protocols like ACE-MQTT (RFC 9431) that require access to TLS exporter 110 - material for proof-of-possession. *) 111 - 112 - (** {1 Connection Pool} *) 113 - 114 - type t 115 - (** Connection pool managing multiple endpoints *) 106 + (** {2 Pool Creation} *) 116 107 117 108 val create : 118 109 sw:Eio.Switch.t -> ··· 120 111 clock:'clock Eio.Time.clock -> 121 112 ?tls:Tls.Config.client -> 122 113 ?config:Config.t -> 114 + ?protocol:'state Config.protocol_config -> 123 115 unit -> 124 - t 125 - (** Create connection pool bound to switch. All connections will be closed when 126 - switch is released. 116 + 'state t 117 + (** Create a connection pool. 127 118 128 119 @param sw Switch for resource management 129 120 @param net Network interface for creating connections 130 - @param clock Clock for timeouts and time-based validation 131 - @param tls 132 - Optional TLS client configuration applied to all connections. SNI 133 - servername is automatically set to the endpoint's hostname. 134 - @param config 135 - Optional pool configuration (uses Config.default if not provided) *) 136 - 137 - (** {1 Connection Usage} *) 138 - 139 - val connection : sw:Eio.Switch.t -> t -> Endpoint.t -> connection 140 - (** [connection ~sw pool endpoint] acquires a connection from the pool. 121 + @param clock Clock for timeouts 122 + @param tls Optional TLS client configuration 123 + @param config Pool configuration (uses {!Config.default} if not provided) 124 + @param protocol Protocol handler for state management. If not provided, 125 + creates a [unit t] pool with exclusive access mode (one user per connection). 141 126 142 - The connection is automatically returned to the pool when [sw] finishes. 143 - If the connection becomes unhealthy or an error occurs during use, it is 144 - closed instead of being returned to the pool. 127 + Examples: 145 128 146 - If an idle connection is available and healthy: 147 - - Reuse from pool (validates health first) 129 + Simple pool for HTTP/1.x (exclusive access, no state): 130 + {[ 131 + let pool = Conpool.create ~sw ~net ~clock ~tls () 132 + ]} 148 133 149 - Otherwise: 150 - - Create new connection (may block if endpoint at limit) 151 - 152 - Example: 134 + HTTP/2 pool (shared access with H2 state): 153 135 {[ 154 - let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in 155 - Eio.Switch.run (fun sw -> 156 - let conn = Conpool.connection ~sw pool endpoint in 157 - Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn; 158 - let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in 159 - Eio.Buf_read.take_all buf) 136 + let pool = Conpool.create ~sw ~net ~clock ~tls ~protocol:h2_handler () 160 137 ]} *) 161 138 162 - val connection_with_info : sw:Eio.Switch.t -> t -> Endpoint.t -> connection_with_info 163 - (** [connection_with_info ~sw pool endpoint] acquires a connection with TLS info. 139 + (** {2 Connection Acquisition} *) 164 140 165 - Like {!val-connection}, but returns a record containing both the connection flow 166 - and TLS epoch data (if the connection uses TLS). 141 + val connection : sw:Eio.Switch.t -> 'state t -> Endpoint.t -> 'state connection_info 142 + (** [connection ~sw pool endpoint] acquires a connection from the pool. 167 143 168 - This is useful for protocols that need access to TLS session information, 169 - such as ACE-MQTT (RFC 9431) which uses TLS exporter material for 170 - proof-of-possession authentication. 144 + The connection is automatically released when [sw] finishes: 145 + - Exclusive mode: connection returns to idle pool 146 + - Shared mode: user count is decremented 147 + 148 + Behavior depends on access mode: 149 + - Exclusive: blocks until a connection is available 150 + - Shared: may share an existing connection if under max_concurrent limit 171 151 172 152 Example: 173 153 {[ 174 - let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in 175 154 Eio.Switch.run (fun sw -> 176 - let info = Conpool.connection_with_info ~sw pool endpoint in 177 - (* Access TLS epoch for PoP authentication *) 178 - match info.tls_epoch with 179 - | Some epoch -> 180 - let challenge = Tls.Engine.export_key_material epoch label 32 in 181 - (* Use challenge for authentication *) 182 - | None -> 183 - failwith "TLS required for ACE authentication"; 184 - (* Use info.flow for MQTT communication *) 185 - Eio.Flow.copy_string data info.flow) 155 + let conn = Conpool.connection ~sw pool endpoint in 156 + (* For HTTP/1.x: conn.state is () *) 157 + (* For HTTP/2: conn.state is H2_client.t *) 158 + Eio.Flow.copy_string data conn.flow) 186 159 ]} *) 187 160 188 - val with_connection : t -> Endpoint.t -> (connection -> 'a) -> 'a 189 - (** [with_connection pool endpoint fn] is a convenience wrapper around 190 - {!val:connection}. 161 + val with_connection : 'state t -> Endpoint.t -> ('state connection_info -> 'a) -> 'a 162 + (** [with_connection pool endpoint fn] is a convenience wrapper. 191 163 192 164 Equivalent to: 193 165 {[ 194 166 Eio.Switch.run (fun sw -> fn (connection ~sw pool endpoint)) 195 - ]} 196 - 197 - Example: 198 - {[ 199 - let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in 200 - Conpool.with_connection pool endpoint (fun conn -> 201 - (* Use conn for HTTP request, Redis command, etc. *) 202 - Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn; 203 - let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in 204 - Eio.Buf_read.take_all buf) 205 167 ]} *) 206 168 207 - (** {1 Statistics & Monitoring} *) 169 + (** {1 Statistics & Management} *) 208 170 209 - val stats : t -> Endpoint.t -> Stats.t 210 - (** Get statistics for specific endpoint *) 171 + val stats : 'state t -> Endpoint.t -> Stats.t 172 + (** Get statistics for specific endpoint. *) 211 173 212 - val all_stats : t -> (Endpoint.t * Stats.t) list 213 - (** Get statistics for all endpoints in pool *) 174 + val all_stats : 'state t -> (Endpoint.t * Stats.t) list 175 + (** Get statistics for all endpoints in pool. *) 214 176 215 - (** {1 Pool Management} *) 216 - 217 - val clear_endpoint : t -> Endpoint.t -> unit 218 - (** Clear all cached connections for a specific endpoint. 219 - 220 - This removes the endpoint from the pool, discarding all idle connections. 221 - Active connections will continue to work but won't be returned to the pool. 222 - 223 - Use this when you know an endpoint's connections are no longer valid (e.g., 224 - server restarted, network reconfigured, credentials changed). 225 - 226 - The pool will be automatically cleaned up when its switch is released. *) 177 + val clear_endpoint : 'state t -> Endpoint.t -> unit 178 + (** Clear all connections for an endpoint. *)
+4 -4
test/stress_test.ml
··· 173 173 let start_time = Eio.Time.now clock in 174 174 175 175 try 176 - Conpool.with_connection pool endpoint (fun flow -> 176 + Conpool.with_connection pool endpoint (fun conn -> 177 177 (* Send message *) 178 - Eio.Flow.copy_string message flow; 179 - Eio.Flow.copy_string "\n" flow; 178 + Eio.Flow.copy_string message conn.Conpool.flow; 179 + Eio.Flow.copy_string "\n" conn.Conpool.flow; 180 180 181 181 (* Read echo response *) 182 - let response = Eio.Buf_read.of_flow flow ~max_size:(msg_len + 1) in 182 + let response = Eio.Buf_read.of_flow conn.Conpool.flow ~max_size:(msg_len + 1) in 183 183 let echoed = Eio.Buf_read.line response in 184 184 185 185 let end_time = Eio.Time.now clock in