My implementation of gossip-gloomers - the fly.io distributed systems challenge (https://fly.io/dist-sys/)
1package main
2
3import (
4 "encoding/json"
5 "slices"
6
7 "github.com/avast/retry-go/v4"
8 maelstrom "github.com/jepsen-io/maelstrom/demo/go"
9)
10
11type broadcastMessage struct {
12 Type string `json:"type"`
13 Message int `json:"message"`
14}
15
16func (s *server) handleBroadcast(msg maelstrom.Message) error {
17 var body broadcastMessage
18 if err := json.Unmarshal(msg.Body, &body); err != nil {
19 return err
20 }
21
22 s.mu.Lock()
23 defer s.mu.Unlock()
24
25 if !slices.Contains(s.ids, body.Message) {
26 s.ids = append(s.ids, body.Message)
27 s.relayMessage(body, msg.Src)
28 }
29
30 resp := map[string]any{
31 "type": "broadcast_ok",
32 }
33 return s.node.Reply(msg, resp)
34}
35
36type readMessage struct {
37 Type string `json:"type"`
38 Messages []int `json:"messages"`
39}
40
41func (s *server) handleRead(msg maelstrom.Message) error {
42 s.mu.Lock()
43 defer s.mu.Unlock()
44
45 resp := readMessage{
46 Type: "read_ok",
47 Messages: s.ids,
48 }
49
50 return s.node.Reply(msg, resp)
51}
52
53func (s *server) relayMessage(msg broadcastMessage, src string) {
54 resp := map[string]any{
55 "type": "broadcast",
56 "message": msg.Message,
57 }
58
59 nodes := s.neighbours.getNeighbours()
60
61 for _, node := range nodes {
62 if src == node || node == s.node.ID() {
63 continue
64 }
65
66 retry.Do(func() error {
67 return s.node.RPC(node, resp, func(msg maelstrom.Message) error { return nil })
68 })
69
70 }
71}
72
73type topology struct {
74 Topology map[string][]string `json:"topology"`
75}
76
77func (s *server) handleTopology(msg maelstrom.Message) error {
78 var topology topology
79 if err := json.Unmarshal(msg.Body, &topology); err != nil {
80 return err
81 }
82
83 s.neighbours.addNeighbours(topology.Topology[s.node.ID()])
84
85 resp := map[string]any{
86 "type": "topology_ok",
87 }
88
89 return s.node.Reply(msg, resp)
90}