An experimental pub/sub client and server project.
at main 62 lines 1.2 kB view raw
1package server 2 3import ( 4 "fmt" 5 "net" 6 "sync" 7 8 "github.com/willdot/messagebroker/internal" 9 "github.com/willdot/messagebroker/internal/messagestore" 10) 11 12type Store interface { 13 Write(msg internal.Message) error 14 ReadFrom(offset int, handleFunc func(msg internal.Message)) 15} 16 17type topic struct { 18 name string 19 subscriptions map[net.Addr]*subscriber 20 mu sync.Mutex 21 messageStore Store 22} 23 24func newTopic(name string) *topic { 25 messageStore := messagestore.NewMemoryStore() 26 return &topic{ 27 name: name, 28 subscriptions: make(map[net.Addr]*subscriber), 29 messageStore: messageStore, 30 } 31} 32 33func (t *topic) sendMessageToSubscribers(msg internal.Message) error { 34 err := t.messageStore.Write(msg) 35 if err != nil { 36 return fmt.Errorf("failed to write message to store: %w", err) 37 } 38 39 t.mu.Lock() 40 subscribers := t.subscriptions 41 t.mu.Unlock() 42 43 for _, subscriber := range subscribers { 44 subscriber.addMessage(msg, 0) 45 } 46 47 return nil 48} 49 50func (t *topic) findSubscription(addr net.Addr) *subscriber { 51 t.mu.Lock() 52 defer t.mu.Unlock() 53 54 return t.subscriptions[addr] 55} 56 57func (t *topic) removeSubscription(addr net.Addr) { 58 t.mu.Lock() 59 defer t.mu.Unlock() 60 61 delete(t.subscriptions, addr) 62}