My implementation of gossip-gloomers - the fly.io distributed systems challenge (https://fly.io/dist-sys/)

Up to challenge 3c complete

Signed-off-by: Will Andrews <will7989@hotmail.com>
Signed-off-by: Will Andrews <did:plc:dadhhalkfcq3gucaq25hjqon>

authored by willdot.net and committed by willdot.net e21d5e4e 89769d91

+214 -16
+1
.gitignore
··· 1 + /results
+90
broadcast.go
··· 1 + package main 2 + 3 + import ( 4 + "encoding/json" 5 + "slices" 6 + 7 + "github.com/avast/retry-go/v4" 8 + maelstrom "github.com/jepsen-io/maelstrom/demo/go" 9 + ) 10 + 11 + type broadcastMessage struct { 12 + Type string `json:"type"` 13 + Message int `json:"message"` 14 + } 15 + 16 + func (s *server) handleBroadcast(msg maelstrom.Message) error { 17 + var body broadcastMessage 18 + if err := json.Unmarshal(msg.Body, &body); err != nil { 19 + return err 20 + } 21 + 22 + s.mu.Lock() 23 + defer s.mu.Unlock() 24 + 25 + if !slices.Contains(s.ids, body.Message) { 26 + s.ids = append(s.ids, body.Message) 27 + s.relayMessage(body, msg.Src) 28 + } 29 + 30 + resp := map[string]any{ 31 + "type": "broadcast_ok", 32 + } 33 + return s.node.Reply(msg, resp) 34 + } 35 + 36 + type readMessage struct { 37 + Type string `json:"type"` 38 + Messages []int `json:"messages"` 39 + } 40 + 41 + func (s *server) handleRead(msg maelstrom.Message) error { 42 + s.mu.Lock() 43 + defer s.mu.Unlock() 44 + 45 + resp := readMessage{ 46 + Type: "read_ok", 47 + Messages: s.ids, 48 + } 49 + 50 + return s.node.Reply(msg, resp) 51 + } 52 + 53 + func (s *server) relayMessage(msg broadcastMessage, src string) { 54 + resp := map[string]any{ 55 + "type": "broadcast", 56 + "message": msg.Message, 57 + } 58 + 59 + nodes := s.neighbours.getNeighbours() 60 + 61 + for _, node := range nodes { 62 + if src == node || node == s.node.ID() { 63 + continue 64 + } 65 + 66 + retry.Do(func() error { 67 + return s.node.RPC(node, resp, func(msg maelstrom.Message) error { return nil }) 68 + }) 69 + 70 + } 71 + } 72 + 73 + type topology struct { 74 + Topology map[string][]string `json:"topology"` 75 + } 76 + 77 + func (s *server) handleTopology(msg maelstrom.Message) error { 78 + var topology topology 79 + if err := json.Unmarshal(msg.Body, &topology); err != nil { 80 + return err 81 + } 82 + 83 + s.neighbours.addNeighbours(topology.Topology[s.node.ID()]) 84 + 85 + resp := map[string]any{ 86 + "type": "topology_ok", 87 + } 88 + 89 + return s.node.Reply(msg, resp) 90 + }
+21
echo.go
··· 1 + package main 2 + 3 + import ( 4 + "encoding/json" 5 + 6 + maelstrom "github.com/jepsen-io/maelstrom/demo/go" 7 + ) 8 + 9 + func (s *server) handleEcho(msg maelstrom.Message) error { 10 + // Unmarshal the message body as an loosely-typed map. 11 + var body map[string]any 12 + if err := json.Unmarshal(msg.Body, &body); err != nil { 13 + return err 14 + } 15 + 16 + // Update the message type to return back. 17 + body["type"] = "echo_ok" 18 + 19 + // Echo the original message back with the updated message type. 20 + return s.node.Reply(msg, body) 21 + }
+4 -1
go.mod
··· 2 2 3 3 go 1.25.0 4 4 5 - require github.com/jepsen-io/maelstrom/demo/go v0.0.0-20251128144731-cb7f07239012 5 + require ( 6 + github.com/avast/retry-go/v4 v4.7.0 7 + github.com/jepsen-io/maelstrom/demo/go v0.0.0-20251128144731-cb7f07239012 8 + )
+10
go.sum
··· 1 + github.com/avast/retry-go/v4 v4.7.0 h1:yjDs35SlGvKwRNSykujfjdMxMhMQQM0TnIjJaHB+Zio= 2 + github.com/avast/retry-go/v4 v4.7.0/go.mod h1:ZMPDa3sY2bKgpLtap9JRUgk2yTAba7cgiFhqxY2Sg6Q= 3 + github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= 4 + github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 1 5 github.com/jepsen-io/maelstrom/demo/go v0.0.0-20251128144731-cb7f07239012 h1:j2FpC/930Px9SWIn8lgzxEiEZOvaQ9EUs37+e1QCNLA= 2 6 github.com/jepsen-io/maelstrom/demo/go v0.0.0-20251128144731-cb7f07239012/go.mod h1:i6aVIs5AIOOaQF1lAisBm7DDeWM1Iopf+26UxjagsCU= 7 + github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 8 + github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 9 + github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= 10 + github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= 11 + gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 12 + gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+49 -14
main.go
··· 1 1 package main 2 2 3 3 import ( 4 - "encoding/json" 5 4 "log" 5 + "log/slog" 6 + "sync" 6 7 7 8 maelstrom "github.com/jepsen-io/maelstrom/demo/go" 8 9 ) 9 10 10 11 func main() { 12 + slog.Info("starting server") 11 13 n := maelstrom.NewNode() 12 - n.Handle("echo", func(msg maelstrom.Message) error { 13 - // Unmarshal the message body as an loosely-typed map. 14 - var body map[string]any 15 - if err := json.Unmarshal(msg.Body, &body); err != nil { 16 - return err 17 - } 18 14 19 - // Update the message type to return back. 20 - body["type"] = "echo_ok" 15 + server := newServer(n) 21 16 22 - // Echo the original message back with the updated message type. 23 - return n.Reply(msg, body) 24 - }) 17 + if err := server.run(); err != nil { 18 + log.Fatal(err) 19 + } 20 + } 25 21 26 - if err := n.Run(); err != nil { 27 - log.Fatal(err) 22 + type server struct { 23 + node *maelstrom.Node 24 + 25 + neighbours *neigbours 26 + 27 + ids []int 28 + mu sync.Mutex 29 + } 30 + 31 + func newServer(node *maelstrom.Node) *server { 32 + server := server{ 33 + node: node, 34 + neighbours: &neigbours{}, 28 35 } 36 + node.Handle("echo", server.handleEcho) 37 + node.Handle("generate", server.handleGenerate) 38 + node.Handle("broadcast", server.handleBroadcast) 39 + node.Handle("read", server.handleRead) 40 + node.Handle("topology", server.handleTopology) 41 + 42 + return &server 43 + } 44 + 45 + func (s *server) run() error { 46 + return s.node.Run() 47 + } 48 + 49 + type neigbours struct { 50 + nodes []string 51 + mu sync.Mutex 52 + } 53 + 54 + func (n *neigbours) addNeighbours(nodes []string) { 55 + n.mu.Lock() 56 + defer n.mu.Unlock() 57 + n.nodes = nodes 58 + } 59 + 60 + func (n *neigbours) getNeighbours() []string { 61 + n.mu.Lock() 62 + defer n.mu.Unlock() 63 + return n.nodes 29 64 }
+14 -1
run.sh
··· 2 2 3 3 go install . 4 4 5 + rm results/results.edn 6 + 5 7 # Go back to where maelstrom is so that the additional files that are created when running it 6 8 # don't end up in the mounted volume and on our local machines 7 9 cd /home/linuxbrew/maelstrom/ 8 10 9 11 if [ "$TEST_TO_RUN" = 'echo' ]; then 10 - ./maelstrom test -w $TEST_TO_RUN --bin /home/linuxbrew/go/bin/gossip-gloomers --node-count 1 --time-limit 10 12 + ./maelstrom test -w echo --bin ../go/bin/gossip-gloomers --node-count 1 --time-limit 10 13 + elif [ "$TEST_TO_RUN" = 'unique-ids' ]; then 14 + ./maelstrom test -w unique-ids --bin ../go/bin/gossip-gloomers --node-count 3 --time-limit 30 --availability total --nemesis partition 15 + elif [ "$TEST_TO_RUN" = 'broadcast-single' ]; then 16 + ./maelstrom test -w broadcast --bin ../go/bin/gossip-gloomers --node-count 1 --time-limit 20 --rate 10 17 + elif [ "$TEST_TO_RUN" = 'broadcast-multi' ]; then 18 + ./maelstrom test -w broadcast --bin ../go/bin/gossip-gloomers --node-count 5 --time-limit 20 --rate 10 19 + elif [ "$TEST_TO_RUN" = 'broadcast-fault' ]; then 20 + ./maelstrom test -w broadcast --bin ../go/bin/gossip-gloomers --node-count 5 --time-limit 20 --rate 10 --nemesis partition 21 + elif [ "$TEST_TO_RUN" = 'broadcast-efficiency-1' ]; then 22 + ./maelstrom test -w broadcast --bin ../go/bin/gossip-gloomers --node-count 25 --time-limit 20 --rate 100 --latency 100 23 + cp store/latest/results.edn ../app/results/results.edn 11 24 else 12 25 echo "invalid input" 13 26 fi
+25
unique_ids.go
··· 1 + package main 2 + 3 + import ( 4 + "encoding/json" 5 + "math/rand" 6 + 7 + maelstrom "github.com/jepsen-io/maelstrom/demo/go" 8 + ) 9 + 10 + func (s *server) handleGenerate(msg maelstrom.Message) error { 11 + // Unmarshal the message body as an loosely-typed map. 12 + var body map[string]any 13 + if err := json.Unmarshal(msg.Body, &body); err != nil { 14 + return err 15 + } 16 + 17 + x := rand.Int63() 18 + 19 + // Update the message type to return back. 20 + body["type"] = "generate_ok" 21 + body["id"] = x 22 + 23 + // Return the original message back with the updated message type. 24 + return s.node.Reply(msg, body) 25 + }