forked from
anil.recoil.org/monopam-myspace
My aggregated monorepo of OCaml code, automaintained
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(** Low-level HTTP/1.1 client over raw TCP connections for connection pooling
7
8 This module orchestrates [Http_write] for request serialization and
9 [Http_read] for response parsing, leveraging Eio's Buf_write and Buf_read
10 for efficient I/O.
11
12 Types are imported from domain-specific modules ({!Response_limits},
13 {!Expect_continue}) and re-exported for API convenience. *)
14
15let src = Logs.Src.create "requests.http_client" ~doc:"Low-level HTTP client"
16module Log = (val Logs.src_log src : Logs.LOG)
17
18(** {1 Types}
19
20 Re-exported from domain-specific modules for API convenience. *)
21
22type limits = Response_limits.t
23let default_limits = Response_limits.default
24
25type expect_100_config = Expect_continue.t
26let default_expect_100_config = Expect_continue.default
27
28(** {1 Decompression Support} *)
29
30(** Generic decompression helper that handles common setup and result handling.
31 The [uncompress] function receives refill/flush callbacks and input/output buffers. *)
32let decompress_with ~name ~uncompress data =
33 Log.debug (fun m -> m "Decompressing %s data (%d bytes)" name (String.length data));
34 let i = De.bigstring_create De.io_buffer_size in
35 let o = De.bigstring_create De.io_buffer_size in
36 let r = Buffer.create (String.length data * 2) in
37 let p = ref 0 in
38 let refill buf =
39 let len = min (String.length data - !p) De.io_buffer_size in
40 Bigstringaf.blit_from_string data ~src_off:!p buf ~dst_off:0 ~len;
41 p := !p + len;
42 len
43 in
44 let flush buf len =
45 Buffer.add_string r (Bigstringaf.substring buf ~off:0 ~len)
46 in
47 match uncompress ~refill ~flush i o with
48 | Ok _ ->
49 let result = Buffer.contents r in
50 Log.debug (fun m -> m "%s decompression succeeded: %d -> %d bytes"
51 name (String.length data) (String.length result));
52 Some result
53 | Error (`Msg e) ->
54 Log.warn (fun m -> m "%s decompression failed: %s" name e);
55 None
56
57(** Decompress gzip-encoded data. Returns [Some decompressed] on success, [None] on failure. *)
58let decompress_gzip data =
59 decompress_with ~name:"gzip" data
60 ~uncompress:(fun ~refill ~flush i o -> Gz.Higher.uncompress ~refill ~flush i o)
61
62(** Decompress deflate-encoded data (raw DEFLATE, RFC 1951). Returns [Some decompressed] on success, [None] on failure. *)
63let decompress_deflate data =
64 let w = De.make_window ~bits:15 in
65 decompress_with ~name:"deflate" data
66 ~uncompress:(fun ~refill ~flush i o -> De.Higher.uncompress ~w ~refill ~flush i o)
67
68(** Decompress zlib-encoded data (DEFLATE with zlib header, RFC 1950). Returns [Some decompressed] on success, [None] on failure. *)
69let decompress_zlib data =
70 let allocate bits = De.make_window ~bits in
71 decompress_with ~name:"zlib" data
72 ~uncompress:(fun ~refill ~flush i o -> Zl.Higher.uncompress ~allocate ~refill ~flush i o)
73
74(** {1 Decompression Bomb Prevention}
75
76 Per Recommendation #25: Check decompressed size and ratio limits *)
77
78let check_decompression_limits ~limits ~compressed_size decompressed =
79 let decompressed_size = Int64.of_int (String.length decompressed) in
80 let compressed_size_i64 = Int64.of_int compressed_size in
81 let max_decompressed = Response_limits.max_decompressed_size limits in
82
83 (* Check absolute size *)
84 if decompressed_size > max_decompressed then begin
85 let ratio = Int64.to_float decompressed_size /. Int64.to_float compressed_size_i64 in
86 raise (Error.err (Error.Decompression_bomb {
87 limit = max_decompressed;
88 ratio
89 }))
90 end;
91
92 (* Check ratio - only if compressed size is > 0 to avoid division by zero *)
93 if compressed_size > 0 then begin
94 let ratio = Int64.to_float decompressed_size /. Int64.to_float compressed_size_i64 in
95 if ratio > Response_limits.max_compression_ratio limits then
96 raise (Error.err (Error.Decompression_bomb {
97 limit = max_decompressed;
98 ratio
99 }))
100 end;
101
102 decompressed
103
104(** Decompress body based on Content-Encoding header with limits *)
105let decompress_body ~limits ~content_encoding body =
106 let encoding = String.lowercase_ascii (String.trim content_encoding) in
107 let compressed_size = String.length body in
108 match encoding with
109 | "gzip" | "x-gzip" ->
110 (match decompress_gzip body with
111 | Some decompressed -> check_decompression_limits ~limits ~compressed_size decompressed
112 | None -> body) (* Fall back to raw body on error *)
113 | "deflate" ->
114 (* "deflate" in HTTP can mean either raw DEFLATE or zlib-wrapped.
115 Many servers send zlib-wrapped data despite the spec. Try zlib first,
116 then fall back to raw deflate. *)
117 (match decompress_zlib body with
118 | Some decompressed -> check_decompression_limits ~limits ~compressed_size decompressed
119 | None ->
120 match decompress_deflate body with
121 | Some decompressed -> check_decompression_limits ~limits ~compressed_size decompressed
122 | None -> body)
123 | "identity" | "" -> body
124 | other ->
125 Log.warn (fun m -> m "Unknown Content-Encoding '%s', returning raw body" other);
126 body
127
128(** {1 Request Execution} *)
129
130(** Write request body to flow, handling empty, chunked, and fixed-length bodies *)
131let write_body_to_flow ~sw flow body =
132 Http_write.write_and_flush flow (fun w ->
133 if Body.Private.is_empty body then
134 ()
135 else if Body.Private.is_chunked body then
136 Body.Private.write_chunked ~sw w body
137 else
138 Body.Private.write ~sw w body
139 )
140
141(** Apply auto-decompression to response if enabled *)
142let maybe_decompress ~limits ~auto_decompress (status, resp_headers, body_str) =
143 match auto_decompress, Headers.get `Content_encoding resp_headers with
144 | true, Some encoding ->
145 let body_str = decompress_body ~limits ~content_encoding:encoding body_str in
146 let resp_headers = Headers.remove `Content_encoding resp_headers in
147 (status, resp_headers, body_str)
148 | _ ->
149 (status, resp_headers, body_str)
150
151(** Make HTTP request over a pooled connection using Buf_write/Buf_read *)
152let make_request ?(limits=default_limits) ~sw ~method_ ~uri ~headers ~body flow =
153 Log.debug (fun m -> m "Making %s request to %s" (Method.to_string method_) (Uri.to_string uri));
154
155 (* Write request using Buf_write - use write_and_flush to avoid nested switch issues *)
156 Http_write.write_and_flush flow (fun w ->
157 Http_write.request w ~sw ~method_ ~uri ~headers ~body
158 );
159
160 (* Read response using Buf_read *)
161 let buf_read = Http_read.of_flow flow ~max_size:max_int in
162 let (_version, status, headers, body) = Http_read.response ~limits ~method_ buf_read in
163 (status, headers, body)
164
165(** Make HTTP request with optional auto-decompression *)
166let make_request_decompress ?(limits=default_limits) ~sw ~method_ ~uri ~headers ~body ~auto_decompress flow =
167 make_request ~limits ~sw ~method_ ~uri ~headers ~body flow
168 |> maybe_decompress ~limits ~auto_decompress
169
170(** {1 HTTP 100-Continue Protocol Implementation}
171
172 Per Recommendation #7: HTTP 100-Continue Support for Large Uploads.
173 RFC 9110 Section 10.1.1 (Expect) and Section 15.2.1 (100 Continue)
174
175 The 100-continue protocol allows:
176 1. Client sends headers with [Expect: 100-continue]
177 2. Server responds with 100 Continue (proceed) or error (4xx/5xx)
178 3. Client sends body only if 100 Continue received
179 4. Server sends final response
180
181 This saves bandwidth when server would reject based on headers alone. *)
182
183(** Result of waiting for 100-continue response *)
184type expect_100_result =
185 | Continue (** Server sent 100 Continue - proceed with body *)
186 | Rejected of int * Headers.t * string (** Server rejected - status, headers, body *)
187 | Timeout (** Timeout expired - proceed with body anyway *)
188
189(** Wait for 100 Continue or error response with timeout.
190 Returns Continue, Rejected, or Timeout. *)
191let wait_for_100_continue ~limits ~timeout:_ flow =
192 Log.debug (fun m -> m "Waiting for 100 Continue response");
193
194 let buf_read = Http_read.of_flow flow ~max_size:max_int in
195
196 try
197 let (_version, status) = Http_read.status_line buf_read in
198
199 Log.debug (fun m -> m "Received response status %d while waiting for 100 Continue" status);
200
201 if status = 100 then begin
202 (* 100 Continue - read any headers (usually none) and return Continue *)
203 let _ = Http_read.headers ~limits buf_read in
204 Log.info (fun m -> m "Received 100 Continue, proceeding with body");
205 Continue
206 end else begin
207 (* Error response - server rejected based on headers *)
208 Log.info (fun m -> m "Server rejected request with status %d before body sent" status);
209 let resp_headers = Http_read.headers ~limits buf_read in
210 let transfer_encoding = Headers.get `Transfer_encoding resp_headers in
211 let content_length = Headers.get `Content_length resp_headers |> Option.map Int64.of_string in
212 let body_str = match transfer_encoding, content_length with
213 | Some te, _ when String.lowercase_ascii te |> String.trim = "chunked" ->
214 Http_read.chunked_body ~limits buf_read
215 | _, Some len -> Http_read.fixed_body ~limits ~length:len buf_read
216 | _ -> ""
217 in
218 Rejected (status, resp_headers, body_str)
219 end
220 with
221 | Eio.Buf_read.Buffer_limit_exceeded ->
222 Log.warn (fun m -> m "Buffer limit exceeded waiting for 100 Continue");
223 Timeout
224 | End_of_file ->
225 Log.warn (fun m -> m "Connection closed waiting for 100 Continue");
226 Timeout
227
228(** Make HTTP request with 100-continue support for large bodies.
229
230 If the body exceeds the threshold and 100-continue is enabled:
231 1. Sends headers with Expect: 100-continue
232 2. Waits for server response (100 Continue or error)
233 3. Sends body only if 100 Continue received
234 4. Otherwise returns the error response without sending body
235
236 Per RFC 9110:
237 - If timeout expires, client should send body anyway
238 - 417 Expectation Failed means server doesn't support Expect header
239 - Any error response (4xx/5xx) should be returned without sending body *)
240let make_request_100_continue
241 ?(limits=default_limits)
242 ?(expect_100=default_expect_100_config)
243 ~clock
244 ~sw
245 ~method_
246 ~uri
247 ~headers
248 ~body
249 flow =
250 let body_len = Body.content_length body |> Option.value ~default:0L in
251
252 (* Determine if we should use 100-continue *)
253 let use_100_continue =
254 Expect_continue.enabled expect_100 &&
255 body_len >= Expect_continue.threshold expect_100 &&
256 body_len > 0L &&
257 not (Headers.mem `Expect headers) (* Don't override explicit Expect header *)
258 in
259
260 if not use_100_continue then begin
261 (* Standard request without 100-continue *)
262 Log.debug (fun m -> m "100-continue not used (body_len=%Ld, threshold=%Ld, enabled=%b)"
263 body_len (Expect_continue.threshold expect_100) (Expect_continue.enabled expect_100));
264 make_request ~limits ~sw ~method_ ~uri ~headers ~body flow
265 end else begin
266 Log.info (fun m -> m "Using 100-continue for large body (%Ld bytes)" body_len);
267
268 (* Add Expect: 100-continue header and Content-Type if present *)
269 let headers_with_expect = Headers.expect_100_continue headers in
270 let headers_with_expect = match Body.content_type body with
271 | Some mime -> Headers.add `Content_type (Mime.to_string mime) headers_with_expect
272 | None -> headers_with_expect
273 in
274
275 (* Send headers only using Buf_write *)
276 Http_write.write_and_flush flow (fun w ->
277 Http_write.request_headers_only w ~method_ ~uri
278 ~headers:headers_with_expect ~content_length:(Some body_len)
279 );
280
281 (* Wait for 100 Continue or error response with timeout *)
282 let result =
283 try
284 Eio.Time.with_timeout_exn clock (Expect_continue.timeout expect_100) (fun () ->
285 wait_for_100_continue ~limits ~timeout:(Expect_continue.timeout expect_100) flow
286 )
287 with Eio.Time.Timeout ->
288 Log.debug (fun m -> m "100-continue timeout expired, sending body anyway");
289 Timeout
290 in
291
292 match result with
293 | Continue ->
294 (* Server said continue - send body and read final response *)
295 Log.debug (fun m -> m "Sending body after 100 Continue");
296 write_body_to_flow ~sw flow body;
297
298 (* Read final response *)
299 let buf_read = Http_read.of_flow flow ~max_size:max_int in
300 let (_version, status, headers, body) = Http_read.response ~limits ~method_ buf_read in
301 (status, headers, body)
302
303 | Rejected (status, resp_headers, resp_body_str) ->
304 (* RFC 9110 Section 10.1.1: If we receive 417 Expectation Failed, retry
305 without the 100-continue expectation *)
306 if status = 417 then begin
307 Log.info (fun m -> m "Received 417 Expectation Failed, retrying without Expect header");
308 (* Make a fresh request without Expect: 100-continue *)
309 make_request ~limits ~sw ~method_ ~uri ~headers ~body flow
310 end else begin
311 (* Server rejected with non-417 error - return error response without sending body *)
312 Log.info (fun m -> m "Request rejected with status %d, body not sent (saved %Ld bytes)"
313 status body_len);
314 (status, resp_headers, resp_body_str)
315 end
316
317 | Timeout ->
318 (* Timeout expired - send body anyway per RFC 9110 *)
319 Log.debug (fun m -> m "Sending body after timeout");
320 write_body_to_flow ~sw flow body;
321
322 (* Read response *)
323 let buf_read = Http_read.of_flow flow ~max_size:max_int in
324 let (_version, status, headers, body) = Http_read.response ~limits ~method_ buf_read in
325 (status, headers, body)
326 end
327
328(** Make HTTP request with 100-continue support and optional auto-decompression *)
329let make_request_100_continue_decompress
330 ?(limits=default_limits)
331 ?(expect_100=default_expect_100_config)
332 ~clock ~sw ~method_ ~uri ~headers ~body ~auto_decompress flow =
333 make_request_100_continue ~limits ~expect_100 ~clock ~sw ~method_ ~uri ~headers ~body flow
334 |> maybe_decompress ~limits ~auto_decompress