Webhook-to-SSE gateway with hierarchical topic routing and signature verification
at main 195 lines 5.3 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/base64" 6 "encoding/json" 7 "net/http" 8 "net/http/httptest" 9 "strings" 10 "testing" 11 "time" 12) 13 14func postAndReceive(t *testing.T, ts *httptest.Server, broker *Broker, contentType string, body string) *Event { 15 t.Helper() 16 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 17 defer cancel() 18 19 ch, unsub := broker.Subscribe("test", "") 20 defer unsub() 21 22 req, _ := http.NewRequestWithContext(ctx, "POST", ts.URL+"/test", strings.NewReader(body)) 23 req.Header.Set("Content-Type", contentType) 24 resp, err := http.DefaultClient.Do(req) 25 if err != nil { 26 t.Fatalf("POST failed: %v", err) 27 } 28 defer resp.Body.Close() 29 if resp.StatusCode != http.StatusAccepted { 30 t.Fatalf("expected 202, got %d", resp.StatusCode) 31 } 32 33 select { 34 case event := <-ch: 35 return event 36 case <-ctx.Done(): 37 t.Fatal("timed out waiting for event") 38 return nil 39 } 40} 41 42func TestServer_postFormDataStoredAsText(t *testing.T) { 43 ts, broker, cancel := newTestServer(nil) 44 defer cancel() 45 defer ts.Close() 46 47 event := postAndReceive(t, ts, broker, "application/x-www-form-urlencoded", "foo=bar&baz=qux") 48 49 s, ok := event.Payload.(string) 50 if !ok { 51 t.Fatalf("expected string payload, got %T", event.Payload) 52 } 53 if s != "foo=bar&baz=qux" { 54 t.Errorf("expected raw form data, got %s", s) 55 } 56} 57 58func TestServer_postPlainTextStoredAsText(t *testing.T) { 59 ts, broker, cancel := newTestServer(nil) 60 defer cancel() 61 defer ts.Close() 62 63 event := postAndReceive(t, ts, broker, "text/plain", "hello world") 64 65 s, ok := event.Payload.(string) 66 if !ok { 67 t.Fatalf("expected string payload, got %T", event.Payload) 68 } 69 if s != "hello world" { 70 t.Errorf("expected raw text, got %s", s) 71 } 72} 73 74func TestServer_postBinaryBase64Encoded(t *testing.T) { 75 ts, broker, cancel := newTestServer(nil) 76 defer cancel() 77 defer ts.Close() 78 79 event := postAndReceive(t, ts, broker, "application/octet-stream", "\x00\x01\x02\x03") 80 81 s, ok := event.Payload.(string) 82 if !ok { 83 t.Fatalf("expected string payload, got %T", event.Payload) 84 } 85 expected := base64.StdEncoding.EncodeToString([]byte("\x00\x01\x02\x03")) 86 if s != expected { 87 t.Errorf("expected base64 %s, got %s", expected, s) 88 } 89} 90 91func TestServer_postIncludesMethod(t *testing.T) { 92 ts, broker, cancel := newTestServer(nil) 93 defer cancel() 94 defer ts.Close() 95 96 event := postAndReceive(t, ts, broker, "application/json", `{"ok":true}`) 97 98 if event.Method != "POST" { 99 t.Errorf("expected method POST, got %s", event.Method) 100 } 101} 102 103func TestServer_postMalformedContentTypeFallsBackToBase64(t *testing.T) { 104 ts, broker, cancel := newTestServer(nil) 105 defer cancel() 106 defer ts.Close() 107 108 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 109 defer cancel() 110 111 ch, unsub := broker.Subscribe("test", "") 112 defer unsub() 113 114 req, _ := http.NewRequestWithContext(ctx, "POST", ts.URL+"/test", strings.NewReader("some data")) 115 req.Header.Set("Content-Type", ";;;malformed") 116 resp, err := http.DefaultClient.Do(req) 117 if err != nil { 118 t.Fatalf("POST failed: %v", err) 119 } 120 defer resp.Body.Close() 121 if resp.StatusCode != http.StatusAccepted { 122 t.Fatalf("expected 202, got %d", resp.StatusCode) 123 } 124 125 select { 126 case event := <-ch: 127 s, ok := event.Payload.(string) 128 if !ok { 129 t.Fatalf("expected string payload, got %T", event.Payload) 130 } 131 expected := base64.StdEncoding.EncodeToString([]byte("some data")) 132 if s != expected { 133 t.Errorf("expected base64 %s, got %s", expected, s) 134 } 135 case <-ctx.Done(): 136 t.Fatal("timed out waiting for event") 137 } 138} 139 140func TestIsTextContent(t *testing.T) { 141 tests := []struct { 142 name string 143 mediaType string 144 params map[string]string 145 want bool 146 }{ 147 {"text/plain", "text/plain", nil, true}, 148 {"text/html", "text/html", nil, true}, 149 {"text/xml", "text/xml", nil, true}, 150 {"form urlencoded", "application/x-www-form-urlencoded", nil, true}, 151 {"application/xml", "application/xml", nil, true}, 152 {"application/xhtml+xml", "application/xhtml+xml", nil, true}, 153 {"charset param on non-text", "application/octet-stream", map[string]string{"charset": "utf-8"}, true}, 154 {"application/json", "application/json", nil, false}, 155 {"application/octet-stream", "application/octet-stream", nil, false}, 156 {"image/png", "image/png", nil, false}, 157 } 158 for _, tt := range tests { 159 t.Run(tt.name, func(t *testing.T) { 160 got := isTextContent(tt.mediaType, tt.params) 161 if got != tt.want { 162 t.Errorf("isTextContent(%q, %v) = %v, want %v", tt.mediaType, tt.params, got, tt.want) 163 } 164 }) 165 } 166} 167 168func TestServer_postJSONViaSSERoundTrip(t *testing.T) { 169 ts, _, cancel := newTestServer(nil) 170 defer cancel() 171 defer ts.Close() 172 173 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 174 defer cancel() 175 176 events := sseSubscribe(ctx, ts.URL+"/test", nil) 177 time.Sleep(50 * time.Millisecond) 178 179 http.Post(ts.URL+"/test", "application/json", strings.NewReader(`{"key":"value"}`)) 180 181 select { 182 case event := <-events: 183 if event.Method != "POST" { 184 t.Errorf("expected method POST in SSE event, got %s", event.Method) 185 } 186 data, _ := json.Marshal(event.Payload) 187 var m map[string]string 188 json.Unmarshal(data, &m) 189 if m["key"] != "value" { 190 t.Errorf("expected key=value, got %v", m) 191 } 192 case <-ctx.Done(): 193 t.Fatal("timed out waiting for SSE event") 194 } 195}