Webhook-to-SSE gateway with hierarchical topic routing and signature verification
at main 127 lines 2.2 kB view raw
1package main 2 3import ( 4 "context" 5 "strings" 6 "sync" 7) 8 9type MemoryBackend struct { 10 mu sync.RWMutex 11 buffer *RingBuffer 12 listeners []chan *Event 13} 14 15func NewMemoryBackend(bufferSize int) *MemoryBackend { 16 return &MemoryBackend{ 17 buffer: NewRingBuffer(bufferSize), 18 } 19} 20 21func (m *MemoryBackend) Publish(event *Event) error { 22 m.buffer.Add(event) 23 24 m.mu.RLock() 25 defer m.mu.RUnlock() 26 27 for _, ch := range m.listeners { 28 select { 29 case ch <- event: 30 default: 31 } 32 } 33 return nil 34} 35 36func (m *MemoryBackend) Subscribe(ctx context.Context) <-chan *Event { 37 ch := make(chan *Event, 256) 38 39 m.mu.Lock() 40 m.listeners = append(m.listeners, ch) 41 m.mu.Unlock() 42 43 go func() { 44 <-ctx.Done() 45 m.mu.Lock() 46 defer m.mu.Unlock() 47 for i, l := range m.listeners { 48 if l == ch { 49 m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) 50 break 51 } 52 } 53 }() 54 55 return ch 56} 57 58func (m *MemoryBackend) Since(lastEventID string, subscribePath string) []*Event { 59 return m.buffer.Since(lastEventID, subscribePath) 60} 61 62func (m *MemoryBackend) Close() error { 63 return nil 64} 65 66type RingBuffer struct { 67 mu sync.RWMutex 68 buf []*Event 69 size int 70 write int 71 count int 72} 73 74func NewRingBuffer(size int) *RingBuffer { 75 return &RingBuffer{ 76 buf: make([]*Event, size), 77 size: size, 78 } 79} 80 81func (rb *RingBuffer) Add(event *Event) { 82 rb.mu.Lock() 83 defer rb.mu.Unlock() 84 rb.buf[rb.write%rb.size] = event 85 rb.write++ 86 if rb.count < rb.size { 87 rb.count++ 88 } 89} 90 91func (rb *RingBuffer) Since(lastEventID string, subscribePath string) []*Event { 92 rb.mu.RLock() 93 defer rb.mu.RUnlock() 94 95 start := rb.write - rb.count 96 found := false 97 foundIdx := start 98 99 for i := start; i < rb.write; i++ { 100 e := rb.buf[i%rb.size] 101 if e.ID == lastEventID { 102 found = true 103 foundIdx = i + 1 104 break 105 } 106 } 107 108 if !found { 109 foundIdx = start 110 } 111 112 var result []*Event 113 for i := foundIdx; i < rb.write; i++ { 114 e := rb.buf[i%rb.size] 115 if pathMatches(subscribePath, e.Path) { 116 result = append(result, e) 117 } 118 } 119 return result 120} 121 122func pathMatches(subscribePath, eventPath string) bool { 123 if subscribePath == "" { 124 return true 125 } 126 return eventPath == subscribePath || strings.HasPrefix(eventPath, subscribePath+"/") 127}