this repo has no description
1package main
2
3import (
4 "encoding/json"
5 "flag"
6 "fmt"
7 "io"
8 "log"
9 "os"
10 "runtime"
11 "sync"
12 "sync/atomic"
13 "time"
14
15 "github.com/hashicorp/serf/serf"
16)
17
18type SerfBenchmarkResult struct {
19 Implementation string `json:"implementation"`
20 NumNodes int `json:"num_nodes"`
21 Duration time.Duration `json:"duration_ns"`
22 EventsReceived int64 `json:"events_received"`
23 QueriesProcessed int64 `json:"queries_processed"`
24 ConvergenceTime time.Duration `json:"convergence_time_ns"`
25 MemoryUsedBytes uint64 `json:"memory_used_bytes"`
26 GoroutinesUsed int `json:"goroutines_used"`
27 CPUCores int `json:"cpu_cores"`
28}
29
30type serfEventHandler struct {
31 events atomic.Int64
32 queries atomic.Int64
33 memberCh chan serf.MemberEvent
34}
35
36func (h *serfEventHandler) HandleEvent(e serf.Event) {
37 h.events.Add(1)
38 switch evt := e.(type) {
39 case serf.MemberEvent:
40 select {
41 case h.memberCh <- evt:
42 default:
43 }
44 case *serf.Query:
45 h.queries.Add(1)
46 evt.Respond([]byte("ok"))
47 }
48}
49
50func createSerfNode(name string, bindPort, rpcPort int, handler *serfEventHandler) (*serf.Serf, error) {
51 cfg := serf.DefaultConfig()
52 cfg.NodeName = name
53 cfg.MemberlistConfig.BindAddr = "127.0.0.1"
54 cfg.MemberlistConfig.BindPort = bindPort
55 cfg.MemberlistConfig.AdvertisePort = bindPort
56 cfg.MemberlistConfig.GossipInterval = 100 * time.Millisecond
57 cfg.MemberlistConfig.ProbeInterval = 500 * time.Millisecond
58 cfg.MemberlistConfig.PushPullInterval = 15 * time.Second
59 cfg.MemberlistConfig.GossipNodes = 3
60 cfg.LogOutput = io.Discard
61
62 eventCh := make(chan serf.Event, 256)
63 cfg.EventCh = eventCh
64
65 s, err := serf.Create(cfg)
66 if err != nil {
67 return nil, err
68 }
69
70 go func() {
71 for e := range eventCh {
72 handler.HandleEvent(e)
73 }
74 }()
75
76 return s, nil
77}
78
79func waitForSerfConvergence(nodes []*serf.Serf, handlers []*serfEventHandler, expected int, timeout time.Duration) bool {
80 deadline := time.Now().Add(timeout)
81 for {
82 allConverged := true
83 for _, n := range nodes {
84 if n.NumNodes() < expected {
85 allConverged = false
86 break
87 }
88 }
89 if allConverged {
90 return true
91 }
92 if time.Now().After(deadline) {
93 return false
94 }
95 time.Sleep(50 * time.Millisecond)
96 }
97}
98
99func runSerfBenchmark(numNodes int, duration time.Duration) (*SerfBenchmarkResult, error) {
100 var memBefore runtime.MemStats
101 runtime.GC()
102 runtime.ReadMemStats(&memBefore)
103 goroutinesBefore := runtime.NumGoroutine()
104
105 nodes := make([]*serf.Serf, numNodes)
106 handlers := make([]*serfEventHandler, numNodes)
107
108 basePort := 27946
109
110 var wg sync.WaitGroup
111 var createErr error
112 var createMu sync.Mutex
113
114 for i := 0; i < numNodes; i++ {
115 handlers[i] = &serfEventHandler{
116 memberCh: make(chan serf.MemberEvent, 100),
117 }
118
119 var err error
120 nodes[i], err = createSerfNode(
121 fmt.Sprintf("serf-node-%d", i),
122 basePort+i,
123 basePort+1000+i,
124 handlers[i],
125 )
126 if err != nil {
127 createMu.Lock()
128 if createErr == nil {
129 createErr = fmt.Errorf("failed to create serf node %d: %w", i, err)
130 }
131 createMu.Unlock()
132 for j := 0; j < i; j++ {
133 nodes[j].Shutdown()
134 }
135 return nil, createErr
136 }
137 }
138
139 convergenceStart := time.Now()
140
141 for i := 1; i < numNodes; i++ {
142 addr := fmt.Sprintf("127.0.0.1:%d", basePort)
143 _, err := nodes[i].Join([]string{addr}, false)
144 if err != nil {
145 log.Printf("Warning: serf node %d failed to join: %v", i, err)
146 }
147 }
148
149 allConverged := waitForSerfConvergence(nodes, handlers, numNodes, 30*time.Second)
150 convergenceTime := time.Since(convergenceStart)
151
152 if !allConverged {
153 log.Printf("Warning: not all serf nodes converged within timeout")
154 }
155
156 wg.Add(1)
157 go func() {
158 defer wg.Done()
159 ticker := time.NewTicker(500 * time.Millisecond)
160 defer ticker.Stop()
161 timeout := time.After(duration)
162 for {
163 select {
164 case <-ticker.C:
165 for _, n := range nodes {
166 n.Query("ping", []byte("test"), nil)
167 }
168 case <-timeout:
169 return
170 }
171 }
172 }()
173
174 time.Sleep(duration)
175 wg.Wait()
176
177 var memAfter runtime.MemStats
178 runtime.ReadMemStats(&memAfter)
179 goroutinesAfter := runtime.NumGoroutine()
180
181 var totalEvents, totalQueries int64
182 for _, h := range handlers {
183 totalEvents += h.events.Load()
184 totalQueries += h.queries.Load()
185 }
186
187 for _, n := range nodes {
188 n.Shutdown()
189 }
190
191 return &SerfBenchmarkResult{
192 Implementation: "serf",
193 NumNodes: numNodes,
194 Duration: duration,
195 EventsReceived: totalEvents,
196 QueriesProcessed: totalQueries,
197 ConvergenceTime: convergenceTime,
198 MemoryUsedBytes: memAfter.HeapAlloc - memBefore.HeapAlloc,
199 GoroutinesUsed: goroutinesAfter - goroutinesBefore,
200 CPUCores: runtime.NumCPU(),
201 }, nil
202}
203
204func main() {
205 numNodes := flag.Int("nodes", 5, "number of nodes")
206 durationSec := flag.Int("duration", 10, "benchmark duration in seconds")
207 outputJSON := flag.Bool("json", false, "output as JSON")
208 flag.Parse()
209
210 result, err := runSerfBenchmark(*numNodes, time.Duration(*durationSec)*time.Second)
211 if err != nil {
212 log.Fatalf("Serf benchmark failed: %v", err)
213 }
214
215 if *outputJSON {
216 enc := json.NewEncoder(os.Stdout)
217 enc.SetIndent("", " ")
218 enc.Encode(result)
219 } else {
220 fmt.Printf("=== Serf Benchmark Results ===\n")
221 fmt.Printf("Nodes: %d\n", result.NumNodes)
222 fmt.Printf("Duration: %s\n", result.Duration)
223 fmt.Printf("Convergence: %s\n", result.ConvergenceTime)
224 fmt.Printf("Events: %d\n", result.EventsReceived)
225 fmt.Printf("Queries: %d\n", result.QueriesProcessed)
226 fmt.Printf("Memory Used: %.2f MB\n", float64(result.MemoryUsedBytes)/1024/1024)
227 fmt.Printf("Goroutines: %d\n", result.GoroutinesUsed)
228 fmt.Printf("CPU Cores: %d\n", result.CPUCores)
229 }
230}