Webhook-to-SSE gateway with hierarchical topic routing and signature verification

Add fsnotify config hot-reload and idiomatic error handling

WatchConfiguration now uses fsnotify to watch for config file changes
and automatically reload. The SSE handler uses comma-ok for the Flusher
type assertion (returns 500 if unsupported) and checks json.Marshal
errors instead of silently discarding them.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+208 -2
+37
configuration.go
··· 1 1 package main 2 2 3 3 import ( 4 + "log" 4 5 "os" 5 6 "strings" 6 7 8 + "github.com/fsnotify/fsnotify" 7 9 "gopkg.in/yaml.v3" 8 10 ) 9 11 ··· 28 30 return nil, err 29 31 } 30 32 return &cfg, nil 33 + } 34 + 35 + var newWatcher = fsnotify.NewWatcher 36 + 37 + func WatchConfiguration(path string, callback func(*Configuration)) (func(), error) { 38 + watcher, err := newWatcher() 39 + if err != nil { 40 + return nil, err 41 + } 42 + if err := watcher.Add(path); err != nil { 43 + watcher.Close() 44 + return nil, err 45 + } 46 + 47 + done := make(chan struct{}) 48 + go func() { 49 + defer watcher.Close() 50 + for { 51 + select { 52 + case <-done: 53 + return 54 + case event := <-watcher.Events: 55 + if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) { 56 + cfg, err := LoadConfiguration(path) 57 + if err != nil { 58 + log.Printf("reloading configuration: %v", err) 59 + continue 60 + } 61 + callback(cfg) 62 + } 63 + } 64 + } 65 + }() 66 + 67 + return func() { close(done) }, nil 31 68 } 32 69 33 70 func (c *Configuration) LookupSubscribeSecret(path string) string {
+86
configuration_test.go
··· 1 1 package main 2 2 3 3 import ( 4 + "errors" 4 5 "os" 5 6 "path/filepath" 6 7 "testing" 8 + "time" 9 + 10 + "github.com/fsnotify/fsnotify" 7 11 ) 8 12 9 13 func TestLoadConfiguration_valid(t *testing.T) { ··· 128 132 pc = cfg.LookupVerification("github.com/chrisguidry/docketeer/subpath") 129 133 if pc != nil { 130 134 t.Fatal("verification should not inherit from parent") 135 + } 136 + } 137 + 138 + func TestWatchConfiguration_reloadsOnChange(t *testing.T) { 139 + dir := t.TempDir() 140 + path := filepath.Join(dir, "wicket.yaml") 141 + os.WriteFile(path, []byte(` 142 + paths: 143 + test/path: 144 + subscribe_secret: "original" 145 + `), 0644) 146 + 147 + reloaded := make(chan *Configuration, 1) 148 + stop, err := WatchConfiguration(path, func(cfg *Configuration) { 149 + reloaded <- cfg 150 + }) 151 + if err != nil { 152 + t.Fatalf("unexpected error: %v", err) 153 + } 154 + defer stop() 155 + 156 + os.WriteFile(path, []byte(` 157 + paths: 158 + test/path: 159 + subscribe_secret: "updated" 160 + `), 0644) 161 + 162 + select { 163 + case cfg := <-reloaded: 164 + secret := cfg.LookupSubscribeSecret("test/path") 165 + if secret != "updated" { 166 + t.Errorf("expected updated secret, got %s", secret) 167 + } 168 + case <-time.After(2 * time.Second): 169 + t.Fatal("timed out waiting for config reload") 170 + } 171 + } 172 + 173 + func TestWatchConfiguration_ignoresInvalidYAML(t *testing.T) { 174 + dir := t.TempDir() 175 + path := filepath.Join(dir, "wicket.yaml") 176 + os.WriteFile(path, []byte(` 177 + paths: 178 + test/path: 179 + subscribe_secret: "original" 180 + `), 0644) 181 + 182 + reloaded := make(chan *Configuration, 1) 183 + stop, err := WatchConfiguration(path, func(cfg *Configuration) { 184 + reloaded <- cfg 185 + }) 186 + if err != nil { 187 + t.Fatalf("unexpected error: %v", err) 188 + } 189 + defer stop() 190 + 191 + os.WriteFile(path, []byte("not: valid: yaml: [[["), 0644) 192 + 193 + select { 194 + case <-reloaded: 195 + t.Fatal("callback should not be called for invalid YAML") 196 + case <-time.After(500 * time.Millisecond): 197 + } 198 + } 199 + 200 + func TestWatchConfiguration_errorOnMissingFile(t *testing.T) { 201 + _, err := WatchConfiguration("/nonexistent/wicket.yaml", func(cfg *Configuration) {}) 202 + if err == nil { 203 + t.Fatal("expected error watching nonexistent file") 204 + } 205 + } 206 + 207 + func TestWatchConfiguration_errorCreatingWatcher(t *testing.T) { 208 + original := newWatcher 209 + newWatcher = func() (*fsnotify.Watcher, error) { 210 + return nil, errors.New("simulated watcher error") 211 + } 212 + defer func() { newWatcher = original }() 213 + 214 + _, err := WatchConfiguration("/any/path", func(cfg *Configuration) {}) 215 + if err == nil { 216 + t.Fatal("expected error when watcher creation fails") 131 217 } 132 218 } 133 219
+3
go.mod
··· 3 3 go 1.24 4 4 5 5 require ( 6 + github.com/fsnotify/fsnotify v1.9.0 6 7 github.com/google/uuid v1.6.0 7 8 gopkg.in/yaml.v3 v3.0.1 8 9 ) 10 + 11 + require golang.org/x/sys v0.13.0 // indirect
+4
go.sum
··· 1 + github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= 2 + github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= 1 3 github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= 2 4 github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= 5 + golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= 6 + golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 3 7 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= 4 8 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 5 9 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+9
main.go
··· 26 26 } 27 27 cfgPtr.Store(cfg) 28 28 log.Printf("loaded configuration from %s", *configPath) 29 + 30 + stop, err := WatchConfiguration(*configPath, func(cfg *Configuration) { 31 + cfgPtr.Store(cfg) 32 + log.Printf("reloaded configuration from %s", *configPath) 33 + }) 34 + if err != nil { 35 + log.Fatalf("watching configuration: %v", err) 36 + } 37 + defer stop() 29 38 } 30 39 31 40 broker := NewBroker(*bufferSize)
+11 -2
server.go
··· 5 5 "encoding/json" 6 6 "fmt" 7 7 "io" 8 + "log" 8 9 "net/http" 9 10 "strings" 10 11 "sync/atomic" ··· 106 107 } 107 108 } 108 109 109 - flusher := w.(http.Flusher) 110 + flusher, ok := w.(http.Flusher) 111 + if !ok { 112 + http.Error(w, "streaming unsupported", http.StatusInternalServerError) 113 + return 114 + } 110 115 111 116 filters := ParseFilters(r.URL.Query()) 112 117 lastEventID := r.Header.Get("Last-Event-ID") ··· 132 137 if !MatchAll(filters, event) { 133 138 continue 134 139 } 135 - data, _ := json.Marshal(event) 140 + data, err := json.Marshal(event) 141 + if err != nil { 142 + log.Printf("marshaling event %s: %v", event.ID, err) 143 + continue 144 + } 136 145 fmt.Fprintf(w, "id: %s\ndata: %s\n\n", event.ID, data) 137 146 flusher.Flush() 138 147 }
+26
server_test.go
··· 236 236 } 237 237 } 238 238 239 + type bareResponseWriter struct { 240 + code int 241 + headers http.Header 242 + body strings.Builder 243 + } 244 + 245 + func (w *bareResponseWriter) Header() http.Header { return w.headers } 246 + func (w *bareResponseWriter) Write(b []byte) (int, error) { return w.body.Write(b) } 247 + func (w *bareResponseWriter) WriteHeader(code int) { w.code = code } 248 + 249 + func TestServer_sseWithoutFlusher(t *testing.T) { 250 + broker := NewBroker(100) 251 + var cfgPtr atomic.Pointer[Configuration] 252 + handler := NewServer(broker, &cfgPtr) 253 + 254 + w := &bareResponseWriter{headers: make(http.Header)} 255 + req := httptest.NewRequest("GET", "/test/topic", nil) 256 + req.Header.Set("Accept", "text/event-stream") 257 + 258 + handler.ServeHTTP(w, req) 259 + 260 + if w.code != http.StatusInternalServerError { 261 + t.Errorf("expected 500, got %d", w.code) 262 + } 263 + } 264 + 239 265 func TestServer_postWithMissingSignatureOnSecuredPath(t *testing.T) { 240 266 cfg := &Configuration{ 241 267 Paths: map[string]PathConfiguration{
+32
sse_test.go
··· 5 5 "bytes" 6 6 "context" 7 7 "encoding/json" 8 + "math" 8 9 "net/http" 9 10 "net/http/httptest" 10 11 "strings" ··· 287 288 } 288 289 case <-time.After(time.Second): 289 290 t.Fatal("timed out waiting for SSE to close") 291 + } 292 + } 293 + 294 + func TestServer_sseSkipsUnmarshalableEvent(t *testing.T) { 295 + ts, broker := newTestServer(nil) 296 + defer ts.Close() 297 + 298 + ctx, cancel := context.WithCancel(context.Background()) 299 + defer cancel() 300 + 301 + events := sseSubscribe(ctx, ts.URL+"/test/topic", nil) 302 + time.Sleep(50 * time.Millisecond) 303 + 304 + broker.Publish(&Event{ 305 + ID: "bad-event", 306 + Path: "test/topic", 307 + Payload: math.NaN(), 308 + }) 309 + broker.Publish(&Event{ 310 + ID: "good-event", 311 + Path: "test/topic", 312 + Payload: map[string]any{"ok": true}, 313 + }) 314 + 315 + select { 316 + case event := <-events: 317 + if event.ID != "good-event" { 318 + t.Errorf("expected good-event, got %s", event.ID) 319 + } 320 + case <-time.After(2 * time.Second): 321 + t.Fatal("timed out waiting for good event after unmarshalable one") 290 322 } 291 323 } 292 324