package main import ( "encoding/json" "flag" "fmt" "log" "os" "runtime" "sync" "sync/atomic" "time" "github.com/hashicorp/memberlist" ) type ThroughputResult struct { Implementation string `json:"implementation"` NumNodes int `json:"num_nodes"` DurationNs int64 `json:"duration_ns"` MsgRate int `json:"msg_rate"` BroadcastsSent int64 `json:"broadcasts_sent"` BroadcastsReceived int64 `json:"broadcasts_received"` MsgsPerSec float64 `json:"msgs_per_sec"` CPUCores int `json:"cpu_cores"` } type throughputDelegate struct { received atomic.Int64 meta []byte } func (d *throughputDelegate) NodeMeta(limit int) []byte { return d.meta } func (d *throughputDelegate) NotifyMsg(msg []byte) { if len(msg) > 0 && msg[0] == 'B' { d.received.Add(1) } } func (d *throughputDelegate) GetBroadcasts(overhead, limit int) [][]byte { return nil } func (d *throughputDelegate) LocalState(join bool) []byte { return nil } func (d *throughputDelegate) MergeRemoteState(buf []byte, join bool) { } type throughputEventDelegate struct { joinCh chan string mu sync.Mutex joined map[string]bool } func newThroughputEventDelegate() *throughputEventDelegate { return &throughputEventDelegate{ joinCh: make(chan string, 1000), joined: make(map[string]bool), } } func (e *throughputEventDelegate) NotifyJoin(node *memberlist.Node) { e.mu.Lock() if !e.joined[node.Name] { e.joined[node.Name] = true select { case e.joinCh <- node.Name: default: } } e.mu.Unlock() } func (e *throughputEventDelegate) NotifyLeave(node *memberlist.Node) {} func (e *throughputEventDelegate) NotifyUpdate(node *memberlist.Node) {} func (e *throughputEventDelegate) waitForNodes(n int, timeout time.Duration) bool { deadline := time.Now().Add(timeout) for { e.mu.Lock() count := len(e.joined) e.mu.Unlock() if count >= n { return true } if time.Now().After(deadline) { return false } time.Sleep(10 * time.Millisecond) } } func createNode(name string, port int, delegate *throughputDelegate, events *throughputEventDelegate) (*memberlist.Memberlist, error) { cfg := memberlist.DefaultLANConfig() cfg.Name = name cfg.BindAddr = "127.0.0.1" cfg.BindPort = port cfg.AdvertisePort = port cfg.Delegate = delegate cfg.Events = events cfg.LogOutput = os.Stderr cfg.GossipInterval = 50 * time.Millisecond cfg.ProbeInterval = 200 * time.Millisecond cfg.PushPullInterval = 30 * time.Second cfg.GossipNodes = 3 return memberlist.Create(cfg) } func runThroughputBenchmark(numNodes int, duration time.Duration, msgRate int) (*ThroughputResult, error) { nodes := make([]*memberlist.Memberlist, numNodes) delegates := make([]*throughputDelegate, numNodes) eventDelegates := make([]*throughputEventDelegate, numNodes) basePort := 18946 for i := 0; i < numNodes; i++ { delegates[i] = &throughputDelegate{meta: []byte(fmt.Sprintf("node-%d", i))} eventDelegates[i] = newThroughputEventDelegate() var err error nodes[i], err = createNode( fmt.Sprintf("node-%d", i), basePort+i, delegates[i], eventDelegates[i], ) if err != nil { for j := 0; j < i; j++ { nodes[j].Shutdown() } return nil, fmt.Errorf("failed to create node %d: %w", i, err) } } for i := 1; i < numNodes; i++ { addr := fmt.Sprintf("127.0.0.1:%d", basePort) _, err := nodes[i].Join([]string{addr}) if err != nil { log.Printf("Warning: node %d failed to join: %v", i, err) } } for i := 0; i < numNodes; i++ { if !eventDelegates[i].waitForNodes(numNodes, 10*time.Second) { log.Printf("Warning: Node %d did not see all %d nodes", i, numNodes) } } time.Sleep(500 * time.Millisecond) var totalSent atomic.Int64 stopCh := make(chan struct{}) var wg sync.WaitGroup msgInterval := time.Duration(float64(time.Second) / float64(msgRate)) msg := make([]byte, 65) msg[0] = 'B' for i := 1; i < 65; i++ { msg[i] = 'x' } startTime := time.Now() for i, n := range nodes { wg.Add(1) go func(node *memberlist.Memberlist, idx int) { defer wg.Done() ticker := time.NewTicker(msgInterval) defer ticker.Stop() for { select { case <-ticker.C: for _, member := range node.Members() { if member.Name != node.LocalNode().Name { node.SendBestEffort(member, msg) totalSent.Add(1) } } case <-stopCh: return } } }(n, i) } time.Sleep(duration) close(stopCh) wg.Wait() elapsed := time.Since(startTime) var totalReceived int64 for _, d := range delegates { totalReceived += d.received.Load() } for _, n := range nodes { n.Shutdown() } msgsPerSec := float64(totalReceived) / elapsed.Seconds() return &ThroughputResult{ Implementation: "memberlist", NumNodes: numNodes, DurationNs: duration.Nanoseconds(), MsgRate: msgRate, BroadcastsSent: totalSent.Load(), BroadcastsReceived: totalReceived, MsgsPerSec: msgsPerSec, CPUCores: runtime.NumCPU(), }, nil } func main() { numNodes := flag.Int("nodes", 5, "number of nodes") durationSec := flag.Int("duration", 10, "benchmark duration in seconds") msgRate := flag.Int("rate", 100, "messages per second per node") outputJSON := flag.Bool("json", false, "output as JSON") flag.Parse() result, err := runThroughputBenchmark(*numNodes, time.Duration(*durationSec)*time.Second, *msgRate) if err != nil { log.Fatalf("Benchmark failed: %v", err) } if *outputJSON { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(result) } else { fmt.Printf("=== Memberlist Throughput Results ===\n") fmt.Printf("Nodes: %d\n", result.NumNodes) fmt.Printf("Duration: %s\n", time.Duration(result.DurationNs)) fmt.Printf("Target Rate: %d msg/s per node\n", result.MsgRate) fmt.Printf("Broadcasts Sent: %d\n", result.BroadcastsSent) fmt.Printf("Broadcasts Recv: %d\n", result.BroadcastsReceived) fmt.Printf("Throughput: %.1f msg/s\n", result.MsgsPerSec) fmt.Printf("CPU Cores: %d\n", result.CPUCores) } }