Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "math"
9 "net/http"
10 "net/http/httptest"
11 "strings"
12 "sync/atomic"
13 "testing"
14 "time"
15)
16
17func sseSubscribe(ctx context.Context, url string, headers map[string]string) <-chan *Event {
18 events := make(chan *Event, 10)
19 go func() {
20 defer close(events)
21 req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
22 req.Header.Set("Accept", "text/event-stream")
23 for k, v := range headers {
24 req.Header.Set(k, v)
25 }
26 resp, err := http.DefaultClient.Do(req)
27 if err != nil {
28 return
29 }
30 defer resp.Body.Close()
31 scanner := bufio.NewScanner(resp.Body)
32 for scanner.Scan() {
33 line := scanner.Text()
34 if strings.HasPrefix(line, "data: ") {
35 var event Event
36 json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &event)
37 select {
38 case events <- &event:
39 case <-ctx.Done():
40 return
41 }
42 }
43 }
44 }()
45 return events
46}
47
48func TestServer_postAndSSEReceive(t *testing.T) {
49 ts, _, cancel := newTestServer(nil)
50 defer cancel()
51 defer ts.Close()
52
53 ctx, cancelSSE := context.WithCancel(context.Background())
54 defer cancelSSE()
55
56 events := sseSubscribe(ctx, ts.URL+"/test/topic", nil)
57 time.Sleep(50 * time.Millisecond)
58
59 http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"hello":"world"}`))
60
61 select {
62 case event := <-events:
63 if event.Path != "test/topic" {
64 t.Errorf("expected path test/topic, got %s", event.Path)
65 }
66 case <-time.After(2 * time.Second):
67 t.Fatal("timed out waiting for SSE event")
68 }
69}
70
71func TestServer_sseWithValidBearerToken(t *testing.T) {
72 cfg := &Configuration{
73 Paths: map[string]PathConfiguration{
74 "private/topic": {SubscribeSecret: "my-token"},
75 },
76 }
77 ts, _, cancel := newTestServer(cfg)
78 defer cancel()
79 defer ts.Close()
80
81 req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil)
82 req.Header.Set("Accept", "text/event-stream")
83 req.Header.Set("Authorization", "Bearer my-token")
84
85 client := &http.Client{Timeout: 500 * time.Millisecond}
86 resp, err := client.Do(req)
87 if err != nil {
88 t.Fatalf("GET failed: %v", err)
89 }
90 defer resp.Body.Close()
91 if resp.StatusCode != http.StatusOK {
92 t.Errorf("expected 200, got %d", resp.StatusCode)
93 }
94}
95
96func TestServer_sseWithWrongToken(t *testing.T) {
97 cfg := &Configuration{
98 Paths: map[string]PathConfiguration{
99 "private/topic": {SubscribeSecret: "my-token"},
100 },
101 }
102 ts, _, cancel := newTestServer(cfg)
103 defer cancel()
104 defer ts.Close()
105
106 req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil)
107 req.Header.Set("Accept", "text/event-stream")
108 req.Header.Set("Authorization", "Bearer wrong-token")
109 resp, err := http.DefaultClient.Do(req)
110 if err != nil {
111 t.Fatalf("GET failed: %v", err)
112 }
113 defer resp.Body.Close()
114 if resp.StatusCode != http.StatusUnauthorized {
115 t.Errorf("expected 401, got %d", resp.StatusCode)
116 }
117}
118
119func TestServer_sseToOpenPath(t *testing.T) {
120 ts, _, cancel := newTestServer(nil)
121 defer cancel()
122 defer ts.Close()
123
124 req, _ := http.NewRequest("GET", ts.URL+"/open/topic", nil)
125 req.Header.Set("Accept", "text/event-stream")
126
127 client := &http.Client{Timeout: 500 * time.Millisecond}
128 resp, err := client.Do(req)
129 if err != nil {
130 t.Fatalf("GET failed: %v", err)
131 }
132 defer resp.Body.Close()
133 if resp.StatusCode != http.StatusOK {
134 t.Errorf("expected 200, got %d", resp.StatusCode)
135 }
136 ct := resp.Header.Get("Content-Type")
137 if ct != "text/event-stream" {
138 t.Errorf("expected text/event-stream, got %s", ct)
139 }
140}
141
142func TestServer_prefixSubscription(t *testing.T) {
143 ts, _, cancel := newTestServer(nil)
144 defer cancel()
145 defer ts.Close()
146
147 ctx, cancelSSE := context.WithCancel(context.Background())
148 defer cancelSSE()
149
150 events := sseSubscribe(ctx, ts.URL+"/github.com/chrisguidry", nil)
151 time.Sleep(50 * time.Millisecond)
152
153 http.Post(ts.URL+"/github.com/chrisguidry/docketeer", "application/json", strings.NewReader(`{"ref":"main"}`))
154
155 select {
156 case event := <-events:
157 if event.Path != "github.com/chrisguidry/docketeer" {
158 t.Errorf("expected child path, got %s", event.Path)
159 }
160 case <-time.After(2 * time.Second):
161 t.Fatal("timed out waiting for SSE event")
162 }
163}
164
165func TestServer_prefixSubscriptionWithTrailingSlash(t *testing.T) {
166 ts, _, cancel := newTestServer(nil)
167 defer cancel()
168 defer ts.Close()
169
170 ctx, cancelSSE := context.WithCancel(context.Background())
171 defer cancelSSE()
172
173 events := sseSubscribe(ctx, ts.URL+"/test/", nil)
174 time.Sleep(50 * time.Millisecond)
175
176 http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"hello":"world"}`))
177
178 select {
179 case event := <-events:
180 if event.Path != "test/topic" {
181 t.Errorf("expected test/topic, got %s", event.Path)
182 }
183 case <-time.After(2 * time.Second):
184 t.Fatal("timed out: trailing slash subscribe should receive child events")
185 }
186}
187
188func TestServer_lastEventIDReplay(t *testing.T) {
189 ts, broker, cancel := newTestServer(nil)
190 defer cancel()
191 defer ts.Close()
192
193 broker.Publish(&Event{
194 ID: "replay-1",
195 Path: "test/topic",
196 Payload: map[string]any{"n": 1},
197 })
198 broker.Publish(&Event{
199 ID: "replay-2",
200 Path: "test/topic",
201 Payload: map[string]any{"n": 2},
202 })
203
204 ctx, cancelSSE := context.WithCancel(context.Background())
205 defer cancelSSE()
206
207 events := sseSubscribe(ctx, ts.URL+"/test/topic", map[string]string{
208 "Last-Event-ID": "replay-1",
209 })
210
211 select {
212 case event := <-events:
213 if event.ID != "replay-2" {
214 t.Errorf("expected replay-2, got %s", event.ID)
215 }
216 case <-time.After(2 * time.Second):
217 t.Fatal("timed out waiting for replayed event")
218 }
219}
220
221func TestServer_filterQueryParam(t *testing.T) {
222 ts, _, cancel := newTestServer(nil)
223 defer cancel()
224 defer ts.Close()
225
226 ctx, cancelSSE := context.WithCancel(context.Background())
227 defer cancelSSE()
228
229 events := sseSubscribe(ctx, ts.URL+"/test/topic?filter=payload.ref:refs/heads/main", nil)
230 time.Sleep(50 * time.Millisecond)
231
232 http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"ref":"refs/heads/develop"}`))
233 time.Sleep(20 * time.Millisecond)
234 http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"ref":"refs/heads/main"}`))
235
236 select {
237 case event := <-events:
238 payload := event.Payload.(map[string]any)
239 if payload["ref"] != "refs/heads/main" {
240 t.Errorf("expected filtered event with ref=main, got %v", payload["ref"])
241 }
242 case <-time.After(2 * time.Second):
243 t.Fatal("timed out waiting for filtered event")
244 }
245}
246
247func TestServer_sseWithMissingAuth(t *testing.T) {
248 cfg := &Configuration{
249 Paths: map[string]PathConfiguration{
250 "private/topic": {SubscribeSecret: "my-token"},
251 },
252 }
253 ts, _, cancel := newTestServer(cfg)
254 defer cancel()
255 defer ts.Close()
256
257 req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil)
258 req.Header.Set("Accept", "text/event-stream")
259 resp, err := http.DefaultClient.Do(req)
260 if err != nil {
261 t.Fatalf("GET failed: %v", err)
262 }
263 defer resp.Body.Close()
264 if resp.StatusCode != http.StatusUnauthorized {
265 t.Errorf("expected 401, got %d", resp.StatusCode)
266 }
267}
268
269func TestServer_sseChannelClosed(t *testing.T) {
270 backend := NewMemoryBackend(100)
271 broker := NewBroker(backend)
272 ctx, cancel := context.WithCancel(context.Background())
273 defer cancel()
274 broker.Start(ctx)
275 var cfgPtr atomic.Pointer[Configuration]
276 handler := NewServer(broker, &cfgPtr)
277 ts := httptest.NewServer(handler)
278 defer ts.Close()
279
280 sseCtx, sseCancel := context.WithCancel(context.Background())
281 defer sseCancel()
282
283 events := sseSubscribe(sseCtx, ts.URL+"/test/topic", nil)
284 time.Sleep(50 * time.Millisecond)
285
286 broker.mu.Lock()
287 for _, subs := range broker.subscribers {
288 for _, sub := range subs {
289 close(sub.ch)
290 }
291 }
292 broker.subscribers = make(map[string][]*subscriber)
293 broker.mu.Unlock()
294
295 time.Sleep(50 * time.Millisecond)
296
297 select {
298 case _, ok := <-events:
299 if ok {
300 t.Error("expected channel to be closed")
301 }
302 case <-time.After(time.Second):
303 t.Fatal("timed out waiting for SSE to close")
304 }
305}
306
307func TestServer_sseSkipsUnmarshalableEvent(t *testing.T) {
308 ts, broker, cancel := newTestServer(nil)
309 defer cancel()
310 defer ts.Close()
311
312 ctx, cancelSSE := context.WithCancel(context.Background())
313 defer cancelSSE()
314
315 events := sseSubscribe(ctx, ts.URL+"/test/topic", nil)
316 time.Sleep(50 * time.Millisecond)
317
318 broker.Publish(&Event{
319 ID: "bad-event",
320 Path: "test/topic",
321 Payload: math.NaN(),
322 })
323 broker.Publish(&Event{
324 ID: "good-event",
325 Path: "test/topic",
326 Payload: map[string]any{"ok": true},
327 })
328
329 select {
330 case event := <-events:
331 if event.ID != "good-event" {
332 t.Errorf("expected good-event, got %s", event.ID)
333 }
334 case <-time.After(2 * time.Second):
335 t.Fatal("timed out waiting for good event after unmarshalable one")
336 }
337}
338
339func TestServer_textPayloadStoredAsString(t *testing.T) {
340 ts, _, cancel := newTestServer(nil)
341 defer cancel()
342 defer ts.Close()
343
344 ctx, cancelSSE := context.WithCancel(context.Background())
345 defer cancelSSE()
346
347 done := make(chan string, 1)
348 go func() {
349 req, _ := http.NewRequestWithContext(ctx, "GET", ts.URL+"/test/topic", nil)
350 req.Header.Set("Accept", "text/event-stream")
351 resp, err := http.DefaultClient.Do(req)
352 if err != nil {
353 return
354 }
355 defer resp.Body.Close()
356 scanner := bufio.NewScanner(resp.Body)
357 for scanner.Scan() {
358 line := scanner.Text()
359 if strings.HasPrefix(line, "data: ") {
360 var envelope struct {
361 Payload json.RawMessage `json:"payload"`
362 }
363 json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &envelope)
364 var s string
365 if json.Unmarshal(envelope.Payload, &s) == nil {
366 done <- s
367 return
368 }
369 }
370 }
371 }()
372
373 time.Sleep(50 * time.Millisecond)
374
375 resp, _ := http.Post(ts.URL+"/test/topic", "text/plain", bytes.NewReader([]byte("hello world")))
376 resp.Body.Close()
377
378 select {
379 case payload := <-done:
380 if payload != "hello world" {
381 t.Errorf("expected plain text payload, got %s", payload)
382 }
383 case <-time.After(2 * time.Second):
384 t.Fatal("timed out waiting for event")
385 }
386}