A batteries included HTTP/1.1 client in OCaml
at main 227 lines 8.1 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(* Test conpool with 16 localhost servers on different 127.0.* addresses *) 7 8open Eio.Std 9 10(* Create a simple echo server on a specific address and port *) 11let server ~sw ~net ipaddr port connections_ref = 12 let socket = 13 Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 (`Tcp (ipaddr, port)) 14 in 15 16 Eio.Fiber.fork ~sw (fun () -> 17 try 18 while true do 19 Eio.Net.accept_fork socket ~sw 20 ~on_error:(fun ex -> 21 traceln "Server %a error: %s" Eio.Net.Sockaddr.pp 22 (`Tcp (ipaddr, port)) 23 (Printexc.to_string ex)) 24 (fun flow _addr -> 25 (* Track this connection *) 26 Atomic.incr connections_ref; 27 28 (* Simple protocol: read lines and echo them back, until EOF *) 29 try 30 let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in 31 while true do 32 let line = Eio.Buf_read.line buf in 33 traceln "Server on %a:%d received: %s" Eio.Net.Ipaddr.pp 34 ipaddr port line; 35 36 Eio.Flow.copy_string (line ^ "\n") flow 37 done 38 with 39 | End_of_file -> 40 traceln "Server on %a:%d client disconnected" 41 Eio.Net.Ipaddr.pp ipaddr port; 42 Eio.Flow.close flow; 43 Atomic.decr connections_ref 44 | ex -> 45 traceln "Server on %a:%d error handling connection: %s" 46 Eio.Net.Ipaddr.pp ipaddr port (Printexc.to_string ex); 47 Eio.Flow.close flow; 48 Atomic.decr connections_ref) 49 done 50 with Eio.Cancel.Cancelled _ -> ()) 51 52(** Generate 16 different servers on 127.0.0.1 with different ports *) 53let generate_localhost_addresses () = 54 List.init 16 (fun i -> 55 (* Use 127.0.0.1 for all, just different ports *) 56 let addr_str = "127.0.0.1" in 57 (* Create raw IPv4 address as 4 bytes *) 58 let raw_bytes = Bytes.create 4 in 59 Bytes.set raw_bytes 0 (Char.chr 127); 60 Bytes.set raw_bytes 1 (Char.chr 0); 61 Bytes.set raw_bytes 2 (Char.chr 0); 62 Bytes.set raw_bytes 3 (Char.chr 1); 63 let addr = Eio.Net.Ipaddr.of_raw (Bytes.to_string raw_bytes) in 64 (addr_str, addr, 10000 + i)) 65 66let () = 67 Vlog.setup_test ~level:Logs.Info (); 68 Logs.Src.set_level Conpool.src (Some Logs.Debug); 69 Eio_main.run @@ fun env -> 70 Switch.run @@ fun sw -> 71 traceln "=== Starting 16 localhost servers ==="; 72 73 (* Generate addresses *) 74 let servers = generate_localhost_addresses () in 75 76 (* Create connection counters for each server *) 77 let connection_refs = List.map (fun _ -> Atomic.make 0) servers in 78 79 (* Start all servers *) 80 List.iter2 81 (fun (_addr_str, addr, port) conn_ref -> 82 traceln "Starting server on %a:%d" Eio.Net.Ipaddr.pp addr port; 83 server ~sw ~net:env#net addr port conn_ref) 84 servers connection_refs; 85 86 (* Give servers time to start *) 87 Eio.Time.sleep env#clock 0.5; 88 89 traceln "\n=== Creating connection pool ==="; 90 91 (* Create connection pool *) 92 let pool_config = 93 Conpool.Config.v ~max_connections_per_endpoint:5 ~max_idle_time:30.0 94 ~max_connection_lifetime:60.0 () 95 in 96 97 let pool = 98 Conpool.basic ~sw ~net:env#net ~clock:env#clock ~config:pool_config () 99 in 100 101 traceln "\n=== Stress testing with thousands of concurrent connections ==="; 102 103 (* Disable debug logging for stress test *) 104 Logs.Src.set_level Conpool.src (Some Logs.Info); 105 106 (* Create endpoints for all servers *) 107 let endpoints = 108 List.map 109 (fun (addr_str, _addr, port) -> Conpool.Endpoint.v ~host:addr_str ~port) 110 servers 111 in 112 113 (* Stress test: thousands of concurrent requests across all 16 servers *) 114 let num_requests = 50000 in 115 116 traceln "Launching %d concurrent requests across %d endpoints..." num_requests 117 (List.length endpoints); 118 traceln "Pool config: max %d connections per endpoint" 119 (Conpool.Config.max_connections_per_endpoint pool_config); 120 121 let start_time = Unix.gettimeofday () in 122 let success_count = Atomic.make 0 in 123 let error_count = Atomic.make 0 in 124 let last_progress = ref 0 in 125 126 (* Generate list of (endpoint, request_id) pairs *) 127 let tasks = 128 List.init num_requests (fun i -> 129 let endpoint = List.nth endpoints (i mod List.length endpoints) in 130 (endpoint, i)) 131 in 132 133 (* Run all requests concurrently with fiber limit *) 134 Eio.Fiber.List.iter ~max_fibers:200 135 (fun (endpoint, req_id) -> 136 try 137 Conpool.with_connection pool endpoint (fun conn -> 138 let test_msg = Fmt.str "Request %d" req_id in 139 Eio.Flow.copy_string (test_msg ^ "\n") conn.Conpool.flow; 140 141 let buf = Eio.Buf_read.of_flow conn.Conpool.flow ~max_size:1024 in 142 let _response = Eio.Buf_read.line buf in 143 let count = Atomic.fetch_and_add success_count 1 + 1 in 144 145 (* Progress indicator every 5000 requests *) 146 if count / 5000 > !last_progress then begin 147 last_progress := count / 5000; 148 traceln " Progress: %d/%d (%.1f%%)" count num_requests 149 (100.0 *. float_of_int count /. float_of_int num_requests) 150 end) 151 with e -> 152 Atomic.incr error_count; 153 if Atomic.get error_count <= 10 then 154 traceln "Request %d to %a failed: %s" req_id Conpool.Endpoint.pp 155 endpoint (Printexc.to_string e)) 156 tasks; 157 158 let end_time = Unix.gettimeofday () in 159 let duration = end_time -. start_time in 160 let successful = Atomic.get success_count in 161 let failed = Atomic.get error_count in 162 163 traceln "\n=== Stress test results ==="; 164 traceln "Total requests: %d" num_requests; 165 traceln "Successful: %d" successful; 166 traceln "Failed: %d" failed; 167 traceln "Duration: %.2fs" duration; 168 traceln "Throughput: %.0f req/s" (float_of_int successful /. duration); 169 traceln "Average latency: %.2fms" 170 (duration *. 1000.0 /. float_of_int successful); 171 172 traceln "\n=== Connection pool statistics ==="; 173 let all_stats = Conpool.all_stats pool in 174 175 (* Calculate totals *) 176 let total_created = 177 List.fold_left 178 (fun acc (_, s) -> acc + Conpool.Stats.total_created s) 179 0 all_stats 180 in 181 let total_reused = 182 List.fold_left 183 (fun acc (_, s) -> acc + Conpool.Stats.total_reused s) 184 0 all_stats 185 in 186 let total_closed = 187 List.fold_left 188 (fun acc (_, s) -> acc + Conpool.Stats.total_closed s) 189 0 all_stats 190 in 191 let total_errors = 192 List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.errors s) 0 all_stats 193 in 194 195 traceln "Total connections created: %d" total_created; 196 traceln "Total connections reused: %d" total_reused; 197 traceln "Total connections closed: %d" total_closed; 198 traceln "Total errors: %d" total_errors; 199 traceln "Connection reuse ratio: %.2fx (reused/created)" 200 (if total_created > 0 then 201 float_of_int total_reused /. float_of_int total_created 202 else 0.0); 203 traceln "Pool efficiency: %.1f%% (avoided creating %d connections)" 204 (if successful > 0 then 205 100.0 *. float_of_int total_reused /. float_of_int successful 206 else 0.0) 207 total_reused; 208 209 traceln "\nPer-endpoint breakdown:"; 210 List.iter 211 (fun (endpoint, stats) -> 212 traceln " %a: created=%d reused=%d active=%d idle=%d" Conpool.Endpoint.pp 213 endpoint 214 (Conpool.Stats.total_created stats) 215 (Conpool.Stats.total_reused stats) 216 (Conpool.Stats.active stats) 217 (Conpool.Stats.idle stats)) 218 all_stats; 219 220 traceln "\n=== Verifying server-side connection counts ==="; 221 List.iter2 222 (fun (addr_str, _addr, port) conn_ref -> 223 let count = Atomic.get conn_ref in 224 traceln "Server %s:%d - Active connections: %d" addr_str port count) 225 servers connection_refs; 226 227 traceln "\n=== Test completed successfully ==="