this repo has no description
at main 6.1 kB view raw
1package main 2 3import ( 4 "encoding/json" 5 "flag" 6 "fmt" 7 "log" 8 "os" 9 "runtime" 10 "sync" 11 "sync/atomic" 12 "time" 13 14 "github.com/hashicorp/memberlist" 15) 16 17type ThroughputResult struct { 18 Implementation string `json:"implementation"` 19 NumNodes int `json:"num_nodes"` 20 DurationNs int64 `json:"duration_ns"` 21 MsgRate int `json:"msg_rate"` 22 BroadcastsSent int64 `json:"broadcasts_sent"` 23 BroadcastsReceived int64 `json:"broadcasts_received"` 24 MsgsPerSec float64 `json:"msgs_per_sec"` 25 CPUCores int `json:"cpu_cores"` 26} 27 28type throughputDelegate struct { 29 received atomic.Int64 30 meta []byte 31} 32 33func (d *throughputDelegate) NodeMeta(limit int) []byte { 34 return d.meta 35} 36 37func (d *throughputDelegate) NotifyMsg(msg []byte) { 38 if len(msg) > 0 && msg[0] == 'B' { 39 d.received.Add(1) 40 } 41} 42 43func (d *throughputDelegate) GetBroadcasts(overhead, limit int) [][]byte { 44 return nil 45} 46 47func (d *throughputDelegate) LocalState(join bool) []byte { 48 return nil 49} 50 51func (d *throughputDelegate) MergeRemoteState(buf []byte, join bool) { 52} 53 54type throughputEventDelegate struct { 55 joinCh chan string 56 mu sync.Mutex 57 joined map[string]bool 58} 59 60func newThroughputEventDelegate() *throughputEventDelegate { 61 return &throughputEventDelegate{ 62 joinCh: make(chan string, 1000), 63 joined: make(map[string]bool), 64 } 65} 66 67func (e *throughputEventDelegate) NotifyJoin(node *memberlist.Node) { 68 e.mu.Lock() 69 if !e.joined[node.Name] { 70 e.joined[node.Name] = true 71 select { 72 case e.joinCh <- node.Name: 73 default: 74 } 75 } 76 e.mu.Unlock() 77} 78 79func (e *throughputEventDelegate) NotifyLeave(node *memberlist.Node) {} 80func (e *throughputEventDelegate) NotifyUpdate(node *memberlist.Node) {} 81 82func (e *throughputEventDelegate) waitForNodes(n int, timeout time.Duration) bool { 83 deadline := time.Now().Add(timeout) 84 for { 85 e.mu.Lock() 86 count := len(e.joined) 87 e.mu.Unlock() 88 if count >= n { 89 return true 90 } 91 if time.Now().After(deadline) { 92 return false 93 } 94 time.Sleep(10 * time.Millisecond) 95 } 96} 97 98func createNode(name string, port int, delegate *throughputDelegate, events *throughputEventDelegate) (*memberlist.Memberlist, error) { 99 cfg := memberlist.DefaultLANConfig() 100 cfg.Name = name 101 cfg.BindAddr = "127.0.0.1" 102 cfg.BindPort = port 103 cfg.AdvertisePort = port 104 cfg.Delegate = delegate 105 cfg.Events = events 106 cfg.LogOutput = os.Stderr 107 cfg.GossipInterval = 50 * time.Millisecond 108 cfg.ProbeInterval = 200 * time.Millisecond 109 cfg.PushPullInterval = 30 * time.Second 110 cfg.GossipNodes = 3 111 112 return memberlist.Create(cfg) 113} 114 115func runThroughputBenchmark(numNodes int, duration time.Duration, msgRate int) (*ThroughputResult, error) { 116 nodes := make([]*memberlist.Memberlist, numNodes) 117 delegates := make([]*throughputDelegate, numNodes) 118 eventDelegates := make([]*throughputEventDelegate, numNodes) 119 120 basePort := 18946 121 122 for i := 0; i < numNodes; i++ { 123 delegates[i] = &throughputDelegate{meta: []byte(fmt.Sprintf("node-%d", i))} 124 eventDelegates[i] = newThroughputEventDelegate() 125 126 var err error 127 nodes[i], err = createNode( 128 fmt.Sprintf("node-%d", i), 129 basePort+i, 130 delegates[i], 131 eventDelegates[i], 132 ) 133 if err != nil { 134 for j := 0; j < i; j++ { 135 nodes[j].Shutdown() 136 } 137 return nil, fmt.Errorf("failed to create node %d: %w", i, err) 138 } 139 } 140 141 for i := 1; i < numNodes; i++ { 142 addr := fmt.Sprintf("127.0.0.1:%d", basePort) 143 _, err := nodes[i].Join([]string{addr}) 144 if err != nil { 145 log.Printf("Warning: node %d failed to join: %v", i, err) 146 } 147 } 148 149 for i := 0; i < numNodes; i++ { 150 if !eventDelegates[i].waitForNodes(numNodes, 10*time.Second) { 151 log.Printf("Warning: Node %d did not see all %d nodes", i, numNodes) 152 } 153 } 154 155 time.Sleep(500 * time.Millisecond) 156 157 var totalSent atomic.Int64 158 stopCh := make(chan struct{}) 159 var wg sync.WaitGroup 160 161 msgInterval := time.Duration(float64(time.Second) / float64(msgRate)) 162 msg := make([]byte, 65) 163 msg[0] = 'B' 164 for i := 1; i < 65; i++ { 165 msg[i] = 'x' 166 } 167 168 startTime := time.Now() 169 170 for i, n := range nodes { 171 wg.Add(1) 172 go func(node *memberlist.Memberlist, idx int) { 173 defer wg.Done() 174 ticker := time.NewTicker(msgInterval) 175 defer ticker.Stop() 176 for { 177 select { 178 case <-ticker.C: 179 for _, member := range node.Members() { 180 if member.Name != node.LocalNode().Name { 181 node.SendBestEffort(member, msg) 182 totalSent.Add(1) 183 } 184 } 185 case <-stopCh: 186 return 187 } 188 } 189 }(n, i) 190 } 191 192 time.Sleep(duration) 193 close(stopCh) 194 wg.Wait() 195 196 elapsed := time.Since(startTime) 197 198 var totalReceived int64 199 for _, d := range delegates { 200 totalReceived += d.received.Load() 201 } 202 203 for _, n := range nodes { 204 n.Shutdown() 205 } 206 207 msgsPerSec := float64(totalReceived) / elapsed.Seconds() 208 209 return &ThroughputResult{ 210 Implementation: "memberlist", 211 NumNodes: numNodes, 212 DurationNs: duration.Nanoseconds(), 213 MsgRate: msgRate, 214 BroadcastsSent: totalSent.Load(), 215 BroadcastsReceived: totalReceived, 216 MsgsPerSec: msgsPerSec, 217 CPUCores: runtime.NumCPU(), 218 }, nil 219} 220 221func main() { 222 numNodes := flag.Int("nodes", 5, "number of nodes") 223 durationSec := flag.Int("duration", 10, "benchmark duration in seconds") 224 msgRate := flag.Int("rate", 100, "messages per second per node") 225 outputJSON := flag.Bool("json", false, "output as JSON") 226 flag.Parse() 227 228 result, err := runThroughputBenchmark(*numNodes, time.Duration(*durationSec)*time.Second, *msgRate) 229 if err != nil { 230 log.Fatalf("Benchmark failed: %v", err) 231 } 232 233 if *outputJSON { 234 enc := json.NewEncoder(os.Stdout) 235 enc.SetIndent("", " ") 236 enc.Encode(result) 237 } else { 238 fmt.Printf("=== Memberlist Throughput Results ===\n") 239 fmt.Printf("Nodes: %d\n", result.NumNodes) 240 fmt.Printf("Duration: %s\n", time.Duration(result.DurationNs)) 241 fmt.Printf("Target Rate: %d msg/s per node\n", result.MsgRate) 242 fmt.Printf("Broadcasts Sent: %d\n", result.BroadcastsSent) 243 fmt.Printf("Broadcasts Recv: %d\n", result.BroadcastsReceived) 244 fmt.Printf("Throughput: %.1f msg/s\n", result.MsgsPerSec) 245 fmt.Printf("CPU Cores: %d\n", result.CPUCores) 246 } 247}