···11+package messagestore
22+33+import (
44+ "sync"
55+66+ "github.com/willdot/messagebroker/internal"
77+)
88+99+// MemoryStore allows messages to be stored in memory
1010+type MemoryStore struct {
1111+ mu sync.Mutex
1212+ msgs map[int]internal.Message
1313+ nextOffset int
1414+}
1515+1616+// NewMemoryStore initializes a new in memory store
1717+func NewMemoryStore() *MemoryStore {
1818+ return &MemoryStore{
1919+ msgs: make(map[int]internal.Message),
2020+ }
2121+}
2222+2323+// Write will write the provided message to the in memory store
2424+func (m *MemoryStore) Write(msg internal.Message) error {
2525+ m.mu.Lock()
2626+ defer m.mu.Unlock()
2727+2828+ m.msgs[m.nextOffset] = msg
2929+3030+ m.nextOffset++
3131+3232+ return nil
3333+}
3434+3535+// ReadFrom will read messages from (and including) the provided offset and pass them to the provided handler
3636+func (m *MemoryStore) ReadFrom(offset int, handleFunc func(msg internal.Message)) {
3737+ if offset < 0 || offset >= m.nextOffset {
3838+ return
3939+ }
4040+4141+ m.mu.Lock()
4242+ defer m.mu.Unlock()
4343+4444+ for i := offset; i < len(m.msgs); i++ {
4545+ handleFunc(m.msgs[i])
4646+ }
4747+}
+12
internal/messge.go
···11+package internal
22+33+// Message represents a message that can be sent / received
44+type Message struct {
55+ Data []byte
66+ DeliveryCount int
77+}
88+99+// NewMessage intializes a new message
1010+func NewMessage(data []byte) Message {
1111+ return Message{Data: data, DeliveryCount: 1}
1212+}
+1-1
pubsub/message.go
client/message.go
···11-package pubsub
11+package client
2233// Message represents a message that can be published or consumed
44type Message struct {
+4-4
pubsub/publisher.go
client/publisher.go
···11-package pubsub
11+package client
2233import (
44 "encoding/binary"
···66 "net"
77 "sync"
8899- "github.com/willdot/messagebroker/server"
99+ "github.com/willdot/messagebroker/internal/server"
1010)
11111212// Publisher allows messages to be published to a server
···4444 // send topic first
4545 topic := fmt.Sprintf("topic:%s", message.Topic)
46464747- topicLenB := make([]byte, 4)
4848- binary.BigEndian.PutUint32(topicLenB, uint32(len(topic)))
4747+ topicLenB := make([]byte, 2)
4848+ binary.BigEndian.PutUint16(topicLenB, uint16(len(topic)))
49495050 headers := append(topicLenB, []byte(topic)...)
5151
+6-6
pubsub/subscriber.go
client/subscriber.go
···11-package pubsub
11+package client
2233import (
44 "context"
···1010 "sync"
1111 "time"
12121313- "github.com/willdot/messagebroker/server"
1313+ "github.com/willdot/messagebroker/internal/server"
1414)
15151616type connOpp func(conn net.Conn) error
···9696 return nil
9797 }
98989999- var dataLen uint32
9999+ var dataLen uint16
100100 err = binary.Read(conn, binary.BigEndian, &dataLen)
101101 if err != nil {
102102 return fmt.Errorf("received status %s:", resp)
···140140 return nil
141141 }
142142143143- var dataLen uint32
143143+ var dataLen uint16
144144 err = binary.Read(conn, binary.BigEndian, &dataLen)
145145 if err != nil {
146146 return fmt.Errorf("received status %s:", resp)
···198198199199func (s *Subscriber) readMessage(ctx context.Context, msgChan chan *Message) error {
200200 op := func(conn net.Conn) error {
201201- err := s.conn.SetReadDeadline(time.Now().Add(time.Second))
201201+ err := s.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 300))
202202 if err != nil {
203203 return err
204204 }
205205206206- var topicLen uint64
206206+ var topicLen uint16
207207 err = binary.Read(s.conn, binary.BigEndian, &topicLen)
208208 if err != nil {
209209 // TODO: check if this is needed elsewhere. I'm not sure where the read deadline resets....
···11-package pubsub
11+package client
2233import (
44 "context"
···8899 "github.com/stretchr/testify/assert"
1010 "github.com/stretchr/testify/require"
1111-1212- "github.com/willdot/messagebroker/server"
1111+ "github.com/willdot/messagebroker/internal/server"
1312)
14131514const (
···134133 err = publisher.PublishMessage(msg)
135134 require.NoError(t, err)
136135137137- time.Sleep(time.Second)
136136+ // give the consumer some time to read the messages -- TODO: make better!
137137+ time.Sleep(time.Millisecond * 300)
138138 cancel()
139139140140 select {
···181181 }
182182183183 // give the consumer some time to read the messages -- TODO: make better!
184184- time.Sleep(time.Second)
184184+ time.Sleep(time.Millisecond * 300)
185185 cancel()
186186187187 select {
-45
server/message_store.go
···11-package server
22-33-import (
44- "sync"
55-)
66-77-// Memory store allows messages to be stored in memory
88-type MemoryStore struct {
99- mu sync.Mutex
1010- msgs map[int]message
1111- nextOffset int
1212-}
1313-1414-// New memory store initializes a new in memory store
1515-func NewMemoryStore() *MemoryStore {
1616- return &MemoryStore{
1717- msgs: make(map[int]message),
1818- }
1919-}
2020-2121-// Write will write the provided message to the in memory store
2222-func (m *MemoryStore) Write(msg message) error {
2323- m.mu.Lock()
2424- defer m.mu.Unlock()
2525-2626- m.msgs[m.nextOffset] = msg
2727-2828- m.nextOffset++
2929-3030- return nil
3131-}
3232-3333-// ReadFrom will read messages from (and including) the provided offset and pass them to the provided handler
3434-func (m *MemoryStore) ReadFrom(offset int, handleFunc func(msg message)) {
3535- if offset < 0 || offset >= m.nextOffset {
3636- return
3737- }
3838-3939- m.mu.Lock()
4040- defer m.mu.Unlock()
4141-4242- for i := offset; i < len(m.msgs); i++ {
4343- handleFunc(m.msgs[i])
4444- }
4545-}
+2-2
server/peer/peer.go
internal/server/peer.go
···11-package peer
11+package server
2233import (
44 "net"
···1212}
13131414// New returns a new peer.
1515-func New(conn net.Conn) *Peer {
1515+func NewPeer(conn net.Conn) *Peer {
1616 return &Peer{
1717 conn: conn,
1818 }
+38-25
server/server.go
internal/server/server.go
···1313 "syscall"
1414 "time"
15151616- "github.com/willdot/messagebroker/server/peer"
1616+ "github.com/willdot/messagebroker/internal"
1717)
18181919// Action represents the type of action that a peer requests to do
···111111}
112112113113func (s *Server) handleConn(conn net.Conn) {
114114- peer := peer.New(conn)
114114+ peer := NewPeer(conn)
115115116116 slog.Info("handling connection", "peer", peer.Addr())
117117 defer slog.Info("ending connection", "peer", peer.Addr())
···137137 }
138138}
139139140140-func (s *Server) handleSubscribe(peer *peer.Peer) {
140140+func (s *Server) handleSubscribe(peer *Peer) {
141141 slog.Info("handling subscriber", "peer", peer.Addr())
142142 // subscribe the peer to the topic
143143 s.subscribePeerToTopic(peer)
144144145145+ s.waitForPeerAction(peer)
146146+}
147147+148148+func (s *Server) waitForPeerAction(peer *Peer) {
145149 // keep handling the peers connection, getting the action from the peer when it wishes to do something else.
146150 // once the peers connection ends, it will be unsubscribed from all topics and returned
147151 for {
···177181 }
178182}
179183180180-func (s *Server) subscribePeerToTopic(peer *peer.Peer) {
184184+func (s *Server) subscribePeerToTopic(peer *Peer) {
181185 op := func(conn net.Conn) error {
182186 // get the topics the peer wishes to subscribe to
183183- dataLen, err := dataLength(conn)
187187+ dataLen, err := dataLengthUint32(conn)
184188 if err != nil {
185189 slog.Error(err.Error(), "peer", peer.Addr())
186190 writeStatus(Error, "invalid data length of topics provided", conn)
···244248 _ = peer.RunConnOperation(op)
245249}
246250247247-func (s *Server) handleUnsubscribe(peer *peer.Peer) {
251251+func (s *Server) handleUnsubscribe(peer *Peer) {
248252 slog.Info("handling unsubscriber", "peer", peer.Addr())
249253 op := func(conn net.Conn) error {
250254 // get the topics the peer wishes to unsubscribe from
251251- dataLen, err := dataLength(conn)
255255+ dataLen, err := dataLengthUint32(conn)
252256 if err != nil {
253257 slog.Error(err.Error(), "peer", peer.Addr())
254258 writeStatus(Error, "invalid data length of topics provided", conn)
···284288 _ = peer.RunConnOperation(op)
285289}
286290287287-func (s *Server) handlePublish(peer *peer.Peer) {
291291+func (s *Server) handlePublish(peer *Peer) {
288292 slog.Info("handling publisher", "peer", peer.Addr())
289293 for {
290294 op := func(conn net.Conn) error {
291291- dataLen, err := dataLength(conn)
295295+ topicDataLen, err := dataLengthUint16(conn)
292296 if err != nil {
293297 if errors.Is(err, io.EOF) {
294298 return nil
···297301 writeStatus(Error, "invalid data length of data provided", conn)
298302 return nil
299303 }
300300- if dataLen == 0 {
304304+ if topicDataLen == 0 {
301305 return nil
302306 }
303303- topicBuf := make([]byte, dataLen)
307307+ topicBuf := make([]byte, topicDataLen)
304308 _, err = conn.Read(topicBuf)
305309 if err != nil {
306310 slog.Error("failed to read topic from peer", "error", err, "peer", peer.Addr())
···316320 }
317321 topicStr = strings.TrimPrefix(topicStr, "topic:")
318322319319- dataLen, err = dataLength(conn)
323323+ msgDataLen, err := dataLengthUint32(conn)
320324 if err != nil {
321325 slog.Error(err.Error(), "peer", peer.Addr())
322326 writeStatus(Error, "invalid data length of data provided", conn)
323327 return nil
324328 }
325325- if dataLen == 0 {
329329+ if msgDataLen == 0 {
326330 return nil
327331 }
328332329329- dataBuf := make([]byte, dataLen)
333333+ dataBuf := make([]byte, msgDataLen)
330334 _, err = conn.Read(dataBuf)
331335 if err != nil {
332336 slog.Error("failed to read data from peer", "error", err, "peer", peer.Addr())
···340344 s.topics[topicStr] = topic
341345 }
342346343343- message := newMessage(dataBuf)
347347+ message := internal.NewMessage(dataBuf)
344348345349 err = topic.sendMessageToSubscribers(message)
346350 if err != nil {
···356360 }
357361}
358362359359-func (s *Server) subscribeToTopics(peer *peer.Peer, topics []string, startAt int) {
363363+func (s *Server) subscribeToTopics(peer *Peer, topics []string, startAt int) {
360364 slog.Info("subscribing peer to topics", "topics", topics, "peer", peer.Addr())
361365 for _, topic := range topics {
362366 s.addSubsciberToTopic(topic, peer, startAt)
363367 }
364368}
365369366366-func (s *Server) addSubsciberToTopic(topicName string, peer *peer.Peer, startAt int) {
370370+func (s *Server) addSubsciberToTopic(topicName string, peer *Peer, startAt int) {
367371 s.mu.Lock()
368372 defer s.mu.Unlock()
369373···377381 s.topics[topicName] = t
378382}
379383380380-func (s *Server) unsubscribeToTopics(peer *peer.Peer, topics []string) {
384384+func (s *Server) unsubscribeToTopics(peer *Peer, topics []string) {
381385 slog.Info("unsubscribing peer from topics", "topics", topics, "peer", peer.Addr())
382386 for _, topic := range topics {
383387 s.removeSubsciberFromTopic(topic, peer)
384388 }
385389}
386390387387-func (s *Server) removeSubsciberFromTopic(topicName string, peer *peer.Peer) {
391391+func (s *Server) removeSubsciberFromTopic(topicName string, peer *Peer) {
388392 s.mu.Lock()
389393 defer s.mu.Unlock()
390394···400404 delete(t.subscriptions, peer.Addr())
401405}
402406403403-func (s *Server) unsubscribePeerFromAllTopics(peer *peer.Peer) {
407407+func (s *Server) unsubscribePeerFromAllTopics(peer *Peer) {
404408 s.mu.Lock()
405409 defer s.mu.Unlock()
406410···425429 return nil
426430}
427431428428-func readAction(peer *peer.Peer, timeout time.Duration) (Action, error) {
432432+func readAction(peer *Peer, timeout time.Duration) (Action, error) {
429433 var action Action
430434 op := func(conn net.Conn) error {
431435 if timeout > 0 {
···454458 return action, nil
455459}
456460457457-func writeInvalidAction(peer *peer.Peer) {
461461+func writeInvalidAction(peer *Peer) {
458462 op := func(conn net.Conn) error {
459463 writeStatus(Error, "unknown action", conn)
460464 return nil
···463467 _ = peer.RunConnOperation(op)
464468}
465469466466-func dataLength(conn net.Conn) (uint32, error) {
470470+func dataLengthUint32(conn net.Conn) (uint32, error) {
467471 var dataLen uint32
472472+ err := binary.Read(conn, binary.BigEndian, &dataLen)
473473+ if err != nil {
474474+ return 0, err
475475+ }
476476+ return dataLen, nil
477477+}
478478+479479+func dataLengthUint16(conn net.Conn) (uint16, error) {
480480+ var dataLen uint16
468481 err := binary.Read(conn, binary.BigEndian, &dataLen)
469482 if err != nil {
470483 return 0, err
···479492 headers := statusB
480493481494 if len(message) > 0 {
482482- sizeB := make([]byte, 4)
483483- binary.BigEndian.PutUint32(sizeB, uint32(len(message)))
495495+ sizeB := make([]byte, 2)
496496+ binary.BigEndian.PutUint16(sizeB, uint16(len(message)))
484497 headers = append(headers, sizeB...)
485498 }
486499
···10101111 "github.com/stretchr/testify/assert"
1212 "github.com/stretchr/testify/require"
1313+ "github.com/willdot/messagebroker/internal/messagestore"
1314)
14151516const (
···3940 srv.topics[topicName] = &topic{
4041 name: topicName,
4142 subscriptions: make(map[net.Addr]*subscriber),
4242- messageStore: NewMemoryStore(),
4343+ messageStore: messagestore.NewMemoryStore(),
4344 }
44454546 return srv
···211212212213 expectedMessage := "unknown action"
213214214214- var dataLen uint32
215215+ var dataLen uint16
215216 err = binary.Read(conn, binary.BigEndian, &dataLen)
216217 require.NoError(t, err)
217218 assert.Equal(t, len(expectedMessage), int(dataLen))
···249250250251 expectedMessage := "topic data does not contain 'topic:' prefix"
251252252252- var dataLen uint32
253253+ var dataLen uint16
253254 err = binary.Read(publisherConn, binary.BigEndian, &dataLen)
254255 require.NoError(t, err)
255256 assert.Equal(t, len(expectedMessage), int(dataLen))
···352353353354 // check the subsribers got the data
354355 readMessage := func(conn net.Conn, ack Action) {
355355- var topicLen uint64
356356+ var topicLen uint16
356357 err = binary.Read(conn, binary.BigEndian, &topicLen)
357358 require.NoError(t, err)
358359···382383 readMessage(subscriberConn, Ack)
383384 // reading for another message should now timeout but give enough time for the ack delay to kick in
384385 // should the second read of the message not have been ack'd properly
385385- var topicLen uint64
386386+ var topicLen uint16
386387 _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100))
387388 err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
388389 require.Error(t, err)
···406407407408 // check the subsribers got the data
408409 readMessage := func(conn net.Conn, ack bool) {
409409- var topicLen uint64
410410+ var topicLen uint16
410411 err = binary.Read(conn, binary.BigEndian, &topicLen)
411412 require.NoError(t, err)
412413···440441441442 // reading for another message should now timeout but give enough time for the ack delay to kick in
442443 // should the second read of the message not have been ack'd properly
443443- var topicLen uint64
444444+ var topicLen uint16
444445 _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100))
445446 err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
446447 require.Error(t, err)
···464465465466 // check the subsribers got the data
466467 readMessage := func(conn net.Conn, ack bool) {
467467- var topicLen uint64
468468+ var topicLen uint16
468469 err = binary.Read(conn, binary.BigEndian, &topicLen)
469470 require.NoError(t, err)
470471···500501 readMessage(subscriberConn, false)
501502502503 // reading for the message should now timeout as we have nack'd the message too many times
503503- var topicLen uint64
504504+ var topicLen uint16
504505 _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100))
505506 err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
506507 require.Error(t, err)
···592593}
593594594595func readMessage(t *testing.T, subscriberConn net.Conn) []byte {
595595- var topicLen uint64
596596+ var topicLen uint16
596597 err := binary.Read(subscriberConn, binary.BigEndian, &topicLen)
597598 require.NoError(t, err)
598599