this repo has no description
at main 4.3 kB view raw
1open Swim.Types 2module Cluster = Swim.Cluster 3 4external env_cast : 'a -> 'b = "%identity" 5 6type throughput_result = { 7 port : int; 8 broadcasts_sent : int; 9 broadcasts_received : int; 10 elapsed_sec : float; 11 msgs_per_sec : float; 12} 13 14let result_to_json r = 15 Printf.sprintf 16 {|{"port":%d,"broadcasts_sent":%d,"broadcasts_received":%d,"elapsed_sec":%.3f,"msgs_per_sec":%.1f}|} 17 r.port r.broadcasts_sent r.broadcasts_received r.elapsed_sec r.msgs_per_sec 18 19let make_config ~port ~name = 20 { 21 default_config with 22 bind_addr = "\127\000\000\001"; 23 bind_port = port; 24 node_name = Some name; 25 protocol_interval = 0.1; 26 probe_timeout = 0.05; 27 suspicion_mult = 2; 28 secret_key = String.make 16 'k'; 29 cluster_name = ""; 30 encryption_enabled = false; 31 } 32 33let run_throughput_node ~env ~port ~peers ~duration_sec ~msg_rate ~use_direct = 34 let start_time = Unix.gettimeofday () in 35 let broadcasts_sent = ref 0 in 36 let broadcasts_received = ref 0 in 37 let msg_interval = 1.0 /. float_of_int msg_rate in 38 39 Eio.Switch.run @@ fun sw -> 40 let config = make_config ~port ~name:(Printf.sprintf "node-%d" port) in 41 let env_wrap = { stdenv = env; sw } in 42 43 match Cluster.create ~sw ~env:env_wrap ~config with 44 | Error `Invalid_key -> Unix._exit 1 45 | Ok cluster -> 46 Cluster.on_message cluster (fun _sender topic _payload -> 47 if topic = "bench" then incr broadcasts_received); 48 49 Cluster.start cluster; 50 51 let peer_addrs = 52 List.filter_map 53 (fun peer_port -> 54 if peer_port <> port then 55 Some (`Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\001", peer_port)) 56 else None) 57 peers 58 in 59 60 List.iter 61 (fun peer_port -> 62 if peer_port <> port then 63 let peer_id = 64 node_id_of_string (Printf.sprintf "node-%d" peer_port) 65 in 66 let peer_addr = 67 `Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\001", peer_port) 68 in 69 let peer = make_node_info ~id:peer_id ~addr:peer_addr ~meta:"" in 70 Cluster.add_member cluster peer) 71 peers; 72 73 Eio.Time.sleep env#clock 0.5; 74 75 let payload = String.make 64 'x' in 76 let end_time = start_time +. duration_sec in 77 78 while Unix.gettimeofday () < end_time do 79 if use_direct then 80 List.iter 81 (fun addr -> 82 Cluster.send_to_addr cluster ~addr ~topic:"bench" ~payload; 83 incr broadcasts_sent) 84 peer_addrs 85 else ( 86 Cluster.broadcast cluster ~topic:"bench" ~payload; 87 incr broadcasts_sent); 88 Eio.Time.sleep env#clock msg_interval 89 done; 90 91 Eio.Time.sleep env#clock 0.5; 92 93 let elapsed = Unix.gettimeofday () -. start_time in 94 let result = 95 { 96 port; 97 broadcasts_sent = !broadcasts_sent; 98 broadcasts_received = !broadcasts_received; 99 elapsed_sec = elapsed; 100 msgs_per_sec = float_of_int !broadcasts_received /. (elapsed -. 1.0); 101 } 102 in 103 104 print_endline (result_to_json result); 105 flush stdout; 106 Unix._exit 0 107 108let parse_peers s = 109 String.split_on_char ',' s 110 |> List.filter (fun s -> String.length s > 0) 111 |> List.map int_of_string 112 113let () = 114 let port = ref 0 in 115 let peers_str = ref "" in 116 let duration_sec = ref 10.0 in 117 let msg_rate = ref 100 in 118 let use_direct = ref true in 119 120 let specs = 121 [ 122 ("-port", Arg.Set_int port, "Port to bind to (required)"); 123 ("-peers", Arg.Set_string peers_str, "Comma-separated peer ports"); 124 ("-duration", Arg.Set_float duration_sec, "Duration in seconds"); 125 ("-rate", Arg.Set_int msg_rate, "Messages per second to send"); 126 ("-direct", Arg.Set use_direct, "Use direct UDP send (default: true)"); 127 ("-gossip", Arg.Clear use_direct, "Use gossip broadcast instead of direct"); 128 ] 129 in 130 Arg.parse specs (fun _ -> ()) "SWIM Throughput Benchmark"; 131 132 if !port = 0 then ( 133 Printf.eprintf "Error: -port is required\n"; 134 exit 1); 135 136 let peers = parse_peers !peers_str in 137 138 Eio_main.run @@ fun env -> 139 let env = env_cast env in 140 run_throughput_node ~env ~port:!port ~peers ~duration_sec:!duration_sec 141 ~msg_rate:!msg_rate ~use_direct:!use_direct