Webhook-to-SSE gateway with hierarchical topic routing and signature verification
1package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "log"
8 "net/http"
9 "os"
10 "os/signal"
11 "sync/atomic"
12 "syscall"
13)
14
15func main() {
16 address := flag.String("address", ":8080", "listen address")
17 configPath := flag.String("configuration", "", "path to configuration file")
18 bufferSize := flag.Int("buffer-size", 1000, "event replay buffer size")
19 backendType := flag.String("backend", "memory", "pub/sub backend: memory or redis")
20 redisURL := flag.String("redis-url", "redis://localhost:6379", "Redis connection URL (when backend=redis)")
21 flag.Parse()
22
23 var cfgPtr atomic.Pointer[Configuration]
24 if *configPath != "" {
25 cfg, err := LoadConfiguration(*configPath)
26 if err != nil {
27 log.Fatalf("loading configuration: %v", err)
28 }
29 cfgPtr.Store(cfg)
30 log.Printf("loaded configuration from %s", *configPath)
31
32 stop, err := WatchConfiguration(*configPath, func(cfg *Configuration) {
33 cfgPtr.Store(cfg)
34 log.Printf("reloaded configuration from %s", *configPath)
35 })
36 if err != nil {
37 log.Fatalf("watching configuration: %v", err)
38 }
39 defer stop()
40 }
41
42 var backend Backend
43 switch *backendType {
44 case "memory":
45 backend = NewMemoryBackend(*bufferSize)
46 case "redis":
47 var err error
48 backend, err = NewRedisBackend(*redisURL, *bufferSize)
49 if err != nil {
50 log.Fatalf("connecting to redis: %v", err)
51 }
52 default:
53 log.Fatalf("unknown backend: %s", *backendType)
54 }
55 defer backend.Close()
56
57 ctx, cancel := context.WithCancel(context.Background())
58 defer cancel()
59
60 broker := NewBroker(backend)
61 broker.Start(ctx)
62 handler := NewServer(broker, &cfgPtr)
63
64 server := &http.Server{
65 Addr: *address,
66 Handler: handler,
67 }
68
69 go func() {
70 sigs := make(chan os.Signal, 1)
71 signal.Notify(sigs, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
72 for sig := range sigs {
73 switch sig {
74 case syscall.SIGHUP:
75 if *configPath == "" {
76 continue
77 }
78 cfg, err := LoadConfiguration(*configPath)
79 if err != nil {
80 log.Printf("reloading configuration: %v", err)
81 continue
82 }
83 cfgPtr.Store(cfg)
84 log.Printf("reloaded configuration from %s", *configPath)
85 case syscall.SIGINT, syscall.SIGTERM:
86 log.Printf("shutting down")
87 cancel()
88 server.Shutdown(context.Background())
89 return
90 }
91 }
92 }()
93
94 fmt.Fprintf(os.Stderr, "wicket listening on %s\n", *address)
95 if err := server.ListenAndServe(); err != http.ErrServerClosed {
96 log.Fatalf("server error: %v", err)
97 }
98}