An experimental pub/sub client and server project.
1package server
2
3import (
4 "fmt"
5 "net"
6 "sync"
7
8 "github.com/willdot/messagebroker/internal"
9 "github.com/willdot/messagebroker/internal/messagestore"
10)
11
12type Store interface {
13 Write(msg internal.Message) error
14 ReadFrom(offset int, handleFunc func(msg internal.Message))
15}
16
17type topic struct {
18 name string
19 subscriptions map[net.Addr]*subscriber
20 mu sync.Mutex
21 messageStore Store
22}
23
24func newTopic(name string) *topic {
25 messageStore := messagestore.NewMemoryStore()
26 return &topic{
27 name: name,
28 subscriptions: make(map[net.Addr]*subscriber),
29 messageStore: messageStore,
30 }
31}
32
33func (t *topic) sendMessageToSubscribers(msg internal.Message) error {
34 err := t.messageStore.Write(msg)
35 if err != nil {
36 return fmt.Errorf("failed to write message to store: %w", err)
37 }
38
39 t.mu.Lock()
40 subscribers := t.subscriptions
41 t.mu.Unlock()
42
43 for _, subscriber := range subscribers {
44 subscriber.addMessage(msg, 0)
45 }
46
47 return nil
48}
49
50func (t *topic) findSubscription(addr net.Addr) *subscriber {
51 t.mu.Lock()
52 defer t.mu.Unlock()
53
54 return t.subscriptions[addr]
55}
56
57func (t *topic) removeSubscription(addr net.Addr) {
58 t.mu.Lock()
59 defer t.mu.Unlock()
60
61 delete(t.subscriptions, addr)
62}