this repo has no description
1package main
2
3import (
4 "encoding/json"
5 "flag"
6 "fmt"
7 "log"
8 "net"
9 "os"
10 "runtime"
11 "sync"
12 "sync/atomic"
13 "time"
14
15 "github.com/hashicorp/memberlist"
16)
17
18type BenchmarkResult struct {
19 Implementation string `json:"implementation"`
20 NumNodes int `json:"num_nodes"`
21 Duration time.Duration `json:"duration_ns"`
22 MessagesReceived int64 `json:"messages_received"`
23 MessagesSent int64 `json:"messages_sent"`
24 ConvergenceTime time.Duration `json:"convergence_time_ns"`
25 MemoryUsedBytes uint64 `json:"memory_used_bytes"`
26 GoroutinesUsed int `json:"goroutines_used"`
27 CPUCores int `json:"cpu_cores"`
28}
29
30type benchDelegate struct {
31 received atomic.Int64
32 sent atomic.Int64
33 meta []byte
34}
35
36func (d *benchDelegate) NodeMeta(limit int) []byte {
37 return d.meta
38}
39
40func (d *benchDelegate) NotifyMsg(msg []byte) {
41 d.received.Add(1)
42}
43
44func (d *benchDelegate) GetBroadcasts(overhead, limit int) [][]byte {
45 return nil
46}
47
48func (d *benchDelegate) LocalState(join bool) []byte {
49 return nil
50}
51
52func (d *benchDelegate) MergeRemoteState(buf []byte, join bool) {
53}
54
55type benchEventDelegate struct {
56 joinCh chan string
57 mu sync.Mutex
58 joined map[string]bool
59}
60
61func newBenchEventDelegate() *benchEventDelegate {
62 return &benchEventDelegate{
63 joinCh: make(chan string, 1000),
64 joined: make(map[string]bool),
65 }
66}
67
68func (e *benchEventDelegate) NotifyJoin(node *memberlist.Node) {
69 e.mu.Lock()
70 if !e.joined[node.Name] {
71 e.joined[node.Name] = true
72 select {
73 case e.joinCh <- node.Name:
74 default:
75 }
76 }
77 e.mu.Unlock()
78}
79
80func (e *benchEventDelegate) NotifyLeave(node *memberlist.Node) {}
81func (e *benchEventDelegate) NotifyUpdate(node *memberlist.Node) {}
82
83func (e *benchEventDelegate) waitForNodes(n int, timeout time.Duration) bool {
84 deadline := time.Now().Add(timeout)
85 for {
86 e.mu.Lock()
87 count := len(e.joined)
88 e.mu.Unlock()
89 if count >= n {
90 return true
91 }
92 if time.Now().After(deadline) {
93 return false
94 }
95 time.Sleep(10 * time.Millisecond)
96 }
97}
98
99func createMemberlistNode(name string, port int, delegate *benchDelegate, events *benchEventDelegate) (*memberlist.Memberlist, error) {
100 cfg := memberlist.DefaultLANConfig()
101 cfg.Name = name
102 cfg.BindAddr = "127.0.0.1"
103 cfg.BindPort = port
104 cfg.AdvertisePort = port
105 cfg.Delegate = delegate
106 cfg.Events = events
107 cfg.LogOutput = os.Stderr
108 cfg.GossipInterval = 100 * time.Millisecond
109 cfg.ProbeInterval = 500 * time.Millisecond
110 cfg.PushPullInterval = 15 * time.Second
111 cfg.GossipNodes = 3
112
113 return memberlist.Create(cfg)
114}
115
116func runMemberlistBenchmark(numNodes int, duration time.Duration) (*BenchmarkResult, error) {
117 var memBefore runtime.MemStats
118 runtime.GC()
119 runtime.ReadMemStats(&memBefore)
120 goroutinesBefore := runtime.NumGoroutine()
121
122 nodes := make([]*memberlist.Memberlist, numNodes)
123 delegates := make([]*benchDelegate, numNodes)
124 eventDelegates := make([]*benchEventDelegate, numNodes)
125
126 basePort := 17946
127
128 for i := 0; i < numNodes; i++ {
129 delegates[i] = &benchDelegate{meta: []byte(fmt.Sprintf("node-%d", i))}
130 eventDelegates[i] = newBenchEventDelegate()
131
132 var err error
133 nodes[i], err = createMemberlistNode(
134 fmt.Sprintf("node-%d", i),
135 basePort+i,
136 delegates[i],
137 eventDelegates[i],
138 )
139 if err != nil {
140 for j := 0; j < i; j++ {
141 nodes[j].Shutdown()
142 }
143 return nil, fmt.Errorf("failed to create node %d: %w", i, err)
144 }
145 }
146
147 convergenceStart := time.Now()
148
149 for i := 1; i < numNodes; i++ {
150 addr := fmt.Sprintf("127.0.0.1:%d", basePort)
151 _, err := nodes[i].Join([]string{addr})
152 if err != nil {
153 log.Printf("Warning: node %d failed to join: %v", i, err)
154 }
155 }
156
157 allConverged := true
158 for i := 0; i < numNodes; i++ {
159 if !eventDelegates[i].waitForNodes(numNodes, 30*time.Second) {
160 allConverged = false
161 log.Printf("Node %d did not see all %d nodes", i, numNodes)
162 }
163 }
164
165 convergenceTime := time.Since(convergenceStart)
166 if !allConverged {
167 log.Printf("Warning: not all nodes converged within timeout")
168 }
169
170 stopBroadcast := make(chan struct{})
171 var wg sync.WaitGroup
172 wg.Add(1)
173 go func() {
174 defer wg.Done()
175 ticker := time.NewTicker(100 * time.Millisecond)
176 defer ticker.Stop()
177 msg := []byte("benchmark-message")
178 for {
179 select {
180 case <-ticker.C:
181 for i, n := range nodes {
182 n.SendBestEffort(n.LocalNode(), msg)
183 delegates[i].sent.Add(1)
184 }
185 case <-stopBroadcast:
186 return
187 }
188 }
189 }()
190
191 time.Sleep(duration)
192 close(stopBroadcast)
193 wg.Wait()
194
195 var memAfter runtime.MemStats
196 runtime.ReadMemStats(&memAfter)
197 goroutinesAfter := runtime.NumGoroutine()
198
199 var totalReceived, totalSent int64
200 for _, d := range delegates {
201 totalReceived += d.received.Load()
202 totalSent += d.sent.Load()
203 }
204
205 for _, n := range nodes {
206 n.Shutdown()
207 }
208
209 return &BenchmarkResult{
210 Implementation: "memberlist",
211 NumNodes: numNodes,
212 Duration: duration,
213 MessagesReceived: totalReceived,
214 MessagesSent: totalSent,
215 ConvergenceTime: convergenceTime,
216 MemoryUsedBytes: memAfter.HeapAlloc - memBefore.HeapAlloc,
217 GoroutinesUsed: goroutinesAfter - goroutinesBefore,
218 CPUCores: runtime.NumCPU(),
219 }, nil
220}
221
222func getFreePort() (int, error) {
223 addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
224 if err != nil {
225 return 0, err
226 }
227 l, err := net.ListenTCP("tcp", addr)
228 if err != nil {
229 return 0, err
230 }
231 defer l.Close()
232 return l.Addr().(*net.TCPAddr).Port, nil
233}
234
235func main() {
236 numNodes := flag.Int("nodes", 5, "number of nodes")
237 durationSec := flag.Int("duration", 10, "benchmark duration in seconds")
238 outputJSON := flag.Bool("json", false, "output as JSON")
239 flag.Parse()
240
241 result, err := runMemberlistBenchmark(*numNodes, time.Duration(*durationSec)*time.Second)
242 if err != nil {
243 log.Fatalf("Benchmark failed: %v", err)
244 }
245
246 if *outputJSON {
247 enc := json.NewEncoder(os.Stdout)
248 enc.SetIndent("", " ")
249 enc.Encode(result)
250 } else {
251 fmt.Printf("=== Memberlist Benchmark Results ===\n")
252 fmt.Printf("Nodes: %d\n", result.NumNodes)
253 fmt.Printf("Duration: %s\n", result.Duration)
254 fmt.Printf("Convergence: %s\n", result.ConvergenceTime)
255 fmt.Printf("Messages Recv: %d\n", result.MessagesReceived)
256 fmt.Printf("Messages Sent: %d\n", result.MessagesSent)
257 fmt.Printf("Memory Used: %.2f MB\n", float64(result.MemoryUsedBytes)/1024/1024)
258 fmt.Printf("Goroutines: %d\n", result.GoroutinesUsed)
259 fmt.Printf("CPU Cores: %d\n", result.CPUCores)
260 }
261}