···55 "sync"
66)
7788-// Peer represents a remote connection to the server such as a publisher or subscriber
88+// Peer represents a remote connection to the server such as a publisher or subscriber.
99type Peer struct {
1010 conn net.Conn
1111 connMu sync.Mutex
1212}
13131414-// New returns a new peer
1414+// New returns a new peer.
1515func New(conn net.Conn) *Peer {
1616 return &Peer{
1717 conn: conn,
1818 }
1919}
20202121-// Addr returns the peers connections address
2121+// Addr returns the peers connections address.
2222func (p *Peer) Addr() net.Addr {
2323 return p.conn.RemoteAddr()
2424}
25252626-// ConnOpp represents a set of actions on a connection that can be used synchrnously
2626+// ConnOpp represents a set of actions on a connection that can be used synchrnously.
2727type ConnOpp func(conn net.Conn) error
28282929// RunConnOperation will run the provided operation. It ensures that it is the only operation that is being
+30-14
server/server.go
···55 "encoding/json"
66 "errors"
77 "fmt"
88+ "io"
89 "log/slog"
910 "net"
1011 "strings"
1112 "sync"
1313+ "syscall"
1214 "time"
13151416 "github.com/willdot/messagebroker/server/peer"
···5153 lis net.Listener
52545355 mu sync.Mutex
5454- topics map[string]topic
5656+ topics map[string]*topic
5557}
56585759// New creates and starts a new server
···63656466 srv := &Server{
6567 lis: lis,
6666- topics: map[string]topic{},
6868+ topics: map[string]*topic{},
6769 }
68706971 go srv.start()
···979998100 action, err := readAction(peer, 0)
99101 if err != nil {
100100- slog.Error("failed to read action from peer", "error", err, "peer", peer.Addr())
102102+ if !errors.Is(err, io.EOF) {
103103+ slog.Error("failed to read action from peer", "error", err, "peer", peer.Addr())
104104+ }
101105 return
102106 }
103107···123127 for {
124128 action, err := readAction(peer, time.Millisecond*100)
125129 if err != nil {
130130+ // if the error is a timeout, it means the peer hasn't sent an action indicating it wishes to do something so sleep
131131+ // for a little bit to allow for other actions to happen on the connection
126132 var neterr net.Error
127133 if errors.As(err, &neterr) && neterr.Timeout() {
128128- time.Sleep(time.Second)
134134+ time.Sleep(time.Millisecond * 500)
129135 continue
130136 }
131131- // TODO: see if there's a way to check if the peers connection has been ended etc
132132- slog.Error("failed to read action from subscriber", "error", err, "peer", peer.Addr())
137137+138138+ if !errors.Is(err, io.EOF) {
139139+ slog.Error("failed to read action from subscriber", "error", err, "peer", peer.Addr())
140140+ }
133141134134- s.unsubscribePeerFromAllTopics(*peer)
142142+ s.unsubscribePeerFromAllTopics(peer)
135143136144 return
137145 }
···218226 return nil
219227 }
220228221221- s.unsubscribeToTopics(*peer, topics)
229229+ s.unsubscribeToTopics(peer, topics)
222230 writeStatus(Unsubscribed, "", conn)
223231224232 return nil
···239247 op := func(conn net.Conn) error {
240248 dataLen, err := dataLength(conn)
241249 if err != nil {
250250+ if errors.Is(err, io.EOF) {
251251+ return nil
252252+ }
242253 slog.Error("failed to read data length", "error", err, "peer", peer.Addr())
243254 writeStatus(Error, "invalid data length of data provided", conn)
244255 return nil
···325336 s.topics[topicName] = t
326337}
327338328328-func (s *Server) unsubscribeToTopics(peer peer.Peer, topics []string) {
339339+func (s *Server) unsubscribeToTopics(peer *peer.Peer, topics []string) {
329340 for _, topic := range topics {
330341 s.removeSubsciberFromTopic(topic, peer)
331342 }
332343}
333344334334-func (s *Server) removeSubsciberFromTopic(topicName string, peer peer.Peer) {
345345+func (s *Server) removeSubsciberFromTopic(topicName string, peer *peer.Peer) {
335346 s.mu.Lock()
336347 defer s.mu.Unlock()
337348···343354 delete(t.subscriptions, peer.Addr())
344355}
345356346346-func (s *Server) unsubscribePeerFromAllTopics(peer peer.Peer) {
357357+func (s *Server) unsubscribePeerFromAllTopics(peer *peer.Peer) {
347358 s.mu.Lock()
348359 defer s.mu.Unlock()
349360···357368 defer s.mu.Unlock()
358369359370 if topic, ok := s.topics[topicName]; ok {
360360- return &topic
371371+ return topic
361372 }
362373363374 return nil
···367378 var action Action
368379 op := func(conn net.Conn) error {
369380 if timeout > 0 {
370370- conn.SetReadDeadline(time.Now().Add(timeout))
381381+ err := conn.SetReadDeadline(time.Now().Add(timeout))
382382+ if err != nil {
383383+ slog.Error("failed to set connection read deadline", "error", err, "peer", peer.Addr())
384384+ }
371385 }
372386373387 err := binary.Read(conn, binary.BigEndian, &action)
···406420func writeStatus(status Status, message string, conn net.Conn) {
407421 err := binary.Write(conn, binary.BigEndian, status)
408422 if err != nil {
409409- slog.Error("failed to write status to peers connection", "error", err, "peer", conn.RemoteAddr())
423423+ if !errors.Is(err, syscall.EPIPE) {
424424+ slog.Error("failed to write status to peers connection", "error", err, "peer", conn.RemoteAddr())
425425+ }
410426 return
411427 }
412428