An experimental pub/sub client and server project.
1package server
2
3import (
4 "encoding/binary"
5 "fmt"
6 "log/slog"
7 "net"
8 "time"
9
10 "github.com/willdot/messagebroker/internal"
11)
12
13type subscriber struct {
14 peer *Peer
15 topic string
16 messages chan internal.Message
17 unsubscribeCh chan struct{}
18
19 ackDelay time.Duration
20 ackTimeout time.Duration
21}
22
23func newSubscriber(peer *Peer, topic *topic, ackDelay, ackTimeout time.Duration, startAt int) *subscriber {
24 s := &subscriber{
25 peer: peer,
26 topic: topic.name,
27 messages: make(chan internal.Message),
28 ackDelay: ackDelay,
29 ackTimeout: ackTimeout,
30 unsubscribeCh: make(chan struct{}, 1),
31 }
32
33 go s.sendMessages()
34
35 go func() {
36 topic.messageStore.ReadFrom(startAt, func(msg internal.Message) {
37 select {
38 case s.messages <- msg:
39 return
40 case <-s.unsubscribeCh:
41 return
42 }
43 })
44 }()
45
46 return s
47}
48
49func (s *subscriber) sendMessages() {
50 for {
51 select {
52 case <-s.unsubscribeCh:
53 return
54 case msg := <-s.messages:
55 ack, err := s.sendMessage(s.topic, msg)
56 if err != nil {
57 slog.Error("failed to send to message", "error", err, "peer", s.peer.Addr())
58 }
59
60 if ack {
61 continue
62 }
63
64 if msg.DeliveryCount >= 5 {
65 slog.Error("max delivery count for message. Dropping", "peer", s.peer.Addr())
66 continue
67 }
68
69 msg.DeliveryCount++
70 s.addMessage(msg, s.ackDelay)
71 }
72 }
73}
74
75func (s *subscriber) addMessage(msg internal.Message, delay time.Duration) {
76 go func() {
77 timer := time.NewTimer(delay)
78 defer timer.Stop()
79
80 select {
81 case <-s.unsubscribeCh:
82 return
83 case <-timer.C:
84 s.messages <- msg
85 }
86 }()
87}
88
89func (s *subscriber) sendMessage(topic string, msg internal.Message) (bool, error) {
90 var ack bool
91 op := func(conn net.Conn) error {
92 topicB := make([]byte, 2)
93 binary.BigEndian.PutUint16(topicB, uint16(len(topic)))
94
95 headers := topicB
96 headers = append(headers, []byte(topic)...)
97
98 // TODO: if message is empty, return error?
99 dataLenB := make([]byte, 8)
100 binary.BigEndian.PutUint64(dataLenB, uint64(len(msg.Data)))
101 headers = append(headers, dataLenB...)
102
103 _, err := conn.Write(append(headers, msg.Data...))
104 if err != nil {
105 return fmt.Errorf("failed to write to peer: %w", err)
106 }
107
108 if err := conn.SetReadDeadline(time.Now().Add(s.ackTimeout)); err != nil {
109 slog.Error("failed to set connection read deadline", "error", err, "peer", s.peer.Addr())
110 }
111 defer func() {
112 if err := conn.SetReadDeadline(time.Time{}); err != nil {
113 slog.Error("failed to reset connection read deadline", "error", err, "peer", s.peer.Addr())
114 }
115 }()
116 var ackRes Action
117 err = binary.Read(conn, binary.BigEndian, &ackRes)
118 if err != nil {
119 return fmt.Errorf("failed to read ack from peer: %w", err)
120 }
121
122 if ackRes == Ack {
123 ack = true
124 }
125
126 return nil
127 }
128
129 err := s.peer.RunConnOperation(op)
130
131 return ack, err
132}
133
134func (s *subscriber) unsubscribe() {
135 close(s.unsubscribeCh)
136}