A batteries included HTTP/1.1 client in OCaml
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(** HTTP/2 Protocol Handler for Conpool.
7
8 This module provides a protocol handler that enables HTTP/2 connection
9 multiplexing through Conpool's typed pool API. It manages H2_client
10 instances and supports concurrent stream multiplexing.
11
12 Key features:
13 - Shared connection mode (multiple streams per connection)
14 - GOAWAY handling for graceful degradation
15 - Stream slot management respecting peer's max_concurrent_streams *)
16
17let src = Logs.Src.create "requests.h2_conpool" ~doc:"HTTP/2 Connection Pool Handler"
18module Log = (val Logs.src_log src : Logs.LOG)
19
20(** {1 State Type} *)
21
22(** HTTP/2 connection state managed by Conpool.
23 This wraps H2_client.t with additional tracking for stream management. *)
24type h2_state = {
25 client : H2_client.t;
26 (** The underlying HTTP/2 client. *)
27 flow : Conpool.Config.connection_flow;
28 (** The connection flow (needed for H2 operations). *)
29 sw : Eio.Switch.t;
30 (** Connection-lifetime switch for the reader fiber. *)
31 mutable reader_started : bool;
32 (** Whether the background reader fiber has been started. *)
33 mutable goaway_received : bool;
34 (** Whether GOAWAY has been received from peer. *)
35 mutable last_goaway_stream : int32;
36 (** Last stream ID from GOAWAY (streams > this may be retried). *)
37 mutable max_concurrent_streams : int;
38 (** Cached max_concurrent_streams from peer settings. *)
39}
40
41(** {1 Protocol Configuration} *)
42
43(** Initialize HTTP/2 state for a new connection.
44 Performs the HTTP/2 handshake and extracts peer settings.
45 The [sw] parameter is a connection-lifetime switch that will be used
46 to spawn the background reader fiber when on_acquire is called. *)
47let init_state ~sw ~flow ~tls_epoch:_ =
48 Log.info (fun m -> m "Initializing HTTP/2 connection state");
49
50 let client = H2_client.create () in
51
52 (* Perform HTTP/2 handshake *)
53 match H2_client.handshake flow client with
54 | Ok () ->
55 Log.info (fun m -> m "HTTP/2 handshake complete");
56
57 (* Get max_concurrent_streams from peer settings *)
58 let conn = H2_client.connection client in
59 let peer_settings = H2_connection.peer_settings conn in
60 let max_streams =
61 match peer_settings.H2_connection.max_concurrent_streams with
62 | Some n -> n
63 | None -> 100 (* Default per RFC 9113 *)
64 in
65
66 Log.debug (fun m -> m "Peer max_concurrent_streams: %d" max_streams);
67
68 {
69 client;
70 flow;
71 sw;
72 reader_started = false;
73 goaway_received = false;
74 last_goaway_stream = Int32.max_int;
75 max_concurrent_streams = max_streams;
76 }
77
78 | Error msg ->
79 Log.err (fun m -> m "HTTP/2 handshake failed: %s" msg);
80 failwith ("HTTP/2 handshake failed: " ^ msg)
81
82(** Called when a connection is acquired from the pool.
83 Starts the background reader fiber on first acquisition. *)
84let on_acquire state =
85 if not state.reader_started then begin
86 Log.info (fun m -> m "Starting HTTP/2 background reader fiber");
87 H2_client.start_reader ~sw:state.sw state.flow state.client
88 ~on_goaway:(fun ~last_stream_id ~error_code:_ ~debug ->
89 Log.info (fun m -> m "GOAWAY received: last_stream_id=%ld, debug=%s"
90 last_stream_id debug);
91 state.goaway_received <- true;
92 state.last_goaway_stream <- last_stream_id);
93 state.reader_started <- true
94 end
95
96(** Called when a connection is released back to the pool.
97 For HTTP/2, this is a no-op since the reader keeps running. *)
98let on_release _state = ()
99
100(** Check if the HTTP/2 connection is still healthy. *)
101let is_healthy state =
102 if state.goaway_received then begin
103 Log.debug (fun m -> m "HTTP/2 connection unhealthy: GOAWAY received");
104 false
105 end else if not (H2_client.is_open state.client) then begin
106 Log.debug (fun m -> m "HTTP/2 connection unhealthy: connection closed");
107 false
108 end else
109 true
110
111(** Cleanup when connection is destroyed. *)
112let on_close state =
113 Log.debug (fun m -> m "Closing HTTP/2 connection");
114 (* Send GOAWAY if connection is still open *)
115 if H2_client.is_open state.client then begin
116 H2_client.close state.flow state.client
117 end
118
119(** Get access mode for this connection.
120 HTTP/2 supports multiplexing: multiple concurrent streams per connection. *)
121let access_mode state =
122 Conpool.Config.Shared state.max_concurrent_streams
123
124(** The protocol configuration for HTTP/2 connections. *)
125let h2_protocol : h2_state Conpool.Config.protocol_config = {
126 Conpool.Config.init_state;
127 on_acquire;
128 on_release;
129 is_healthy;
130 on_close;
131 access_mode;
132}
133
134(** {1 Request Functions} *)
135
136(** Make an HTTP/2 request using the pooled connection state.
137
138 Uses the concurrent request path with the connection-lifetime reader fiber.
139 Multiple requests can be made concurrently on the same connection.
140
141 @param state The HTTP/2 state from Conpool
142 @param uri Request URI
143 @param headers Request headers
144 @param body Optional request body
145 @param method_ HTTP method
146 @param auto_decompress Whether to decompress response body
147 @return Response or error message *)
148let request
149 ~(state : h2_state)
150 ~(uri : Uri.t)
151 ~(headers : Headers.t)
152 ?(body : Body.t option)
153 ~(method_ : Method.t)
154 ~auto_decompress
155 ()
156 : (H2_adapter.response, string) result =
157
158 (* Validate HTTP/2 header constraints *)
159 match Headers.validate_h2_user_headers headers with
160 | Error e ->
161 Error (Format.asprintf "Invalid HTTP/2 request headers: %a"
162 Headers.pp_h2_validation_error e)
163 | Ok () ->
164 (* Check connection health before making request *)
165 if state.goaway_received then
166 Error "Connection received GOAWAY"
167 else if not (H2_client.is_open state.client) then
168 Error "Connection is closed"
169 else begin
170 let h2_headers = Headers.to_list headers in
171 let h2_body = Option.bind body H2_adapter.body_to_string_opt in
172 let meth = Method.to_string method_ in
173
174 Log.debug (fun m -> m "Making HTTP/2 request on pooled connection: %s %s"
175 meth (Uri.to_string uri));
176
177 (* Use concurrent request path - reader fiber is already running *)
178 match H2_client.request ~sw:state.sw state.flow state.client
179 { H2_protocol.meth; uri; headers = h2_headers; body = h2_body } with
180 | Ok resp ->
181 (* Check if GOAWAY was received during request *)
182 let conn = H2_client.connection state.client in
183 if H2_connection.is_closing conn then begin
184 Log.info (fun m -> m "GOAWAY received during request");
185 state.goaway_received <- true
186 end;
187
188 (* Update max_concurrent_streams if it changed *)
189 let peer_settings = H2_connection.peer_settings conn in
190 (match peer_settings.H2_connection.max_concurrent_streams with
191 | Some n when n <> state.max_concurrent_streams ->
192 Log.debug (fun m -> m "Peer max_concurrent_streams changed: %d -> %d"
193 state.max_concurrent_streams n);
194 state.max_concurrent_streams <- n
195 | _ -> ());
196
197 Ok (H2_adapter.make_response ~auto_decompress resp)
198
199 | Error msg ->
200 Log.warn (fun m -> m "HTTP/2 request failed: %s" msg);
201 Error msg
202 end