A batteries included HTTP/1.1 client in OCaml
at claude-test 216 lines 7.9 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(* Test conpool with 16 localhost servers on different 127.0.* addresses *) 7 8open Eio.Std 9 10(* Create a simple echo server on a specific address and port *) 11let create_server ~sw ~net ipaddr port connections_ref = 12 let socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 13 (`Tcp (ipaddr, port)) 14 in 15 16 Eio.Fiber.fork ~sw (fun () -> 17 try 18 while true do 19 Eio.Net.accept_fork socket ~sw ~on_error:(fun ex -> 20 traceln "Server %a error: %s" Eio.Net.Sockaddr.pp (`Tcp (ipaddr, port)) 21 (Printexc.to_string ex) 22 ) (fun flow _addr -> 23 (* Track this connection *) 24 Atomic.incr connections_ref; 25 26 (* Simple protocol: read lines and echo them back, until EOF *) 27 try 28 let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in 29 while true do 30 let line = Eio.Buf_read.line buf in 31 traceln "Server on %a:%d received: %s" 32 Eio.Net.Ipaddr.pp ipaddr port line; 33 34 Eio.Flow.copy_string (line ^ "\n") flow 35 done 36 with 37 | End_of_file -> 38 traceln "Server on %a:%d client disconnected" 39 Eio.Net.Ipaddr.pp ipaddr port; 40 Eio.Flow.close flow; 41 Atomic.decr connections_ref 42 | ex -> 43 traceln "Server on %a:%d error handling connection: %s" 44 Eio.Net.Ipaddr.pp ipaddr port 45 (Printexc.to_string ex); 46 Eio.Flow.close flow; 47 Atomic.decr connections_ref 48 ) 49 done 50 with Eio.Cancel.Cancelled _ -> () 51 ) 52 53(** Generate 16 different servers on 127.0.0.1 with different ports *) 54let generate_localhost_addresses () = 55 List.init 16 (fun i -> 56 (* Use 127.0.0.1 for all, just different ports *) 57 let addr_str = "127.0.0.1" in 58 (* Create raw IPv4 address as 4 bytes *) 59 let raw_bytes = Bytes.create 4 in 60 Bytes.set raw_bytes 0 (Char.chr 127); 61 Bytes.set raw_bytes 1 (Char.chr 0); 62 Bytes.set raw_bytes 2 (Char.chr 0); 63 Bytes.set raw_bytes 3 (Char.chr 1); 64 let addr = Eio.Net.Ipaddr.of_raw (Bytes.to_string raw_bytes) in 65 (addr_str, addr, 10000 + i) 66 ) 67 68let () = 69 (* Setup logging *) 70 Logs.set_reporter (Logs_fmt.reporter ()); 71 Logs.set_level (Some Logs.Info); 72 Logs.Src.set_level Conpool.src (Some Logs.Debug); 73 74 Eio_main.run @@ fun env -> 75 Switch.run @@ fun sw -> 76 77 traceln "=== Starting 16 localhost servers ==="; 78 79 (* Generate addresses *) 80 let servers = generate_localhost_addresses () in 81 82 (* Create connection counters for each server *) 83 let connection_refs = List.map (fun _ -> Atomic.make 0) servers in 84 85 (* Start all servers *) 86 List.iter2 (fun (_addr_str, addr, port) conn_ref -> 87 traceln "Starting server on %a:%d" 88 Eio.Net.Ipaddr.pp addr port; 89 create_server ~sw ~net:env#net addr port conn_ref 90 ) servers connection_refs; 91 92 (* Give servers time to start *) 93 Eio.Time.sleep env#clock 0.5; 94 95 traceln "\n=== Creating connection pool ==="; 96 97 (* Create connection pool *) 98 let pool_config = Conpool.Config.make 99 ~max_connections_per_endpoint:5 100 ~max_idle_time:30.0 101 ~max_connection_lifetime:60.0 102 () 103 in 104 105 let pool = Conpool.create 106 ~sw 107 ~net:env#net 108 ~clock:env#clock 109 ~config:pool_config 110 () 111 in 112 113 traceln "\n=== Stress testing with thousands of concurrent connections ==="; 114 115 (* Disable debug logging for stress test *) 116 Logs.Src.set_level Conpool.src (Some Logs.Info); 117 118 (* Create endpoints for all servers *) 119 let endpoints = List.map (fun (addr_str, _addr, port) -> 120 Conpool.Endpoint.make ~host:addr_str ~port 121 ) servers in 122 123 (* Stress test: thousands of concurrent requests across all 16 servers *) 124 let num_requests = 50000 in 125 126 traceln "Launching %d concurrent requests across %d endpoints..." 127 num_requests (List.length endpoints); 128 traceln "Pool config: max %d connections per endpoint" 129 (Conpool.Config.max_connections_per_endpoint pool_config); 130 131 let start_time = Unix.gettimeofday () in 132 let success_count = Atomic.make 0 in 133 let error_count = Atomic.make 0 in 134 let last_progress = ref 0 in 135 136 (* Generate list of (endpoint, request_id) pairs *) 137 let tasks = List.init num_requests (fun i -> 138 let endpoint = List.nth endpoints (i mod List.length endpoints) in 139 (endpoint, i) 140 ) in 141 142 (* Run all requests concurrently with fiber limit *) 143 Eio.Fiber.List.iter ~max_fibers:200 (fun (endpoint, req_id) -> 144 try 145 Conpool.with_connection pool endpoint (fun flow -> 146 let test_msg = Printf.sprintf "Request %d" req_id in 147 Eio.Flow.copy_string (test_msg ^ "\n") flow; 148 149 let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in 150 let _response = Eio.Buf_read.line buf in 151 let count = Atomic.fetch_and_add success_count 1 + 1 in 152 153 (* Progress indicator every 5000 requests *) 154 if count / 5000 > !last_progress then begin 155 last_progress := count / 5000; 156 traceln " Progress: %d/%d (%.1f%%)" 157 count num_requests 158 (100.0 *. float_of_int count /. float_of_int num_requests) 159 end 160 ) 161 with e -> 162 Atomic.incr error_count; 163 if Atomic.get error_count <= 10 then 164 traceln "Request %d to %a failed: %s" 165 req_id Conpool.Endpoint.pp endpoint (Printexc.to_string e) 166 ) tasks; 167 168 let end_time = Unix.gettimeofday () in 169 let duration = end_time -. start_time in 170 let successful = Atomic.get success_count in 171 let failed = Atomic.get error_count in 172 173 traceln "\n=== Stress test results ==="; 174 traceln "Total requests: %d" num_requests; 175 traceln "Successful: %d" successful; 176 traceln "Failed: %d" failed; 177 traceln "Duration: %.2fs" duration; 178 traceln "Throughput: %.0f req/s" (float_of_int successful /. duration); 179 traceln "Average latency: %.2fms" (duration *. 1000.0 /. float_of_int successful); 180 181 traceln "\n=== Connection pool statistics ==="; 182 let all_stats = Conpool.all_stats pool in 183 184 (* Calculate totals *) 185 let total_created = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_created s) 0 all_stats in 186 let total_reused = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_reused s) 0 all_stats in 187 let total_closed = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_closed s) 0 all_stats in 188 let total_errors = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.errors s) 0 all_stats in 189 190 traceln "Total connections created: %d" total_created; 191 traceln "Total connections reused: %d" total_reused; 192 traceln "Total connections closed: %d" total_closed; 193 traceln "Total errors: %d" total_errors; 194 traceln "Connection reuse ratio: %.2fx (reused/created)" 195 (if total_created > 0 then float_of_int total_reused /. float_of_int total_created else 0.0); 196 traceln "Pool efficiency: %.1f%% (avoided creating %d connections)" 197 (if successful > 0 then 100.0 *. float_of_int total_reused /. float_of_int successful else 0.0) 198 total_reused; 199 200 traceln "\nPer-endpoint breakdown:"; 201 List.iter (fun (endpoint, stats) -> 202 traceln " %a: created=%d reused=%d active=%d idle=%d" 203 Conpool.Endpoint.pp endpoint 204 (Conpool.Stats.total_created stats) 205 (Conpool.Stats.total_reused stats) 206 (Conpool.Stats.active stats) 207 (Conpool.Stats.idle stats) 208 ) all_stats; 209 210 traceln "\n=== Verifying server-side connection counts ==="; 211 List.iter2 (fun (addr_str, _addr, port) conn_ref -> 212 let count = Atomic.get conn_ref in 213 traceln "Server %s:%d - Active connections: %d" addr_str port count 214 ) servers connection_refs; 215 216 traceln "\n=== Test completed successfully ==="