Webhook-to-SSE gateway with hierarchical topic routing and signature verification

Pluggable pub/sub backend for multi-replica support

Wicket was single-replica only — the Broker fanned out events directly
to local SSE subscriber channels, so only the replica that received a
POST could deliver to its subscribers. This adds a Backend interface
that decouples event distribution from local subscriber management,
with two implementations:

- MemoryBackend (default): same behavior as before, ring buffer +
in-process fan-out. Extracts RingBuffer and pathMatches from broker.go.

- RedisBackend: uses Redis pub/sub for cross-replica delivery and a
Redis LIST for replay buffering. Pipelined LPUSH+LTRIM+PUBLISH keeps
it atomic. Tested with miniredis, including a two-backend
cross-replica delivery test.

The Broker now takes a Backend and has a Start(ctx) method that reads
from the backend's event channel. A monotonic sequence counter prevents
duplicate delivery when backend-buffered events race with Last-Event-ID
replay. New -backend and -redis-url CLI flags wire it up in main.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+936 -140
+10
backend.go
··· 1 + package main 2 + 3 + import "context" 4 + 5 + type Backend interface { 6 + Publish(event *Event) error 7 + Subscribe(ctx context.Context) <-chan *Event 8 + Since(lastEventID string, subscribePath string) []*Event 9 + Close() error 10 + }
+127
backend_memory.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "strings" 6 + "sync" 7 + ) 8 + 9 + type MemoryBackend struct { 10 + mu sync.RWMutex 11 + buffer *RingBuffer 12 + listeners []chan *Event 13 + } 14 + 15 + func NewMemoryBackend(bufferSize int) *MemoryBackend { 16 + return &MemoryBackend{ 17 + buffer: NewRingBuffer(bufferSize), 18 + } 19 + } 20 + 21 + func (m *MemoryBackend) Publish(event *Event) error { 22 + m.buffer.Add(event) 23 + 24 + m.mu.RLock() 25 + defer m.mu.RUnlock() 26 + 27 + for _, ch := range m.listeners { 28 + select { 29 + case ch <- event: 30 + default: 31 + } 32 + } 33 + return nil 34 + } 35 + 36 + func (m *MemoryBackend) Subscribe(ctx context.Context) <-chan *Event { 37 + ch := make(chan *Event, 256) 38 + 39 + m.mu.Lock() 40 + m.listeners = append(m.listeners, ch) 41 + m.mu.Unlock() 42 + 43 + go func() { 44 + <-ctx.Done() 45 + m.mu.Lock() 46 + defer m.mu.Unlock() 47 + for i, l := range m.listeners { 48 + if l == ch { 49 + m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) 50 + break 51 + } 52 + } 53 + }() 54 + 55 + return ch 56 + } 57 + 58 + func (m *MemoryBackend) Since(lastEventID string, subscribePath string) []*Event { 59 + return m.buffer.Since(lastEventID, subscribePath) 60 + } 61 + 62 + func (m *MemoryBackend) Close() error { 63 + return nil 64 + } 65 + 66 + type RingBuffer struct { 67 + mu sync.RWMutex 68 + buf []*Event 69 + size int 70 + write int 71 + count int 72 + } 73 + 74 + func NewRingBuffer(size int) *RingBuffer { 75 + return &RingBuffer{ 76 + buf: make([]*Event, size), 77 + size: size, 78 + } 79 + } 80 + 81 + func (rb *RingBuffer) Add(event *Event) { 82 + rb.mu.Lock() 83 + defer rb.mu.Unlock() 84 + rb.buf[rb.write%rb.size] = event 85 + rb.write++ 86 + if rb.count < rb.size { 87 + rb.count++ 88 + } 89 + } 90 + 91 + func (rb *RingBuffer) Since(lastEventID string, subscribePath string) []*Event { 92 + rb.mu.RLock() 93 + defer rb.mu.RUnlock() 94 + 95 + start := rb.write - rb.count 96 + found := false 97 + foundIdx := start 98 + 99 + for i := start; i < rb.write; i++ { 100 + e := rb.buf[i%rb.size] 101 + if e.ID == lastEventID { 102 + found = true 103 + foundIdx = i + 1 104 + break 105 + } 106 + } 107 + 108 + if !found { 109 + foundIdx = start 110 + } 111 + 112 + var result []*Event 113 + for i := foundIdx; i < rb.write; i++ { 114 + e := rb.buf[i%rb.size] 115 + if pathMatches(subscribePath, e.Path) { 116 + result = append(result, e) 117 + } 118 + } 119 + return result 120 + } 121 + 122 + func pathMatches(subscribePath, eventPath string) bool { 123 + if subscribePath == "" { 124 + return true 125 + } 126 + return eventPath == subscribePath || strings.HasPrefix(eventPath, subscribePath+"/") 127 + }
+119
backend_memory_test.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "testing" 7 + "time" 8 + ) 9 + 10 + func TestMemoryBackend_publishAndSubscribe(t *testing.T) { 11 + backend := NewMemoryBackend(100) 12 + ctx, cancel := context.WithCancel(context.Background()) 13 + defer cancel() 14 + 15 + ch := backend.Subscribe(ctx) 16 + event := &Event{ID: "e1", Path: "test", Payload: map[string]any{}} 17 + backend.Publish(event) 18 + 19 + select { 20 + case got := <-ch: 21 + if got.ID != "e1" { 22 + t.Errorf("expected e1, got %s", got.ID) 23 + } 24 + case <-time.After(time.Second): 25 + t.Fatal("timed out waiting for event") 26 + } 27 + } 28 + 29 + func TestMemoryBackend_multipleListeners(t *testing.T) { 30 + backend := NewMemoryBackend(100) 31 + ctx, cancel := context.WithCancel(context.Background()) 32 + defer cancel() 33 + 34 + ch1 := backend.Subscribe(ctx) 35 + ch2 := backend.Subscribe(ctx) 36 + backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 37 + 38 + for i, ch := range []<-chan *Event{ch1, ch2} { 39 + select { 40 + case got := <-ch: 41 + if got.ID != "e1" { 42 + t.Errorf("listener %d: expected e1, got %s", i, got.ID) 43 + } 44 + case <-time.After(time.Second): 45 + t.Fatalf("listener %d: timed out", i) 46 + } 47 + } 48 + } 49 + 50 + func TestMemoryBackend_cancelRemovesListener(t *testing.T) { 51 + backend := NewMemoryBackend(100) 52 + ctx, cancel := context.WithCancel(context.Background()) 53 + backend.Subscribe(ctx) 54 + cancel() 55 + 56 + // Give the cleanup goroutine time to run 57 + time.Sleep(50 * time.Millisecond) 58 + 59 + backend.mu.RLock() 60 + n := len(backend.listeners) 61 + backend.mu.RUnlock() 62 + if n != 0 { 63 + t.Errorf("expected 0 listeners after cancel, got %d", n) 64 + } 65 + } 66 + 67 + func TestMemoryBackend_since(t *testing.T) { 68 + backend := NewMemoryBackend(100) 69 + backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 70 + backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 71 + backend.Publish(&Event{ID: "e3", Path: "other", Payload: map[string]any{}}) 72 + 73 + events := backend.Since("e1", "test") 74 + if len(events) != 1 { 75 + t.Fatalf("expected 1 event, got %d", len(events)) 76 + } 77 + if events[0].ID != "e2" { 78 + t.Errorf("expected e2, got %s", events[0].ID) 79 + } 80 + } 81 + 82 + func TestMemoryBackend_sinceEmptyPath(t *testing.T) { 83 + backend := NewMemoryBackend(100) 84 + backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 85 + backend.Publish(&Event{ID: "e2", Path: "other", Payload: map[string]any{}}) 86 + 87 + events := backend.Since("e1", "") 88 + if len(events) != 1 { 89 + t.Fatalf("expected 1 event, got %d", len(events)) 90 + } 91 + if events[0].ID != "e2" { 92 + t.Errorf("expected e2, got %s", events[0].ID) 93 + } 94 + } 95 + 96 + func TestMemoryBackend_sinceExpiredID(t *testing.T) { 97 + backend := NewMemoryBackend(3) 98 + for i := range 6 { 99 + backend.Publish(&Event{ID: fmt.Sprintf("e%d", i), Path: "test", Payload: map[string]any{}}) 100 + } 101 + 102 + events := backend.Since("e0", "test") 103 + if len(events) != 3 { 104 + t.Fatalf("expected 3 events (full buffer), got %d", len(events)) 105 + } 106 + if events[0].ID != "e3" { 107 + t.Errorf("expected e3, got %s", events[0].ID) 108 + } 109 + } 110 + 111 + func TestMemoryBackend_closeIsIdempotent(t *testing.T) { 112 + backend := NewMemoryBackend(100) 113 + if err := backend.Close(); err != nil { 114 + t.Errorf("first close: %v", err) 115 + } 116 + if err := backend.Close(); err != nil { 117 + t.Errorf("second close: %v", err) 118 + } 119 + }
+141
backend_redis.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log" 8 + "sync" 9 + 10 + "github.com/redis/go-redis/v9" 11 + ) 12 + 13 + const ( 14 + redisChannel = "wicket:events" 15 + redisBufferKey = "wicket:buffer" 16 + ) 17 + 18 + type RedisBackend struct { 19 + client *redis.Client 20 + bufferSize int 21 + mu sync.Mutex 22 + closed bool 23 + } 24 + 25 + func NewRedisBackend(url string, bufferSize int) (*RedisBackend, error) { 26 + opts, err := redis.ParseURL(url) 27 + if err != nil { 28 + return nil, fmt.Errorf("parsing redis URL: %w", err) 29 + } 30 + client := redis.NewClient(opts) 31 + if err := client.Ping(context.Background()).Err(); err != nil { 32 + return nil, fmt.Errorf("connecting to redis: %w", err) 33 + } 34 + return &RedisBackend{ 35 + client: client, 36 + bufferSize: bufferSize, 37 + }, nil 38 + } 39 + 40 + func newRedisBackendFromClient(client *redis.Client, bufferSize int) *RedisBackend { 41 + return &RedisBackend{ 42 + client: client, 43 + bufferSize: bufferSize, 44 + } 45 + } 46 + 47 + func (r *RedisBackend) Publish(event *Event) error { 48 + data, err := json.Marshal(event) 49 + if err != nil { 50 + return fmt.Errorf("marshaling event: %w", err) 51 + } 52 + ctx := context.Background() 53 + pipe := r.client.Pipeline() 54 + pipe.LPush(ctx, redisBufferKey, data) 55 + pipe.LTrim(ctx, redisBufferKey, 0, int64(r.bufferSize-1)) 56 + pipe.Publish(ctx, redisChannel, data) 57 + _, err = pipe.Exec(ctx) 58 + return err 59 + } 60 + 61 + func (r *RedisBackend) Subscribe(ctx context.Context) <-chan *Event { 62 + ch := make(chan *Event, 256) 63 + pubsub := r.client.Subscribe(ctx, redisChannel) 64 + 65 + go func() { 66 + defer close(ch) 67 + defer pubsub.Close() 68 + msgCh := pubsub.Channel() 69 + for { 70 + select { 71 + case <-ctx.Done(): 72 + return 73 + case msg := <-msgCh: 74 + if msg == nil { 75 + return 76 + } 77 + var event Event 78 + if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { 79 + log.Printf("redis: unmarshal event: %v", err) 80 + continue 81 + } 82 + select { 83 + case ch <- &event: 84 + default: 85 + } 86 + } 87 + } 88 + }() 89 + 90 + return ch 91 + } 92 + 93 + func (r *RedisBackend) Since(lastEventID string, subscribePath string) []*Event { 94 + ctx := context.Background() 95 + vals, err := r.client.LRange(ctx, redisBufferKey, 0, -1).Result() 96 + if err != nil { 97 + log.Printf("redis: LRANGE: %v", err) 98 + return nil 99 + } 100 + 101 + // Redis list is newest-first (LPUSH), reverse to chronological order 102 + events := make([]*Event, 0, len(vals)) 103 + for i := len(vals) - 1; i >= 0; i-- { 104 + var e Event 105 + if err := json.Unmarshal([]byte(vals[i]), &e); err != nil { 106 + continue 107 + } 108 + events = append(events, &e) 109 + } 110 + 111 + found := false 112 + foundIdx := 0 113 + for i, e := range events { 114 + if e.ID == lastEventID { 115 + found = true 116 + foundIdx = i + 1 117 + break 118 + } 119 + } 120 + if !found { 121 + foundIdx = 0 122 + } 123 + 124 + var result []*Event 125 + for i := foundIdx; i < len(events); i++ { 126 + if pathMatches(subscribePath, events[i].Path) { 127 + result = append(result, events[i]) 128 + } 129 + } 130 + return result 131 + } 132 + 133 + func (r *RedisBackend) Close() error { 134 + r.mu.Lock() 135 + defer r.mu.Unlock() 136 + if r.closed { 137 + return nil 138 + } 139 + r.closed = true 140 + return r.client.Close() 141 + }
+301
backend_redis_test.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "testing" 6 + "time" 7 + 8 + "github.com/alicebob/miniredis/v2" 9 + "github.com/redis/go-redis/v9" 10 + ) 11 + 12 + func newTestRedisBackend(t *testing.T, bufferSize int) (*RedisBackend, *miniredis.Miniredis) { 13 + t.Helper() 14 + mr := miniredis.RunT(t) 15 + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 16 + return newRedisBackendFromClient(client, bufferSize), mr 17 + } 18 + 19 + func TestRedisBackend_publishAndSubscribe(t *testing.T) { 20 + backend, _ := newTestRedisBackend(t, 100) 21 + defer backend.Close() 22 + 23 + ctx, cancel := context.WithCancel(context.Background()) 24 + defer cancel() 25 + 26 + ch := backend.Subscribe(ctx) 27 + time.Sleep(50 * time.Millisecond) 28 + 29 + backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{"ok": true}}) 30 + 31 + select { 32 + case got := <-ch: 33 + if got.ID != "e1" { 34 + t.Errorf("expected e1, got %s", got.ID) 35 + } 36 + if got.Path != "test" { 37 + t.Errorf("expected path test, got %s", got.Path) 38 + } 39 + case <-time.After(2 * time.Second): 40 + t.Fatal("timed out waiting for event") 41 + } 42 + } 43 + 44 + func TestRedisBackend_since(t *testing.T) { 45 + backend, _ := newTestRedisBackend(t, 100) 46 + defer backend.Close() 47 + 48 + backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 49 + backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 50 + backend.Publish(&Event{ID: "e3", Path: "other", Payload: map[string]any{}}) 51 + 52 + events := backend.Since("e1", "test") 53 + if len(events) != 1 { 54 + t.Fatalf("expected 1 event, got %d", len(events)) 55 + } 56 + if events[0].ID != "e2" { 57 + t.Errorf("expected e2, got %s", events[0].ID) 58 + } 59 + } 60 + 61 + func TestRedisBackend_sinceExpired(t *testing.T) { 62 + backend, _ := newTestRedisBackend(t, 3) 63 + defer backend.Close() 64 + 65 + backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 66 + backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 67 + backend.Publish(&Event{ID: "e3", Path: "test", Payload: map[string]any{}}) 68 + backend.Publish(&Event{ID: "e4", Path: "test", Payload: map[string]any{}}) 69 + 70 + events := backend.Since("e1", "test") 71 + if len(events) != 3 { 72 + t.Fatalf("expected 3 events (full buffer), got %d", len(events)) 73 + } 74 + if events[0].ID != "e2" { 75 + t.Errorf("expected e2, got %s", events[0].ID) 76 + } 77 + } 78 + 79 + func TestRedisBackend_sincePathFiltering(t *testing.T) { 80 + backend, _ := newTestRedisBackend(t, 100) 81 + defer backend.Close() 82 + 83 + backend.Publish(&Event{ID: "e1", Path: "a/b", Payload: map[string]any{}}) 84 + backend.Publish(&Event{ID: "e2", Path: "a/b/c", Payload: map[string]any{}}) 85 + backend.Publish(&Event{ID: "e3", Path: "x/y", Payload: map[string]any{}}) 86 + 87 + events := backend.Since("e1", "a/b") 88 + if len(events) != 1 { 89 + t.Fatalf("expected 1 event, got %d", len(events)) 90 + } 91 + if events[0].ID != "e2" { 92 + t.Errorf("expected e2, got %s", events[0].ID) 93 + } 94 + } 95 + 96 + func TestRedisBackend_multiReplica(t *testing.T) { 97 + mr := miniredis.RunT(t) 98 + 99 + client1 := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 100 + backend1 := newRedisBackendFromClient(client1, 100) 101 + defer backend1.Close() 102 + 103 + client2 := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 104 + backend2 := newRedisBackendFromClient(client2, 100) 105 + defer backend2.Close() 106 + 107 + ctx, cancel := context.WithCancel(context.Background()) 108 + defer cancel() 109 + 110 + ch2 := backend2.Subscribe(ctx) 111 + time.Sleep(50 * time.Millisecond) 112 + 113 + backend1.Publish(&Event{ID: "cross-replica", Path: "test", Payload: map[string]any{"from": "replica1"}}) 114 + 115 + select { 116 + case got := <-ch2: 117 + if got.ID != "cross-replica" { 118 + t.Errorf("expected cross-replica, got %s", got.ID) 119 + } 120 + case <-time.After(2 * time.Second): 121 + t.Fatal("timed out waiting for cross-replica event") 122 + } 123 + } 124 + 125 + func TestRedisBackend_multiReplicaReplay(t *testing.T) { 126 + mr := miniredis.RunT(t) 127 + 128 + client1 := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 129 + backend1 := newRedisBackendFromClient(client1, 100) 130 + defer backend1.Close() 131 + 132 + backend1.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 133 + backend1.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 134 + 135 + client2 := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 136 + backend2 := newRedisBackendFromClient(client2, 100) 137 + defer backend2.Close() 138 + 139 + events := backend2.Since("e1", "test") 140 + if len(events) != 1 { 141 + t.Fatalf("expected 1 event, got %d", len(events)) 142 + } 143 + if events[0].ID != "e2" { 144 + t.Errorf("expected e2, got %s", events[0].ID) 145 + } 146 + } 147 + 148 + func TestRedisBackend_closeIsIdempotent(t *testing.T) { 149 + backend, _ := newTestRedisBackend(t, 100) 150 + if err := backend.Close(); err != nil { 151 + t.Errorf("first close: %v", err) 152 + } 153 + if err := backend.Close(); err != nil { 154 + t.Errorf("second close: %v", err) 155 + } 156 + } 157 + 158 + func TestRedisBackend_bufferTrims(t *testing.T) { 159 + backend, _ := newTestRedisBackend(t, 3) 160 + defer backend.Close() 161 + 162 + for i := range 10 { 163 + backend.Publish(&Event{ID: string(rune('a' + i)), Path: "test", Payload: map[string]any{}}) 164 + } 165 + 166 + events := backend.Since("", "test") 167 + if len(events) != 3 { 168 + t.Fatalf("expected 3 events in trimmed buffer, got %d", len(events)) 169 + } 170 + } 171 + 172 + func TestNewRedisBackend_success(t *testing.T) { 173 + mr := miniredis.RunT(t) 174 + backend, err := NewRedisBackend("redis://"+mr.Addr(), 100) 175 + if err != nil { 176 + t.Fatalf("expected no error, got %v", err) 177 + } 178 + defer backend.Close() 179 + } 180 + 181 + func TestNewRedisBackend_badURL(t *testing.T) { 182 + _, err := NewRedisBackend("not-a-url", 100) 183 + if err == nil { 184 + t.Fatal("expected error for bad URL") 185 + } 186 + } 187 + 188 + func TestNewRedisBackend_unreachable(t *testing.T) { 189 + _, err := NewRedisBackend("redis://127.0.0.1:1", 100) 190 + if err == nil { 191 + t.Fatal("expected error for unreachable redis") 192 + } 193 + } 194 + 195 + func TestRedisBackend_sinceWithCorruptData(t *testing.T) { 196 + backend, mr := newTestRedisBackend(t, 100) 197 + defer backend.Close() 198 + 199 + backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 200 + mr.Lpush(redisBufferKey, "not-valid-json") 201 + backend.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 202 + 203 + events := backend.Since("e1", "test") 204 + if len(events) != 1 { 205 + t.Fatalf("expected 1 event (corrupt data skipped), got %d", len(events)) 206 + } 207 + if events[0].ID != "e2" { 208 + t.Errorf("expected e2, got %s", events[0].ID) 209 + } 210 + } 211 + 212 + func TestRedisBackend_subscribeContextCancel(t *testing.T) { 213 + backend, _ := newTestRedisBackend(t, 100) 214 + defer backend.Close() 215 + 216 + ctx, cancel := context.WithCancel(context.Background()) 217 + ch := backend.Subscribe(ctx) 218 + cancel() 219 + 220 + // Channel should close after context cancel 221 + select { 222 + case _, ok := <-ch: 223 + if ok { 224 + t.Error("expected channel to be closed") 225 + } 226 + case <-time.After(2 * time.Second): 227 + t.Fatal("timed out waiting for channel close") 228 + } 229 + } 230 + 231 + func TestRedisBackend_subscribeSkipsBadJSON(t *testing.T) { 232 + backend, _ := newTestRedisBackend(t, 100) 233 + defer backend.Close() 234 + 235 + ctx, cancel := context.WithCancel(context.Background()) 236 + defer cancel() 237 + 238 + ch := backend.Subscribe(ctx) 239 + time.Sleep(50 * time.Millisecond) 240 + 241 + // Publish garbage directly to the Redis channel 242 + backend.client.Publish(context.Background(), redisChannel, "not-json") 243 + // Then a valid event 244 + backend.Publish(&Event{ID: "valid", Path: "test", Payload: map[string]any{}}) 245 + 246 + select { 247 + case got := <-ch: 248 + if got.ID != "valid" { 249 + t.Errorf("expected valid, got %s", got.ID) 250 + } 251 + case <-time.After(2 * time.Second): 252 + t.Fatal("timed out") 253 + } 254 + } 255 + 256 + func TestRedisBackend_sinceClosedClient(t *testing.T) { 257 + backend, _ := newTestRedisBackend(t, 100) 258 + backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 259 + backend.client.Close() 260 + backend.closed = true 261 + 262 + events := backend.Since("", "test") 263 + if events != nil { 264 + t.Errorf("expected nil, got %v", events) 265 + } 266 + } 267 + 268 + func TestRedisBackend_subscribeNilMessage(t *testing.T) { 269 + mr := miniredis.RunT(t) 270 + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) 271 + backend := newRedisBackendFromClient(client, 100) 272 + 273 + ctx, cancel := context.WithCancel(context.Background()) 274 + defer cancel() 275 + 276 + ch := backend.Subscribe(ctx) 277 + time.Sleep(50 * time.Millisecond) 278 + 279 + // Close the client, which causes the pubsub channel to close 280 + client.Close() 281 + mr.Close() 282 + 283 + select { 284 + case _, ok := <-ch: 285 + if ok { 286 + t.Error("expected channel to be closed after client close") 287 + } 288 + case <-time.After(5 * time.Second): 289 + t.Fatal("timed out waiting for channel close") 290 + } 291 + } 292 + 293 + func TestRedisBackend_publishMarshalError(t *testing.T) { 294 + backend, _ := newTestRedisBackend(t, 100) 295 + defer backend.Close() 296 + 297 + err := backend.Publish(&Event{ID: "bad", Path: "test", Payload: make(chan int)}) 298 + if err == nil { 299 + t.Fatal("expected marshal error") 300 + } 301 + }
+34 -73
broker.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 4 5 "strings" 5 6 "sync" 7 + "sync/atomic" 6 8 ) 7 9 8 10 type subscriber struct { 9 - ch chan *Event 10 - path string 11 + ch chan *Event 12 + path string 13 + startSeq int64 11 14 } 12 15 13 16 type Broker struct { 14 17 mu sync.RWMutex 15 18 subscribers map[string][]*subscriber 16 - buffer *RingBuffer 19 + backend Backend 20 + seq atomic.Int64 17 21 } 18 22 19 - func NewBroker(bufferSize int) *Broker { 23 + func NewBroker(backend Backend) *Broker { 20 24 return &Broker{ 21 25 subscribers: make(map[string][]*subscriber), 22 - buffer: NewRingBuffer(bufferSize), 26 + backend: backend, 23 27 } 24 28 } 25 29 26 - func (b *Broker) Publish(event *Event) { 27 - b.buffer.Add(event) 30 + func (b *Broker) Start(ctx context.Context) { 31 + ch := b.backend.Subscribe(ctx) 32 + go func() { 33 + var fanOutSeq int64 34 + for event := range ch { 35 + fanOutSeq++ 36 + b.fanOut(event, fanOutSeq) 37 + } 38 + }() 39 + } 28 40 41 + func (b *Broker) fanOut(event *Event, seq int64) { 29 42 b.mu.RLock() 30 43 defer b.mu.RUnlock() 31 44 32 45 for _, path := range publishPaths(event.Path) { 33 46 for _, sub := range b.subscribers[path] { 47 + if seq <= sub.startSeq { 48 + continue 49 + } 34 50 select { 35 51 case sub.ch <- event: 36 52 default: ··· 39 55 } 40 56 } 41 57 58 + func (b *Broker) Publish(event *Event) error { 59 + b.seq.Add(1) 60 + return b.backend.Publish(event) 61 + } 62 + 42 63 func (b *Broker) Subscribe(path string, lastEventID string) (<-chan *Event, func()) { 43 64 ch := make(chan *Event, 64) 44 - sub := &subscriber{ch: ch, path: path} 65 + sub := &subscriber{ch: ch, path: path, startSeq: b.seq.Load()} 45 66 46 67 b.mu.Lock() 47 68 b.subscribers[path] = append(b.subscribers[path], sub) 48 69 b.mu.Unlock() 49 70 50 71 if lastEventID != "" { 51 - events := b.buffer.Since(lastEventID, path) 72 + events := b.backend.Since(lastEventID, path) 52 73 for _, e := range events { 53 74 ch <- e 54 75 } ··· 64 85 break 65 86 } 66 87 } 67 - close(ch) 88 + func() { 89 + defer func() { recover() }() 90 + close(ch) 91 + }() 68 92 } 69 93 70 94 return ch, unsub ··· 85 109 } 86 110 return paths 87 111 } 88 - 89 - type RingBuffer struct { 90 - mu sync.RWMutex 91 - buf []*Event 92 - size int 93 - write int 94 - count int 95 - } 96 - 97 - func NewRingBuffer(size int) *RingBuffer { 98 - return &RingBuffer{ 99 - buf: make([]*Event, size), 100 - size: size, 101 - } 102 - } 103 - 104 - func (rb *RingBuffer) Add(event *Event) { 105 - rb.mu.Lock() 106 - defer rb.mu.Unlock() 107 - rb.buf[rb.write%rb.size] = event 108 - rb.write++ 109 - if rb.count < rb.size { 110 - rb.count++ 111 - } 112 - } 113 - 114 - func (rb *RingBuffer) Since(lastEventID string, subscribePath string) []*Event { 115 - rb.mu.RLock() 116 - defer rb.mu.RUnlock() 117 - 118 - start := rb.write - rb.count 119 - found := false 120 - foundIdx := start 121 - 122 - for i := start; i < rb.write; i++ { 123 - e := rb.buf[i%rb.size] 124 - if e.ID == lastEventID { 125 - found = true 126 - foundIdx = i + 1 127 - break 128 - } 129 - } 130 - 131 - if !found { 132 - foundIdx = start 133 - } 134 - 135 - var result []*Event 136 - for i := foundIdx; i < rb.write; i++ { 137 - e := rb.buf[i%rb.size] 138 - if pathMatches(subscribePath, e.Path) { 139 - result = append(result, e) 140 - } 141 - } 142 - return result 143 - } 144 - 145 - func pathMatches(subscribePath, eventPath string) bool { 146 - if subscribePath == "" { 147 - return true 148 - } 149 - return eventPath == subscribePath || strings.HasPrefix(eventPath, subscribePath+"/") 150 - }
+31 -11
broker_test.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 4 5 "fmt" 5 6 "testing" 6 7 "time" ··· 35 36 } 36 37 } 37 38 39 + func newTestBroker(bufferSize int) (*Broker, context.CancelFunc) { 40 + backend := NewMemoryBackend(bufferSize) 41 + broker := NewBroker(backend) 42 + ctx, cancel := context.WithCancel(context.Background()) 43 + broker.Start(ctx) 44 + return broker, cancel 45 + } 46 + 38 47 func TestBroker_exactPathDelivery(t *testing.T) { 39 - b := NewBroker(100) 48 + b, cancel := newTestBroker(100) 49 + defer cancel() 40 50 ch, unsub := b.Subscribe("github.com/chrisguidry/docketeer", "") 41 51 defer unsub() 42 52 ··· 50 60 } 51 61 52 62 func TestBroker_parentReceivesChildEvents(t *testing.T) { 53 - b := NewBroker(100) 63 + b, cancel := newTestBroker(100) 64 + defer cancel() 54 65 ch, unsub := b.Subscribe("github.com/chrisguidry", "") 55 66 defer unsub() 56 67 ··· 64 75 } 65 76 66 77 func TestBroker_rootReceivesAll(t *testing.T) { 67 - b := NewBroker(100) 78 + b, cancel := newTestBroker(100) 79 + defer cancel() 68 80 ch, unsub := b.Subscribe("", "") 69 81 defer unsub() 70 82 ··· 78 90 } 79 91 80 92 func TestBroker_unrelatedSubscriberDoesNotReceive(t *testing.T) { 81 - b := NewBroker(100) 93 + b, cancel := newTestBroker(100) 94 + defer cancel() 82 95 ch, unsub := b.Subscribe("gitlab.com", "") 83 96 defer unsub() 84 97 ··· 88 101 } 89 102 90 103 func TestBroker_multipleSubscribersSamePath(t *testing.T) { 91 - b := NewBroker(100) 104 + b, cancel := newTestBroker(100) 105 + defer cancel() 92 106 ch1, unsub1 := b.Subscribe("github.com/chrisguidry/docketeer", "") 93 107 defer unsub1() 94 108 ch2, unsub2 := b.Subscribe("github.com/chrisguidry/docketeer", "") ··· 101 115 } 102 116 103 117 func TestBroker_unsubscribeStopsDelivery(t *testing.T) { 104 - b := NewBroker(100) 118 + b, cancel := newTestBroker(100) 119 + defer cancel() 105 120 ch, unsub := b.Subscribe("github.com/chrisguidry/docketeer", "") 106 121 unsub() 107 122 ··· 118 133 } 119 134 120 135 func TestBroker_ringBufferWraps(t *testing.T) { 121 - b := NewBroker(5) 136 + b, cancel := newTestBroker(5) 137 + defer cancel() 122 138 for i := range 10 { 123 139 b.Publish(&Event{ 124 140 ID: fmt.Sprintf("event-%d", i), ··· 140 156 } 141 157 142 158 func TestBroker_lastEventIDReplay(t *testing.T) { 143 - b := NewBroker(100) 159 + b, cancel := newTestBroker(100) 160 + defer cancel() 144 161 b.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 145 162 b.Publish(&Event{ID: "e2", Path: "test", Payload: map[string]any{}}) 146 163 b.Publish(&Event{ID: "e3", Path: "test", Payload: map[string]any{}}) ··· 156 173 } 157 174 158 175 func TestBroker_lastEventIDRespectsPathHierarchy(t *testing.T) { 159 - b := NewBroker(100) 176 + b, cancel := newTestBroker(100) 177 + defer cancel() 160 178 b.Publish(&Event{ID: "e1", Path: "github.com/chrisguidry/docketeer", Payload: map[string]any{}}) 161 179 b.Publish(&Event{ID: "e2", Path: "gitlab.com/other/project", Payload: map[string]any{}}) 162 180 b.Publish(&Event{ID: "e3", Path: "github.com/chrisguidry/other", Payload: map[string]any{}}) ··· 172 190 } 173 191 174 192 func TestBroker_lastEventIDReplayExactPath(t *testing.T) { 175 - b := NewBroker(100) 193 + b, cancel := newTestBroker(100) 194 + defer cancel() 176 195 b.Publish(&Event{ID: "e1", Path: "exact/path", Payload: map[string]any{}}) 177 196 b.Publish(&Event{ID: "e2", Path: "exact/path/child", Payload: map[string]any{}}) 178 197 b.Publish(&Event{ID: "e3", Path: "exact/path", Payload: map[string]any{}}) ··· 211 230 } 212 231 213 232 func TestBroker_lastEventIDExpiredFromBuffer(t *testing.T) { 214 - b := NewBroker(3) 233 + b, cancel := newTestBroker(3) 234 + defer cancel() 215 235 b.Publish(&Event{ID: "old1", Path: "test", Payload: map[string]any{}}) 216 236 b.Publish(&Event{ID: "old2", Path: "test", Payload: map[string]any{}}) 217 237 b.Publish(&Event{ID: "old3", Path: "test", Payload: map[string]any{}})
+9 -1
go.mod
··· 8 8 gopkg.in/yaml.v3 v3.0.1 9 9 ) 10 10 11 - require golang.org/x/sys v0.13.0 // indirect 11 + require ( 12 + github.com/alicebob/miniredis/v2 v2.37.0 // indirect 13 + github.com/cespare/xxhash/v2 v2.3.0 // indirect 14 + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect 15 + github.com/redis/go-redis/v9 v9.18.0 // indirect 16 + github.com/yuin/gopher-lua v1.1.1 // indirect 17 + go.uber.org/atomic v1.11.0 // indirect 18 + golang.org/x/sys v0.13.0 // indirect 19 + )
+12
go.sum
··· 1 + github.com/alicebob/miniredis/v2 v2.37.0 h1:RheObYW32G1aiJIj81XVt78ZHJpHonHLHW7OLIshq68= 2 + github.com/alicebob/miniredis/v2 v2.37.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= 3 + github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 4 + github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 5 + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= 6 + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= 1 7 github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= 2 8 github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= 3 9 github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= 4 10 github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= 11 + github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= 12 + github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0= 13 + github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= 14 + github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= 15 + go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= 16 + go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= 5 17 golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= 6 18 golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 7 19 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+23 -1
main.go
··· 16 16 address := flag.String("address", ":8080", "listen address") 17 17 configPath := flag.String("configuration", "", "path to configuration file") 18 18 bufferSize := flag.Int("buffer-size", 1000, "event replay buffer size") 19 + backendType := flag.String("backend", "memory", "pub/sub backend: memory or redis") 20 + redisURL := flag.String("redis-url", "redis://localhost:6379", "Redis connection URL (when backend=redis)") 19 21 flag.Parse() 20 22 21 23 var cfgPtr atomic.Pointer[Configuration] ··· 37 39 defer stop() 38 40 } 39 41 40 - broker := NewBroker(*bufferSize) 42 + var backend Backend 43 + switch *backendType { 44 + case "memory": 45 + backend = NewMemoryBackend(*bufferSize) 46 + case "redis": 47 + var err error 48 + backend, err = NewRedisBackend(*redisURL, *bufferSize) 49 + if err != nil { 50 + log.Fatalf("connecting to redis: %v", err) 51 + } 52 + default: 53 + log.Fatalf("unknown backend: %s", *backendType) 54 + } 55 + defer backend.Close() 56 + 57 + ctx, cancel := context.WithCancel(context.Background()) 58 + defer cancel() 59 + 60 + broker := NewBroker(backend) 61 + broker.Start(ctx) 41 62 handler := NewServer(broker, &cfgPtr) 42 63 43 64 server := &http.Server{ ··· 63 84 log.Printf("reloaded configuration from %s", *configPath) 64 85 case syscall.SIGINT, syscall.SIGTERM: 65 86 log.Printf("shutting down") 87 + cancel() 66 88 server.Shutdown(context.Background()) 67 89 return 68 90 }
+12 -6
payload_test.go
··· 40 40 } 41 41 42 42 func TestServer_postFormDataStoredAsText(t *testing.T) { 43 - ts, broker := newTestServer(nil) 43 + ts, broker, cancel := newTestServer(nil) 44 + defer cancel() 44 45 defer ts.Close() 45 46 46 47 event := postAndReceive(t, ts, broker, "application/x-www-form-urlencoded", "foo=bar&baz=qux") ··· 55 56 } 56 57 57 58 func TestServer_postPlainTextStoredAsText(t *testing.T) { 58 - ts, broker := newTestServer(nil) 59 + ts, broker, cancel := newTestServer(nil) 60 + defer cancel() 59 61 defer ts.Close() 60 62 61 63 event := postAndReceive(t, ts, broker, "text/plain", "hello world") ··· 70 72 } 71 73 72 74 func TestServer_postBinaryBase64Encoded(t *testing.T) { 73 - ts, broker := newTestServer(nil) 75 + ts, broker, cancel := newTestServer(nil) 76 + defer cancel() 74 77 defer ts.Close() 75 78 76 79 event := postAndReceive(t, ts, broker, "application/octet-stream", "\x00\x01\x02\x03") ··· 86 89 } 87 90 88 91 func TestServer_postIncludesMethod(t *testing.T) { 89 - ts, broker := newTestServer(nil) 92 + ts, broker, cancel := newTestServer(nil) 93 + defer cancel() 90 94 defer ts.Close() 91 95 92 96 event := postAndReceive(t, ts, broker, "application/json", `{"ok":true}`) ··· 97 101 } 98 102 99 103 func TestServer_postMalformedContentTypeFallsBackToBase64(t *testing.T) { 100 - ts, broker := newTestServer(nil) 104 + ts, broker, cancel := newTestServer(nil) 105 + defer cancel() 101 106 defer ts.Close() 102 107 103 108 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ··· 161 166 } 162 167 163 168 func TestServer_postJSONViaSSERoundTrip(t *testing.T) { 164 - ts, _ := newTestServer(nil) 169 + ts, _, cancel := newTestServer(nil) 170 + defer cancel() 165 171 defer ts.Close() 166 172 167 173 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+5 -1
server.go
··· 102 102 Payload: payload, 103 103 } 104 104 105 - s.broker.Publish(event) 105 + if err := s.broker.Publish(event); err != nil { 106 + log.Printf("publish failed for %s: %v", path, err) 107 + http.Error(w, "publish failed", http.StatusInternalServerError) 108 + return 109 + } 106 110 log.Printf("published %s %s (%s) id=%s", r.Method, path, r.Header.Get("Content-Type"), event.ID) 107 111 w.WriteHeader(http.StatusAccepted) 108 112 }
+68 -18
server_test.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 4 5 "crypto/hmac" 5 6 "crypto/sha256" 6 7 "encoding/hex" 8 + "fmt" 7 9 "net/http" 8 10 "net/http/httptest" 9 11 "strings" ··· 12 14 "time" 13 15 ) 14 16 15 - func newTestServer(cfg *Configuration) (*httptest.Server, *Broker) { 16 - broker := NewBroker(100) 17 + func newTestServer(cfg *Configuration) (*httptest.Server, *Broker, context.CancelFunc) { 18 + backend := NewMemoryBackend(100) 19 + broker := NewBroker(backend) 20 + ctx, cancel := context.WithCancel(context.Background()) 21 + broker.Start(ctx) 17 22 var cfgPtr atomic.Pointer[Configuration] 18 23 if cfg != nil { 19 24 cfgPtr.Store(cfg) 20 25 } 21 26 handler := NewServer(broker, &cfgPtr) 22 - return httptest.NewServer(handler), broker 27 + return httptest.NewServer(handler), broker, cancel 23 28 } 24 29 25 30 func TestServer_healthEndpoint(t *testing.T) { 26 - ts, _ := newTestServer(nil) 31 + ts, _, cancel := newTestServer(nil) 32 + defer cancel() 27 33 defer ts.Close() 28 34 29 35 resp, err := http.Get(ts.URL + "/_health") ··· 42 48 } 43 49 44 50 func TestServer_postPublishesEvent(t *testing.T) { 45 - ts, _ := newTestServer(nil) 51 + ts, _, cancel := newTestServer(nil) 52 + defer cancel() 46 53 defer ts.Close() 47 54 48 55 resp, err := http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"hello":"world"}`)) ··· 56 63 } 57 64 58 65 func TestServer_postTrailingSlashNormalized(t *testing.T) { 59 - ts, broker := newTestServer(nil) 66 + ts, broker, cancel := newTestServer(nil) 67 + defer cancel() 60 68 defer ts.Close() 61 69 62 70 ch, unsub := broker.Subscribe("test/topic", "") ··· 84 92 }, 85 93 }, 86 94 } 87 - ts, _ := newTestServer(cfg) 95 + ts, _, cancel := newTestServer(cfg) 96 + defer cancel() 88 97 defer ts.Close() 89 98 90 99 body := `{"action":"push"}` ··· 115 124 }, 116 125 }, 117 126 } 118 - ts, _ := newTestServer(cfg) 127 + ts, _, cancel := newTestServer(cfg) 128 + defer cancel() 119 129 defer ts.Close() 120 130 121 131 req, _ := http.NewRequest("POST", ts.URL+"/secure/path", strings.NewReader(`{"bad":"data"}`)) ··· 141 151 }, 142 152 }, 143 153 } 144 - ts, _ := newTestServer(cfg) 154 + ts, _, cancel := newTestServer(cfg) 155 + defer cancel() 145 156 defer ts.Close() 146 157 147 158 resp, err := http.Post(ts.URL+"/open/path", "application/json", strings.NewReader(`{"ok":true}`)) ··· 155 166 } 156 167 157 168 func TestServer_getWithoutSSEAccept(t *testing.T) { 158 - ts, _ := newTestServer(nil) 169 + ts, _, cancel := newTestServer(nil) 170 + defer cancel() 159 171 defer ts.Close() 160 172 161 173 resp, err := http.Get(ts.URL + "/test/topic") ··· 169 181 } 170 182 171 183 func TestServer_corsHeaders(t *testing.T) { 172 - ts, _ := newTestServer(nil) 184 + ts, _, cancel := newTestServer(nil) 185 + defer cancel() 173 186 defer ts.Close() 174 187 175 188 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{}`)) ··· 184 197 } 185 198 186 199 func TestServer_optionsPreflight(t *testing.T) { 187 - ts, _ := newTestServer(nil) 200 + ts, _, cancel := newTestServer(nil) 201 + defer cancel() 188 202 defer ts.Close() 189 203 190 204 req, _ := http.NewRequest("OPTIONS", ts.URL+"/test", nil) ··· 202 216 } 203 217 204 218 func TestServer_methodNotAllowed(t *testing.T) { 205 - ts, _ := newTestServer(nil) 219 + ts, _, cancel := newTestServer(nil) 220 + defer cancel() 206 221 defer ts.Close() 207 222 208 223 req, _ := http.NewRequest("DELETE", ts.URL+"/test", nil) ··· 226 241 }, 227 242 }, 228 243 } 229 - ts, _ := newTestServer(cfg) 244 + ts, _, cancel := newTestServer(cfg) 245 + defer cancel() 230 246 defer ts.Close() 231 247 232 248 req, _ := http.NewRequest("POST", ts.URL+"/bad/path", strings.NewReader(`{}`)) ··· 242 258 } 243 259 244 260 func TestServer_postInvalidJSON(t *testing.T) { 245 - ts, _ := newTestServer(nil) 261 + ts, _, cancel := newTestServer(nil) 262 + defer cancel() 246 263 defer ts.Close() 247 264 248 265 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{not json`)) ··· 266 283 func (w *bareResponseWriter) WriteHeader(code int) { w.code = code } 267 284 268 285 func TestServer_sseWithoutFlusher(t *testing.T) { 269 - broker := NewBroker(100) 286 + backend := NewMemoryBackend(100) 287 + broker := NewBroker(backend) 288 + ctx, cancel := context.WithCancel(context.Background()) 289 + defer cancel() 290 + broker.Start(ctx) 270 291 var cfgPtr atomic.Pointer[Configuration] 271 292 handler := NewServer(broker, &cfgPtr) 272 293 ··· 291 312 }, 292 313 }, 293 314 } 294 - ts, _ := newTestServer(cfg) 315 + ts, _, cancel := newTestServer(cfg) 316 + defer cancel() 295 317 defer ts.Close() 296 318 297 319 body := `{"action":"push"}` ··· 324 346 } 325 347 } 326 348 349 + type failingBackend struct{ MemoryBackend } 350 + 351 + func (f *failingBackend) Publish(*Event) error { 352 + return fmt.Errorf("backend unavailable") 353 + } 354 + 355 + func TestServer_postPublishError(t *testing.T) { 356 + backend := &failingBackend{MemoryBackend: *NewMemoryBackend(100)} 357 + broker := NewBroker(backend) 358 + ctx, cancel := context.WithCancel(context.Background()) 359 + defer cancel() 360 + broker.Start(ctx) 361 + var cfgPtr atomic.Pointer[Configuration] 362 + handler := NewServer(broker, &cfgPtr) 363 + ts := httptest.NewServer(handler) 364 + defer ts.Close() 365 + 366 + resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{}`)) 367 + if err != nil { 368 + t.Fatalf("POST failed: %v", err) 369 + } 370 + defer resp.Body.Close() 371 + if resp.StatusCode != http.StatusInternalServerError { 372 + t.Errorf("expected 500, got %d", resp.StatusCode) 373 + } 374 + } 375 + 327 376 func TestServer_postWithMissingSignatureOnSecuredPath(t *testing.T) { 328 377 cfg := &Configuration{ 329 378 Paths: map[string]PathConfiguration{ ··· 334 383 }, 335 384 }, 336 385 } 337 - ts, _ := newTestServer(cfg) 386 + ts, _, cancel := newTestServer(cfg) 387 + defer cancel() 338 388 defer ts.Close() 339 389 340 390 resp, err := http.Post(ts.URL+"/secure/path", "application/json", strings.NewReader(`{}`))
+44 -29
sse_test.go
··· 46 46 } 47 47 48 48 func TestServer_postAndSSEReceive(t *testing.T) { 49 - ts, _ := newTestServer(nil) 49 + ts, _, cancel := newTestServer(nil) 50 + defer cancel() 50 51 defer ts.Close() 51 52 52 - ctx, cancel := context.WithCancel(context.Background()) 53 - defer cancel() 53 + ctx, cancelSSE := context.WithCancel(context.Background()) 54 + defer cancelSSE() 54 55 55 56 events := sseSubscribe(ctx, ts.URL+"/test/topic", nil) 56 57 time.Sleep(50 * time.Millisecond) ··· 73 74 "private/topic": {SubscribeSecret: "my-token"}, 74 75 }, 75 76 } 76 - ts, _ := newTestServer(cfg) 77 + ts, _, cancel := newTestServer(cfg) 78 + defer cancel() 77 79 defer ts.Close() 78 80 79 81 req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil) ··· 97 99 "private/topic": {SubscribeSecret: "my-token"}, 98 100 }, 99 101 } 100 - ts, _ := newTestServer(cfg) 102 + ts, _, cancel := newTestServer(cfg) 103 + defer cancel() 101 104 defer ts.Close() 102 105 103 106 req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil) ··· 114 117 } 115 118 116 119 func TestServer_sseToOpenPath(t *testing.T) { 117 - ts, _ := newTestServer(nil) 120 + ts, _, cancel := newTestServer(nil) 121 + defer cancel() 118 122 defer ts.Close() 119 123 120 124 req, _ := http.NewRequest("GET", ts.URL+"/open/topic", nil) ··· 136 140 } 137 141 138 142 func TestServer_prefixSubscription(t *testing.T) { 139 - ts, _ := newTestServer(nil) 143 + ts, _, cancel := newTestServer(nil) 144 + defer cancel() 140 145 defer ts.Close() 141 146 142 - ctx, cancel := context.WithCancel(context.Background()) 143 - defer cancel() 147 + ctx, cancelSSE := context.WithCancel(context.Background()) 148 + defer cancelSSE() 144 149 145 150 events := sseSubscribe(ctx, ts.URL+"/github.com/chrisguidry", nil) 146 151 time.Sleep(50 * time.Millisecond) ··· 158 163 } 159 164 160 165 func TestServer_prefixSubscriptionWithTrailingSlash(t *testing.T) { 161 - ts, _ := newTestServer(nil) 166 + ts, _, cancel := newTestServer(nil) 167 + defer cancel() 162 168 defer ts.Close() 163 169 164 - ctx, cancel := context.WithCancel(context.Background()) 165 - defer cancel() 170 + ctx, cancelSSE := context.WithCancel(context.Background()) 171 + defer cancelSSE() 166 172 167 173 events := sseSubscribe(ctx, ts.URL+"/test/", nil) 168 174 time.Sleep(50 * time.Millisecond) ··· 180 186 } 181 187 182 188 func TestServer_lastEventIDReplay(t *testing.T) { 183 - ts, broker := newTestServer(nil) 189 + ts, broker, cancel := newTestServer(nil) 190 + defer cancel() 184 191 defer ts.Close() 185 192 186 193 broker.Publish(&Event{ ··· 194 201 Payload: map[string]any{"n": 2}, 195 202 }) 196 203 197 - ctx, cancel := context.WithCancel(context.Background()) 198 - defer cancel() 204 + ctx, cancelSSE := context.WithCancel(context.Background()) 205 + defer cancelSSE() 199 206 200 207 events := sseSubscribe(ctx, ts.URL+"/test/topic", map[string]string{ 201 208 "Last-Event-ID": "replay-1", ··· 212 219 } 213 220 214 221 func TestServer_filterQueryParam(t *testing.T) { 215 - ts, _ := newTestServer(nil) 222 + ts, _, cancel := newTestServer(nil) 223 + defer cancel() 216 224 defer ts.Close() 217 225 218 - ctx, cancel := context.WithCancel(context.Background()) 219 - defer cancel() 226 + ctx, cancelSSE := context.WithCancel(context.Background()) 227 + defer cancelSSE() 220 228 221 229 events := sseSubscribe(ctx, ts.URL+"/test/topic?filter=payload.ref:refs/heads/main", nil) 222 230 time.Sleep(50 * time.Millisecond) ··· 242 250 "private/topic": {SubscribeSecret: "my-token"}, 243 251 }, 244 252 } 245 - ts, _ := newTestServer(cfg) 253 + ts, _, cancel := newTestServer(cfg) 254 + defer cancel() 246 255 defer ts.Close() 247 256 248 257 req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil) ··· 258 267 } 259 268 260 269 func TestServer_sseChannelClosed(t *testing.T) { 261 - broker := NewBroker(100) 270 + backend := NewMemoryBackend(100) 271 + broker := NewBroker(backend) 272 + ctx, cancel := context.WithCancel(context.Background()) 273 + defer cancel() 274 + broker.Start(ctx) 262 275 var cfgPtr atomic.Pointer[Configuration] 263 276 handler := NewServer(broker, &cfgPtr) 264 277 ts := httptest.NewServer(handler) 265 278 defer ts.Close() 266 279 267 - ctx, cancel := context.WithCancel(context.Background()) 268 - defer cancel() 280 + sseCtx, sseCancel := context.WithCancel(context.Background()) 281 + defer sseCancel() 269 282 270 - events := sseSubscribe(ctx, ts.URL+"/test/topic", nil) 283 + events := sseSubscribe(sseCtx, ts.URL+"/test/topic", nil) 271 284 time.Sleep(50 * time.Millisecond) 272 285 273 286 broker.mu.Lock() ··· 292 305 } 293 306 294 307 func TestServer_sseSkipsUnmarshalableEvent(t *testing.T) { 295 - ts, broker := newTestServer(nil) 308 + ts, broker, cancel := newTestServer(nil) 309 + defer cancel() 296 310 defer ts.Close() 297 311 298 - ctx, cancel := context.WithCancel(context.Background()) 299 - defer cancel() 312 + ctx, cancelSSE := context.WithCancel(context.Background()) 313 + defer cancelSSE() 300 314 301 315 events := sseSubscribe(ctx, ts.URL+"/test/topic", nil) 302 316 time.Sleep(50 * time.Millisecond) ··· 323 337 } 324 338 325 339 func TestServer_textPayloadStoredAsString(t *testing.T) { 326 - ts, _ := newTestServer(nil) 340 + ts, _, cancel := newTestServer(nil) 341 + defer cancel() 327 342 defer ts.Close() 328 343 329 - ctx, cancel := context.WithCancel(context.Background()) 330 - defer cancel() 344 + ctx, cancelSSE := context.WithCancel(context.Background()) 345 + defer cancelSSE() 331 346 332 347 done := make(chan string, 1) 333 348 go func() {