package main import ( "context" "strings" "sync" ) type MemoryBackend struct { mu sync.RWMutex buffer *RingBuffer listeners []chan *Event } func NewMemoryBackend(bufferSize int) *MemoryBackend { return &MemoryBackend{ buffer: NewRingBuffer(bufferSize), } } func (m *MemoryBackend) Publish(event *Event) error { m.buffer.Add(event) m.mu.RLock() defer m.mu.RUnlock() for _, ch := range m.listeners { select { case ch <- event: default: } } return nil } func (m *MemoryBackend) Subscribe(ctx context.Context) <-chan *Event { ch := make(chan *Event, 256) m.mu.Lock() m.listeners = append(m.listeners, ch) m.mu.Unlock() go func() { <-ctx.Done() m.mu.Lock() defer m.mu.Unlock() for i, l := range m.listeners { if l == ch { m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) break } } }() return ch } func (m *MemoryBackend) Since(lastEventID string, subscribePath string) []*Event { return m.buffer.Since(lastEventID, subscribePath) } func (m *MemoryBackend) Close() error { return nil } type RingBuffer struct { mu sync.RWMutex buf []*Event size int write int count int } func NewRingBuffer(size int) *RingBuffer { return &RingBuffer{ buf: make([]*Event, size), size: size, } } func (rb *RingBuffer) Add(event *Event) { rb.mu.Lock() defer rb.mu.Unlock() rb.buf[rb.write%rb.size] = event rb.write++ if rb.count < rb.size { rb.count++ } } func (rb *RingBuffer) Since(lastEventID string, subscribePath string) []*Event { rb.mu.RLock() defer rb.mu.RUnlock() start := rb.write - rb.count found := false foundIdx := start for i := start; i < rb.write; i++ { e := rb.buf[i%rb.size] if e.ID == lastEventID { found = true foundIdx = i + 1 break } } if !found { foundIdx = start } var result []*Event for i := foundIdx; i < rb.write; i++ { e := rb.buf[i%rb.size] if pathMatches(subscribePath, e.Path) { result = append(result, e) } } return result } func pathMatches(subscribePath, eventPath string) bool { if subscribePath == "" { return true } return eventPath == subscribePath || strings.HasPrefix(eventPath, subscribePath+"/") }