Webhook-to-SSE gateway with hierarchical topic routing and signature verification
at main 98 lines 2.4 kB view raw
1package main 2 3import ( 4 "context" 5 "flag" 6 "fmt" 7 "log" 8 "net/http" 9 "os" 10 "os/signal" 11 "sync/atomic" 12 "syscall" 13) 14 15func main() { 16 address := flag.String("address", ":8080", "listen address") 17 configPath := flag.String("configuration", "", "path to configuration file") 18 bufferSize := flag.Int("buffer-size", 1000, "event replay buffer size") 19 backendType := flag.String("backend", "memory", "pub/sub backend: memory or redis") 20 redisURL := flag.String("redis-url", "redis://localhost:6379", "Redis connection URL (when backend=redis)") 21 flag.Parse() 22 23 var cfgPtr atomic.Pointer[Configuration] 24 if *configPath != "" { 25 cfg, err := LoadConfiguration(*configPath) 26 if err != nil { 27 log.Fatalf("loading configuration: %v", err) 28 } 29 cfgPtr.Store(cfg) 30 log.Printf("loaded configuration from %s", *configPath) 31 32 stop, err := WatchConfiguration(*configPath, func(cfg *Configuration) { 33 cfgPtr.Store(cfg) 34 log.Printf("reloaded configuration from %s", *configPath) 35 }) 36 if err != nil { 37 log.Fatalf("watching configuration: %v", err) 38 } 39 defer stop() 40 } 41 42 var backend Backend 43 switch *backendType { 44 case "memory": 45 backend = NewMemoryBackend(*bufferSize) 46 case "redis": 47 var err error 48 backend, err = NewRedisBackend(*redisURL, *bufferSize) 49 if err != nil { 50 log.Fatalf("connecting to redis: %v", err) 51 } 52 default: 53 log.Fatalf("unknown backend: %s", *backendType) 54 } 55 defer backend.Close() 56 57 ctx, cancel := context.WithCancel(context.Background()) 58 defer cancel() 59 60 broker := NewBroker(backend) 61 broker.Start(ctx) 62 handler := NewServer(broker, &cfgPtr) 63 64 server := &http.Server{ 65 Addr: *address, 66 Handler: handler, 67 } 68 69 go func() { 70 sigs := make(chan os.Signal, 1) 71 signal.Notify(sigs, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) 72 for sig := range sigs { 73 switch sig { 74 case syscall.SIGHUP: 75 if *configPath == "" { 76 continue 77 } 78 cfg, err := LoadConfiguration(*configPath) 79 if err != nil { 80 log.Printf("reloading configuration: %v", err) 81 continue 82 } 83 cfgPtr.Store(cfg) 84 log.Printf("reloaded configuration from %s", *configPath) 85 case syscall.SIGINT, syscall.SIGTERM: 86 log.Printf("shutting down") 87 cancel() 88 server.Shutdown(context.Background()) 89 return 90 } 91 } 92 }() 93 94 fmt.Fprintf(os.Stderr, "wicket listening on %s\n", *address) 95 if err := server.ListenAndServe(); err != http.ErrServerClosed { 96 log.Fatalf("server error: %v", err) 97 } 98}