this repo has no description
at main 6.5 kB view raw
1package main 2 3import ( 4 "encoding/json" 5 "flag" 6 "fmt" 7 "log" 8 "net" 9 "os" 10 "runtime" 11 "sync" 12 "sync/atomic" 13 "time" 14 15 "github.com/hashicorp/memberlist" 16) 17 18type BenchmarkResult struct { 19 Implementation string `json:"implementation"` 20 NumNodes int `json:"num_nodes"` 21 Duration time.Duration `json:"duration_ns"` 22 MessagesReceived int64 `json:"messages_received"` 23 MessagesSent int64 `json:"messages_sent"` 24 ConvergenceTime time.Duration `json:"convergence_time_ns"` 25 MemoryUsedBytes uint64 `json:"memory_used_bytes"` 26 GoroutinesUsed int `json:"goroutines_used"` 27 CPUCores int `json:"cpu_cores"` 28} 29 30type benchDelegate struct { 31 received atomic.Int64 32 sent atomic.Int64 33 meta []byte 34} 35 36func (d *benchDelegate) NodeMeta(limit int) []byte { 37 return d.meta 38} 39 40func (d *benchDelegate) NotifyMsg(msg []byte) { 41 d.received.Add(1) 42} 43 44func (d *benchDelegate) GetBroadcasts(overhead, limit int) [][]byte { 45 return nil 46} 47 48func (d *benchDelegate) LocalState(join bool) []byte { 49 return nil 50} 51 52func (d *benchDelegate) MergeRemoteState(buf []byte, join bool) { 53} 54 55type benchEventDelegate struct { 56 joinCh chan string 57 mu sync.Mutex 58 joined map[string]bool 59} 60 61func newBenchEventDelegate() *benchEventDelegate { 62 return &benchEventDelegate{ 63 joinCh: make(chan string, 1000), 64 joined: make(map[string]bool), 65 } 66} 67 68func (e *benchEventDelegate) NotifyJoin(node *memberlist.Node) { 69 e.mu.Lock() 70 if !e.joined[node.Name] { 71 e.joined[node.Name] = true 72 select { 73 case e.joinCh <- node.Name: 74 default: 75 } 76 } 77 e.mu.Unlock() 78} 79 80func (e *benchEventDelegate) NotifyLeave(node *memberlist.Node) {} 81func (e *benchEventDelegate) NotifyUpdate(node *memberlist.Node) {} 82 83func (e *benchEventDelegate) waitForNodes(n int, timeout time.Duration) bool { 84 deadline := time.Now().Add(timeout) 85 for { 86 e.mu.Lock() 87 count := len(e.joined) 88 e.mu.Unlock() 89 if count >= n { 90 return true 91 } 92 if time.Now().After(deadline) { 93 return false 94 } 95 time.Sleep(10 * time.Millisecond) 96 } 97} 98 99func createMemberlistNode(name string, port int, delegate *benchDelegate, events *benchEventDelegate) (*memberlist.Memberlist, error) { 100 cfg := memberlist.DefaultLANConfig() 101 cfg.Name = name 102 cfg.BindAddr = "127.0.0.1" 103 cfg.BindPort = port 104 cfg.AdvertisePort = port 105 cfg.Delegate = delegate 106 cfg.Events = events 107 cfg.LogOutput = os.Stderr 108 cfg.GossipInterval = 100 * time.Millisecond 109 cfg.ProbeInterval = 500 * time.Millisecond 110 cfg.PushPullInterval = 15 * time.Second 111 cfg.GossipNodes = 3 112 113 return memberlist.Create(cfg) 114} 115 116func runMemberlistBenchmark(numNodes int, duration time.Duration) (*BenchmarkResult, error) { 117 var memBefore runtime.MemStats 118 runtime.GC() 119 runtime.ReadMemStats(&memBefore) 120 goroutinesBefore := runtime.NumGoroutine() 121 122 nodes := make([]*memberlist.Memberlist, numNodes) 123 delegates := make([]*benchDelegate, numNodes) 124 eventDelegates := make([]*benchEventDelegate, numNodes) 125 126 basePort := 17946 127 128 for i := 0; i < numNodes; i++ { 129 delegates[i] = &benchDelegate{meta: []byte(fmt.Sprintf("node-%d", i))} 130 eventDelegates[i] = newBenchEventDelegate() 131 132 var err error 133 nodes[i], err = createMemberlistNode( 134 fmt.Sprintf("node-%d", i), 135 basePort+i, 136 delegates[i], 137 eventDelegates[i], 138 ) 139 if err != nil { 140 for j := 0; j < i; j++ { 141 nodes[j].Shutdown() 142 } 143 return nil, fmt.Errorf("failed to create node %d: %w", i, err) 144 } 145 } 146 147 convergenceStart := time.Now() 148 149 for i := 1; i < numNodes; i++ { 150 addr := fmt.Sprintf("127.0.0.1:%d", basePort) 151 _, err := nodes[i].Join([]string{addr}) 152 if err != nil { 153 log.Printf("Warning: node %d failed to join: %v", i, err) 154 } 155 } 156 157 allConverged := true 158 for i := 0; i < numNodes; i++ { 159 if !eventDelegates[i].waitForNodes(numNodes, 30*time.Second) { 160 allConverged = false 161 log.Printf("Node %d did not see all %d nodes", i, numNodes) 162 } 163 } 164 165 convergenceTime := time.Since(convergenceStart) 166 if !allConverged { 167 log.Printf("Warning: not all nodes converged within timeout") 168 } 169 170 stopBroadcast := make(chan struct{}) 171 var wg sync.WaitGroup 172 wg.Add(1) 173 go func() { 174 defer wg.Done() 175 ticker := time.NewTicker(100 * time.Millisecond) 176 defer ticker.Stop() 177 msg := []byte("benchmark-message") 178 for { 179 select { 180 case <-ticker.C: 181 for i, n := range nodes { 182 n.SendBestEffort(n.LocalNode(), msg) 183 delegates[i].sent.Add(1) 184 } 185 case <-stopBroadcast: 186 return 187 } 188 } 189 }() 190 191 time.Sleep(duration) 192 close(stopBroadcast) 193 wg.Wait() 194 195 var memAfter runtime.MemStats 196 runtime.ReadMemStats(&memAfter) 197 goroutinesAfter := runtime.NumGoroutine() 198 199 var totalReceived, totalSent int64 200 for _, d := range delegates { 201 totalReceived += d.received.Load() 202 totalSent += d.sent.Load() 203 } 204 205 for _, n := range nodes { 206 n.Shutdown() 207 } 208 209 return &BenchmarkResult{ 210 Implementation: "memberlist", 211 NumNodes: numNodes, 212 Duration: duration, 213 MessagesReceived: totalReceived, 214 MessagesSent: totalSent, 215 ConvergenceTime: convergenceTime, 216 MemoryUsedBytes: memAfter.HeapAlloc - memBefore.HeapAlloc, 217 GoroutinesUsed: goroutinesAfter - goroutinesBefore, 218 CPUCores: runtime.NumCPU(), 219 }, nil 220} 221 222func getFreePort() (int, error) { 223 addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") 224 if err != nil { 225 return 0, err 226 } 227 l, err := net.ListenTCP("tcp", addr) 228 if err != nil { 229 return 0, err 230 } 231 defer l.Close() 232 return l.Addr().(*net.TCPAddr).Port, nil 233} 234 235func main() { 236 numNodes := flag.Int("nodes", 5, "number of nodes") 237 durationSec := flag.Int("duration", 10, "benchmark duration in seconds") 238 outputJSON := flag.Bool("json", false, "output as JSON") 239 flag.Parse() 240 241 result, err := runMemberlistBenchmark(*numNodes, time.Duration(*durationSec)*time.Second) 242 if err != nil { 243 log.Fatalf("Benchmark failed: %v", err) 244 } 245 246 if *outputJSON { 247 enc := json.NewEncoder(os.Stdout) 248 enc.SetIndent("", " ") 249 enc.Encode(result) 250 } else { 251 fmt.Printf("=== Memberlist Benchmark Results ===\n") 252 fmt.Printf("Nodes: %d\n", result.NumNodes) 253 fmt.Printf("Duration: %s\n", result.Duration) 254 fmt.Printf("Convergence: %s\n", result.ConvergenceTime) 255 fmt.Printf("Messages Recv: %d\n", result.MessagesReceived) 256 fmt.Printf("Messages Sent: %d\n", result.MessagesSent) 257 fmt.Printf("Memory Used: %.2f MB\n", float64(result.MemoryUsedBytes)/1024/1024) 258 fmt.Printf("Goroutines: %d\n", result.GoroutinesUsed) 259 fmt.Printf("CPU Cores: %d\n", result.CPUCores) 260 } 261}