open Swim.Types module Cluster = Swim.Cluster external env_cast : 'a -> 'b = "%identity" type throughput_result = { port : int; broadcasts_sent : int; broadcasts_received : int; elapsed_sec : float; msgs_per_sec : float; } let result_to_json r = Printf.sprintf {|{"port":%d,"broadcasts_sent":%d,"broadcasts_received":%d,"elapsed_sec":%.3f,"msgs_per_sec":%.1f}|} r.port r.broadcasts_sent r.broadcasts_received r.elapsed_sec r.msgs_per_sec let make_config ~port ~name = { default_config with bind_addr = "\127\000\000\001"; bind_port = port; node_name = Some name; protocol_interval = 0.1; probe_timeout = 0.05; suspicion_mult = 2; secret_key = String.make 16 'k'; cluster_name = ""; encryption_enabled = false; } let run_throughput_node ~env ~port ~peers ~duration_sec ~msg_rate ~use_direct = let start_time = Unix.gettimeofday () in let broadcasts_sent = ref 0 in let broadcasts_received = ref 0 in let msg_interval = 1.0 /. float_of_int msg_rate in Eio.Switch.run @@ fun sw -> let config = make_config ~port ~name:(Printf.sprintf "node-%d" port) in let env_wrap = { stdenv = env; sw } in match Cluster.create ~sw ~env:env_wrap ~config with | Error `Invalid_key -> Unix._exit 1 | Ok cluster -> Cluster.on_message cluster (fun _sender topic _payload -> if topic = "bench" then incr broadcasts_received); Cluster.start cluster; let peer_addrs = List.filter_map (fun peer_port -> if peer_port <> port then Some (`Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\001", peer_port)) else None) peers in List.iter (fun peer_port -> if peer_port <> port then let peer_id = node_id_of_string (Printf.sprintf "node-%d" peer_port) in let peer_addr = `Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\001", peer_port) in let peer = make_node_info ~id:peer_id ~addr:peer_addr ~meta:"" in Cluster.add_member cluster peer) peers; Eio.Time.sleep env#clock 0.5; let payload = String.make 64 'x' in let end_time = start_time +. duration_sec in while Unix.gettimeofday () < end_time do if use_direct then List.iter (fun addr -> Cluster.send_to_addr cluster ~addr ~topic:"bench" ~payload; incr broadcasts_sent) peer_addrs else ( Cluster.broadcast cluster ~topic:"bench" ~payload; incr broadcasts_sent); Eio.Time.sleep env#clock msg_interval done; Eio.Time.sleep env#clock 0.5; let elapsed = Unix.gettimeofday () -. start_time in let result = { port; broadcasts_sent = !broadcasts_sent; broadcasts_received = !broadcasts_received; elapsed_sec = elapsed; msgs_per_sec = float_of_int !broadcasts_received /. (elapsed -. 1.0); } in print_endline (result_to_json result); flush stdout; Unix._exit 0 let parse_peers s = String.split_on_char ',' s |> List.filter (fun s -> String.length s > 0) |> List.map int_of_string let () = let port = ref 0 in let peers_str = ref "" in let duration_sec = ref 10.0 in let msg_rate = ref 100 in let use_direct = ref true in let specs = [ ("-port", Arg.Set_int port, "Port to bind to (required)"); ("-peers", Arg.Set_string peers_str, "Comma-separated peer ports"); ("-duration", Arg.Set_float duration_sec, "Duration in seconds"); ("-rate", Arg.Set_int msg_rate, "Messages per second to send"); ("-direct", Arg.Set use_direct, "Use direct UDP send (default: true)"); ("-gossip", Arg.Clear use_direct, "Use gossip broadcast instead of direct"); ] in Arg.parse specs (fun _ -> ()) "SWIM Throughput Benchmark"; if !port = 0 then ( Printf.eprintf "Error: -port is required\n"; exit 1); let peers = parse_peers !peers_str in Eio_main.run @@ fun env -> let env = env_cast env in run_throughput_node ~env ~port:!port ~peers ~duration_sec:!duration_sec ~msg_rate:!msg_rate ~use_direct:!use_direct