package main import ( "encoding/json" "flag" "fmt" "io" "log" "os" "runtime" "sync" "sync/atomic" "time" "github.com/hashicorp/serf/serf" ) type SerfBenchmarkResult struct { Implementation string `json:"implementation"` NumNodes int `json:"num_nodes"` Duration time.Duration `json:"duration_ns"` EventsReceived int64 `json:"events_received"` QueriesProcessed int64 `json:"queries_processed"` ConvergenceTime time.Duration `json:"convergence_time_ns"` MemoryUsedBytes uint64 `json:"memory_used_bytes"` GoroutinesUsed int `json:"goroutines_used"` CPUCores int `json:"cpu_cores"` } type serfEventHandler struct { events atomic.Int64 queries atomic.Int64 memberCh chan serf.MemberEvent } func (h *serfEventHandler) HandleEvent(e serf.Event) { h.events.Add(1) switch evt := e.(type) { case serf.MemberEvent: select { case h.memberCh <- evt: default: } case *serf.Query: h.queries.Add(1) evt.Respond([]byte("ok")) } } func createSerfNode(name string, bindPort, rpcPort int, handler *serfEventHandler) (*serf.Serf, error) { cfg := serf.DefaultConfig() cfg.NodeName = name cfg.MemberlistConfig.BindAddr = "127.0.0.1" cfg.MemberlistConfig.BindPort = bindPort cfg.MemberlistConfig.AdvertisePort = bindPort cfg.MemberlistConfig.GossipInterval = 100 * time.Millisecond cfg.MemberlistConfig.ProbeInterval = 500 * time.Millisecond cfg.MemberlistConfig.PushPullInterval = 15 * time.Second cfg.MemberlistConfig.GossipNodes = 3 cfg.LogOutput = io.Discard eventCh := make(chan serf.Event, 256) cfg.EventCh = eventCh s, err := serf.Create(cfg) if err != nil { return nil, err } go func() { for e := range eventCh { handler.HandleEvent(e) } }() return s, nil } func waitForSerfConvergence(nodes []*serf.Serf, handlers []*serfEventHandler, expected int, timeout time.Duration) bool { deadline := time.Now().Add(timeout) for { allConverged := true for _, n := range nodes { if n.NumNodes() < expected { allConverged = false break } } if allConverged { return true } if time.Now().After(deadline) { return false } time.Sleep(50 * time.Millisecond) } } func runSerfBenchmark(numNodes int, duration time.Duration) (*SerfBenchmarkResult, error) { var memBefore runtime.MemStats runtime.GC() runtime.ReadMemStats(&memBefore) goroutinesBefore := runtime.NumGoroutine() nodes := make([]*serf.Serf, numNodes) handlers := make([]*serfEventHandler, numNodes) basePort := 27946 var wg sync.WaitGroup var createErr error var createMu sync.Mutex for i := 0; i < numNodes; i++ { handlers[i] = &serfEventHandler{ memberCh: make(chan serf.MemberEvent, 100), } var err error nodes[i], err = createSerfNode( fmt.Sprintf("serf-node-%d", i), basePort+i, basePort+1000+i, handlers[i], ) if err != nil { createMu.Lock() if createErr == nil { createErr = fmt.Errorf("failed to create serf node %d: %w", i, err) } createMu.Unlock() for j := 0; j < i; j++ { nodes[j].Shutdown() } return nil, createErr } } convergenceStart := time.Now() for i := 1; i < numNodes; i++ { addr := fmt.Sprintf("127.0.0.1:%d", basePort) _, err := nodes[i].Join([]string{addr}, false) if err != nil { log.Printf("Warning: serf node %d failed to join: %v", i, err) } } allConverged := waitForSerfConvergence(nodes, handlers, numNodes, 30*time.Second) convergenceTime := time.Since(convergenceStart) if !allConverged { log.Printf("Warning: not all serf nodes converged within timeout") } wg.Add(1) go func() { defer wg.Done() ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() timeout := time.After(duration) for { select { case <-ticker.C: for _, n := range nodes { n.Query("ping", []byte("test"), nil) } case <-timeout: return } } }() time.Sleep(duration) wg.Wait() var memAfter runtime.MemStats runtime.ReadMemStats(&memAfter) goroutinesAfter := runtime.NumGoroutine() var totalEvents, totalQueries int64 for _, h := range handlers { totalEvents += h.events.Load() totalQueries += h.queries.Load() } for _, n := range nodes { n.Shutdown() } return &SerfBenchmarkResult{ Implementation: "serf", NumNodes: numNodes, Duration: duration, EventsReceived: totalEvents, QueriesProcessed: totalQueries, ConvergenceTime: convergenceTime, MemoryUsedBytes: memAfter.HeapAlloc - memBefore.HeapAlloc, GoroutinesUsed: goroutinesAfter - goroutinesBefore, CPUCores: runtime.NumCPU(), }, nil } func main() { numNodes := flag.Int("nodes", 5, "number of nodes") durationSec := flag.Int("duration", 10, "benchmark duration in seconds") outputJSON := flag.Bool("json", false, "output as JSON") flag.Parse() result, err := runSerfBenchmark(*numNodes, time.Duration(*durationSec)*time.Second) if err != nil { log.Fatalf("Serf benchmark failed: %v", err) } if *outputJSON { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(result) } else { fmt.Printf("=== Serf Benchmark Results ===\n") fmt.Printf("Nodes: %d\n", result.NumNodes) fmt.Printf("Duration: %s\n", result.Duration) fmt.Printf("Convergence: %s\n", result.ConvergenceTime) fmt.Printf("Events: %d\n", result.EventsReceived) fmt.Printf("Queries: %d\n", result.QueriesProcessed) fmt.Printf("Memory Used: %.2f MB\n", float64(result.MemoryUsedBytes)/1024/1024) fmt.Printf("Goroutines: %d\n", result.GoroutinesUsed) fmt.Printf("CPU Cores: %d\n", result.CPUCores) } }