this repo has no description
at main 5.7 kB view raw
1package main 2 3import ( 4 "encoding/json" 5 "flag" 6 "fmt" 7 "io" 8 "log" 9 "os" 10 "runtime" 11 "sync" 12 "sync/atomic" 13 "time" 14 15 "github.com/hashicorp/serf/serf" 16) 17 18type SerfBenchmarkResult struct { 19 Implementation string `json:"implementation"` 20 NumNodes int `json:"num_nodes"` 21 Duration time.Duration `json:"duration_ns"` 22 EventsReceived int64 `json:"events_received"` 23 QueriesProcessed int64 `json:"queries_processed"` 24 ConvergenceTime time.Duration `json:"convergence_time_ns"` 25 MemoryUsedBytes uint64 `json:"memory_used_bytes"` 26 GoroutinesUsed int `json:"goroutines_used"` 27 CPUCores int `json:"cpu_cores"` 28} 29 30type serfEventHandler struct { 31 events atomic.Int64 32 queries atomic.Int64 33 memberCh chan serf.MemberEvent 34} 35 36func (h *serfEventHandler) HandleEvent(e serf.Event) { 37 h.events.Add(1) 38 switch evt := e.(type) { 39 case serf.MemberEvent: 40 select { 41 case h.memberCh <- evt: 42 default: 43 } 44 case *serf.Query: 45 h.queries.Add(1) 46 evt.Respond([]byte("ok")) 47 } 48} 49 50func createSerfNode(name string, bindPort, rpcPort int, handler *serfEventHandler) (*serf.Serf, error) { 51 cfg := serf.DefaultConfig() 52 cfg.NodeName = name 53 cfg.MemberlistConfig.BindAddr = "127.0.0.1" 54 cfg.MemberlistConfig.BindPort = bindPort 55 cfg.MemberlistConfig.AdvertisePort = bindPort 56 cfg.MemberlistConfig.GossipInterval = 100 * time.Millisecond 57 cfg.MemberlistConfig.ProbeInterval = 500 * time.Millisecond 58 cfg.MemberlistConfig.PushPullInterval = 15 * time.Second 59 cfg.MemberlistConfig.GossipNodes = 3 60 cfg.LogOutput = io.Discard 61 62 eventCh := make(chan serf.Event, 256) 63 cfg.EventCh = eventCh 64 65 s, err := serf.Create(cfg) 66 if err != nil { 67 return nil, err 68 } 69 70 go func() { 71 for e := range eventCh { 72 handler.HandleEvent(e) 73 } 74 }() 75 76 return s, nil 77} 78 79func waitForSerfConvergence(nodes []*serf.Serf, handlers []*serfEventHandler, expected int, timeout time.Duration) bool { 80 deadline := time.Now().Add(timeout) 81 for { 82 allConverged := true 83 for _, n := range nodes { 84 if n.NumNodes() < expected { 85 allConverged = false 86 break 87 } 88 } 89 if allConverged { 90 return true 91 } 92 if time.Now().After(deadline) { 93 return false 94 } 95 time.Sleep(50 * time.Millisecond) 96 } 97} 98 99func runSerfBenchmark(numNodes int, duration time.Duration) (*SerfBenchmarkResult, error) { 100 var memBefore runtime.MemStats 101 runtime.GC() 102 runtime.ReadMemStats(&memBefore) 103 goroutinesBefore := runtime.NumGoroutine() 104 105 nodes := make([]*serf.Serf, numNodes) 106 handlers := make([]*serfEventHandler, numNodes) 107 108 basePort := 27946 109 110 var wg sync.WaitGroup 111 var createErr error 112 var createMu sync.Mutex 113 114 for i := 0; i < numNodes; i++ { 115 handlers[i] = &serfEventHandler{ 116 memberCh: make(chan serf.MemberEvent, 100), 117 } 118 119 var err error 120 nodes[i], err = createSerfNode( 121 fmt.Sprintf("serf-node-%d", i), 122 basePort+i, 123 basePort+1000+i, 124 handlers[i], 125 ) 126 if err != nil { 127 createMu.Lock() 128 if createErr == nil { 129 createErr = fmt.Errorf("failed to create serf node %d: %w", i, err) 130 } 131 createMu.Unlock() 132 for j := 0; j < i; j++ { 133 nodes[j].Shutdown() 134 } 135 return nil, createErr 136 } 137 } 138 139 convergenceStart := time.Now() 140 141 for i := 1; i < numNodes; i++ { 142 addr := fmt.Sprintf("127.0.0.1:%d", basePort) 143 _, err := nodes[i].Join([]string{addr}, false) 144 if err != nil { 145 log.Printf("Warning: serf node %d failed to join: %v", i, err) 146 } 147 } 148 149 allConverged := waitForSerfConvergence(nodes, handlers, numNodes, 30*time.Second) 150 convergenceTime := time.Since(convergenceStart) 151 152 if !allConverged { 153 log.Printf("Warning: not all serf nodes converged within timeout") 154 } 155 156 wg.Add(1) 157 go func() { 158 defer wg.Done() 159 ticker := time.NewTicker(500 * time.Millisecond) 160 defer ticker.Stop() 161 timeout := time.After(duration) 162 for { 163 select { 164 case <-ticker.C: 165 for _, n := range nodes { 166 n.Query("ping", []byte("test"), nil) 167 } 168 case <-timeout: 169 return 170 } 171 } 172 }() 173 174 time.Sleep(duration) 175 wg.Wait() 176 177 var memAfter runtime.MemStats 178 runtime.ReadMemStats(&memAfter) 179 goroutinesAfter := runtime.NumGoroutine() 180 181 var totalEvents, totalQueries int64 182 for _, h := range handlers { 183 totalEvents += h.events.Load() 184 totalQueries += h.queries.Load() 185 } 186 187 for _, n := range nodes { 188 n.Shutdown() 189 } 190 191 return &SerfBenchmarkResult{ 192 Implementation: "serf", 193 NumNodes: numNodes, 194 Duration: duration, 195 EventsReceived: totalEvents, 196 QueriesProcessed: totalQueries, 197 ConvergenceTime: convergenceTime, 198 MemoryUsedBytes: memAfter.HeapAlloc - memBefore.HeapAlloc, 199 GoroutinesUsed: goroutinesAfter - goroutinesBefore, 200 CPUCores: runtime.NumCPU(), 201 }, nil 202} 203 204func main() { 205 numNodes := flag.Int("nodes", 5, "number of nodes") 206 durationSec := flag.Int("duration", 10, "benchmark duration in seconds") 207 outputJSON := flag.Bool("json", false, "output as JSON") 208 flag.Parse() 209 210 result, err := runSerfBenchmark(*numNodes, time.Duration(*durationSec)*time.Second) 211 if err != nil { 212 log.Fatalf("Serf benchmark failed: %v", err) 213 } 214 215 if *outputJSON { 216 enc := json.NewEncoder(os.Stdout) 217 enc.SetIndent("", " ") 218 enc.Encode(result) 219 } else { 220 fmt.Printf("=== Serf Benchmark Results ===\n") 221 fmt.Printf("Nodes: %d\n", result.NumNodes) 222 fmt.Printf("Duration: %s\n", result.Duration) 223 fmt.Printf("Convergence: %s\n", result.ConvergenceTime) 224 fmt.Printf("Events: %d\n", result.EventsReceived) 225 fmt.Printf("Queries: %d\n", result.QueriesProcessed) 226 fmt.Printf("Memory Used: %.2f MB\n", float64(result.MemoryUsedBytes)/1024/1024) 227 fmt.Printf("Goroutines: %d\n", result.GoroutinesUsed) 228 fmt.Printf("CPU Cores: %d\n", result.CPUCores) 229 } 230}