A batteries included HTTP/1.1 client in OCaml
at main 202 lines 7.6 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(** HTTP/2 Protocol Handler for Conpool. 7 8 This module provides a protocol handler that enables HTTP/2 connection 9 multiplexing through Conpool's typed pool API. It manages H2_client 10 instances and supports concurrent stream multiplexing. 11 12 Key features: 13 - Shared connection mode (multiple streams per connection) 14 - GOAWAY handling for graceful degradation 15 - Stream slot management respecting peer's max_concurrent_streams *) 16 17let src = Logs.Src.create "requests.h2_conpool" ~doc:"HTTP/2 Connection Pool Handler" 18module Log = (val Logs.src_log src : Logs.LOG) 19 20(** {1 State Type} *) 21 22(** HTTP/2 connection state managed by Conpool. 23 This wraps H2_client.t with additional tracking for stream management. *) 24type h2_state = { 25 client : H2_client.t; 26 (** The underlying HTTP/2 client. *) 27 flow : Conpool.Config.connection_flow; 28 (** The connection flow (needed for H2 operations). *) 29 sw : Eio.Switch.t; 30 (** Connection-lifetime switch for the reader fiber. *) 31 mutable reader_started : bool; 32 (** Whether the background reader fiber has been started. *) 33 mutable goaway_received : bool; 34 (** Whether GOAWAY has been received from peer. *) 35 mutable last_goaway_stream : int32; 36 (** Last stream ID from GOAWAY (streams > this may be retried). *) 37 mutable max_concurrent_streams : int; 38 (** Cached max_concurrent_streams from peer settings. *) 39} 40 41(** {1 Protocol Configuration} *) 42 43(** Initialize HTTP/2 state for a new connection. 44 Performs the HTTP/2 handshake and extracts peer settings. 45 The [sw] parameter is a connection-lifetime switch that will be used 46 to spawn the background reader fiber when on_acquire is called. *) 47let init_state ~sw ~flow ~tls_epoch:_ = 48 Log.info (fun m -> m "Initializing HTTP/2 connection state"); 49 50 let client = H2_client.create () in 51 52 (* Perform HTTP/2 handshake *) 53 match H2_client.handshake flow client with 54 | Ok () -> 55 Log.info (fun m -> m "HTTP/2 handshake complete"); 56 57 (* Get max_concurrent_streams from peer settings *) 58 let conn = H2_client.connection client in 59 let peer_settings = H2_connection.peer_settings conn in 60 let max_streams = 61 match peer_settings.H2_connection.max_concurrent_streams with 62 | Some n -> n 63 | None -> 100 (* Default per RFC 9113 *) 64 in 65 66 Log.debug (fun m -> m "Peer max_concurrent_streams: %d" max_streams); 67 68 { 69 client; 70 flow; 71 sw; 72 reader_started = false; 73 goaway_received = false; 74 last_goaway_stream = Int32.max_int; 75 max_concurrent_streams = max_streams; 76 } 77 78 | Error msg -> 79 Log.err (fun m -> m "HTTP/2 handshake failed: %s" msg); 80 failwith ("HTTP/2 handshake failed: " ^ msg) 81 82(** Called when a connection is acquired from the pool. 83 Starts the background reader fiber on first acquisition. *) 84let on_acquire state = 85 if not state.reader_started then begin 86 Log.info (fun m -> m "Starting HTTP/2 background reader fiber"); 87 H2_client.start_reader ~sw:state.sw state.flow state.client 88 ~on_goaway:(fun ~last_stream_id ~error_code:_ ~debug -> 89 Log.info (fun m -> m "GOAWAY received: last_stream_id=%ld, debug=%s" 90 last_stream_id debug); 91 state.goaway_received <- true; 92 state.last_goaway_stream <- last_stream_id); 93 state.reader_started <- true 94 end 95 96(** Called when a connection is released back to the pool. 97 For HTTP/2, this is a no-op since the reader keeps running. *) 98let on_release _state = () 99 100(** Check if the HTTP/2 connection is still healthy. *) 101let is_healthy state = 102 if state.goaway_received then begin 103 Log.debug (fun m -> m "HTTP/2 connection unhealthy: GOAWAY received"); 104 false 105 end else if not (H2_client.is_open state.client) then begin 106 Log.debug (fun m -> m "HTTP/2 connection unhealthy: connection closed"); 107 false 108 end else 109 true 110 111(** Cleanup when connection is destroyed. *) 112let on_close state = 113 Log.debug (fun m -> m "Closing HTTP/2 connection"); 114 (* Send GOAWAY if connection is still open *) 115 if H2_client.is_open state.client then begin 116 H2_client.close state.flow state.client 117 end 118 119(** Get access mode for this connection. 120 HTTP/2 supports multiplexing: multiple concurrent streams per connection. *) 121let access_mode state = 122 Conpool.Config.Shared state.max_concurrent_streams 123 124(** The protocol configuration for HTTP/2 connections. *) 125let h2_protocol : h2_state Conpool.Config.protocol_config = { 126 Conpool.Config.init_state; 127 on_acquire; 128 on_release; 129 is_healthy; 130 on_close; 131 access_mode; 132} 133 134(** {1 Request Functions} *) 135 136(** Make an HTTP/2 request using the pooled connection state. 137 138 Uses the concurrent request path with the connection-lifetime reader fiber. 139 Multiple requests can be made concurrently on the same connection. 140 141 @param state The HTTP/2 state from Conpool 142 @param uri Request URI 143 @param headers Request headers 144 @param body Optional request body 145 @param method_ HTTP method 146 @param auto_decompress Whether to decompress response body 147 @return Response or error message *) 148let request 149 ~(state : h2_state) 150 ~(uri : Uri.t) 151 ~(headers : Headers.t) 152 ?(body : Body.t option) 153 ~(method_ : Method.t) 154 ~auto_decompress 155 () 156 : (H2_adapter.response, string) result = 157 158 (* Validate HTTP/2 header constraints *) 159 match Headers.validate_h2_user_headers headers with 160 | Error e -> 161 Error (Format.asprintf "Invalid HTTP/2 request headers: %a" 162 Headers.pp_h2_validation_error e) 163 | Ok () -> 164 (* Check connection health before making request *) 165 if state.goaway_received then 166 Error "Connection received GOAWAY" 167 else if not (H2_client.is_open state.client) then 168 Error "Connection is closed" 169 else begin 170 let h2_headers = Headers.to_list headers in 171 let h2_body = Option.bind body H2_adapter.body_to_string_opt in 172 let meth = Method.to_string method_ in 173 174 Log.debug (fun m -> m "Making HTTP/2 request on pooled connection: %s %s" 175 meth (Uri.to_string uri)); 176 177 (* Use concurrent request path - reader fiber is already running *) 178 match H2_client.request ~sw:state.sw state.flow state.client 179 { H2_protocol.meth; uri; headers = h2_headers; body = h2_body } with 180 | Ok resp -> 181 (* Check if GOAWAY was received during request *) 182 let conn = H2_client.connection state.client in 183 if H2_connection.is_closing conn then begin 184 Log.info (fun m -> m "GOAWAY received during request"); 185 state.goaway_received <- true 186 end; 187 188 (* Update max_concurrent_streams if it changed *) 189 let peer_settings = H2_connection.peer_settings conn in 190 (match peer_settings.H2_connection.max_concurrent_streams with 191 | Some n when n <> state.max_concurrent_streams -> 192 Log.debug (fun m -> m "Peer max_concurrent_streams changed: %d -> %d" 193 state.max_concurrent_streams n); 194 state.max_concurrent_streams <- n 195 | _ -> ()); 196 197 Ok (H2_adapter.make_response ~auto_decompress resp) 198 199 | Error msg -> 200 Log.warn (fun m -> m "HTTP/2 request failed: %s" msg); 201 Error msg 202 end