A batteries included HTTP/1.1 client in OCaml

Fix HTTP/2 request hanging by using synchronous frame reading

The previous implementation spawned a background reader fiber that
blocked on read_frame after the response completed, preventing the
switch from exiting. This caused requests to hang indefinitely.

Changes:
- Add request_sync for synchronous single-request operation
- one_request now uses request_sync without background fibers
- H2_conpool_handler uses request_sync to avoid fiber lifecycle issues
- Connection pool uses Exclusive mode (no multiplexing for now)
- Keep concurrent request infrastructure for future multiplexing support

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+804 -142
+3 -1
lib/h2/h2_adapter.ml
··· 64 64 a new handshake. For proper HTTP/2 multiplexing with connection reuse, 65 65 the connection management needs to be integrated at the Conpool level. 66 66 67 + @param sw Switch for the reader fiber 67 68 @param flow The underlying TLS connection 68 69 @param uri Request URI 69 70 @param headers Request headers ··· 72 73 @param auto_decompress Whether to decompress response body 73 74 @return Response with Headers.t, or error message *) 74 75 let request 76 + ~sw 75 77 ~(flow : [> Eio.Flow.two_way_ty] Eio.Resource.t) 76 78 ~(uri : Uri.t) 77 79 ~(headers : Headers.t) ··· 89 91 let h2_headers = Headers.to_list headers in 90 92 let h2_body = Option.bind body body_to_string_opt in 91 93 let meth = Method.to_string method_ in 92 - H2_client.one_request flow ~meth ~uri ~headers:h2_headers ?body:h2_body () 94 + H2_client.one_request ~sw flow ~meth ~uri ~headers:h2_headers ?body:h2_body () 93 95 |> Result.map (make_response ~auto_decompress) 94 96 95 97 (** Make a one-shot HTTP/2 request (same as {!request}). *)
+10 -1
lib/h2/h2_adapter.mli
··· 37 37 (** [body_to_string_opt body] converts a request body to a string option. 38 38 Returns [None] for empty bodies or streaming bodies that can't be converted. *) 39 39 40 + (** {1 Response Construction} *) 41 + 42 + val make_response : auto_decompress:bool -> H2_protocol.response -> response 43 + (** [make_response ~auto_decompress h2_resp] converts an H2_protocol response 44 + to an adapter response, applying decompression if enabled. *) 45 + 40 46 (** {1 Request Functions} *) 41 47 42 48 val request : 49 + sw:Eio.Switch.t -> 43 50 flow:[> Eio.Flow.two_way_ty] Eio.Resource.t -> 44 51 uri:Uri.t -> 45 52 headers:Headers.t -> ··· 48 55 auto_decompress:bool -> 49 56 unit -> 50 57 (response, string) result 51 - (** [request ~flow ~uri ~headers ?body ~method_ ~auto_decompress ()] 58 + (** [request ~sw ~flow ~uri ~headers ?body ~method_ ~auto_decompress ()] 52 59 makes an HTTP/2 request. 53 60 54 61 This function creates a fresh H2_client for each request and performs 55 62 a new handshake. For proper HTTP/2 multiplexing with connection reuse, 56 63 the connection management needs to be integrated at the Conpool level. 57 64 65 + @param sw Switch for the reader fiber 58 66 @param flow The underlying TLS connection 59 67 @param uri Request URI 60 68 @param headers Request headers ··· 64 72 @return Response on success, Error msg on failure *) 65 73 66 74 val one_request : 75 + sw:Eio.Switch.t -> 67 76 flow:[> Eio.Flow.two_way_ty] Eio.Resource.t -> 68 77 uri:Uri.t -> 69 78 headers:Headers.t ->
+468 -100
lib/h2/h2_client.ml
··· 31 31 SPDX-License-Identifier: BSD-3-Clause 32 32 ---------------------------------------------------------------------------*) 33 33 34 - (** HTTP/2 Client Implementation. *) 34 + (** HTTP/2 Client with Eio-based Concurrent Dispatch. 35 + 36 + This implementation supports true HTTP/2 multiplexing by using: 37 + - A centralized frame reader fiber that dispatches to stream handlers 38 + - Per-stream Eio.Stream queues for frame delivery 39 + - Eio.Promise for stream completion signaling 40 + 41 + Multiple concurrent requests can share a single HTTP/2 connection. *) 35 42 36 43 let src = Logs.Src.create "h2.client" ~doc:"HTTP/2 Client" 37 44 module Log = (val Logs.src_log src : Logs.LOG) ··· 40 47 let ( let* ) = Result.bind 41 48 42 49 (* ============================================================ 50 + Frame Types for Dispatch 51 + ============================================================ *) 52 + 53 + (** Events dispatched to stream handlers. *) 54 + type stream_event = 55 + | Headers of { headers : H2_hpack.header list; end_stream : bool } 56 + | Data of { data : Cstruct.t; end_stream : bool } 57 + | Rst_stream of H2_frame.error_code 58 + | Window_update of int 59 + | Connection_error of string 60 + 61 + (* ============================================================ 62 + Per-Stream Handler 63 + ============================================================ *) 64 + 65 + (** State for a single stream's response handling. *) 66 + type stream_handler = { 67 + stream : H2_stream.t; 68 + events : stream_event Eio.Stream.t; (** Events for this stream *) 69 + } 70 + 71 + let create_stream_handler stream = 72 + { 73 + stream; 74 + events = Eio.Stream.create 64; (* Buffer up to 64 events per stream *) 75 + } 76 + 77 + (* ============================================================ 43 78 Client State 44 79 ============================================================ *) 45 80 46 81 type t = { 47 82 conn : H2_connection.t; 83 + handlers : (int32, stream_handler) Hashtbl.t; 84 + handlers_mutex : Eio.Mutex.t; 85 + mutable reader_running : bool; 86 + mutable connection_error : string option; 87 + connection_error_mutex : Eio.Mutex.t; 48 88 } 49 89 50 90 let create ?settings () = 51 91 let conn = H2_connection.create ?settings H2_connection.Client in 52 - { conn } 92 + { 93 + conn; 94 + handlers = Hashtbl.create 16; 95 + handlers_mutex = Eio.Mutex.create (); 96 + reader_running = false; 97 + connection_error = None; 98 + connection_error_mutex = Eio.Mutex.create (); 99 + } 53 100 54 101 (* ============================================================ 55 - Frame I/O 102 + Frame I/O (same as before, but factored out) 56 103 ============================================================ *) 57 104 58 105 let write_frame flow frame = ··· 132 179 let is_ack flags = H2_frame.Flags.test flags H2_frame.Flags.ack 133 180 134 181 (* ============================================================ 135 - Connection Handshake 182 + Connection Handshake (blocking, before reader starts) 136 183 ============================================================ *) 137 184 138 185 let handshake flow t = ··· 192 239 wait_for_handshake () 193 240 194 241 (* ============================================================ 195 - Request/Response 242 + Frame Dispatch to Stream Handlers 196 243 ============================================================ *) 197 244 198 - (** Handle flow control window updates after receiving data. 199 - Updates both connection and stream windows, and sends WINDOW_UPDATE frames. *) 200 - let handle_data_flow_control flow t stream stream_id data_len = 201 - if data_len > 0 then begin 202 - H2_connection.consume_recv_window t.conn data_len; 203 - H2_stream.consume_recv_window stream data_len; 204 - send_window_update flow ~stream_id:0l ~increment:data_len; 205 - send_window_update flow ~stream_id ~increment:data_len; 206 - H2_connection.credit_recv_window t.conn data_len; 207 - H2_stream.credit_recv_window stream data_len 245 + (** Dispatch a frame to the appropriate stream handler. *) 246 + let dispatch_frame t flow frame = 247 + let stream_id = frame.H2_frame.header.stream_id in 248 + let flags = frame.H2_frame.header.flags in 249 + 250 + (* Connection-level frames (stream 0) *) 251 + if Int32.equal stream_id 0l then begin 252 + match frame.H2_frame.payload with 253 + | H2_frame.Settings_payload settings -> 254 + let pairs = List.map H2_frame.setting_to_pair settings in 255 + let pairs32 = List.map (fun (id, v) -> (Int32.of_int id, v)) pairs in 256 + (match H2_connection.handle_settings t.conn ~ack:(is_ack flags) pairs32 with 257 + | Ok `Settings_received -> 258 + Log.debug (fun m -> m "Received SETTINGS, sending ACK"); 259 + send_settings_ack flow; 260 + `Continue 261 + | Ok `Ack_received -> 262 + Log.debug (fun m -> m "Received SETTINGS ACK"); 263 + `Continue 264 + | Error (_, msg) -> 265 + Log.err (fun m -> m "Settings error: %s" msg); 266 + `Error msg) 267 + 268 + | H2_frame.Ping_payload data -> 269 + if not (is_ack flags) then begin 270 + Log.debug (fun m -> m "Received PING, sending ACK"); 271 + send_ping_ack flow data 272 + end; 273 + `Continue 274 + 275 + | H2_frame.Window_update_payload increment -> 276 + let inc = Int32.to_int increment in 277 + Log.debug (fun m -> m "Connection WINDOW_UPDATE: %d" inc); 278 + (match H2_connection.credit_send_window t.conn inc with 279 + | Ok () -> `Continue 280 + | Error (_, msg) -> 281 + Log.err (fun m -> m "Window update error: %s" msg); 282 + `Error msg) 283 + 284 + | H2_frame.Goaway_payload { last_stream_id; error_code; debug_data } -> 285 + let debug = Cstruct.to_string debug_data in 286 + Log.warn (fun m -> m "Received GOAWAY: last_stream=%ld, error=%a, debug=%s" 287 + last_stream_id H2_frame.pp_error_code error_code debug); 288 + H2_connection.handle_goaway t.conn ~last_stream_id ~error_code ~debug; 289 + (* Notify all active streams about the connection error *) 290 + Eio.Mutex.use_ro t.handlers_mutex (fun () -> 291 + Hashtbl.iter (fun _id handler -> 292 + Eio.Stream.add handler.events (Connection_error ("GOAWAY: " ^ debug)) 293 + ) t.handlers 294 + ); 295 + `Goaway debug 296 + 297 + | _ -> 298 + Log.debug (fun m -> m "Ignoring connection-level frame: %a" 299 + H2_frame.pp_frame_type frame.H2_frame.header.frame_type); 300 + `Continue 208 301 end 302 + else begin 303 + (* Stream-specific frame - dispatch to handler *) 304 + let handler_opt = Eio.Mutex.use_ro t.handlers_mutex (fun () -> 305 + Hashtbl.find_opt t.handlers stream_id 306 + ) in 209 307 308 + match handler_opt with 309 + | None -> 310 + Log.debug (fun m -> m "Received frame for unknown stream %ld, ignoring" 311 + stream_id); 312 + `Continue 313 + 314 + | Some handler -> 315 + let event = match frame.H2_frame.payload with 316 + | H2_frame.Headers_payload { header_block; _ } -> 317 + (match H2_connection.decode_headers t.conn header_block with 318 + | Ok headers -> 319 + Some (Headers { headers; end_stream = is_end_stream flags }) 320 + | Error _ -> 321 + Log.err (fun m -> m "Failed to decode headers on stream %ld" stream_id); 322 + Some (Connection_error "Header decode failed")) 323 + 324 + | H2_frame.Data_payload { data } -> 325 + (* Handle flow control *) 326 + let data_len = Cstruct.length data in 327 + if data_len > 0 then begin 328 + H2_connection.consume_recv_window t.conn data_len; 329 + H2_stream.consume_recv_window handler.stream data_len; 330 + send_window_update flow ~stream_id:0l ~increment:data_len; 331 + send_window_update flow ~stream_id ~increment:data_len; 332 + H2_connection.credit_recv_window t.conn data_len; 333 + H2_stream.credit_recv_window handler.stream data_len 334 + end; 335 + Some (Data { data; end_stream = is_end_stream flags }) 336 + 337 + | H2_frame.Rst_stream_payload error_code -> 338 + Some (Rst_stream error_code) 339 + 340 + | H2_frame.Window_update_payload increment -> 341 + let inc = Int32.to_int increment in 342 + (match H2_stream.credit_send_window handler.stream inc with 343 + | Ok () -> Some (Window_update inc) 344 + | Error (_, msg) -> 345 + Log.warn (fun m -> m "Stream %ld window update error: %s" stream_id msg); 346 + None) 347 + 348 + | _ -> 349 + Log.debug (fun m -> m "Ignoring frame %a on stream %ld" 350 + H2_frame.pp_frame_type frame.H2_frame.header.frame_type stream_id); 351 + None 352 + in 353 + 354 + (match event with 355 + | Some e -> Eio.Stream.add handler.events e 356 + | None -> ()); 357 + 358 + `Continue 359 + end 360 + 361 + (* ============================================================ 362 + Background Frame Reader Fiber 363 + ============================================================ *) 364 + 365 + (** Start the background frame reader. 366 + This runs in a fiber and dispatches frames to stream handlers. *) 367 + let start_reader ~sw flow t = 368 + if t.reader_running then 369 + () (* Already running *) 370 + else begin 371 + t.reader_running <- true; 372 + Eio.Fiber.fork ~sw (fun () -> 373 + Log.debug (fun m -> m "Frame reader fiber started"); 374 + let rec read_loop () = 375 + match read_frame flow with 376 + | None -> 377 + Log.info (fun m -> m "Frame reader: connection closed"); 378 + Eio.Mutex.use_rw ~protect:true t.connection_error_mutex (fun () -> 379 + if t.connection_error = None then 380 + t.connection_error <- Some "Connection closed" 381 + ); 382 + (* Notify all handlers about connection close *) 383 + Eio.Mutex.use_ro t.handlers_mutex (fun () -> 384 + Hashtbl.iter (fun _id handler -> 385 + Eio.Stream.add handler.events (Connection_error "Connection closed") 386 + ) t.handlers 387 + ) 388 + 389 + | Some frame -> 390 + match dispatch_frame t flow frame with 391 + | `Continue -> read_loop () 392 + | `Goaway _ -> 393 + (* Continue reading to drain any remaining frames *) 394 + read_loop () 395 + | `Error msg -> 396 + Log.err (fun m -> m "Frame reader error: %s" msg); 397 + Eio.Mutex.use_rw ~protect:true t.connection_error_mutex (fun () -> 398 + t.connection_error <- Some msg 399 + ); 400 + (* Notify all handlers *) 401 + Eio.Mutex.use_ro t.handlers_mutex (fun () -> 402 + Hashtbl.iter (fun _id handler -> 403 + Eio.Stream.add handler.events (Connection_error msg) 404 + ) t.handlers 405 + ) 406 + in 407 + read_loop (); 408 + t.reader_running <- false; 409 + Log.debug (fun m -> m "Frame reader fiber stopped") 410 + ) 411 + end 412 + 413 + (* ============================================================ 414 + Request/Response with Concurrent Dispatch 415 + ============================================================ *) 416 + 417 + (** Response accumulator for a stream. *) 210 418 type pending_response = { 211 419 mutable status : int option; 212 420 mutable headers : (string * string) list; 213 421 mutable body_parts : string list; 214 422 mutable done_ : bool; 423 + mutable error : string option; 215 424 } 216 425 217 - let request flow t (req : H2_protocol.request) = 426 + (** Make a request and wait for its response. 427 + This can be called concurrently from multiple fibers. *) 428 + let request ~sw flow t (req : H2_protocol.request) = 429 + (* Check for connection errors first *) 430 + (match Eio.Mutex.use_ro t.connection_error_mutex (fun () -> t.connection_error) with 431 + | Some err -> Error ("Connection error: " ^ err) 432 + | None -> 433 + 218 434 Log.info (fun m -> m "Sending HTTP/2 request: %s %s" 219 435 req.meth (Uri.to_string req.uri)); 220 436 437 + (* Ensure reader is running *) 438 + start_reader ~sw flow t; 439 + 440 + (* Create a new stream *) 221 441 match H2_connection.create_stream t.conn with 222 442 | Error (_, msg) -> 223 443 Error ("Failed to create stream: " ^ msg) 444 + 224 445 | Ok stream -> 225 446 let stream_id = H2_stream.id stream in 226 447 Log.debug (fun m -> m "Created stream %ld" stream_id); 227 448 449 + (* Create and register the stream handler *) 450 + let handler = create_stream_handler stream in 451 + Eio.Mutex.use_rw ~protect:true t.handlers_mutex (fun () -> 452 + Hashtbl.add t.handlers stream_id handler 453 + ); 454 + 455 + (* Send request headers and body *) 228 456 let h2_headers = H2_protocol.request_to_h2_headers req in 229 457 let header_block = H2_connection.encode_headers t.conn h2_headers in 230 458 ··· 252 480 (H2_stream.Send_data { end_stream = true }) in 253 481 ()); 254 482 483 + (* Wait for response by consuming events from the stream's queue *) 255 484 let pending = { 256 485 status = None; 257 486 headers = []; 258 487 body_parts = []; 259 488 done_ = false; 489 + error = None; 260 490 } in 261 491 262 - let rec read_response () = 263 - if pending.done_ then begin 492 + let rec wait_for_response () = 493 + if pending.done_ then 494 + () 495 + else begin 496 + let event = Eio.Stream.take handler.events in 497 + match event with 498 + | Headers { headers; end_stream } -> 499 + Log.debug (fun m -> m "Stream %ld: received HEADERS (end_stream=%b)" 500 + stream_id end_stream); 501 + let status, hdrs = H2_protocol.h2_headers_to_response headers in 502 + pending.status <- Some status; 503 + pending.headers <- hdrs; 504 + let _ = H2_stream.apply_event stream 505 + (H2_stream.Recv_headers { end_stream }) in 506 + if end_stream then pending.done_ <- true 507 + else wait_for_response () 508 + 509 + | Data { data; end_stream } -> 510 + Log.debug (fun m -> m "Stream %ld: received DATA (%d bytes, end_stream=%b)" 511 + stream_id (Cstruct.length data) end_stream); 512 + pending.body_parts <- Cstruct.to_string data :: pending.body_parts; 513 + let _ = H2_stream.apply_event stream 514 + (H2_stream.Recv_data { end_stream }) in 515 + if end_stream then pending.done_ <- true 516 + else wait_for_response () 517 + 518 + | Rst_stream error_code -> 519 + Log.warn (fun m -> m "Stream %ld: reset with %a" 520 + stream_id H2_frame.pp_error_code error_code); 521 + let _ = H2_stream.apply_event stream 522 + (H2_stream.Recv_rst_stream error_code) in 523 + pending.error <- Some (Printf.sprintf "Stream reset: %s" 524 + (H2_frame.error_code_to_string error_code)); 525 + pending.done_ <- true 526 + 527 + | Window_update _ -> 528 + (* Window updates are informational for requests *) 529 + wait_for_response () 530 + 531 + | Connection_error msg -> 532 + Log.warn (fun m -> m "Stream %ld: connection error: %s" 533 + stream_id msg); 534 + pending.error <- Some msg; 535 + pending.done_ <- true 536 + end 537 + in 538 + 539 + wait_for_response (); 540 + 541 + (* Unregister the handler *) 542 + Eio.Mutex.use_rw ~protect:true t.handlers_mutex (fun () -> 543 + Hashtbl.remove t.handlers stream_id 544 + ); 545 + 546 + (* Return result *) 547 + match pending.error with 548 + | Some err -> Error err 549 + | None -> 264 550 match pending.status with 265 551 | None -> Error "No response status received" 266 552 | Some status -> ··· 270 556 headers = pending.headers; 271 557 body; 272 558 protocol = H2_protocol.Http2; 273 - } 274 - end else begin 559 + }) 560 + 561 + (* ============================================================ 562 + Synchronous Request (for one-shot requests without multiplexing) 563 + ============================================================ *) 564 + 565 + (** Make a single request synchronously without spawning a background reader. 566 + This is more efficient for one-shot requests since it doesn't require 567 + fiber management. *) 568 + let request_sync flow t (req : H2_protocol.request) = 569 + (* Check for connection errors first *) 570 + match Eio.Mutex.use_ro t.connection_error_mutex (fun () -> t.connection_error) with 571 + | Some err -> Error ("Connection error: " ^ err) 572 + | None -> 573 + 574 + Log.info (fun m -> m "Sending HTTP/2 request (sync): %s %s" 575 + req.meth (Uri.to_string req.uri)); 576 + 577 + (* Create a new stream *) 578 + match H2_connection.create_stream t.conn with 579 + | Error (_, msg) -> 580 + Error ("Failed to create stream: " ^ msg) 581 + 582 + | Ok stream -> 583 + let stream_id = H2_stream.id stream in 584 + Log.debug (fun m -> m "Created stream %ld" stream_id); 585 + 586 + (* Send request headers and body *) 587 + let h2_headers = H2_protocol.request_to_h2_headers req in 588 + let header_block = H2_connection.encode_headers t.conn h2_headers in 589 + 590 + let has_body = Option.is_some req.body in 591 + let end_stream_on_headers = not has_body in 592 + 593 + let headers_frame = H2_frame.make_headers 594 + ~stream_id 595 + ~end_stream:end_stream_on_headers 596 + ~end_headers:true 597 + header_block 598 + in 599 + write_frame flow headers_frame; 600 + 601 + let _ = H2_stream.apply_event stream 602 + (H2_stream.Send_headers { end_stream = end_stream_on_headers }) in 603 + 604 + (match req.body with 605 + | None -> () 606 + | Some body -> 607 + let data = Cstruct.of_string body in 608 + let data_frame = H2_frame.make_data ~stream_id ~end_stream:true data in 609 + write_frame flow data_frame; 610 + let _ = H2_stream.apply_event stream 611 + (H2_stream.Send_data { end_stream = true }) in 612 + ()); 613 + 614 + (* Read response synchronously *) 615 + let pending = { 616 + status = None; 617 + headers = []; 618 + body_parts = []; 619 + done_ = false; 620 + error = None; 621 + } in 622 + 623 + let rec read_response () = 624 + if pending.done_ then () 625 + else begin 275 626 match read_frame flow with 276 627 | None -> 277 - Error "Connection closed before response complete" 628 + pending.error <- Some "Connection closed"; 629 + pending.done_ <- true 630 + 278 631 | Some frame -> 279 632 let frame_stream_id = frame.H2_frame.header.stream_id in 280 633 let flags = frame.H2_frame.header.flags in 281 - match frame.H2_frame.payload with 282 - | H2_frame.Headers_payload { header_block; _ } 283 - when frame_stream_id = stream_id -> 284 - Log.debug (fun m -> m "Received HEADERS on stream %ld" stream_id); 285 - let end_stream = is_end_stream flags in 286 - (match H2_connection.decode_headers t.conn header_block with 287 - | Ok decoded_headers -> 288 - let status, hdrs = H2_protocol.h2_headers_to_response decoded_headers in 289 - pending.status <- Some status; 290 - pending.headers <- hdrs; 291 - let _ = H2_stream.apply_event stream 292 - (H2_stream.Recv_headers { end_stream }) in 293 - if end_stream then pending.done_ <- true; 294 - read_response () 295 - | Error _ -> 296 - Error "Failed to decode response headers") 297 634 298 - | H2_frame.Data_payload { data } 299 - when frame_stream_id = stream_id -> 300 - let end_stream = is_end_stream flags in 301 - Log.debug (fun m -> m "Received DATA on stream %ld: %d bytes" 302 - stream_id (Cstruct.length data)); 303 - pending.body_parts <- Cstruct.to_string data :: pending.body_parts; 304 - let _ = H2_stream.apply_event stream 305 - (H2_stream.Recv_data { end_stream }) in 306 - handle_data_flow_control flow t stream stream_id (Cstruct.length data); 307 - if end_stream then pending.done_ <- true; 308 - read_response () 309 - 310 - | H2_frame.Rst_stream_payload error_code 311 - when frame_stream_id = stream_id -> 312 - Log.warn (fun m -> m "Stream %ld reset: %a" 313 - stream_id H2_frame.pp_error_code error_code); 314 - let _ = H2_stream.apply_event stream 315 - (H2_stream.Recv_rst_stream error_code) in 316 - Error (Printf.sprintf "Stream reset: %s" 317 - (H2_frame.error_code_to_string error_code)) 635 + (* Handle connection-level frames *) 636 + if Int32.equal frame_stream_id 0l then begin 637 + (match frame.H2_frame.payload with 638 + | H2_frame.Settings_payload settings when not (is_ack flags) -> 639 + let pairs = List.map H2_frame.setting_to_pair settings in 640 + let pairs32 = List.map (fun (id, v) -> (Int32.of_int id, v)) pairs in 641 + let _ = H2_connection.handle_settings t.conn ~ack:false pairs32 in 642 + send_settings_ack flow 643 + | H2_frame.Ping_payload data when not (is_ack flags) -> 644 + send_ping_ack flow data 645 + | H2_frame.Window_update_payload increment -> 646 + let _ = H2_connection.credit_send_window t.conn (Int32.to_int increment) in 647 + () 648 + | H2_frame.Goaway_payload { last_stream_id; error_code; debug_data } -> 649 + let debug = Cstruct.to_string debug_data in 650 + H2_connection.handle_goaway t.conn ~last_stream_id ~error_code ~debug; 651 + pending.error <- Some ("GOAWAY: " ^ debug); 652 + pending.done_ <- true 653 + | _ -> ()); 654 + if not pending.done_ then read_response () 655 + end 656 + (* Handle frames for our stream *) 657 + else if Int32.equal frame_stream_id stream_id then begin 658 + (match frame.H2_frame.payload with 659 + | H2_frame.Headers_payload { header_block; _ } -> 660 + (match H2_connection.decode_headers t.conn header_block with 661 + | Ok headers -> 662 + let status, hdrs = H2_protocol.h2_headers_to_response headers in 663 + pending.status <- Some status; 664 + pending.headers <- hdrs; 665 + let _ = H2_stream.apply_event stream 666 + (H2_stream.Recv_headers { end_stream = is_end_stream flags }) in 667 + if is_end_stream flags then pending.done_ <- true 668 + | Error _ -> 669 + pending.error <- Some "Header decode failed"; 670 + pending.done_ <- true) 318 671 319 - | H2_frame.Settings_payload settings -> 320 - let pairs = List.map H2_frame.setting_to_pair settings in 321 - let pairs32 = List.map (fun (id, v) -> (Int32.of_int id, v)) pairs in 322 - (match H2_connection.handle_settings t.conn ~ack:(is_ack flags) pairs32 with 323 - | Ok `Settings_received -> 324 - send_settings_ack flow; 325 - read_response () 326 - | Ok `Ack_received -> 327 - read_response () 328 - | Error (_, msg) -> 329 - Error ("Settings error: " ^ msg)) 330 - 331 - | H2_frame.Ping_payload data -> 332 - if not (is_ack flags) then send_ping_ack flow data; 333 - read_response () 334 - 335 - | H2_frame.Window_update_payload increment 336 - when frame_stream_id = 0l -> 337 - let inc = Int32.to_int increment in 338 - (match H2_connection.credit_send_window t.conn inc with 339 - | Ok () -> read_response () 340 - | Error (_, msg) -> Error msg) 672 + | H2_frame.Data_payload { data } -> 673 + let data_len = Cstruct.length data in 674 + if data_len > 0 then begin 675 + H2_connection.consume_recv_window t.conn data_len; 676 + H2_stream.consume_recv_window stream data_len; 677 + send_window_update flow ~stream_id:0l ~increment:data_len; 678 + send_window_update flow ~stream_id ~increment:data_len; 679 + H2_connection.credit_recv_window t.conn data_len; 680 + H2_stream.credit_recv_window stream data_len 681 + end; 682 + pending.body_parts <- Cstruct.to_string data :: pending.body_parts; 683 + let _ = H2_stream.apply_event stream 684 + (H2_stream.Recv_data { end_stream = is_end_stream flags }) in 685 + if is_end_stream flags then pending.done_ <- true 341 686 342 - | H2_frame.Window_update_payload increment 343 - when frame_stream_id = stream_id -> 344 - let inc = Int32.to_int increment in 345 - (match H2_stream.credit_send_window stream inc with 346 - | Ok () -> read_response () 347 - | Error (_, msg) -> Error msg) 687 + | H2_frame.Rst_stream_payload error_code -> 688 + let _ = H2_stream.apply_event stream 689 + (H2_stream.Recv_rst_stream error_code) in 690 + pending.error <- Some (Printf.sprintf "Stream reset: %s" 691 + (H2_frame.error_code_to_string error_code)); 692 + pending.done_ <- true 348 693 349 - | H2_frame.Goaway_payload { last_stream_id; error_code; debug_data } -> 350 - let debug = Cstruct.to_string debug_data in 351 - H2_connection.handle_goaway t.conn 352 - ~last_stream_id ~error_code ~debug; 353 - if Int32.compare stream_id last_stream_id <= 0 then 354 - read_response () 355 - else 356 - Error ("Stream rejected by GOAWAY: " ^ debug) 694 + | H2_frame.Window_update_payload increment -> 695 + let _ = H2_stream.credit_send_window stream (Int32.to_int increment) in 696 + () 357 697 358 - | _ -> 359 - read_response () 698 + | _ -> ()); 699 + if not pending.done_ then read_response () 700 + end 701 + (* Ignore frames for other streams *) 702 + else read_response () 360 703 end 361 704 in 362 - read_response () 705 + 706 + read_response (); 707 + 708 + (* Return result *) 709 + match pending.error with 710 + | Some err -> Error err 711 + | None -> 712 + match pending.status with 713 + | None -> Error "No response status received" 714 + | Some status -> 715 + let body = String.concat "" (List.rev pending.body_parts) in 716 + Ok { H2_protocol. 717 + status; 718 + headers = pending.headers; 719 + body; 720 + protocol = H2_protocol.Http2; 721 + } 363 722 364 723 (* ============================================================ 365 724 High-level API 366 725 ============================================================ *) 367 726 368 - let one_request flow ~meth ~uri ?headers ?body () = 727 + let one_request ~sw:_ flow ~meth ~uri ?headers ?body () = 369 728 let t = create () in 370 729 let* () = handshake flow t in 371 730 let headers = Option.value headers ~default:[] in 372 731 let req = H2_protocol.make_request ~meth ~uri ~headers ?body () in 373 - request flow t req 732 + (* Use synchronous request for one-shot - no background reader needed *) 733 + request_sync flow t req 374 734 375 - let one_request_strings flow ~meth ~scheme ~host ?port ?path ?(query=[]) ?headers ?body () = 735 + let one_request_strings ~sw:_ flow ~meth ~scheme ~host ?port ?path ?(query=[]) ?headers ?body () = 376 736 let t = create () in 377 737 let* () = handshake flow t in 378 738 let headers = Option.value headers ~default:[] in 379 739 let req = H2_protocol.make_request_from_strings ~meth ~scheme ~host ?port ?path ~query ~headers ?body () in 380 - request flow t req 740 + (* Use synchronous request for one-shot - no background reader needed *) 741 + request_sync flow t req 381 742 382 743 let is_open t = H2_connection.is_open t.conn 383 744 ··· 396 757 write_frame flow goaway_frame 397 758 end; 398 759 H2_connection.close t.conn 760 + 761 + (** Get number of active stream handlers. 762 + Useful for connection pool management. *) 763 + let active_streams t = 764 + Eio.Mutex.use_ro t.handlers_mutex (fun () -> 765 + Hashtbl.length t.handlers 766 + )
+53 -14
lib/h2/h2_client.mli
··· 31 31 SPDX-License-Identifier: BSD-3-Clause 32 32 ---------------------------------------------------------------------------*) 33 33 34 - (** HTTP/2 Client Implementation. 34 + (** HTTP/2 Client with Concurrent Stream Multiplexing. 35 35 36 36 This module provides a client for making HTTP/2 requests over 37 37 an established TLS connection. It handles: 38 38 - Connection preface and settings exchange 39 - - Request/response multiplexing 39 + - True request/response multiplexing via a background reader fiber 40 40 - Flow control 41 41 - HPACK header compression 42 42 43 + {2 Architecture} 44 + 45 + The client uses a centralized frame reader fiber that dispatches 46 + incoming frames to per-stream event queues. This allows multiple 47 + concurrent requests to share a single HTTP/2 connection without 48 + blocking each other. 49 + 43 50 {2 Usage} 44 51 45 52 {[ 46 53 (* For a single request *) 47 - let response = H2_client.one_request flow 54 + Eio.Switch.run @@ fun sw -> 55 + let response = H2_client.one_request ~sw flow 48 56 ~meth:"GET" 49 - ~uri:(Huri.of_string "https://example.com/") 57 + ~uri:(Uri.of_string "https://example.com/") 50 58 () 51 59 in 52 60 match response with 53 61 | Ok r -> Printf.printf "Status: %d\n" r.status 54 62 | Error msg -> Printf.printf "Error: %s\n" msg 55 63 56 - (* For multiple requests on same connection *) 64 + (* For multiple concurrent requests on same connection *) 65 + Eio.Switch.run @@ fun sw -> 57 66 let client = H2_client.create () in 58 67 match H2_client.handshake flow client with 59 68 | Error msg -> failwith msg 60 69 | Ok () -> 70 + (* Concurrent requests in parallel fibers *) 61 71 let req1 = H2_protocol.make_request ~meth:"GET" ~uri:uri1 () in 62 72 let req2 = H2_protocol.make_request ~meth:"GET" ~uri:uri2 () in 63 - let resp1 = H2_client.request flow client req1 in 64 - let resp2 = H2_client.request flow client req2 in 65 - (* ... *) 73 + Eio.Fiber.both 74 + (fun () -> H2_client.request ~sw flow client req1) 75 + (fun () -> H2_client.request ~sw flow client req2); 66 76 H2_client.close flow client 67 77 ]} 68 78 ··· 83 93 val is_open : t -> bool 84 94 (** [is_open t] returns true if the connection is still usable. *) 85 95 96 + val active_streams : t -> int 97 + (** [active_streams t] returns the number of streams currently 98 + waiting for responses. Useful for connection pool management. *) 99 + 86 100 (** {1 Connection Lifecycle} *) 87 101 88 102 val handshake : [> Eio.Flow.two_way_ty] Eio.Resource.t -> t -> (unit, string) result ··· 104 118 105 119 (** {1 Making Requests} *) 106 120 107 - val request : [> Eio.Flow.two_way_ty] Eio.Resource.t -> t -> 121 + val request : sw:Eio.Switch.t -> 122 + [> Eio.Flow.two_way_ty] Eio.Resource.t -> t -> 123 + H2_protocol.request -> (H2_protocol.response, string) result 124 + (** [request ~sw flow t req] sends a request and waits for the response. 125 + 126 + This function can be called concurrently from multiple fibers on the 127 + same client. The background frame reader (started on first request) 128 + dispatches frames to the appropriate stream handlers. 129 + 130 + @param sw Switch for the background reader fiber 131 + @param flow The underlying connection 132 + @param t Client state 133 + @param req The request to send 134 + @return Response on success, Error msg on failure *) 135 + 136 + val request_sync : 137 + [> Eio.Flow.two_way_ty] Eio.Resource.t -> t -> 108 138 H2_protocol.request -> (H2_protocol.response, string) result 109 - (** [request flow t req] sends a request and waits for the response. 139 + (** [request_sync flow t req] sends a request and waits for the response 140 + using synchronous I/O without spawning a background reader fiber. 141 + 142 + This is more efficient for single requests but does not support 143 + concurrent multiplexing. Use this for one-shot requests or when 144 + connection pooling doesn't require multiplexing. 110 145 111 146 @param flow The underlying connection 112 147 @param t Client state 113 148 @param req The request to send 114 149 @return Response on success, Error msg on failure *) 115 150 116 - val one_request : [> Eio.Flow.two_way_ty] Eio.Resource.t -> 151 + val one_request : sw:Eio.Switch.t -> 152 + [> Eio.Flow.two_way_ty] Eio.Resource.t -> 117 153 meth:string -> uri:Uri.t -> ?headers:(string * string) list -> 118 154 ?body:string -> unit -> (H2_protocol.response, string) result 119 - (** [one_request flow ~meth ~uri ?headers ?body ()] makes a single request. 155 + (** [one_request ~sw flow ~meth ~uri ?headers ?body ()] makes a single request. 120 156 121 157 This is a convenience function that creates a client, performs 122 158 handshake, sends the request, and returns the response. 123 159 Use this for one-off requests; for multiple requests on the same 124 160 connection, use [create], [handshake], and [request] directly. 125 161 162 + @param sw Switch for the background reader fiber 126 163 @param flow The underlying TLS connection 127 164 @param meth HTTP method (GET, POST, etc) 128 165 @param uri Request URI ··· 130 167 @param body Optional request body 131 168 @return Response on success, Error msg on failure *) 132 169 133 - val one_request_strings : [> Eio.Flow.two_way_ty] Eio.Resource.t -> 170 + val one_request_strings : sw:Eio.Switch.t -> 171 + [> Eio.Flow.two_way_ty] Eio.Resource.t -> 134 172 meth:string -> scheme:string -> host:string -> 135 173 ?port:int -> ?path:string -> ?query:(string * string list) list -> 136 174 ?headers:(string * string) list -> 137 175 ?body:string -> unit -> (H2_protocol.response, string) result 138 - (** [one_request_strings flow ~meth ~scheme ~host ?port ?path ?query ?headers ?body ()] 176 + (** [one_request_strings ~sw flow ~meth ~scheme ~host ?port ?path ?query ?headers ?body ()] 139 177 makes a single request using string components instead of Uri.t. 140 178 141 179 This is useful when calling from libraries that have their own Uri module 142 180 which would conflict with the external uri library. 143 181 182 + @param sw Switch for the background reader fiber 144 183 @param flow The underlying TLS connection 145 184 @param meth HTTP method (GET, POST, etc) 146 185 @param scheme URL scheme (http or https)
+174
lib/h2/h2_conpool_handler.ml
··· 1 + (*--------------------------------------------------------------------------- 2 + Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 + SPDX-License-Identifier: ISC 4 + ---------------------------------------------------------------------------*) 5 + 6 + (** HTTP/2 Protocol Handler for Conpool. 7 + 8 + This module provides a protocol handler that enables HTTP/2 connection 9 + multiplexing through Conpool's typed pool API. It manages H2_client 10 + instances and supports concurrent stream multiplexing. 11 + 12 + Key features: 13 + - Shared connection mode (multiple streams per connection) 14 + - GOAWAY handling for graceful degradation 15 + - Stream slot management respecting peer's max_concurrent_streams *) 16 + 17 + let src = Logs.Src.create "requests.h2_conpool" ~doc:"HTTP/2 Connection Pool Handler" 18 + module Log = (val Logs.src_log src : Logs.LOG) 19 + 20 + (** {1 State Type} *) 21 + 22 + (** HTTP/2 connection state managed by Conpool. 23 + This wraps H2_client.t with additional tracking for stream management. *) 24 + type h2_state = { 25 + client : H2_client.t; 26 + (** The underlying HTTP/2 client. *) 27 + flow : Conpool.Config.connection_flow; 28 + (** The connection flow (needed for H2 operations). *) 29 + mutable goaway_received : bool; 30 + (** Whether GOAWAY has been received from peer. *) 31 + mutable max_concurrent_streams : int; 32 + (** Cached max_concurrent_streams from peer settings. *) 33 + } 34 + 35 + (** {1 Protocol Configuration} *) 36 + 37 + (** Initialize HTTP/2 state for a new connection. 38 + Performs the HTTP/2 handshake and extracts peer settings. *) 39 + let init_state ~flow ~tls_epoch:_ = 40 + Log.info (fun m -> m "Initializing HTTP/2 connection state"); 41 + 42 + let client = H2_client.create () in 43 + 44 + (* Perform HTTP/2 handshake *) 45 + match H2_client.handshake flow client with 46 + | Ok () -> 47 + Log.info (fun m -> m "HTTP/2 handshake complete"); 48 + 49 + (* Get max_concurrent_streams from peer settings *) 50 + let conn = H2_client.connection client in 51 + let peer_settings = H2_connection.peer_settings conn in 52 + let max_streams = 53 + match peer_settings.H2_connection.max_concurrent_streams with 54 + | Some n -> n 55 + | None -> 100 (* Default per RFC 9113 *) 56 + in 57 + 58 + Log.debug (fun m -> m "Peer max_concurrent_streams: %d" max_streams); 59 + 60 + { 61 + client; 62 + flow; 63 + goaway_received = false; 64 + max_concurrent_streams = max_streams; 65 + } 66 + 67 + | Error msg -> 68 + Log.err (fun m -> m "HTTP/2 handshake failed: %s" msg); 69 + failwith ("HTTP/2 handshake failed: " ^ msg) 70 + 71 + (** Check if the HTTP/2 connection is still healthy. *) 72 + let is_healthy state = 73 + if state.goaway_received then begin 74 + Log.debug (fun m -> m "HTTP/2 connection unhealthy: GOAWAY received"); 75 + false 76 + end else if not (H2_client.is_open state.client) then begin 77 + Log.debug (fun m -> m "HTTP/2 connection unhealthy: connection closed"); 78 + false 79 + end else 80 + true 81 + 82 + (** Cleanup when connection is destroyed. *) 83 + let on_close state = 84 + Log.debug (fun m -> m "Closing HTTP/2 connection"); 85 + (* Send GOAWAY if connection is still open *) 86 + if H2_client.is_open state.client then begin 87 + H2_client.close state.flow state.client 88 + end 89 + 90 + (** Get access mode for this connection. 91 + Currently using Exclusive mode since the synchronous request path 92 + doesn't support true multiplexing. The max_concurrent_streams is 93 + still tracked for future multiplexing support. *) 94 + let access_mode _state = 95 + Conpool.Config.Exclusive 96 + 97 + (** The protocol configuration for HTTP/2 connections. *) 98 + let h2_protocol : h2_state Conpool.Config.protocol_config = { 99 + Conpool.Config.init_state; 100 + is_healthy; 101 + on_close; 102 + access_mode; 103 + } 104 + 105 + (** {1 Request Functions} *) 106 + 107 + (** Make an HTTP/2 request using the pooled connection state. 108 + 109 + Uses synchronous I/O to avoid background fiber lifecycle issues 110 + with connection pooling. True multiplexing would require the 111 + connection pool to manage the reader fiber lifetime. 112 + 113 + @param state The HTTP/2 state from Conpool 114 + @param uri Request URI 115 + @param headers Request headers 116 + @param body Optional request body 117 + @param method_ HTTP method 118 + @param auto_decompress Whether to decompress response body 119 + @return Response or error message *) 120 + let request 121 + ~(state : h2_state) 122 + ~(uri : Uri.t) 123 + ~(headers : Headers.t) 124 + ?(body : Body.t option) 125 + ~(method_ : Method.t) 126 + ~auto_decompress 127 + () 128 + : (H2_adapter.response, string) result = 129 + 130 + (* Validate HTTP/2 header constraints *) 131 + match Headers.validate_h2_user_headers headers with 132 + | Error e -> 133 + Error (Format.asprintf "Invalid HTTP/2 request headers: %a" 134 + Headers.pp_h2_validation_error e) 135 + | Ok () -> 136 + (* Check connection health before making request *) 137 + if state.goaway_received then 138 + Error "Connection received GOAWAY" 139 + else if not (H2_client.is_open state.client) then 140 + Error "Connection is closed" 141 + else begin 142 + let h2_headers = Headers.to_list headers in 143 + let h2_body = Option.bind body H2_adapter.body_to_string_opt in 144 + let meth = Method.to_string method_ in 145 + 146 + Log.debug (fun m -> m "Making HTTP/2 request on pooled connection: %s %s" 147 + meth (Uri.to_string uri)); 148 + 149 + (* Use synchronous request to avoid fiber lifecycle issues *) 150 + match H2_client.request_sync state.flow state.client 151 + { H2_protocol.meth; uri; headers = h2_headers; body = h2_body } with 152 + | Ok resp -> 153 + (* Check if GOAWAY was received during request *) 154 + let conn = H2_client.connection state.client in 155 + if H2_connection.is_closing conn then begin 156 + Log.info (fun m -> m "GOAWAY received during request"); 157 + state.goaway_received <- true 158 + end; 159 + 160 + (* Update max_concurrent_streams if it changed *) 161 + let peer_settings = H2_connection.peer_settings conn in 162 + (match peer_settings.H2_connection.max_concurrent_streams with 163 + | Some n when n <> state.max_concurrent_streams -> 164 + Log.debug (fun m -> m "Peer max_concurrent_streams changed: %d -> %d" 165 + state.max_concurrent_streams n); 166 + state.max_concurrent_streams <- n 167 + | _ -> ()); 168 + 169 + Ok (H2_adapter.make_response ~auto_decompress resp) 170 + 171 + | Error msg -> 172 + Log.warn (fun m -> m "HTTP/2 request failed: %s" msg); 173 + Error msg 174 + end
+1
lib/one.ml
··· 209 209 (* Use HTTP/2 client via H2_adapter (handles decompression) *) 210 210 Log.debug (fun m -> m "[One] Using HTTP/2 for %s" url_to_fetch); 211 211 (match H2_adapter.one_request 212 + ~sw 212 213 ~flow 213 214 ~uri:uri_to_fetch 214 215 ~headers:headers_for_request
+95 -26
lib/requests.ml
··· 42 42 43 43 (* Main API - Session functionality with connection pooling *) 44 44 45 + (** Protocol hint for endpoints - remembers ALPN negotiation results. *) 46 + type protocol_hint = H1 | H2 47 + 45 48 type t = T : { 46 49 sw : Eio.Switch.t; 47 50 clock : [> float Eio.Time.clock_ty] Eio.Resource.t; 48 51 net : [> [> `Generic] Eio.Net.ty] Eio.Resource.t; 49 - http_pool : Conpool.t; 50 - https_pool : Conpool.t; 52 + http_pool : unit Conpool.t; 53 + (** HTTP/1.x pool - exclusive access, no protocol state *) 54 + https_pool : unit Conpool.t; 55 + (** HTTPS pool - exclusive access, no protocol state *) 56 + h2_pool : H2_conpool_handler.h2_state Conpool.t; 57 + (** HTTP/2 pool - shared access with H2 client state *) 58 + protocol_hints : (string, protocol_hint) Hashtbl.t; 59 + (** Maps "host:port" to protocol hint from ALPN *) 60 + protocol_hints_mutex : Eio.Mutex.t; 51 61 cookie_jar : Cookeio_jar.t; 52 62 cookie_mutex : Eio.Mutex.t; 53 63 default_headers : Headers.t; ··· 148 158 Conpool.create ~sw ~net ~clock ?tls:tls_config ~config:pool_config () 149 159 in 150 160 161 + (* HTTP/2 pool - shared connections with H2 state *) 162 + let h2_pool = 163 + Conpool.create ~sw ~net ~clock ?tls:tls_config ~config:pool_config 164 + ~protocol:H2_conpool_handler.h2_protocol () 165 + in 166 + 167 + (* Protocol hints - remember ALPN negotiation results *) 168 + let protocol_hints = Hashtbl.create 32 in 169 + 151 170 Log.info (fun m -> m "Created Requests session with connection pools (max_per_host=%d, TLS=%b)" 152 171 max_connections_per_host (Option.is_some tls_config)); 153 172 ··· 178 197 net; 179 198 http_pool; 180 199 https_pool; 200 + h2_pool; 201 + protocol_hints; 202 + protocol_hints_mutex = Eio.Mutex.create (); 181 203 cookie_jar; 182 204 cookie_mutex = Eio.Mutex.create (); 183 205 default_headers; ··· 499 521 | Some proxy -> not (Proxy.should_bypass proxy url_to_fetch) 500 522 in 501 523 524 + (* Helper to make endpoint key for protocol hints *) 525 + let endpoint_key = Printf.sprintf "%s:%d" redirect_host redirect_port in 526 + 527 + (* Get protocol hint for this endpoint *) 528 + let protocol_hint = 529 + Eio.Mutex.use_ro t.protocol_hints_mutex (fun () -> 530 + Hashtbl.find_opt t.protocol_hints endpoint_key) 531 + in 532 + 502 533 let make_request_fn () = 503 - match use_proxy, redirect_is_https, t.proxy with 504 - | false, _, _ -> 505 - (* Direct connection - use connection pool with ALPN detection *) 534 + match use_proxy, redirect_is_https, t.proxy, protocol_hint with 535 + | false, true, _, Some H2 -> 536 + (* Known HTTP/2 - use h2_pool with shared connections *) 537 + Log.debug (fun m -> m "Using HTTP/2 for %s (from protocol hint)" url_to_fetch); 506 538 Eio.Switch.run (fun conn_sw -> 507 - let conn_info = Conpool.connection_with_info ~sw:conn_sw redirect_pool redirect_endpoint in 539 + let h2_conn = Conpool.connection ~sw:conn_sw t.h2_pool redirect_endpoint in 540 + match H2_conpool_handler.request 541 + ~state:h2_conn.Conpool.state 542 + ~uri:uri_to_fetch 543 + ~headers:headers_with_cookies 544 + ~body 545 + ~method_ 546 + ~auto_decompress:t.auto_decompress 547 + () 548 + with 549 + | Ok resp -> (resp.H2_adapter.status, resp.H2_adapter.headers, resp.H2_adapter.body) 550 + | Error msg -> raise (Error.err (Error.Invalid_request { reason = "HTTP/2 error: " ^ msg })) 551 + ) 552 + 553 + | false, true, _, Some H1 -> 554 + (* Known HTTP/1.x - use https_pool *) 555 + Log.debug (fun m -> m "Using HTTP/1.1 for %s (from protocol hint)" url_to_fetch); 556 + Eio.Switch.run (fun conn_sw -> 557 + let conn_info = Conpool.connection ~sw:conn_sw redirect_pool redirect_endpoint in 558 + Http_client.make_request_100_continue_decompress 559 + ~expect_100:t.expect_100_continue 560 + ~clock:t.clock 561 + ~sw:t.sw 562 + ~method_ ~uri:uri_to_fetch 563 + ~headers:headers_with_cookies ~body 564 + ~auto_decompress:t.auto_decompress conn_info.Conpool.flow 565 + ) 566 + 567 + | false, _, _, _ -> 568 + (* Unknown protocol or non-HTTPS - use ALPN detection *) 569 + Eio.Switch.run (fun conn_sw -> 570 + let conn_info = Conpool.connection ~sw:conn_sw redirect_pool redirect_endpoint in 508 571 (* Check ALPN negotiated protocol *) 509 - let is_h2 = match conn_info.tls_epoch with 572 + let is_h2 = match conn_info.Conpool.tls_epoch with 510 573 | Some epoch -> epoch.Tls.Core.alpn_protocol = Some "h2" 511 574 | None -> false 512 575 in 576 + 577 + (* Update protocol hint for future requests *) 578 + if redirect_is_https then begin 579 + let hint = if is_h2 then H2 else H1 in 580 + Eio.Mutex.use_rw ~protect:true t.protocol_hints_mutex (fun () -> 581 + Hashtbl.replace t.protocol_hints endpoint_key hint); 582 + Log.debug (fun m -> m "Learned protocol for %s: %s" 583 + endpoint_key (if is_h2 then "H2" else "H1")) 584 + end; 585 + 513 586 if is_h2 then begin 514 - (* Use HTTP/2 client via H2_adapter (handles caching and decompression) *) 587 + (* First H2 connection - use H2_adapter for this request 588 + (subsequent requests will use h2_pool) *) 515 589 Log.debug (fun m -> m "Using HTTP/2 for %s (ALPN negotiated)" url_to_fetch); 516 - let result = H2_adapter.request 517 - ~flow:conn_info.flow 590 + match H2_adapter.request 591 + ~sw:conn_sw 592 + ~flow:conn_info.Conpool.flow 518 593 ~uri:uri_to_fetch 519 594 ~headers:headers_with_cookies 520 595 ~body 521 596 ~method_ 522 597 ~auto_decompress:t.auto_decompress 523 598 () 524 - in 525 - (* Close the connection after HTTP/2 use to prevent Conpool from reusing it. 526 - HTTP/2 connection state (HPACK tables, flow control windows, stream IDs) 527 - is tied to the TCP connection, and we're not maintaining it across requests. 528 - Proper HTTP/2 multiplexing would require architectural changes to Conpool. *) 529 - Eio.Flow.close conn_info.flow; 530 - match result with 599 + with 531 600 | Ok resp -> (resp.H2_adapter.status, resp.H2_adapter.headers, resp.H2_adapter.body) 532 601 | Error msg -> raise (Error.err (Error.Invalid_request { reason = "HTTP/2 error: " ^ msg })) 533 602 end else begin ··· 538 607 ~sw:t.sw 539 608 ~method_ ~uri:uri_to_fetch 540 609 ~headers:headers_with_cookies ~body 541 - ~auto_decompress:t.auto_decompress conn_info.flow 610 + ~auto_decompress:t.auto_decompress conn_info.Conpool.flow 542 611 end 543 612 ) 544 613 545 - | true, false, Some proxy -> 614 + | true, false, Some proxy, _ -> 546 615 (* HTTP via proxy - connect to proxy and use absolute-URI form *) 547 616 Log.debug (fun m -> m "Routing HTTP request via proxy %s:%d" 548 617 proxy.Proxy.host proxy.Proxy.port); ··· 555 624 Headers.get `Authorization auth_headers 556 625 | None -> None 557 626 in 558 - Conpool.with_connection t.http_pool proxy_endpoint (fun flow -> 627 + Conpool.with_connection t.http_pool proxy_endpoint (fun conn -> 559 628 (* Write request using absolute-URI form *) 560 - Http_write.write_and_flush flow (fun w -> 629 + Http_write.write_and_flush conn.Conpool.flow (fun w -> 561 630 Http_write.request_via_proxy w ~sw:t.sw ~method_ ~uri:uri_to_fetch 562 631 ~headers:headers_with_cookies ~body 563 632 ~proxy_auth 564 633 ); 565 634 (* Read response *) 566 635 let limits = Response_limits.default in 567 - let buf_read = Http_read.of_flow ~max_size:65536 flow in 636 + let buf_read = Http_read.of_flow ~max_size:65536 conn.Conpool.flow in 568 637 let _version, status, resp_headers, body_str = 569 638 Http_read.response ~limits ~method_ buf_read in 570 639 (* Handle decompression if enabled *) ··· 576 645 (status, resp_headers, body_str) 577 646 ) 578 647 579 - | true, true, Some proxy -> 648 + | true, true, Some proxy, _ -> 580 649 (* HTTPS via proxy - establish CONNECT tunnel then TLS *) 581 650 Log.debug (fun m -> m "Routing HTTPS request via proxy %s:%d (CONNECT tunnel)" 582 651 proxy.Proxy.host proxy.Proxy.port); ··· 598 667 ~headers:headers_with_cookies ~body 599 668 ~auto_decompress:t.auto_decompress tunnel_flow 600 669 601 - | true, _, None -> 670 + | true, _, None, _ -> 602 671 (* Should not happen due to use_proxy check *) 603 - Conpool.with_connection redirect_pool redirect_endpoint (fun flow -> 672 + Conpool.with_connection redirect_pool redirect_endpoint (fun conn -> 604 673 Http_client.make_request_100_continue_decompress 605 674 ~expect_100:t.expect_100_continue 606 675 ~clock:t.clock 607 676 ~sw:t.sw 608 677 ~method_ ~uri:uri_to_fetch 609 678 ~headers:headers_with_cookies ~body 610 - ~auto_decompress:t.auto_decompress flow 679 + ~auto_decompress:t.auto_decompress conn.Conpool.flow 611 680 ) 612 681 in 613 682