Webhook-to-SSE gateway with hierarchical topic routing and signature verification

Clean up empty subscriber map entries and add TTL to Redis buffer

The broker's subscriber map accumulated empty slices for paths that no
longer had any subscribers. Now it deletes the key when the last one
unsubscribes. The Redis buffer key also had no expiry, so it would
stick around forever after all instances shut down — now it gets a 24h
TTL refreshed on each publish.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+43
+2
backend_redis.go
··· 6 6 "fmt" 7 7 "log" 8 8 "sync" 9 + "time" 9 10 10 11 "github.com/redis/go-redis/v9" 11 12 ) ··· 53 54 pipe := r.client.Pipeline() 54 55 pipe.LPush(ctx, redisBufferKey, data) 55 56 pipe.LTrim(ctx, redisBufferKey, 0, int64(r.bufferSize-1)) 57 + pipe.Expire(ctx, redisBufferKey, 24*time.Hour) 56 58 pipe.Publish(ctx, redisChannel, data) 57 59 _, err = pipe.Exec(ctx) 58 60 return err
+15
backend_redis_test.go
··· 169 169 } 170 170 } 171 171 172 + func TestRedisBackend_bufferHasTTL(t *testing.T) { 173 + backend, mr := newTestRedisBackend(t, 100) 174 + defer backend.Close() 175 + 176 + backend.Publish(&Event{ID: "e1", Path: "test", Payload: map[string]any{}}) 177 + 178 + ttl := mr.TTL(redisBufferKey) 179 + if ttl <= 0 { 180 + t.Fatalf("expected positive TTL on buffer key, got %v", ttl) 181 + } 182 + if ttl > 24*time.Hour { 183 + t.Fatalf("expected TTL <= 24h, got %v", ttl) 184 + } 185 + } 186 + 172 187 func TestNewRedisBackend_success(t *testing.T) { 173 188 mr := miniredis.RunT(t) 174 189 backend, err := NewRedisBackend("redis://"+mr.Addr(), 100)
+3
broker.go
··· 85 85 break 86 86 } 87 87 } 88 + if len(b.subscribers[path]) == 0 { 89 + delete(b.subscribers, path) 90 + } 88 91 func() { 89 92 defer func() { recover() }() 90 93 close(ch)
+23
broker_test.go
··· 132 132 } 133 133 } 134 134 135 + func TestBroker_unsubscribeCleansUpEmptyPath(t *testing.T) { 136 + b, cancel := newTestBroker(100) 137 + defer cancel() 138 + 139 + _, unsub := b.Subscribe("ephemeral/path", "") 140 + 141 + b.mu.RLock() 142 + _, exists := b.subscribers["ephemeral/path"] 143 + b.mu.RUnlock() 144 + if !exists { 145 + t.Fatal("expected subscriber map entry to exist") 146 + } 147 + 148 + unsub() 149 + 150 + b.mu.RLock() 151 + _, exists = b.subscribers["ephemeral/path"] 152 + b.mu.RUnlock() 153 + if exists { 154 + t.Fatal("expected subscriber map entry to be deleted after last unsub") 155 + } 156 + } 157 + 135 158 func TestBroker_ringBufferWraps(t *testing.T) { 136 159 b, cancel := newTestBroker(5) 137 160 defer cancel()