this repo has no description
1open Swim.Types
2module Cluster = Swim.Cluster
3
4external env_cast : 'a -> 'b = "%identity"
5
6type throughput_result = {
7 port : int;
8 broadcasts_sent : int;
9 broadcasts_received : int;
10 elapsed_sec : float;
11 msgs_per_sec : float;
12}
13
14let result_to_json r =
15 Printf.sprintf
16 {|{"port":%d,"broadcasts_sent":%d,"broadcasts_received":%d,"elapsed_sec":%.3f,"msgs_per_sec":%.1f}|}
17 r.port r.broadcasts_sent r.broadcasts_received r.elapsed_sec r.msgs_per_sec
18
19let make_config ~port ~name =
20 {
21 default_config with
22 bind_addr = "\127\000\000\001";
23 bind_port = port;
24 node_name = Some name;
25 protocol_interval = 0.1;
26 probe_timeout = 0.05;
27 suspicion_mult = 2;
28 secret_key = String.make 16 'k';
29 cluster_name = "";
30 encryption_enabled = false;
31 }
32
33let run_throughput_node ~env ~port ~peers ~duration_sec ~msg_rate ~use_direct =
34 let start_time = Unix.gettimeofday () in
35 let broadcasts_sent = ref 0 in
36 let broadcasts_received = ref 0 in
37 let msg_interval = 1.0 /. float_of_int msg_rate in
38
39 Eio.Switch.run @@ fun sw ->
40 let config = make_config ~port ~name:(Printf.sprintf "node-%d" port) in
41 let env_wrap = { stdenv = env; sw } in
42
43 match Cluster.create ~sw ~env:env_wrap ~config with
44 | Error `Invalid_key -> Unix._exit 1
45 | Ok cluster ->
46 Cluster.on_message cluster (fun _sender topic _payload ->
47 if topic = "bench" then incr broadcasts_received);
48
49 Cluster.start cluster;
50
51 let peer_addrs =
52 List.filter_map
53 (fun peer_port ->
54 if peer_port <> port then
55 Some (`Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\001", peer_port))
56 else None)
57 peers
58 in
59
60 List.iter
61 (fun peer_port ->
62 if peer_port <> port then
63 let peer_id =
64 node_id_of_string (Printf.sprintf "node-%d" peer_port)
65 in
66 let peer_addr =
67 `Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\001", peer_port)
68 in
69 let peer = make_node_info ~id:peer_id ~addr:peer_addr ~meta:"" in
70 Cluster.add_member cluster peer)
71 peers;
72
73 Eio.Time.sleep env#clock 0.5;
74
75 let payload = String.make 64 'x' in
76 let end_time = start_time +. duration_sec in
77
78 while Unix.gettimeofday () < end_time do
79 if use_direct then
80 List.iter
81 (fun addr ->
82 Cluster.send_to_addr cluster ~addr ~topic:"bench" ~payload;
83 incr broadcasts_sent)
84 peer_addrs
85 else (
86 Cluster.broadcast cluster ~topic:"bench" ~payload;
87 incr broadcasts_sent);
88 Eio.Time.sleep env#clock msg_interval
89 done;
90
91 Eio.Time.sleep env#clock 0.5;
92
93 let elapsed = Unix.gettimeofday () -. start_time in
94 let result =
95 {
96 port;
97 broadcasts_sent = !broadcasts_sent;
98 broadcasts_received = !broadcasts_received;
99 elapsed_sec = elapsed;
100 msgs_per_sec = float_of_int !broadcasts_received /. (elapsed -. 1.0);
101 }
102 in
103
104 print_endline (result_to_json result);
105 flush stdout;
106 Unix._exit 0
107
108let parse_peers s =
109 String.split_on_char ',' s
110 |> List.filter (fun s -> String.length s > 0)
111 |> List.map int_of_string
112
113let () =
114 let port = ref 0 in
115 let peers_str = ref "" in
116 let duration_sec = ref 10.0 in
117 let msg_rate = ref 100 in
118 let use_direct = ref true in
119
120 let specs =
121 [
122 ("-port", Arg.Set_int port, "Port to bind to (required)");
123 ("-peers", Arg.Set_string peers_str, "Comma-separated peer ports");
124 ("-duration", Arg.Set_float duration_sec, "Duration in seconds");
125 ("-rate", Arg.Set_int msg_rate, "Messages per second to send");
126 ("-direct", Arg.Set use_direct, "Use direct UDP send (default: true)");
127 ("-gossip", Arg.Clear use_direct, "Use gossip broadcast instead of direct");
128 ]
129 in
130 Arg.parse specs (fun _ -> ()) "SWIM Throughput Benchmark";
131
132 if !port = 0 then (
133 Printf.eprintf "Error: -port is required\n";
134 exit 1);
135
136 let peers = parse_peers !peers_str in
137
138 Eio_main.run @@ fun env ->
139 let env = env_cast env in
140 run_throughput_node ~env ~port:!port ~peers ~duration_sec:!duration_sec
141 ~msg_rate:!msg_rate ~use_direct:!use_direct