···77777878type Store interface {
7979 Write(msg MessageToSend) error
8080- ReadFrom(offset int, handleFunc func(msgs []MessageToSend)) error
8080+ ReadFrom(offset int, handleFunc func(msg MessageToSend)) error
8181}
82828383// Server accepts subscribe and publish connections and passes messages around
···90909191 ackDelay time.Duration
9292 ackTimeout time.Duration
9393-9494- messageStore Store
9593}
96949795// New creates and starts a new server
9898-func New(Addr string, ackDelay, ackTimeout time.Duration, messageStore Store) (*Server, error) {
9696+func New(Addr string, ackDelay, ackTimeout time.Duration) (*Server, error) {
9997 lis, err := net.Listen("tcp", Addr)
10098 if err != nil {
10199 return nil, fmt.Errorf("failed to listen: %w", err)
102100 }
103101104102 srv := &Server{
105105- lis: lis,
106106- topics: map[string]*topic{},
107107- ackDelay: ackDelay,
108108- ackTimeout: ackTimeout,
109109- messageStore: messageStore,
103103+ lis: lis,
104104+ topics: map[string]*topic{},
105105+ ackDelay: ackDelay,
106106+ ackTimeout: ackTimeout,
110107 }
111108112109 go srv.start()
···375372376373 topic := s.getTopic(message.topic)
377374 if topic == nil {
378378- topic = newTopic(message.topic, s.messageStore)
375375+ topic = newTopic(message.topic)
379376 s.topics[message.topic] = topic
380377 }
381378···406403407404 t, ok := s.topics[topicName]
408405 if !ok {
409409- t = newTopic(topicName, s.messageStore)
406406+ t = newTopic(topicName)
410407 }
411408412412- t.subscriptions[peer.Addr()] = newSubscriber(peer, topicName, s.ackDelay, s.ackTimeout, s.messageStore, startAt)
409409+ t.subscriptions[peer.Addr()] = newSubscriber(peer, topicName, s.ackDelay, s.ackTimeout, t.messageStore, startAt)
413410414411 s.topics[topicName] = t
415412}
+12-5
server/server_test.go
···2424)
25252626func createServer(t *testing.T) *Server {
2727- store := NewMemoryStore()
2828- srv, err := New(serverAddr, ackDelay, ackTimeout, store)
2727+ srv, err := New(serverAddr, ackDelay, ackTimeout)
2928 require.NoError(t, err)
30293130 t.Cleanup(func() {
···4039 srv.topics[topicName] = &topic{
4140 name: topicName,
4241 subscriptions: make(map[net.Addr]*subscriber),
4242+ messageStore: NewMemoryStore(),
4343 }
44444545 return srv
···516516 messages = append(messages, fmt.Sprintf("message %d", i))
517517 }
518518519519- // send messages first
520519 topic := fmt.Sprintf("topic:%s", topicA)
521520522522- // send multiple messages
523521 for _, msg := range messages {
524522 sendMessage(t, publisherConn, topic, []byte(msg))
525523 }
524524+525525+ // send some messages for topic B as well
526526+ sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 1"))
527527+ sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 2"))
528528+ sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 3"))
526529527530 subscriberConn := createConnectionAndSubscribe(t, []string{topicA}, From, 0)
528531 results := make([]string, 0, len(messages))
···547550 messages = append(messages, fmt.Sprintf("message %d", i))
548551 }
549552550550- // send messages first
551553 topic := fmt.Sprintf("topic:%s", topicA)
552554553555 // send multiple messages
554556 for _, msg := range messages {
555557 sendMessage(t, publisherConn, topic, []byte(msg))
556558 }
559559+560560+ // send some messages for topic B as well
561561+ sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 1"))
562562+ sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 2"))
563563+ sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 3"))
557564558565 subscriberConn := createConnectionAndSubscribe(t, []string{topicA}, From, 3)
559566
+2-7
server/subscriber.go
···4444 offset := startAt
45454646 go func() {
4747- // here we need to replay all messages from the store for the topic.
4848- err := messageStore.ReadFrom(offset, func(msgs []MessageToSend) {
4949- // go func() {
5050- for _, msg := range msgs {
5151- s.messages <- newMessage(msg.data)
5252- }
5353- // }()
4747+ err := messageStore.ReadFrom(offset, func(msg MessageToSend) {
4848+ s.messages <- newMessage(msg.data)
5449 })
5550 if err != nil {
5651 slog.Error("failed to replay messages from offset", "error", err, "offset", offset)