forked from
anil.recoil.org/ocaml-requests
A batteries included HTTP/1.1 client in OCaml
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(* Test conpool with 16 localhost servers on different 127.0.* addresses *)
7
8open Eio.Std
9
10(* Create a simple echo server on a specific address and port *)
11let create_server ~sw ~net ipaddr port connections_ref =
12 let socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10
13 (`Tcp (ipaddr, port))
14 in
15
16 Eio.Fiber.fork ~sw (fun () ->
17 try
18 while true do
19 Eio.Net.accept_fork socket ~sw ~on_error:(fun ex ->
20 traceln "Server %a error: %s" Eio.Net.Sockaddr.pp (`Tcp (ipaddr, port))
21 (Printexc.to_string ex)
22 ) (fun flow _addr ->
23 (* Track this connection *)
24 Atomic.incr connections_ref;
25
26 (* Simple protocol: read lines and echo them back, until EOF *)
27 try
28 let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in
29 while true do
30 let line = Eio.Buf_read.line buf in
31 traceln "Server on %a:%d received: %s"
32 Eio.Net.Ipaddr.pp ipaddr port line;
33
34 Eio.Flow.copy_string (line ^ "\n") flow
35 done
36 with
37 | End_of_file ->
38 traceln "Server on %a:%d client disconnected"
39 Eio.Net.Ipaddr.pp ipaddr port;
40 Eio.Flow.close flow;
41 Atomic.decr connections_ref
42 | ex ->
43 traceln "Server on %a:%d error handling connection: %s"
44 Eio.Net.Ipaddr.pp ipaddr port
45 (Printexc.to_string ex);
46 Eio.Flow.close flow;
47 Atomic.decr connections_ref
48 )
49 done
50 with Eio.Cancel.Cancelled _ -> ()
51 )
52
53(** Generate 16 different servers on 127.0.0.1 with different ports *)
54let generate_localhost_addresses () =
55 List.init 16 (fun i ->
56 (* Use 127.0.0.1 for all, just different ports *)
57 let addr_str = "127.0.0.1" in
58 (* Create raw IPv4 address as 4 bytes *)
59 let raw_bytes = Bytes.create 4 in
60 Bytes.set raw_bytes 0 (Char.chr 127);
61 Bytes.set raw_bytes 1 (Char.chr 0);
62 Bytes.set raw_bytes 2 (Char.chr 0);
63 Bytes.set raw_bytes 3 (Char.chr 1);
64 let addr = Eio.Net.Ipaddr.of_raw (Bytes.to_string raw_bytes) in
65 (addr_str, addr, 10000 + i)
66 )
67
68let () =
69 (* Setup logging *)
70 Logs.set_reporter (Logs_fmt.reporter ());
71 Logs.set_level (Some Logs.Info);
72 Logs.Src.set_level Conpool.src (Some Logs.Debug);
73
74 Eio_main.run @@ fun env ->
75 Switch.run @@ fun sw ->
76
77 traceln "=== Starting 16 localhost servers ===";
78
79 (* Generate addresses *)
80 let servers = generate_localhost_addresses () in
81
82 (* Create connection counters for each server *)
83 let connection_refs = List.map (fun _ -> Atomic.make 0) servers in
84
85 (* Start all servers *)
86 List.iter2 (fun (_addr_str, addr, port) conn_ref ->
87 traceln "Starting server on %a:%d"
88 Eio.Net.Ipaddr.pp addr port;
89 create_server ~sw ~net:env#net addr port conn_ref
90 ) servers connection_refs;
91
92 (* Give servers time to start *)
93 Eio.Time.sleep env#clock 0.5;
94
95 traceln "\n=== Creating connection pool ===";
96
97 (* Create connection pool *)
98 let pool_config = Conpool.Config.make
99 ~max_connections_per_endpoint:5
100 ~max_idle_time:30.0
101 ~max_connection_lifetime:60.0
102 ()
103 in
104
105 let pool = Conpool.create
106 ~sw
107 ~net:env#net
108 ~clock:env#clock
109 ~config:pool_config
110 ()
111 in
112
113 traceln "\n=== Stress testing with thousands of concurrent connections ===";
114
115 (* Disable debug logging for stress test *)
116 Logs.Src.set_level Conpool.src (Some Logs.Info);
117
118 (* Create endpoints for all servers *)
119 let endpoints = List.map (fun (addr_str, _addr, port) ->
120 Conpool.Endpoint.make ~host:addr_str ~port
121 ) servers in
122
123 (* Stress test: thousands of concurrent requests across all 16 servers *)
124 let num_requests = 50000 in
125
126 traceln "Launching %d concurrent requests across %d endpoints..."
127 num_requests (List.length endpoints);
128 traceln "Pool config: max %d connections per endpoint"
129 (Conpool.Config.max_connections_per_endpoint pool_config);
130
131 let start_time = Unix.gettimeofday () in
132 let success_count = Atomic.make 0 in
133 let error_count = Atomic.make 0 in
134 let last_progress = ref 0 in
135
136 (* Generate list of (endpoint, request_id) pairs *)
137 let tasks = List.init num_requests (fun i ->
138 let endpoint = List.nth endpoints (i mod List.length endpoints) in
139 (endpoint, i)
140 ) in
141
142 (* Run all requests concurrently with fiber limit *)
143 Eio.Fiber.List.iter ~max_fibers:200 (fun (endpoint, req_id) ->
144 try
145 Conpool.with_connection pool endpoint (fun flow ->
146 let test_msg = Printf.sprintf "Request %d" req_id in
147 Eio.Flow.copy_string (test_msg ^ "\n") flow;
148
149 let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in
150 let _response = Eio.Buf_read.line buf in
151 let count = Atomic.fetch_and_add success_count 1 + 1 in
152
153 (* Progress indicator every 5000 requests *)
154 if count / 5000 > !last_progress then begin
155 last_progress := count / 5000;
156 traceln " Progress: %d/%d (%.1f%%)"
157 count num_requests
158 (100.0 *. float_of_int count /. float_of_int num_requests)
159 end
160 )
161 with e ->
162 Atomic.incr error_count;
163 if Atomic.get error_count <= 10 then
164 traceln "Request %d to %a failed: %s"
165 req_id Conpool.Endpoint.pp endpoint (Printexc.to_string e)
166 ) tasks;
167
168 let end_time = Unix.gettimeofday () in
169 let duration = end_time -. start_time in
170 let successful = Atomic.get success_count in
171 let failed = Atomic.get error_count in
172
173 traceln "\n=== Stress test results ===";
174 traceln "Total requests: %d" num_requests;
175 traceln "Successful: %d" successful;
176 traceln "Failed: %d" failed;
177 traceln "Duration: %.2fs" duration;
178 traceln "Throughput: %.0f req/s" (float_of_int successful /. duration);
179 traceln "Average latency: %.2fms" (duration *. 1000.0 /. float_of_int successful);
180
181 traceln "\n=== Connection pool statistics ===";
182 let all_stats = Conpool.all_stats pool in
183
184 (* Calculate totals *)
185 let total_created = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_created s) 0 all_stats in
186 let total_reused = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_reused s) 0 all_stats in
187 let total_closed = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.total_closed s) 0 all_stats in
188 let total_errors = List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.errors s) 0 all_stats in
189
190 traceln "Total connections created: %d" total_created;
191 traceln "Total connections reused: %d" total_reused;
192 traceln "Total connections closed: %d" total_closed;
193 traceln "Total errors: %d" total_errors;
194 traceln "Connection reuse ratio: %.2fx (reused/created)"
195 (if total_created > 0 then float_of_int total_reused /. float_of_int total_created else 0.0);
196 traceln "Pool efficiency: %.1f%% (avoided creating %d connections)"
197 (if successful > 0 then 100.0 *. float_of_int total_reused /. float_of_int successful else 0.0)
198 total_reused;
199
200 traceln "\nPer-endpoint breakdown:";
201 List.iter (fun (endpoint, stats) ->
202 traceln " %a: created=%d reused=%d active=%d idle=%d"
203 Conpool.Endpoint.pp endpoint
204 (Conpool.Stats.total_created stats)
205 (Conpool.Stats.total_reused stats)
206 (Conpool.Stats.active stats)
207 (Conpool.Stats.idle stats)
208 ) all_stats;
209
210 traceln "\n=== Verifying server-side connection counts ===";
211 List.iter2 (fun (addr_str, _addr, port) conn_ref ->
212 let count = Atomic.get conn_ref in
213 traceln "Server %s:%d - Active connections: %d" addr_str port count
214 ) servers connection_refs;
215
216 traceln "\n=== Test completed successfully ==="