this repo has no description
1package main
2
3import (
4 "encoding/json"
5 "flag"
6 "fmt"
7 "log"
8 "os"
9 "runtime"
10 "sync"
11 "sync/atomic"
12 "time"
13
14 "github.com/hashicorp/memberlist"
15)
16
17type ThroughputResult struct {
18 Implementation string `json:"implementation"`
19 NumNodes int `json:"num_nodes"`
20 DurationNs int64 `json:"duration_ns"`
21 MsgRate int `json:"msg_rate"`
22 BroadcastsSent int64 `json:"broadcasts_sent"`
23 BroadcastsReceived int64 `json:"broadcasts_received"`
24 MsgsPerSec float64 `json:"msgs_per_sec"`
25 CPUCores int `json:"cpu_cores"`
26}
27
28type throughputDelegate struct {
29 received atomic.Int64
30 meta []byte
31}
32
33func (d *throughputDelegate) NodeMeta(limit int) []byte {
34 return d.meta
35}
36
37func (d *throughputDelegate) NotifyMsg(msg []byte) {
38 if len(msg) > 0 && msg[0] == 'B' {
39 d.received.Add(1)
40 }
41}
42
43func (d *throughputDelegate) GetBroadcasts(overhead, limit int) [][]byte {
44 return nil
45}
46
47func (d *throughputDelegate) LocalState(join bool) []byte {
48 return nil
49}
50
51func (d *throughputDelegate) MergeRemoteState(buf []byte, join bool) {
52}
53
54type throughputEventDelegate struct {
55 joinCh chan string
56 mu sync.Mutex
57 joined map[string]bool
58}
59
60func newThroughputEventDelegate() *throughputEventDelegate {
61 return &throughputEventDelegate{
62 joinCh: make(chan string, 1000),
63 joined: make(map[string]bool),
64 }
65}
66
67func (e *throughputEventDelegate) NotifyJoin(node *memberlist.Node) {
68 e.mu.Lock()
69 if !e.joined[node.Name] {
70 e.joined[node.Name] = true
71 select {
72 case e.joinCh <- node.Name:
73 default:
74 }
75 }
76 e.mu.Unlock()
77}
78
79func (e *throughputEventDelegate) NotifyLeave(node *memberlist.Node) {}
80func (e *throughputEventDelegate) NotifyUpdate(node *memberlist.Node) {}
81
82func (e *throughputEventDelegate) waitForNodes(n int, timeout time.Duration) bool {
83 deadline := time.Now().Add(timeout)
84 for {
85 e.mu.Lock()
86 count := len(e.joined)
87 e.mu.Unlock()
88 if count >= n {
89 return true
90 }
91 if time.Now().After(deadline) {
92 return false
93 }
94 time.Sleep(10 * time.Millisecond)
95 }
96}
97
98func createNode(name string, port int, delegate *throughputDelegate, events *throughputEventDelegate) (*memberlist.Memberlist, error) {
99 cfg := memberlist.DefaultLANConfig()
100 cfg.Name = name
101 cfg.BindAddr = "127.0.0.1"
102 cfg.BindPort = port
103 cfg.AdvertisePort = port
104 cfg.Delegate = delegate
105 cfg.Events = events
106 cfg.LogOutput = os.Stderr
107 cfg.GossipInterval = 50 * time.Millisecond
108 cfg.ProbeInterval = 200 * time.Millisecond
109 cfg.PushPullInterval = 30 * time.Second
110 cfg.GossipNodes = 3
111
112 return memberlist.Create(cfg)
113}
114
115func runThroughputBenchmark(numNodes int, duration time.Duration, msgRate int) (*ThroughputResult, error) {
116 nodes := make([]*memberlist.Memberlist, numNodes)
117 delegates := make([]*throughputDelegate, numNodes)
118 eventDelegates := make([]*throughputEventDelegate, numNodes)
119
120 basePort := 18946
121
122 for i := 0; i < numNodes; i++ {
123 delegates[i] = &throughputDelegate{meta: []byte(fmt.Sprintf("node-%d", i))}
124 eventDelegates[i] = newThroughputEventDelegate()
125
126 var err error
127 nodes[i], err = createNode(
128 fmt.Sprintf("node-%d", i),
129 basePort+i,
130 delegates[i],
131 eventDelegates[i],
132 )
133 if err != nil {
134 for j := 0; j < i; j++ {
135 nodes[j].Shutdown()
136 }
137 return nil, fmt.Errorf("failed to create node %d: %w", i, err)
138 }
139 }
140
141 for i := 1; i < numNodes; i++ {
142 addr := fmt.Sprintf("127.0.0.1:%d", basePort)
143 _, err := nodes[i].Join([]string{addr})
144 if err != nil {
145 log.Printf("Warning: node %d failed to join: %v", i, err)
146 }
147 }
148
149 for i := 0; i < numNodes; i++ {
150 if !eventDelegates[i].waitForNodes(numNodes, 10*time.Second) {
151 log.Printf("Warning: Node %d did not see all %d nodes", i, numNodes)
152 }
153 }
154
155 time.Sleep(500 * time.Millisecond)
156
157 var totalSent atomic.Int64
158 stopCh := make(chan struct{})
159 var wg sync.WaitGroup
160
161 msgInterval := time.Duration(float64(time.Second) / float64(msgRate))
162 msg := make([]byte, 65)
163 msg[0] = 'B'
164 for i := 1; i < 65; i++ {
165 msg[i] = 'x'
166 }
167
168 startTime := time.Now()
169
170 for i, n := range nodes {
171 wg.Add(1)
172 go func(node *memberlist.Memberlist, idx int) {
173 defer wg.Done()
174 ticker := time.NewTicker(msgInterval)
175 defer ticker.Stop()
176 for {
177 select {
178 case <-ticker.C:
179 for _, member := range node.Members() {
180 if member.Name != node.LocalNode().Name {
181 node.SendBestEffort(member, msg)
182 totalSent.Add(1)
183 }
184 }
185 case <-stopCh:
186 return
187 }
188 }
189 }(n, i)
190 }
191
192 time.Sleep(duration)
193 close(stopCh)
194 wg.Wait()
195
196 elapsed := time.Since(startTime)
197
198 var totalReceived int64
199 for _, d := range delegates {
200 totalReceived += d.received.Load()
201 }
202
203 for _, n := range nodes {
204 n.Shutdown()
205 }
206
207 msgsPerSec := float64(totalReceived) / elapsed.Seconds()
208
209 return &ThroughputResult{
210 Implementation: "memberlist",
211 NumNodes: numNodes,
212 DurationNs: duration.Nanoseconds(),
213 MsgRate: msgRate,
214 BroadcastsSent: totalSent.Load(),
215 BroadcastsReceived: totalReceived,
216 MsgsPerSec: msgsPerSec,
217 CPUCores: runtime.NumCPU(),
218 }, nil
219}
220
221func main() {
222 numNodes := flag.Int("nodes", 5, "number of nodes")
223 durationSec := flag.Int("duration", 10, "benchmark duration in seconds")
224 msgRate := flag.Int("rate", 100, "messages per second per node")
225 outputJSON := flag.Bool("json", false, "output as JSON")
226 flag.Parse()
227
228 result, err := runThroughputBenchmark(*numNodes, time.Duration(*durationSec)*time.Second, *msgRate)
229 if err != nil {
230 log.Fatalf("Benchmark failed: %v", err)
231 }
232
233 if *outputJSON {
234 enc := json.NewEncoder(os.Stdout)
235 enc.SetIndent("", " ")
236 enc.Encode(result)
237 } else {
238 fmt.Printf("=== Memberlist Throughput Results ===\n")
239 fmt.Printf("Nodes: %d\n", result.NumNodes)
240 fmt.Printf("Duration: %s\n", time.Duration(result.DurationNs))
241 fmt.Printf("Target Rate: %d msg/s per node\n", result.MsgRate)
242 fmt.Printf("Broadcasts Sent: %d\n", result.BroadcastsSent)
243 fmt.Printf("Broadcasts Recv: %d\n", result.BroadcastsReceived)
244 fmt.Printf("Throughput: %.1f msg/s\n", result.MsgsPerSec)
245 fmt.Printf("CPU Cores: %d\n", result.CPUCores)
246 }
247}