Webhook-to-SSE gateway with hierarchical topic routing and signature verification
at main 398 lines 11 kB view raw
1package main 2 3import ( 4 "context" 5 "crypto/hmac" 6 "crypto/sha256" 7 "encoding/hex" 8 "fmt" 9 "net/http" 10 "net/http/httptest" 11 "strings" 12 "sync/atomic" 13 "testing" 14 "time" 15) 16 17func newTestServer(cfg *Configuration) (*httptest.Server, *Broker, context.CancelFunc) { 18 backend := NewMemoryBackend(100) 19 broker := NewBroker(backend) 20 ctx, cancel := context.WithCancel(context.Background()) 21 broker.Start(ctx) 22 var cfgPtr atomic.Pointer[Configuration] 23 if cfg != nil { 24 cfgPtr.Store(cfg) 25 } 26 handler := NewServer(broker, &cfgPtr) 27 return httptest.NewServer(handler), broker, cancel 28} 29 30func TestServer_healthEndpoint(t *testing.T) { 31 ts, _, cancel := newTestServer(nil) 32 defer cancel() 33 defer ts.Close() 34 35 resp, err := http.Get(ts.URL + "/_health") 36 if err != nil { 37 t.Fatalf("GET /_health failed: %v", err) 38 } 39 defer resp.Body.Close() 40 if resp.StatusCode != http.StatusNoContent { 41 t.Errorf("expected 204, got %d", resp.StatusCode) 42 } 43 buf := make([]byte, 1) 44 n, _ := resp.Body.Read(buf) 45 if n != 0 { 46 t.Errorf("expected empty body, got %d bytes", n) 47 } 48} 49 50func TestServer_postPublishesEvent(t *testing.T) { 51 ts, _, cancel := newTestServer(nil) 52 defer cancel() 53 defer ts.Close() 54 55 resp, err := http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"hello":"world"}`)) 56 if err != nil { 57 t.Fatalf("POST failed: %v", err) 58 } 59 defer resp.Body.Close() 60 if resp.StatusCode != http.StatusAccepted { 61 t.Errorf("expected 202, got %d", resp.StatusCode) 62 } 63} 64 65func TestServer_postTrailingSlashNormalized(t *testing.T) { 66 ts, broker, cancel := newTestServer(nil) 67 defer cancel() 68 defer ts.Close() 69 70 ch, unsub := broker.Subscribe("test/topic", "") 71 defer unsub() 72 73 http.Post(ts.URL+"/test/topic/", "application/json", strings.NewReader(`{}`)) 74 75 select { 76 case event := <-ch: 77 if event.Path != "test/topic" { 78 t.Errorf("expected normalized path test/topic, got %s", event.Path) 79 } 80 case <-time.After(time.Second): 81 t.Fatal("timed out: POST with trailing slash should deliver to normalized path") 82 } 83} 84 85func TestServer_postWithValidHMAC(t *testing.T) { 86 cfg := &Configuration{ 87 Paths: map[string]PathConfiguration{ 88 "secure/path": { 89 Verify: "hmac-sha256", 90 Secret: "test-secret", 91 SignatureHeader: "X-Hub-Signature-256", 92 }, 93 }, 94 } 95 ts, _, cancel := newTestServer(cfg) 96 defer cancel() 97 defer ts.Close() 98 99 body := `{"action":"push"}` 100 mac := hmac.New(sha256.New, []byte("test-secret")) 101 mac.Write([]byte(body)) 102 sig := "sha256=" + hex.EncodeToString(mac.Sum(nil)) 103 104 req, _ := http.NewRequest("POST", ts.URL+"/secure/path", strings.NewReader(body)) 105 req.Header.Set("Content-Type", "application/json") 106 req.Header.Set("X-Hub-Signature-256", sig) 107 resp, err := http.DefaultClient.Do(req) 108 if err != nil { 109 t.Fatalf("POST failed: %v", err) 110 } 111 defer resp.Body.Close() 112 if resp.StatusCode != http.StatusAccepted { 113 t.Errorf("expected 202, got %d", resp.StatusCode) 114 } 115} 116 117func TestServer_postWithInvalidSignature(t *testing.T) { 118 cfg := &Configuration{ 119 Paths: map[string]PathConfiguration{ 120 "secure/path": { 121 Verify: "hmac-sha256", 122 Secret: "test-secret", 123 SignatureHeader: "X-Hub-Signature-256", 124 }, 125 }, 126 } 127 ts, _, cancel := newTestServer(cfg) 128 defer cancel() 129 defer ts.Close() 130 131 req, _ := http.NewRequest("POST", ts.URL+"/secure/path", strings.NewReader(`{"bad":"data"}`)) 132 req.Header.Set("Content-Type", "application/json") 133 req.Header.Set("X-Hub-Signature-256", "sha256=deadbeef") 134 resp, err := http.DefaultClient.Do(req) 135 if err != nil { 136 t.Fatalf("POST failed: %v", err) 137 } 138 defer resp.Body.Close() 139 if resp.StatusCode != http.StatusForbidden { 140 t.Errorf("expected 403, got %d", resp.StatusCode) 141 } 142} 143 144func TestServer_postToUnconfiguredPath(t *testing.T) { 145 cfg := &Configuration{ 146 Paths: map[string]PathConfiguration{ 147 "secure/path": { 148 Verify: "hmac-sha256", 149 Secret: "test-secret", 150 SignatureHeader: "X-Hub-Signature-256", 151 }, 152 }, 153 } 154 ts, _, cancel := newTestServer(cfg) 155 defer cancel() 156 defer ts.Close() 157 158 resp, err := http.Post(ts.URL+"/open/path", "application/json", strings.NewReader(`{"ok":true}`)) 159 if err != nil { 160 t.Fatalf("POST failed: %v", err) 161 } 162 defer resp.Body.Close() 163 if resp.StatusCode != http.StatusAccepted { 164 t.Errorf("expected 202, got %d", resp.StatusCode) 165 } 166} 167 168func TestServer_getWithoutSSEAccept(t *testing.T) { 169 ts, _, cancel := newTestServer(nil) 170 defer cancel() 171 defer ts.Close() 172 173 resp, err := http.Get(ts.URL + "/test/topic") 174 if err != nil { 175 t.Fatalf("GET failed: %v", err) 176 } 177 defer resp.Body.Close() 178 if resp.StatusCode != http.StatusNotFound { 179 t.Errorf("expected 404, got %d", resp.StatusCode) 180 } 181} 182 183func TestServer_corsHeaders(t *testing.T) { 184 ts, _, cancel := newTestServer(nil) 185 defer cancel() 186 defer ts.Close() 187 188 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{}`)) 189 if err != nil { 190 t.Fatalf("POST failed: %v", err) 191 } 192 defer resp.Body.Close() 193 194 if resp.Header.Get("Access-Control-Allow-Origin") != "*" { 195 t.Error("missing CORS Allow-Origin header") 196 } 197} 198 199func TestServer_optionsPreflight(t *testing.T) { 200 ts, _, cancel := newTestServer(nil) 201 defer cancel() 202 defer ts.Close() 203 204 req, _ := http.NewRequest("OPTIONS", ts.URL+"/test", nil) 205 resp, err := http.DefaultClient.Do(req) 206 if err != nil { 207 t.Fatalf("OPTIONS failed: %v", err) 208 } 209 defer resp.Body.Close() 210 if resp.StatusCode != http.StatusNoContent { 211 t.Errorf("expected 204, got %d", resp.StatusCode) 212 } 213 if resp.Header.Get("Access-Control-Allow-Methods") == "" { 214 t.Error("missing CORS Allow-Methods header") 215 } 216} 217 218func TestServer_methodNotAllowed(t *testing.T) { 219 ts, _, cancel := newTestServer(nil) 220 defer cancel() 221 defer ts.Close() 222 223 req, _ := http.NewRequest("DELETE", ts.URL+"/test", nil) 224 resp, err := http.DefaultClient.Do(req) 225 if err != nil { 226 t.Fatalf("DELETE failed: %v", err) 227 } 228 defer resp.Body.Close() 229 if resp.StatusCode != http.StatusMethodNotAllowed { 230 t.Errorf("expected 405, got %d", resp.StatusCode) 231 } 232} 233 234func TestServer_postWithBadVerifierConfig(t *testing.T) { 235 cfg := &Configuration{ 236 Paths: map[string]PathConfiguration{ 237 "bad/path": { 238 Verify: "unknown-method", 239 Secret: "secret", 240 SignatureHeader: "X-Signature", 241 }, 242 }, 243 } 244 ts, _, cancel := newTestServer(cfg) 245 defer cancel() 246 defer ts.Close() 247 248 req, _ := http.NewRequest("POST", ts.URL+"/bad/path", strings.NewReader(`{}`)) 249 req.Header.Set("Content-Type", "application/json") 250 resp, err := http.DefaultClient.Do(req) 251 if err != nil { 252 t.Fatalf("POST failed: %v", err) 253 } 254 defer resp.Body.Close() 255 if resp.StatusCode != http.StatusInternalServerError { 256 t.Errorf("expected 500, got %d", resp.StatusCode) 257 } 258} 259 260func TestServer_postInvalidJSON(t *testing.T) { 261 ts, _, cancel := newTestServer(nil) 262 defer cancel() 263 defer ts.Close() 264 265 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{not json`)) 266 if err != nil { 267 t.Fatalf("POST failed: %v", err) 268 } 269 defer resp.Body.Close() 270 if resp.StatusCode != http.StatusBadRequest { 271 t.Errorf("expected 400, got %d", resp.StatusCode) 272 } 273} 274 275type bareResponseWriter struct { 276 code int 277 headers http.Header 278 body strings.Builder 279} 280 281func (w *bareResponseWriter) Header() http.Header { return w.headers } 282func (w *bareResponseWriter) Write(b []byte) (int, error) { return w.body.Write(b) } 283func (w *bareResponseWriter) WriteHeader(code int) { w.code = code } 284 285func TestServer_sseWithoutFlusher(t *testing.T) { 286 backend := NewMemoryBackend(100) 287 broker := NewBroker(backend) 288 ctx, cancel := context.WithCancel(context.Background()) 289 defer cancel() 290 broker.Start(ctx) 291 var cfgPtr atomic.Pointer[Configuration] 292 handler := NewServer(broker, &cfgPtr) 293 294 w := &bareResponseWriter{headers: make(http.Header)} 295 req := httptest.NewRequest("GET", "/test/topic", nil) 296 req.Header.Set("Accept", "text/event-stream") 297 298 handler.ServeHTTP(w, req) 299 300 if w.code != http.StatusInternalServerError { 301 t.Errorf("expected 500, got %d", w.code) 302 } 303} 304 305func TestServer_postInheritsVerificationFromParent(t *testing.T) { 306 cfg := &Configuration{ 307 Paths: map[string]PathConfiguration{ 308 "github.com": { 309 Verify: "hmac-sha256", 310 Secret: "parent-secret", 311 SignatureHeader: "X-Hub-Signature-256", 312 }, 313 }, 314 } 315 ts, _, cancel := newTestServer(cfg) 316 defer cancel() 317 defer ts.Close() 318 319 body := `{"action":"push"}` 320 mac := hmac.New(sha256.New, []byte("parent-secret")) 321 mac.Write([]byte(body)) 322 sig := "sha256=" + hex.EncodeToString(mac.Sum(nil)) 323 324 req, _ := http.NewRequest("POST", ts.URL+"/github.com/org/repo", strings.NewReader(body)) 325 req.Header.Set("Content-Type", "application/json") 326 req.Header.Set("X-Hub-Signature-256", sig) 327 resp, err := http.DefaultClient.Do(req) 328 if err != nil { 329 t.Fatalf("POST failed: %v", err) 330 } 331 defer resp.Body.Close() 332 if resp.StatusCode != http.StatusAccepted { 333 t.Errorf("expected 202 with valid signature, got %d", resp.StatusCode) 334 } 335 336 req, _ = http.NewRequest("POST", ts.URL+"/github.com/org/repo", strings.NewReader(body)) 337 req.Header.Set("Content-Type", "application/json") 338 req.Header.Set("X-Hub-Signature-256", "sha256=deadbeef") 339 resp, err = http.DefaultClient.Do(req) 340 if err != nil { 341 t.Fatalf("POST failed: %v", err) 342 } 343 defer resp.Body.Close() 344 if resp.StatusCode != http.StatusForbidden { 345 t.Errorf("expected 403 with invalid signature, got %d", resp.StatusCode) 346 } 347} 348 349type failingBackend struct{ MemoryBackend } 350 351func (f *failingBackend) Publish(*Event) error { 352 return fmt.Errorf("backend unavailable") 353} 354 355func TestServer_postPublishError(t *testing.T) { 356 backend := &failingBackend{MemoryBackend: *NewMemoryBackend(100)} 357 broker := NewBroker(backend) 358 ctx, cancel := context.WithCancel(context.Background()) 359 defer cancel() 360 broker.Start(ctx) 361 var cfgPtr atomic.Pointer[Configuration] 362 handler := NewServer(broker, &cfgPtr) 363 ts := httptest.NewServer(handler) 364 defer ts.Close() 365 366 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{}`)) 367 if err != nil { 368 t.Fatalf("POST failed: %v", err) 369 } 370 defer resp.Body.Close() 371 if resp.StatusCode != http.StatusInternalServerError { 372 t.Errorf("expected 500, got %d", resp.StatusCode) 373 } 374} 375 376func TestServer_postWithMissingSignatureOnSecuredPath(t *testing.T) { 377 cfg := &Configuration{ 378 Paths: map[string]PathConfiguration{ 379 "secure/path": { 380 Verify: "hmac-sha256", 381 Secret: "test-secret", 382 SignatureHeader: "X-Hub-Signature-256", 383 }, 384 }, 385 } 386 ts, _, cancel := newTestServer(cfg) 387 defer cancel() 388 defer ts.Close() 389 390 resp, err := http.Post(ts.URL+"/secure/path", "application/json", strings.NewReader(`{}`)) 391 if err != nil { 392 t.Fatalf("POST failed: %v", err) 393 } 394 defer resp.Body.Close() 395 if resp.StatusCode != http.StatusForbidden { 396 t.Errorf("expected 403, got %d", resp.StatusCode) 397 } 398}