Webhook-to-SSE gateway with hierarchical topic routing and signature verification
at main 386 lines 9.9 kB view raw
1package main 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "encoding/json" 8 "math" 9 "net/http" 10 "net/http/httptest" 11 "strings" 12 "sync/atomic" 13 "testing" 14 "time" 15) 16 17func sseSubscribe(ctx context.Context, url string, headers map[string]string) <-chan *Event { 18 events := make(chan *Event, 10) 19 go func() { 20 defer close(events) 21 req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) 22 req.Header.Set("Accept", "text/event-stream") 23 for k, v := range headers { 24 req.Header.Set(k, v) 25 } 26 resp, err := http.DefaultClient.Do(req) 27 if err != nil { 28 return 29 } 30 defer resp.Body.Close() 31 scanner := bufio.NewScanner(resp.Body) 32 for scanner.Scan() { 33 line := scanner.Text() 34 if strings.HasPrefix(line, "data: ") { 35 var event Event 36 json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &event) 37 select { 38 case events <- &event: 39 case <-ctx.Done(): 40 return 41 } 42 } 43 } 44 }() 45 return events 46} 47 48func TestServer_postAndSSEReceive(t *testing.T) { 49 ts, _, cancel := newTestServer(nil) 50 defer cancel() 51 defer ts.Close() 52 53 ctx, cancelSSE := context.WithCancel(context.Background()) 54 defer cancelSSE() 55 56 events := sseSubscribe(ctx, ts.URL+"/test/topic", nil) 57 time.Sleep(50 * time.Millisecond) 58 59 http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"hello":"world"}`)) 60 61 select { 62 case event := <-events: 63 if event.Path != "test/topic" { 64 t.Errorf("expected path test/topic, got %s", event.Path) 65 } 66 case <-time.After(2 * time.Second): 67 t.Fatal("timed out waiting for SSE event") 68 } 69} 70 71func TestServer_sseWithValidBearerToken(t *testing.T) { 72 cfg := &Configuration{ 73 Paths: map[string]PathConfiguration{ 74 "private/topic": {SubscribeSecret: "my-token"}, 75 }, 76 } 77 ts, _, cancel := newTestServer(cfg) 78 defer cancel() 79 defer ts.Close() 80 81 req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil) 82 req.Header.Set("Accept", "text/event-stream") 83 req.Header.Set("Authorization", "Bearer my-token") 84 85 client := &http.Client{Timeout: 500 * time.Millisecond} 86 resp, err := client.Do(req) 87 if err != nil { 88 t.Fatalf("GET failed: %v", err) 89 } 90 defer resp.Body.Close() 91 if resp.StatusCode != http.StatusOK { 92 t.Errorf("expected 200, got %d", resp.StatusCode) 93 } 94} 95 96func TestServer_sseWithWrongToken(t *testing.T) { 97 cfg := &Configuration{ 98 Paths: map[string]PathConfiguration{ 99 "private/topic": {SubscribeSecret: "my-token"}, 100 }, 101 } 102 ts, _, cancel := newTestServer(cfg) 103 defer cancel() 104 defer ts.Close() 105 106 req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil) 107 req.Header.Set("Accept", "text/event-stream") 108 req.Header.Set("Authorization", "Bearer wrong-token") 109 resp, err := http.DefaultClient.Do(req) 110 if err != nil { 111 t.Fatalf("GET failed: %v", err) 112 } 113 defer resp.Body.Close() 114 if resp.StatusCode != http.StatusUnauthorized { 115 t.Errorf("expected 401, got %d", resp.StatusCode) 116 } 117} 118 119func TestServer_sseToOpenPath(t *testing.T) { 120 ts, _, cancel := newTestServer(nil) 121 defer cancel() 122 defer ts.Close() 123 124 req, _ := http.NewRequest("GET", ts.URL+"/open/topic", nil) 125 req.Header.Set("Accept", "text/event-stream") 126 127 client := &http.Client{Timeout: 500 * time.Millisecond} 128 resp, err := client.Do(req) 129 if err != nil { 130 t.Fatalf("GET failed: %v", err) 131 } 132 defer resp.Body.Close() 133 if resp.StatusCode != http.StatusOK { 134 t.Errorf("expected 200, got %d", resp.StatusCode) 135 } 136 ct := resp.Header.Get("Content-Type") 137 if ct != "text/event-stream" { 138 t.Errorf("expected text/event-stream, got %s", ct) 139 } 140} 141 142func TestServer_prefixSubscription(t *testing.T) { 143 ts, _, cancel := newTestServer(nil) 144 defer cancel() 145 defer ts.Close() 146 147 ctx, cancelSSE := context.WithCancel(context.Background()) 148 defer cancelSSE() 149 150 events := sseSubscribe(ctx, ts.URL+"/github.com/chrisguidry", nil) 151 time.Sleep(50 * time.Millisecond) 152 153 http.Post(ts.URL+"/github.com/chrisguidry/docketeer", "application/json", strings.NewReader(`{"ref":"main"}`)) 154 155 select { 156 case event := <-events: 157 if event.Path != "github.com/chrisguidry/docketeer" { 158 t.Errorf("expected child path, got %s", event.Path) 159 } 160 case <-time.After(2 * time.Second): 161 t.Fatal("timed out waiting for SSE event") 162 } 163} 164 165func TestServer_prefixSubscriptionWithTrailingSlash(t *testing.T) { 166 ts, _, cancel := newTestServer(nil) 167 defer cancel() 168 defer ts.Close() 169 170 ctx, cancelSSE := context.WithCancel(context.Background()) 171 defer cancelSSE() 172 173 events := sseSubscribe(ctx, ts.URL+"/test/", nil) 174 time.Sleep(50 * time.Millisecond) 175 176 http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"hello":"world"}`)) 177 178 select { 179 case event := <-events: 180 if event.Path != "test/topic" { 181 t.Errorf("expected test/topic, got %s", event.Path) 182 } 183 case <-time.After(2 * time.Second): 184 t.Fatal("timed out: trailing slash subscribe should receive child events") 185 } 186} 187 188func TestServer_lastEventIDReplay(t *testing.T) { 189 ts, broker, cancel := newTestServer(nil) 190 defer cancel() 191 defer ts.Close() 192 193 broker.Publish(&Event{ 194 ID: "replay-1", 195 Path: "test/topic", 196 Payload: map[string]any{"n": 1}, 197 }) 198 broker.Publish(&Event{ 199 ID: "replay-2", 200 Path: "test/topic", 201 Payload: map[string]any{"n": 2}, 202 }) 203 204 ctx, cancelSSE := context.WithCancel(context.Background()) 205 defer cancelSSE() 206 207 events := sseSubscribe(ctx, ts.URL+"/test/topic", map[string]string{ 208 "Last-Event-ID": "replay-1", 209 }) 210 211 select { 212 case event := <-events: 213 if event.ID != "replay-2" { 214 t.Errorf("expected replay-2, got %s", event.ID) 215 } 216 case <-time.After(2 * time.Second): 217 t.Fatal("timed out waiting for replayed event") 218 } 219} 220 221func TestServer_filterQueryParam(t *testing.T) { 222 ts, _, cancel := newTestServer(nil) 223 defer cancel() 224 defer ts.Close() 225 226 ctx, cancelSSE := context.WithCancel(context.Background()) 227 defer cancelSSE() 228 229 events := sseSubscribe(ctx, ts.URL+"/test/topic?filter=payload.ref:refs/heads/main", nil) 230 time.Sleep(50 * time.Millisecond) 231 232 http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"ref":"refs/heads/develop"}`)) 233 time.Sleep(20 * time.Millisecond) 234 http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"ref":"refs/heads/main"}`)) 235 236 select { 237 case event := <-events: 238 payload := event.Payload.(map[string]any) 239 if payload["ref"] != "refs/heads/main" { 240 t.Errorf("expected filtered event with ref=main, got %v", payload["ref"]) 241 } 242 case <-time.After(2 * time.Second): 243 t.Fatal("timed out waiting for filtered event") 244 } 245} 246 247func TestServer_sseWithMissingAuth(t *testing.T) { 248 cfg := &Configuration{ 249 Paths: map[string]PathConfiguration{ 250 "private/topic": {SubscribeSecret: "my-token"}, 251 }, 252 } 253 ts, _, cancel := newTestServer(cfg) 254 defer cancel() 255 defer ts.Close() 256 257 req, _ := http.NewRequest("GET", ts.URL+"/private/topic", nil) 258 req.Header.Set("Accept", "text/event-stream") 259 resp, err := http.DefaultClient.Do(req) 260 if err != nil { 261 t.Fatalf("GET failed: %v", err) 262 } 263 defer resp.Body.Close() 264 if resp.StatusCode != http.StatusUnauthorized { 265 t.Errorf("expected 401, got %d", resp.StatusCode) 266 } 267} 268 269func TestServer_sseChannelClosed(t *testing.T) { 270 backend := NewMemoryBackend(100) 271 broker := NewBroker(backend) 272 ctx, cancel := context.WithCancel(context.Background()) 273 defer cancel() 274 broker.Start(ctx) 275 var cfgPtr atomic.Pointer[Configuration] 276 handler := NewServer(broker, &cfgPtr) 277 ts := httptest.NewServer(handler) 278 defer ts.Close() 279 280 sseCtx, sseCancel := context.WithCancel(context.Background()) 281 defer sseCancel() 282 283 events := sseSubscribe(sseCtx, ts.URL+"/test/topic", nil) 284 time.Sleep(50 * time.Millisecond) 285 286 broker.mu.Lock() 287 for _, subs := range broker.subscribers { 288 for _, sub := range subs { 289 close(sub.ch) 290 } 291 } 292 broker.subscribers = make(map[string][]*subscriber) 293 broker.mu.Unlock() 294 295 time.Sleep(50 * time.Millisecond) 296 297 select { 298 case _, ok := <-events: 299 if ok { 300 t.Error("expected channel to be closed") 301 } 302 case <-time.After(time.Second): 303 t.Fatal("timed out waiting for SSE to close") 304 } 305} 306 307func TestServer_sseSkipsUnmarshalableEvent(t *testing.T) { 308 ts, broker, cancel := newTestServer(nil) 309 defer cancel() 310 defer ts.Close() 311 312 ctx, cancelSSE := context.WithCancel(context.Background()) 313 defer cancelSSE() 314 315 events := sseSubscribe(ctx, ts.URL+"/test/topic", nil) 316 time.Sleep(50 * time.Millisecond) 317 318 broker.Publish(&Event{ 319 ID: "bad-event", 320 Path: "test/topic", 321 Payload: math.NaN(), 322 }) 323 broker.Publish(&Event{ 324 ID: "good-event", 325 Path: "test/topic", 326 Payload: map[string]any{"ok": true}, 327 }) 328 329 select { 330 case event := <-events: 331 if event.ID != "good-event" { 332 t.Errorf("expected good-event, got %s", event.ID) 333 } 334 case <-time.After(2 * time.Second): 335 t.Fatal("timed out waiting for good event after unmarshalable one") 336 } 337} 338 339func TestServer_textPayloadStoredAsString(t *testing.T) { 340 ts, _, cancel := newTestServer(nil) 341 defer cancel() 342 defer ts.Close() 343 344 ctx, cancelSSE := context.WithCancel(context.Background()) 345 defer cancelSSE() 346 347 done := make(chan string, 1) 348 go func() { 349 req, _ := http.NewRequestWithContext(ctx, "GET", ts.URL+"/test/topic", nil) 350 req.Header.Set("Accept", "text/event-stream") 351 resp, err := http.DefaultClient.Do(req) 352 if err != nil { 353 return 354 } 355 defer resp.Body.Close() 356 scanner := bufio.NewScanner(resp.Body) 357 for scanner.Scan() { 358 line := scanner.Text() 359 if strings.HasPrefix(line, "data: ") { 360 var envelope struct { 361 Payload json.RawMessage `json:"payload"` 362 } 363 json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &envelope) 364 var s string 365 if json.Unmarshal(envelope.Payload, &s) == nil { 366 done <- s 367 return 368 } 369 } 370 } 371 }() 372 373 time.Sleep(50 * time.Millisecond) 374 375 resp, _ := http.Post(ts.URL+"/test/topic", "text/plain", bytes.NewReader([]byte("hello world"))) 376 resp.Body.Close() 377 378 select { 379 case payload := <-done: 380 if payload != "hello world" { 381 t.Errorf("expected plain text payload, got %s", payload) 382 } 383 case <-time.After(2 * time.Second): 384 t.Fatal("timed out waiting for event") 385 } 386}