package main import ( "encoding/json" "flag" "fmt" "log" "net" "os" "runtime" "sync" "sync/atomic" "time" "github.com/hashicorp/memberlist" ) type BenchmarkResult struct { Implementation string `json:"implementation"` NumNodes int `json:"num_nodes"` Duration time.Duration `json:"duration_ns"` MessagesReceived int64 `json:"messages_received"` MessagesSent int64 `json:"messages_sent"` ConvergenceTime time.Duration `json:"convergence_time_ns"` MemoryUsedBytes uint64 `json:"memory_used_bytes"` GoroutinesUsed int `json:"goroutines_used"` CPUCores int `json:"cpu_cores"` } type benchDelegate struct { received atomic.Int64 sent atomic.Int64 meta []byte } func (d *benchDelegate) NodeMeta(limit int) []byte { return d.meta } func (d *benchDelegate) NotifyMsg(msg []byte) { d.received.Add(1) } func (d *benchDelegate) GetBroadcasts(overhead, limit int) [][]byte { return nil } func (d *benchDelegate) LocalState(join bool) []byte { return nil } func (d *benchDelegate) MergeRemoteState(buf []byte, join bool) { } type benchEventDelegate struct { joinCh chan string mu sync.Mutex joined map[string]bool } func newBenchEventDelegate() *benchEventDelegate { return &benchEventDelegate{ joinCh: make(chan string, 1000), joined: make(map[string]bool), } } func (e *benchEventDelegate) NotifyJoin(node *memberlist.Node) { e.mu.Lock() if !e.joined[node.Name] { e.joined[node.Name] = true select { case e.joinCh <- node.Name: default: } } e.mu.Unlock() } func (e *benchEventDelegate) NotifyLeave(node *memberlist.Node) {} func (e *benchEventDelegate) NotifyUpdate(node *memberlist.Node) {} func (e *benchEventDelegate) waitForNodes(n int, timeout time.Duration) bool { deadline := time.Now().Add(timeout) for { e.mu.Lock() count := len(e.joined) e.mu.Unlock() if count >= n { return true } if time.Now().After(deadline) { return false } time.Sleep(10 * time.Millisecond) } } func createMemberlistNode(name string, port int, delegate *benchDelegate, events *benchEventDelegate) (*memberlist.Memberlist, error) { cfg := memberlist.DefaultLANConfig() cfg.Name = name cfg.BindAddr = "127.0.0.1" cfg.BindPort = port cfg.AdvertisePort = port cfg.Delegate = delegate cfg.Events = events cfg.LogOutput = os.Stderr cfg.GossipInterval = 100 * time.Millisecond cfg.ProbeInterval = 500 * time.Millisecond cfg.PushPullInterval = 15 * time.Second cfg.GossipNodes = 3 return memberlist.Create(cfg) } func runMemberlistBenchmark(numNodes int, duration time.Duration) (*BenchmarkResult, error) { var memBefore runtime.MemStats runtime.GC() runtime.ReadMemStats(&memBefore) goroutinesBefore := runtime.NumGoroutine() nodes := make([]*memberlist.Memberlist, numNodes) delegates := make([]*benchDelegate, numNodes) eventDelegates := make([]*benchEventDelegate, numNodes) basePort := 17946 for i := 0; i < numNodes; i++ { delegates[i] = &benchDelegate{meta: []byte(fmt.Sprintf("node-%d", i))} eventDelegates[i] = newBenchEventDelegate() var err error nodes[i], err = createMemberlistNode( fmt.Sprintf("node-%d", i), basePort+i, delegates[i], eventDelegates[i], ) if err != nil { for j := 0; j < i; j++ { nodes[j].Shutdown() } return nil, fmt.Errorf("failed to create node %d: %w", i, err) } } convergenceStart := time.Now() for i := 1; i < numNodes; i++ { addr := fmt.Sprintf("127.0.0.1:%d", basePort) _, err := nodes[i].Join([]string{addr}) if err != nil { log.Printf("Warning: node %d failed to join: %v", i, err) } } allConverged := true for i := 0; i < numNodes; i++ { if !eventDelegates[i].waitForNodes(numNodes, 30*time.Second) { allConverged = false log.Printf("Node %d did not see all %d nodes", i, numNodes) } } convergenceTime := time.Since(convergenceStart) if !allConverged { log.Printf("Warning: not all nodes converged within timeout") } time.Sleep(duration) var memAfter runtime.MemStats runtime.ReadMemStats(&memAfter) goroutinesAfter := runtime.NumGoroutine() var totalReceived, totalSent int64 for _, d := range delegates { totalReceived += d.received.Load() totalSent += d.sent.Load() } for _, n := range nodes { n.Shutdown() } return &BenchmarkResult{ Implementation: "memberlist", NumNodes: numNodes, Duration: duration, MessagesReceived: totalReceived, MessagesSent: totalSent, ConvergenceTime: convergenceTime, MemoryUsedBytes: memAfter.HeapAlloc - memBefore.HeapAlloc, GoroutinesUsed: goroutinesAfter - goroutinesBefore, CPUCores: runtime.NumCPU(), }, nil } func getFreePort() (int, error) { addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") if err != nil { return 0, err } l, err := net.ListenTCP("tcp", addr) if err != nil { return 0, err } defer l.Close() return l.Addr().(*net.TCPAddr).Port, nil } func main() { numNodes := flag.Int("nodes", 5, "number of nodes") durationSec := flag.Int("duration", 10, "benchmark duration in seconds") outputJSON := flag.Bool("json", false, "output as JSON") flag.Parse() result, err := runMemberlistBenchmark(*numNodes, time.Duration(*durationSec)*time.Second) if err != nil { log.Fatalf("Benchmark failed: %v", err) } if *outputJSON { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(result) } else { fmt.Printf("=== Memberlist Benchmark Results ===\n") fmt.Printf("Nodes: %d\n", result.NumNodes) fmt.Printf("Duration: %s\n", result.Duration) fmt.Printf("Convergence: %s\n", result.ConvergenceTime) fmt.Printf("Messages Recv: %d\n", result.MessagesReceived) fmt.Printf("Messages Sent: %d\n", result.MessagesSent) fmt.Printf("Memory Used: %.2f MB\n", float64(result.MemoryUsedBytes)/1024/1024) fmt.Printf("Goroutines: %d\n", result.GoroutinesUsed) fmt.Printf("CPU Cores: %d\n", result.CPUCores) } }