forked from
anil.recoil.org/ocaml-requests
A batteries included HTTP/1.1 client in OCaml
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(* Test conpool with 16 localhost servers on different 127.0.* addresses *)
7
8open Eio.Std
9
10(* Create a simple echo server on a specific address and port *)
11let server ~sw ~net ipaddr port connections_ref =
12 let socket =
13 Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 (`Tcp (ipaddr, port))
14 in
15
16 Eio.Fiber.fork ~sw (fun () ->
17 try
18 while true do
19 Eio.Net.accept_fork socket ~sw
20 ~on_error:(fun ex ->
21 traceln "Server %a error: %s" Eio.Net.Sockaddr.pp
22 (`Tcp (ipaddr, port))
23 (Printexc.to_string ex))
24 (fun flow _addr ->
25 (* Track this connection *)
26 Atomic.incr connections_ref;
27
28 (* Simple protocol: read lines and echo them back, until EOF *)
29 try
30 let buf = Eio.Buf_read.of_flow flow ~max_size:1024 in
31 while true do
32 let line = Eio.Buf_read.line buf in
33 traceln "Server on %a:%d received: %s" Eio.Net.Ipaddr.pp
34 ipaddr port line;
35
36 Eio.Flow.copy_string (line ^ "\n") flow
37 done
38 with
39 | End_of_file ->
40 traceln "Server on %a:%d client disconnected"
41 Eio.Net.Ipaddr.pp ipaddr port;
42 Eio.Flow.close flow;
43 Atomic.decr connections_ref
44 | ex ->
45 traceln "Server on %a:%d error handling connection: %s"
46 Eio.Net.Ipaddr.pp ipaddr port (Printexc.to_string ex);
47 Eio.Flow.close flow;
48 Atomic.decr connections_ref)
49 done
50 with Eio.Cancel.Cancelled _ -> ())
51
52(** Generate 16 different servers on 127.0.0.1 with different ports *)
53let generate_localhost_addresses () =
54 List.init 16 (fun i ->
55 (* Use 127.0.0.1 for all, just different ports *)
56 let addr_str = "127.0.0.1" in
57 (* Create raw IPv4 address as 4 bytes *)
58 let raw_bytes = Bytes.create 4 in
59 Bytes.set raw_bytes 0 (Char.chr 127);
60 Bytes.set raw_bytes 1 (Char.chr 0);
61 Bytes.set raw_bytes 2 (Char.chr 0);
62 Bytes.set raw_bytes 3 (Char.chr 1);
63 let addr = Eio.Net.Ipaddr.of_raw (Bytes.to_string raw_bytes) in
64 (addr_str, addr, 10000 + i))
65
66let () =
67 Vlog.setup_test ~level:Logs.Info ();
68 Logs.Src.set_level Conpool.src (Some Logs.Debug);
69 Eio_main.run @@ fun env ->
70 Switch.run @@ fun sw ->
71 traceln "=== Starting 16 localhost servers ===";
72
73 (* Generate addresses *)
74 let servers = generate_localhost_addresses () in
75
76 (* Create connection counters for each server *)
77 let connection_refs = List.map (fun _ -> Atomic.make 0) servers in
78
79 (* Start all servers *)
80 List.iter2
81 (fun (_addr_str, addr, port) conn_ref ->
82 traceln "Starting server on %a:%d" Eio.Net.Ipaddr.pp addr port;
83 server ~sw ~net:env#net addr port conn_ref)
84 servers connection_refs;
85
86 (* Give servers time to start *)
87 Eio.Time.sleep env#clock 0.5;
88
89 traceln "\n=== Creating connection pool ===";
90
91 (* Create connection pool *)
92 let pool_config =
93 Conpool.Config.v ~max_connections_per_endpoint:5 ~max_idle_time:30.0
94 ~max_connection_lifetime:60.0 ()
95 in
96
97 let pool =
98 Conpool.basic ~sw ~net:env#net ~clock:env#clock ~config:pool_config ()
99 in
100
101 traceln "\n=== Stress testing with thousands of concurrent connections ===";
102
103 (* Disable debug logging for stress test *)
104 Logs.Src.set_level Conpool.src (Some Logs.Info);
105
106 (* Create endpoints for all servers *)
107 let endpoints =
108 List.map
109 (fun (addr_str, _addr, port) -> Conpool.Endpoint.v ~host:addr_str ~port)
110 servers
111 in
112
113 (* Stress test: thousands of concurrent requests across all 16 servers *)
114 let num_requests = 50000 in
115
116 traceln "Launching %d concurrent requests across %d endpoints..." num_requests
117 (List.length endpoints);
118 traceln "Pool config: max %d connections per endpoint"
119 (Conpool.Config.max_connections_per_endpoint pool_config);
120
121 let start_time = Unix.gettimeofday () in
122 let success_count = Atomic.make 0 in
123 let error_count = Atomic.make 0 in
124 let last_progress = ref 0 in
125
126 (* Generate list of (endpoint, request_id) pairs *)
127 let tasks =
128 List.init num_requests (fun i ->
129 let endpoint = List.nth endpoints (i mod List.length endpoints) in
130 (endpoint, i))
131 in
132
133 (* Run all requests concurrently with fiber limit *)
134 Eio.Fiber.List.iter ~max_fibers:200
135 (fun (endpoint, req_id) ->
136 try
137 Conpool.with_connection pool endpoint (fun conn ->
138 let test_msg = Fmt.str "Request %d" req_id in
139 Eio.Flow.copy_string (test_msg ^ "\n") conn.Conpool.flow;
140
141 let buf = Eio.Buf_read.of_flow conn.Conpool.flow ~max_size:1024 in
142 let _response = Eio.Buf_read.line buf in
143 let count = Atomic.fetch_and_add success_count 1 + 1 in
144
145 (* Progress indicator every 5000 requests *)
146 if count / 5000 > !last_progress then begin
147 last_progress := count / 5000;
148 traceln " Progress: %d/%d (%.1f%%)" count num_requests
149 (100.0 *. float_of_int count /. float_of_int num_requests)
150 end)
151 with e ->
152 Atomic.incr error_count;
153 if Atomic.get error_count <= 10 then
154 traceln "Request %d to %a failed: %s" req_id Conpool.Endpoint.pp
155 endpoint (Printexc.to_string e))
156 tasks;
157
158 let end_time = Unix.gettimeofday () in
159 let duration = end_time -. start_time in
160 let successful = Atomic.get success_count in
161 let failed = Atomic.get error_count in
162
163 traceln "\n=== Stress test results ===";
164 traceln "Total requests: %d" num_requests;
165 traceln "Successful: %d" successful;
166 traceln "Failed: %d" failed;
167 traceln "Duration: %.2fs" duration;
168 traceln "Throughput: %.0f req/s" (float_of_int successful /. duration);
169 traceln "Average latency: %.2fms"
170 (duration *. 1000.0 /. float_of_int successful);
171
172 traceln "\n=== Connection pool statistics ===";
173 let all_stats = Conpool.all_stats pool in
174
175 (* Calculate totals *)
176 let total_created =
177 List.fold_left
178 (fun acc (_, s) -> acc + Conpool.Stats.total_created s)
179 0 all_stats
180 in
181 let total_reused =
182 List.fold_left
183 (fun acc (_, s) -> acc + Conpool.Stats.total_reused s)
184 0 all_stats
185 in
186 let total_closed =
187 List.fold_left
188 (fun acc (_, s) -> acc + Conpool.Stats.total_closed s)
189 0 all_stats
190 in
191 let total_errors =
192 List.fold_left (fun acc (_, s) -> acc + Conpool.Stats.errors s) 0 all_stats
193 in
194
195 traceln "Total connections created: %d" total_created;
196 traceln "Total connections reused: %d" total_reused;
197 traceln "Total connections closed: %d" total_closed;
198 traceln "Total errors: %d" total_errors;
199 traceln "Connection reuse ratio: %.2fx (reused/created)"
200 (if total_created > 0 then
201 float_of_int total_reused /. float_of_int total_created
202 else 0.0);
203 traceln "Pool efficiency: %.1f%% (avoided creating %d connections)"
204 (if successful > 0 then
205 100.0 *. float_of_int total_reused /. float_of_int successful
206 else 0.0)
207 total_reused;
208
209 traceln "\nPer-endpoint breakdown:";
210 List.iter
211 (fun (endpoint, stats) ->
212 traceln " %a: created=%d reused=%d active=%d idle=%d" Conpool.Endpoint.pp
213 endpoint
214 (Conpool.Stats.total_created stats)
215 (Conpool.Stats.total_reused stats)
216 (Conpool.Stats.active stats)
217 (Conpool.Stats.idle stats))
218 all_stats;
219
220 traceln "\n=== Verifying server-side connection counts ===";
221 List.iter2
222 (fun (addr_str, _addr, port) conn_ref ->
223 let count = Atomic.get conn_ref in
224 traceln "Server %s:%d - Active connections: %d" addr_str port count)
225 servers connection_refs;
226
227 traceln "\n=== Test completed successfully ==="