Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "context"
5 "crypto/hmac"
6 "crypto/sha256"
7 "encoding/hex"
8 "fmt"
9 "net/http"
10 "net/http/httptest"
11 "strings"
12 "sync/atomic"
13 "testing"
14 "time"
15)
16
17func newTestServer(cfg *Configuration) (*httptest.Server, *Broker, context.CancelFunc) {
18 backend := NewMemoryBackend(100)
19 broker := NewBroker(backend)
20 ctx, cancel := context.WithCancel(context.Background())
21 broker.Start(ctx)
22 var cfgPtr atomic.Pointer[Configuration]
23 if cfg != nil {
24 cfgPtr.Store(cfg)
25 }
26 handler := NewServer(broker, &cfgPtr)
27 return httptest.NewServer(handler), broker, cancel
28}
29
30func TestServer_healthEndpoint(t *testing.T) {
31 ts, _, cancel := newTestServer(nil)
32 defer cancel()
33 defer ts.Close()
34
35 resp, err := http.Get(ts.URL + "/_health")
36 if err != nil {
37 t.Fatalf("GET /_health failed: %v", err)
38 }
39 defer resp.Body.Close()
40 if resp.StatusCode != http.StatusNoContent {
41 t.Errorf("expected 204, got %d", resp.StatusCode)
42 }
43 buf := make([]byte, 1)
44 n, _ := resp.Body.Read(buf)
45 if n != 0 {
46 t.Errorf("expected empty body, got %d bytes", n)
47 }
48}
49
50func TestServer_postPublishesEvent(t *testing.T) {
51 ts, _, cancel := newTestServer(nil)
52 defer cancel()
53 defer ts.Close()
54
55 resp, err := http.Post(ts.URL+"/test/topic", "application/json", strings.NewReader(`{"hello":"world"}`))
56 if err != nil {
57 t.Fatalf("POST failed: %v", err)
58 }
59 defer resp.Body.Close()
60 if resp.StatusCode != http.StatusAccepted {
61 t.Errorf("expected 202, got %d", resp.StatusCode)
62 }
63}
64
65func TestServer_postTrailingSlashNormalized(t *testing.T) {
66 ts, broker, cancel := newTestServer(nil)
67 defer cancel()
68 defer ts.Close()
69
70 ch, unsub := broker.Subscribe("test/topic", "")
71 defer unsub()
72
73 http.Post(ts.URL+"/test/topic/", "application/json", strings.NewReader(`{}`))
74
75 select {
76 case event := <-ch:
77 if event.Path != "test/topic" {
78 t.Errorf("expected normalized path test/topic, got %s", event.Path)
79 }
80 case <-time.After(time.Second):
81 t.Fatal("timed out: POST with trailing slash should deliver to normalized path")
82 }
83}
84
85func TestServer_postWithValidHMAC(t *testing.T) {
86 cfg := &Configuration{
87 Paths: map[string]PathConfiguration{
88 "secure/path": {
89 Verify: "hmac-sha256",
90 Secret: "test-secret",
91 SignatureHeader: "X-Hub-Signature-256",
92 },
93 },
94 }
95 ts, _, cancel := newTestServer(cfg)
96 defer cancel()
97 defer ts.Close()
98
99 body := `{"action":"push"}`
100 mac := hmac.New(sha256.New, []byte("test-secret"))
101 mac.Write([]byte(body))
102 sig := "sha256=" + hex.EncodeToString(mac.Sum(nil))
103
104 req, _ := http.NewRequest("POST", ts.URL+"/secure/path", strings.NewReader(body))
105 req.Header.Set("Content-Type", "application/json")
106 req.Header.Set("X-Hub-Signature-256", sig)
107 resp, err := http.DefaultClient.Do(req)
108 if err != nil {
109 t.Fatalf("POST failed: %v", err)
110 }
111 defer resp.Body.Close()
112 if resp.StatusCode != http.StatusAccepted {
113 t.Errorf("expected 202, got %d", resp.StatusCode)
114 }
115}
116
117func TestServer_postWithInvalidSignature(t *testing.T) {
118 cfg := &Configuration{
119 Paths: map[string]PathConfiguration{
120 "secure/path": {
121 Verify: "hmac-sha256",
122 Secret: "test-secret",
123 SignatureHeader: "X-Hub-Signature-256",
124 },
125 },
126 }
127 ts, _, cancel := newTestServer(cfg)
128 defer cancel()
129 defer ts.Close()
130
131 req, _ := http.NewRequest("POST", ts.URL+"/secure/path", strings.NewReader(`{"bad":"data"}`))
132 req.Header.Set("Content-Type", "application/json")
133 req.Header.Set("X-Hub-Signature-256", "sha256=deadbeef")
134 resp, err := http.DefaultClient.Do(req)
135 if err != nil {
136 t.Fatalf("POST failed: %v", err)
137 }
138 defer resp.Body.Close()
139 if resp.StatusCode != http.StatusForbidden {
140 t.Errorf("expected 403, got %d", resp.StatusCode)
141 }
142}
143
144func TestServer_postToUnconfiguredPath(t *testing.T) {
145 cfg := &Configuration{
146 Paths: map[string]PathConfiguration{
147 "secure/path": {
148 Verify: "hmac-sha256",
149 Secret: "test-secret",
150 SignatureHeader: "X-Hub-Signature-256",
151 },
152 },
153 }
154 ts, _, cancel := newTestServer(cfg)
155 defer cancel()
156 defer ts.Close()
157
158 resp, err := http.Post(ts.URL+"/open/path", "application/json", strings.NewReader(`{"ok":true}`))
159 if err != nil {
160 t.Fatalf("POST failed: %v", err)
161 }
162 defer resp.Body.Close()
163 if resp.StatusCode != http.StatusAccepted {
164 t.Errorf("expected 202, got %d", resp.StatusCode)
165 }
166}
167
168func TestServer_getWithoutSSEAccept(t *testing.T) {
169 ts, _, cancel := newTestServer(nil)
170 defer cancel()
171 defer ts.Close()
172
173 resp, err := http.Get(ts.URL + "/test/topic")
174 if err != nil {
175 t.Fatalf("GET failed: %v", err)
176 }
177 defer resp.Body.Close()
178 if resp.StatusCode != http.StatusNotFound {
179 t.Errorf("expected 404, got %d", resp.StatusCode)
180 }
181}
182
183func TestServer_corsHeaders(t *testing.T) {
184 ts, _, cancel := newTestServer(nil)
185 defer cancel()
186 defer ts.Close()
187
188 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{}`))
189 if err != nil {
190 t.Fatalf("POST failed: %v", err)
191 }
192 defer resp.Body.Close()
193
194 if resp.Header.Get("Access-Control-Allow-Origin") != "*" {
195 t.Error("missing CORS Allow-Origin header")
196 }
197}
198
199func TestServer_optionsPreflight(t *testing.T) {
200 ts, _, cancel := newTestServer(nil)
201 defer cancel()
202 defer ts.Close()
203
204 req, _ := http.NewRequest("OPTIONS", ts.URL+"/test", nil)
205 resp, err := http.DefaultClient.Do(req)
206 if err != nil {
207 t.Fatalf("OPTIONS failed: %v", err)
208 }
209 defer resp.Body.Close()
210 if resp.StatusCode != http.StatusNoContent {
211 t.Errorf("expected 204, got %d", resp.StatusCode)
212 }
213 if resp.Header.Get("Access-Control-Allow-Methods") == "" {
214 t.Error("missing CORS Allow-Methods header")
215 }
216}
217
218func TestServer_methodNotAllowed(t *testing.T) {
219 ts, _, cancel := newTestServer(nil)
220 defer cancel()
221 defer ts.Close()
222
223 req, _ := http.NewRequest("DELETE", ts.URL+"/test", nil)
224 resp, err := http.DefaultClient.Do(req)
225 if err != nil {
226 t.Fatalf("DELETE failed: %v", err)
227 }
228 defer resp.Body.Close()
229 if resp.StatusCode != http.StatusMethodNotAllowed {
230 t.Errorf("expected 405, got %d", resp.StatusCode)
231 }
232}
233
234func TestServer_postWithBadVerifierConfig(t *testing.T) {
235 cfg := &Configuration{
236 Paths: map[string]PathConfiguration{
237 "bad/path": {
238 Verify: "unknown-method",
239 Secret: "secret",
240 SignatureHeader: "X-Signature",
241 },
242 },
243 }
244 ts, _, cancel := newTestServer(cfg)
245 defer cancel()
246 defer ts.Close()
247
248 req, _ := http.NewRequest("POST", ts.URL+"/bad/path", strings.NewReader(`{}`))
249 req.Header.Set("Content-Type", "application/json")
250 resp, err := http.DefaultClient.Do(req)
251 if err != nil {
252 t.Fatalf("POST failed: %v", err)
253 }
254 defer resp.Body.Close()
255 if resp.StatusCode != http.StatusInternalServerError {
256 t.Errorf("expected 500, got %d", resp.StatusCode)
257 }
258}
259
260func TestServer_postInvalidJSON(t *testing.T) {
261 ts, _, cancel := newTestServer(nil)
262 defer cancel()
263 defer ts.Close()
264
265 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{not json`))
266 if err != nil {
267 t.Fatalf("POST failed: %v", err)
268 }
269 defer resp.Body.Close()
270 if resp.StatusCode != http.StatusBadRequest {
271 t.Errorf("expected 400, got %d", resp.StatusCode)
272 }
273}
274
275type bareResponseWriter struct {
276 code int
277 headers http.Header
278 body strings.Builder
279}
280
281func (w *bareResponseWriter) Header() http.Header { return w.headers }
282func (w *bareResponseWriter) Write(b []byte) (int, error) { return w.body.Write(b) }
283func (w *bareResponseWriter) WriteHeader(code int) { w.code = code }
284
285func TestServer_sseWithoutFlusher(t *testing.T) {
286 backend := NewMemoryBackend(100)
287 broker := NewBroker(backend)
288 ctx, cancel := context.WithCancel(context.Background())
289 defer cancel()
290 broker.Start(ctx)
291 var cfgPtr atomic.Pointer[Configuration]
292 handler := NewServer(broker, &cfgPtr)
293
294 w := &bareResponseWriter{headers: make(http.Header)}
295 req := httptest.NewRequest("GET", "/test/topic", nil)
296 req.Header.Set("Accept", "text/event-stream")
297
298 handler.ServeHTTP(w, req)
299
300 if w.code != http.StatusInternalServerError {
301 t.Errorf("expected 500, got %d", w.code)
302 }
303}
304
305func TestServer_postInheritsVerificationFromParent(t *testing.T) {
306 cfg := &Configuration{
307 Paths: map[string]PathConfiguration{
308 "github.com": {
309 Verify: "hmac-sha256",
310 Secret: "parent-secret",
311 SignatureHeader: "X-Hub-Signature-256",
312 },
313 },
314 }
315 ts, _, cancel := newTestServer(cfg)
316 defer cancel()
317 defer ts.Close()
318
319 body := `{"action":"push"}`
320 mac := hmac.New(sha256.New, []byte("parent-secret"))
321 mac.Write([]byte(body))
322 sig := "sha256=" + hex.EncodeToString(mac.Sum(nil))
323
324 req, _ := http.NewRequest("POST", ts.URL+"/github.com/org/repo", strings.NewReader(body))
325 req.Header.Set("Content-Type", "application/json")
326 req.Header.Set("X-Hub-Signature-256", sig)
327 resp, err := http.DefaultClient.Do(req)
328 if err != nil {
329 t.Fatalf("POST failed: %v", err)
330 }
331 defer resp.Body.Close()
332 if resp.StatusCode != http.StatusAccepted {
333 t.Errorf("expected 202 with valid signature, got %d", resp.StatusCode)
334 }
335
336 req, _ = http.NewRequest("POST", ts.URL+"/github.com/org/repo", strings.NewReader(body))
337 req.Header.Set("Content-Type", "application/json")
338 req.Header.Set("X-Hub-Signature-256", "sha256=deadbeef")
339 resp, err = http.DefaultClient.Do(req)
340 if err != nil {
341 t.Fatalf("POST failed: %v", err)
342 }
343 defer resp.Body.Close()
344 if resp.StatusCode != http.StatusForbidden {
345 t.Errorf("expected 403 with invalid signature, got %d", resp.StatusCode)
346 }
347}
348
349type failingBackend struct{ MemoryBackend }
350
351func (f *failingBackend) Publish(*Event) error {
352 return fmt.Errorf("backend unavailable")
353}
354
355func TestServer_postPublishError(t *testing.T) {
356 backend := &failingBackend{MemoryBackend: *NewMemoryBackend(100)}
357 broker := NewBroker(backend)
358 ctx, cancel := context.WithCancel(context.Background())
359 defer cancel()
360 broker.Start(ctx)
361 var cfgPtr atomic.Pointer[Configuration]
362 handler := NewServer(broker, &cfgPtr)
363 ts := httptest.NewServer(handler)
364 defer ts.Close()
365
366 resp, err := http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{}`))
367 if err != nil {
368 t.Fatalf("POST failed: %v", err)
369 }
370 defer resp.Body.Close()
371 if resp.StatusCode != http.StatusInternalServerError {
372 t.Errorf("expected 500, got %d", resp.StatusCode)
373 }
374}
375
376func TestServer_postWithMissingSignatureOnSecuredPath(t *testing.T) {
377 cfg := &Configuration{
378 Paths: map[string]PathConfiguration{
379 "secure/path": {
380 Verify: "hmac-sha256",
381 Secret: "test-secret",
382 SignatureHeader: "X-Hub-Signature-256",
383 },
384 },
385 }
386 ts, _, cancel := newTestServer(cfg)
387 defer cancel()
388 defer ts.Close()
389
390 resp, err := http.Post(ts.URL+"/secure/path", "application/json", strings.NewReader(`{}`))
391 if err != nil {
392 t.Fatalf("POST failed: %v", err)
393 }
394 defer resp.Body.Close()
395 if resp.StatusCode != http.StatusForbidden {
396 t.Errorf("expected 403, got %d", resp.StatusCode)
397 }
398}