Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "context"
5 "encoding/base64"
6 "encoding/json"
7 "net/http"
8 "net/http/httptest"
9 "strings"
10 "testing"
11 "time"
12)
13
14func postAndReceive(t *testing.T, ts *httptest.Server, broker *Broker, contentType string, body string) *Event {
15 t.Helper()
16 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
17 defer cancel()
18
19 ch, unsub := broker.Subscribe("test", "")
20 defer unsub()
21
22 req, _ := http.NewRequestWithContext(ctx, "POST", ts.URL+"/test", strings.NewReader(body))
23 req.Header.Set("Content-Type", contentType)
24 resp, err := http.DefaultClient.Do(req)
25 if err != nil {
26 t.Fatalf("POST failed: %v", err)
27 }
28 defer resp.Body.Close()
29 if resp.StatusCode != http.StatusAccepted {
30 t.Fatalf("expected 202, got %d", resp.StatusCode)
31 }
32
33 select {
34 case event := <-ch:
35 return event
36 case <-ctx.Done():
37 t.Fatal("timed out waiting for event")
38 return nil
39 }
40}
41
42func TestServer_postFormDataStoredAsText(t *testing.T) {
43 ts, broker, cancel := newTestServer(nil)
44 defer cancel()
45 defer ts.Close()
46
47 event := postAndReceive(t, ts, broker, "application/x-www-form-urlencoded", "foo=bar&baz=qux")
48
49 s, ok := event.Payload.(string)
50 if !ok {
51 t.Fatalf("expected string payload, got %T", event.Payload)
52 }
53 if s != "foo=bar&baz=qux" {
54 t.Errorf("expected raw form data, got %s", s)
55 }
56}
57
58func TestServer_postPlainTextStoredAsText(t *testing.T) {
59 ts, broker, cancel := newTestServer(nil)
60 defer cancel()
61 defer ts.Close()
62
63 event := postAndReceive(t, ts, broker, "text/plain", "hello world")
64
65 s, ok := event.Payload.(string)
66 if !ok {
67 t.Fatalf("expected string payload, got %T", event.Payload)
68 }
69 if s != "hello world" {
70 t.Errorf("expected raw text, got %s", s)
71 }
72}
73
74func TestServer_postBinaryBase64Encoded(t *testing.T) {
75 ts, broker, cancel := newTestServer(nil)
76 defer cancel()
77 defer ts.Close()
78
79 event := postAndReceive(t, ts, broker, "application/octet-stream", "\x00\x01\x02\x03")
80
81 s, ok := event.Payload.(string)
82 if !ok {
83 t.Fatalf("expected string payload, got %T", event.Payload)
84 }
85 expected := base64.StdEncoding.EncodeToString([]byte("\x00\x01\x02\x03"))
86 if s != expected {
87 t.Errorf("expected base64 %s, got %s", expected, s)
88 }
89}
90
91func TestServer_postIncludesMethod(t *testing.T) {
92 ts, broker, cancel := newTestServer(nil)
93 defer cancel()
94 defer ts.Close()
95
96 event := postAndReceive(t, ts, broker, "application/json", `{"ok":true}`)
97
98 if event.Method != "POST" {
99 t.Errorf("expected method POST, got %s", event.Method)
100 }
101}
102
103func TestServer_postMalformedContentTypeFallsBackToBase64(t *testing.T) {
104 ts, broker, cancel := newTestServer(nil)
105 defer cancel()
106 defer ts.Close()
107
108 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
109 defer cancel()
110
111 ch, unsub := broker.Subscribe("test", "")
112 defer unsub()
113
114 req, _ := http.NewRequestWithContext(ctx, "POST", ts.URL+"/test", strings.NewReader("some data"))
115 req.Header.Set("Content-Type", ";;;malformed")
116 resp, err := http.DefaultClient.Do(req)
117 if err != nil {
118 t.Fatalf("POST failed: %v", err)
119 }
120 defer resp.Body.Close()
121 if resp.StatusCode != http.StatusAccepted {
122 t.Fatalf("expected 202, got %d", resp.StatusCode)
123 }
124
125 select {
126 case event := <-ch:
127 s, ok := event.Payload.(string)
128 if !ok {
129 t.Fatalf("expected string payload, got %T", event.Payload)
130 }
131 expected := base64.StdEncoding.EncodeToString([]byte("some data"))
132 if s != expected {
133 t.Errorf("expected base64 %s, got %s", expected, s)
134 }
135 case <-ctx.Done():
136 t.Fatal("timed out waiting for event")
137 }
138}
139
140func TestIsTextContent(t *testing.T) {
141 tests := []struct {
142 name string
143 mediaType string
144 params map[string]string
145 want bool
146 }{
147 {"text/plain", "text/plain", nil, true},
148 {"text/html", "text/html", nil, true},
149 {"text/xml", "text/xml", nil, true},
150 {"form urlencoded", "application/x-www-form-urlencoded", nil, true},
151 {"application/xml", "application/xml", nil, true},
152 {"application/xhtml+xml", "application/xhtml+xml", nil, true},
153 {"charset param on non-text", "application/octet-stream", map[string]string{"charset": "utf-8"}, true},
154 {"application/json", "application/json", nil, false},
155 {"application/octet-stream", "application/octet-stream", nil, false},
156 {"image/png", "image/png", nil, false},
157 }
158 for _, tt := range tests {
159 t.Run(tt.name, func(t *testing.T) {
160 got := isTextContent(tt.mediaType, tt.params)
161 if got != tt.want {
162 t.Errorf("isTextContent(%q, %v) = %v, want %v", tt.mediaType, tt.params, got, tt.want)
163 }
164 })
165 }
166}
167
168func TestServer_postJSONViaSSERoundTrip(t *testing.T) {
169 ts, _, cancel := newTestServer(nil)
170 defer cancel()
171 defer ts.Close()
172
173 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
174 defer cancel()
175
176 events := sseSubscribe(ctx, ts.URL+"/test", nil)
177 time.Sleep(50 * time.Millisecond)
178
179 http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{"key":"value"}`))
180
181 select {
182 case event := <-events:
183 if event.Method != "POST" {
184 t.Errorf("expected method POST in SSE event, got %s", event.Method)
185 }
186 data, _ := json.Marshal(event.Payload)
187 var m map[string]string
188 json.Unmarshal(data, &m)
189 if m["key"] != "value" {
190 t.Errorf("expected key=value, got %v", m)
191 }
192 case <-ctx.Done():
193 t.Fatal("timed out waiting for SSE event")
194 }
195}