An experimental pub/sub client and server project.
at main 136 lines 2.9 kB view raw
1package server 2 3import ( 4 "encoding/binary" 5 "fmt" 6 "log/slog" 7 "net" 8 "time" 9 10 "github.com/willdot/messagebroker/internal" 11) 12 13type subscriber struct { 14 peer *Peer 15 topic string 16 messages chan internal.Message 17 unsubscribeCh chan struct{} 18 19 ackDelay time.Duration 20 ackTimeout time.Duration 21} 22 23func newSubscriber(peer *Peer, topic *topic, ackDelay, ackTimeout time.Duration, startAt int) *subscriber { 24 s := &subscriber{ 25 peer: peer, 26 topic: topic.name, 27 messages: make(chan internal.Message), 28 ackDelay: ackDelay, 29 ackTimeout: ackTimeout, 30 unsubscribeCh: make(chan struct{}, 1), 31 } 32 33 go s.sendMessages() 34 35 go func() { 36 topic.messageStore.ReadFrom(startAt, func(msg internal.Message) { 37 select { 38 case s.messages <- msg: 39 return 40 case <-s.unsubscribeCh: 41 return 42 } 43 }) 44 }() 45 46 return s 47} 48 49func (s *subscriber) sendMessages() { 50 for { 51 select { 52 case <-s.unsubscribeCh: 53 return 54 case msg := <-s.messages: 55 ack, err := s.sendMessage(s.topic, msg) 56 if err != nil { 57 slog.Error("failed to send to message", "error", err, "peer", s.peer.Addr()) 58 } 59 60 if ack { 61 continue 62 } 63 64 if msg.DeliveryCount >= 5 { 65 slog.Error("max delivery count for message. Dropping", "peer", s.peer.Addr()) 66 continue 67 } 68 69 msg.DeliveryCount++ 70 s.addMessage(msg, s.ackDelay) 71 } 72 } 73} 74 75func (s *subscriber) addMessage(msg internal.Message, delay time.Duration) { 76 go func() { 77 timer := time.NewTimer(delay) 78 defer timer.Stop() 79 80 select { 81 case <-s.unsubscribeCh: 82 return 83 case <-timer.C: 84 s.messages <- msg 85 } 86 }() 87} 88 89func (s *subscriber) sendMessage(topic string, msg internal.Message) (bool, error) { 90 var ack bool 91 op := func(conn net.Conn) error { 92 topicB := make([]byte, 2) 93 binary.BigEndian.PutUint16(topicB, uint16(len(topic))) 94 95 headers := topicB 96 headers = append(headers, []byte(topic)...) 97 98 // TODO: if message is empty, return error? 99 dataLenB := make([]byte, 8) 100 binary.BigEndian.PutUint64(dataLenB, uint64(len(msg.Data))) 101 headers = append(headers, dataLenB...) 102 103 _, err := conn.Write(append(headers, msg.Data...)) 104 if err != nil { 105 return fmt.Errorf("failed to write to peer: %w", err) 106 } 107 108 if err := conn.SetReadDeadline(time.Now().Add(s.ackTimeout)); err != nil { 109 slog.Error("failed to set connection read deadline", "error", err, "peer", s.peer.Addr()) 110 } 111 defer func() { 112 if err := conn.SetReadDeadline(time.Time{}); err != nil { 113 slog.Error("failed to reset connection read deadline", "error", err, "peer", s.peer.Addr()) 114 } 115 }() 116 var ackRes Action 117 err = binary.Read(conn, binary.BigEndian, &ackRes) 118 if err != nil { 119 return fmt.Errorf("failed to read ack from peer: %w", err) 120 } 121 122 if ackRes == Ack { 123 ack = true 124 } 125 126 return nil 127 } 128 129 err := s.peer.RunConnOperation(op) 130 131 return ack, err 132} 133 134func (s *subscriber) unsubscribe() { 135 close(s.unsubscribeCh) 136}