My aggregated monorepo of OCaml code, automaintained
at doc-fixes 334 lines 14 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(** Low-level HTTP/1.1 client over raw TCP connections for connection pooling 7 8 This module orchestrates [Http_write] for request serialization and 9 [Http_read] for response parsing, leveraging Eio's Buf_write and Buf_read 10 for efficient I/O. 11 12 Types are imported from domain-specific modules ({!Response_limits}, 13 {!Expect_continue}) and re-exported for API convenience. *) 14 15let src = Logs.Src.create "requests.http_client" ~doc:"Low-level HTTP client" 16module Log = (val Logs.src_log src : Logs.LOG) 17 18(** {1 Types} 19 20 Re-exported from domain-specific modules for API convenience. *) 21 22type limits = Response_limits.t 23let default_limits = Response_limits.default 24 25type expect_100_config = Expect_continue.t 26let default_expect_100_config = Expect_continue.default 27 28(** {1 Decompression Support} *) 29 30(** Generic decompression helper that handles common setup and result handling. 31 The [uncompress] function receives refill/flush callbacks and input/output buffers. *) 32let decompress_with ~name ~uncompress data = 33 Log.debug (fun m -> m "Decompressing %s data (%d bytes)" name (String.length data)); 34 let i = De.bigstring_create De.io_buffer_size in 35 let o = De.bigstring_create De.io_buffer_size in 36 let r = Buffer.create (String.length data * 2) in 37 let p = ref 0 in 38 let refill buf = 39 let len = min (String.length data - !p) De.io_buffer_size in 40 Bigstringaf.blit_from_string data ~src_off:!p buf ~dst_off:0 ~len; 41 p := !p + len; 42 len 43 in 44 let flush buf len = 45 Buffer.add_string r (Bigstringaf.substring buf ~off:0 ~len) 46 in 47 match uncompress ~refill ~flush i o with 48 | Ok _ -> 49 let result = Buffer.contents r in 50 Log.debug (fun m -> m "%s decompression succeeded: %d -> %d bytes" 51 name (String.length data) (String.length result)); 52 Some result 53 | Error (`Msg e) -> 54 Log.warn (fun m -> m "%s decompression failed: %s" name e); 55 None 56 57(** Decompress gzip-encoded data. Returns [Some decompressed] on success, [None] on failure. *) 58let decompress_gzip data = 59 decompress_with ~name:"gzip" data 60 ~uncompress:(fun ~refill ~flush i o -> Gz.Higher.uncompress ~refill ~flush i o) 61 62(** Decompress deflate-encoded data (raw DEFLATE, RFC 1951). Returns [Some decompressed] on success, [None] on failure. *) 63let decompress_deflate data = 64 let w = De.make_window ~bits:15 in 65 decompress_with ~name:"deflate" data 66 ~uncompress:(fun ~refill ~flush i o -> De.Higher.uncompress ~w ~refill ~flush i o) 67 68(** Decompress zlib-encoded data (DEFLATE with zlib header, RFC 1950). Returns [Some decompressed] on success, [None] on failure. *) 69let decompress_zlib data = 70 let allocate bits = De.make_window ~bits in 71 decompress_with ~name:"zlib" data 72 ~uncompress:(fun ~refill ~flush i o -> Zl.Higher.uncompress ~allocate ~refill ~flush i o) 73 74(** {1 Decompression Bomb Prevention} 75 76 Per Recommendation #25: Check decompressed size and ratio limits *) 77 78let check_decompression_limits ~limits ~compressed_size decompressed = 79 let decompressed_size = Int64.of_int (String.length decompressed) in 80 let compressed_size_i64 = Int64.of_int compressed_size in 81 let max_decompressed = Response_limits.max_decompressed_size limits in 82 83 (* Check absolute size *) 84 if decompressed_size > max_decompressed then begin 85 let ratio = Int64.to_float decompressed_size /. Int64.to_float compressed_size_i64 in 86 raise (Error.err (Error.Decompression_bomb { 87 limit = max_decompressed; 88 ratio 89 })) 90 end; 91 92 (* Check ratio - only if compressed size is > 0 to avoid division by zero *) 93 if compressed_size > 0 then begin 94 let ratio = Int64.to_float decompressed_size /. Int64.to_float compressed_size_i64 in 95 if ratio > Response_limits.max_compression_ratio limits then 96 raise (Error.err (Error.Decompression_bomb { 97 limit = max_decompressed; 98 ratio 99 })) 100 end; 101 102 decompressed 103 104(** Decompress body based on Content-Encoding header with limits *) 105let decompress_body ~limits ~content_encoding body = 106 let encoding = String.lowercase_ascii (String.trim content_encoding) in 107 let compressed_size = String.length body in 108 match encoding with 109 | "gzip" | "x-gzip" -> 110 (match decompress_gzip body with 111 | Some decompressed -> check_decompression_limits ~limits ~compressed_size decompressed 112 | None -> body) (* Fall back to raw body on error *) 113 | "deflate" -> 114 (* "deflate" in HTTP can mean either raw DEFLATE or zlib-wrapped. 115 Many servers send zlib-wrapped data despite the spec. Try zlib first, 116 then fall back to raw deflate. *) 117 (match decompress_zlib body with 118 | Some decompressed -> check_decompression_limits ~limits ~compressed_size decompressed 119 | None -> 120 match decompress_deflate body with 121 | Some decompressed -> check_decompression_limits ~limits ~compressed_size decompressed 122 | None -> body) 123 | "identity" | "" -> body 124 | other -> 125 Log.warn (fun m -> m "Unknown Content-Encoding '%s', returning raw body" other); 126 body 127 128(** {1 Request Execution} *) 129 130(** Write request body to flow, handling empty, chunked, and fixed-length bodies *) 131let write_body_to_flow ~sw flow body = 132 Http_write.write_and_flush flow (fun w -> 133 if Body.Private.is_empty body then 134 () 135 else if Body.Private.is_chunked body then 136 Body.Private.write_chunked ~sw w body 137 else 138 Body.Private.write ~sw w body 139 ) 140 141(** Apply auto-decompression to response if enabled *) 142let maybe_decompress ~limits ~auto_decompress (status, resp_headers, body_str) = 143 match auto_decompress, Headers.get `Content_encoding resp_headers with 144 | true, Some encoding -> 145 let body_str = decompress_body ~limits ~content_encoding:encoding body_str in 146 let resp_headers = Headers.remove `Content_encoding resp_headers in 147 (status, resp_headers, body_str) 148 | _ -> 149 (status, resp_headers, body_str) 150 151(** Make HTTP request over a pooled connection using Buf_write/Buf_read *) 152let make_request ?(limits=default_limits) ~sw ~method_ ~uri ~headers ~body flow = 153 Log.debug (fun m -> m "Making %s request to %s" (Method.to_string method_) (Uri.to_string uri)); 154 155 (* Write request using Buf_write - use write_and_flush to avoid nested switch issues *) 156 Http_write.write_and_flush flow (fun w -> 157 Http_write.request w ~sw ~method_ ~uri ~headers ~body 158 ); 159 160 (* Read response using Buf_read *) 161 let buf_read = Http_read.of_flow flow ~max_size:max_int in 162 let (_version, status, headers, body) = Http_read.response ~limits ~method_ buf_read in 163 (status, headers, body) 164 165(** Make HTTP request with optional auto-decompression *) 166let make_request_decompress ?(limits=default_limits) ~sw ~method_ ~uri ~headers ~body ~auto_decompress flow = 167 make_request ~limits ~sw ~method_ ~uri ~headers ~body flow 168 |> maybe_decompress ~limits ~auto_decompress 169 170(** {1 HTTP 100-Continue Protocol Implementation} 171 172 Per Recommendation #7: HTTP 100-Continue Support for Large Uploads. 173 RFC 9110 Section 10.1.1 (Expect) and Section 15.2.1 (100 Continue) 174 175 The 100-continue protocol allows: 176 1. Client sends headers with [Expect: 100-continue] 177 2. Server responds with 100 Continue (proceed) or error (4xx/5xx) 178 3. Client sends body only if 100 Continue received 179 4. Server sends final response 180 181 This saves bandwidth when server would reject based on headers alone. *) 182 183(** Result of waiting for 100-continue response *) 184type expect_100_result = 185 | Continue (** Server sent 100 Continue - proceed with body *) 186 | Rejected of int * Headers.t * string (** Server rejected - status, headers, body *) 187 | Timeout (** Timeout expired - proceed with body anyway *) 188 189(** Wait for 100 Continue or error response with timeout. 190 Returns Continue, Rejected, or Timeout. *) 191let wait_for_100_continue ~limits ~timeout:_ flow = 192 Log.debug (fun m -> m "Waiting for 100 Continue response"); 193 194 let buf_read = Http_read.of_flow flow ~max_size:max_int in 195 196 try 197 let (_version, status) = Http_read.status_line buf_read in 198 199 Log.debug (fun m -> m "Received response status %d while waiting for 100 Continue" status); 200 201 if status = 100 then begin 202 (* 100 Continue - read any headers (usually none) and return Continue *) 203 let _ = Http_read.headers ~limits buf_read in 204 Log.info (fun m -> m "Received 100 Continue, proceeding with body"); 205 Continue 206 end else begin 207 (* Error response - server rejected based on headers *) 208 Log.info (fun m -> m "Server rejected request with status %d before body sent" status); 209 let resp_headers = Http_read.headers ~limits buf_read in 210 let transfer_encoding = Headers.get `Transfer_encoding resp_headers in 211 let content_length = Headers.get `Content_length resp_headers |> Option.map Int64.of_string in 212 let body_str = match transfer_encoding, content_length with 213 | Some te, _ when String.lowercase_ascii te |> String.trim = "chunked" -> 214 Http_read.chunked_body ~limits buf_read 215 | _, Some len -> Http_read.fixed_body ~limits ~length:len buf_read 216 | _ -> "" 217 in 218 Rejected (status, resp_headers, body_str) 219 end 220 with 221 | Eio.Buf_read.Buffer_limit_exceeded -> 222 Log.warn (fun m -> m "Buffer limit exceeded waiting for 100 Continue"); 223 Timeout 224 | End_of_file -> 225 Log.warn (fun m -> m "Connection closed waiting for 100 Continue"); 226 Timeout 227 228(** Make HTTP request with 100-continue support for large bodies. 229 230 If the body exceeds the threshold and 100-continue is enabled: 231 1. Sends headers with Expect: 100-continue 232 2. Waits for server response (100 Continue or error) 233 3. Sends body only if 100 Continue received 234 4. Otherwise returns the error response without sending body 235 236 Per RFC 9110: 237 - If timeout expires, client should send body anyway 238 - 417 Expectation Failed means server doesn't support Expect header 239 - Any error response (4xx/5xx) should be returned without sending body *) 240let make_request_100_continue 241 ?(limits=default_limits) 242 ?(expect_100=default_expect_100_config) 243 ~clock 244 ~sw 245 ~method_ 246 ~uri 247 ~headers 248 ~body 249 flow = 250 let body_len = Body.content_length body |> Option.value ~default:0L in 251 252 (* Determine if we should use 100-continue *) 253 let use_100_continue = 254 Expect_continue.enabled expect_100 && 255 body_len >= Expect_continue.threshold expect_100 && 256 body_len > 0L && 257 not (Headers.mem `Expect headers) (* Don't override explicit Expect header *) 258 in 259 260 if not use_100_continue then begin 261 (* Standard request without 100-continue *) 262 Log.debug (fun m -> m "100-continue not used (body_len=%Ld, threshold=%Ld, enabled=%b)" 263 body_len (Expect_continue.threshold expect_100) (Expect_continue.enabled expect_100)); 264 make_request ~limits ~sw ~method_ ~uri ~headers ~body flow 265 end else begin 266 Log.info (fun m -> m "Using 100-continue for large body (%Ld bytes)" body_len); 267 268 (* Add Expect: 100-continue header and Content-Type if present *) 269 let headers_with_expect = Headers.expect_100_continue headers in 270 let headers_with_expect = match Body.content_type body with 271 | Some mime -> Headers.add `Content_type (Mime.to_string mime) headers_with_expect 272 | None -> headers_with_expect 273 in 274 275 (* Send headers only using Buf_write *) 276 Http_write.write_and_flush flow (fun w -> 277 Http_write.request_headers_only w ~method_ ~uri 278 ~headers:headers_with_expect ~content_length:(Some body_len) 279 ); 280 281 (* Wait for 100 Continue or error response with timeout *) 282 let result = 283 try 284 Eio.Time.with_timeout_exn clock (Expect_continue.timeout expect_100) (fun () -> 285 wait_for_100_continue ~limits ~timeout:(Expect_continue.timeout expect_100) flow 286 ) 287 with Eio.Time.Timeout -> 288 Log.debug (fun m -> m "100-continue timeout expired, sending body anyway"); 289 Timeout 290 in 291 292 match result with 293 | Continue -> 294 (* Server said continue - send body and read final response *) 295 Log.debug (fun m -> m "Sending body after 100 Continue"); 296 write_body_to_flow ~sw flow body; 297 298 (* Read final response *) 299 let buf_read = Http_read.of_flow flow ~max_size:max_int in 300 let (_version, status, headers, body) = Http_read.response ~limits ~method_ buf_read in 301 (status, headers, body) 302 303 | Rejected (status, resp_headers, resp_body_str) -> 304 (* RFC 9110 Section 10.1.1: If we receive 417 Expectation Failed, retry 305 without the 100-continue expectation *) 306 if status = 417 then begin 307 Log.info (fun m -> m "Received 417 Expectation Failed, retrying without Expect header"); 308 (* Make a fresh request without Expect: 100-continue *) 309 make_request ~limits ~sw ~method_ ~uri ~headers ~body flow 310 end else begin 311 (* Server rejected with non-417 error - return error response without sending body *) 312 Log.info (fun m -> m "Request rejected with status %d, body not sent (saved %Ld bytes)" 313 status body_len); 314 (status, resp_headers, resp_body_str) 315 end 316 317 | Timeout -> 318 (* Timeout expired - send body anyway per RFC 9110 *) 319 Log.debug (fun m -> m "Sending body after timeout"); 320 write_body_to_flow ~sw flow body; 321 322 (* Read response *) 323 let buf_read = Http_read.of_flow flow ~max_size:max_int in 324 let (_version, status, headers, body) = Http_read.response ~limits ~method_ buf_read in 325 (status, headers, body) 326 end 327 328(** Make HTTP request with 100-continue support and optional auto-decompression *) 329let make_request_100_continue_decompress 330 ?(limits=default_limits) 331 ?(expect_100=default_expect_100_config) 332 ~clock ~sw ~method_ ~uri ~headers ~body ~auto_decompress flow = 333 make_request_100_continue ~limits ~expect_100 ~clock ~sw ~method_ ~uri ~headers ~body flow 334 |> maybe_decompress ~limits ~auto_decompress