auto-reconnecting jetstream proxy

use / as health check for upstream conn

+27 -2
+4
README.md
··· 7 NOTES: 8 - this should run as close to your infrastructure as possible. you then 9 would connect to `ws://localhost:6969/subscribe` 10 11 ``` 12 go build
··· 7 NOTES: 8 - this should run as close to your infrastructure as possible. you then 9 would connect to `ws://localhost:6969/subscribe` 10 + - no cursor support. since there will be multiple jetstream upstreams that 11 + run at separate cursor timelines, it would be pretty hard to rewrite cursors 12 + in such a way that everything works. if your application relies on cursors, 13 + then it's probably best for your application to deal with multi-upstream support. 14 15 ``` 16 go build
+23 -2
main.go
··· 7 "os" 8 "strings" 9 "sync" 10 "time" 11 12 "github.com/gorilla/websocket" ··· 26 type Broadcaster struct { 27 listeners []chan []byte 28 mu sync.Mutex 29 } 30 31 // Subscribe returns a new channel that will receive Jetstream events ··· 84 } 85 86 start := time.Now() 87 resp, err := client.Get(url) 88 if err != nil { 89 return 0, err ··· 99 }, 100 } 101 102 // handleSubscribe upgrades HTTP connection to websocket and streams events 103 func handleSubscribe(broadcaster *Broadcaster) http.HandlerFunc { 104 return func(w http.ResponseWriter, r *http.Request) { ··· 130 131 // connectToUpstream maintains a connection to the upstream websocket and broadcasts messages 132 func connectToUpstream(pool []string, broadcaster *Broadcaster) { 133 - backoff := time.Second 134 - maxBackoff := time.Minute 135 var currentUpstream string 136 137 for { ··· 157 conn, _, err := websocket.DefaultDialer.Dial(currentUpstream+"/subscribe", nil) 158 if err != nil { 159 slog.Error("Failed to connect to upstream", slog.String("url", currentUpstream), slog.Any("error", err)) 160 time.Sleep(backoff) 161 backoff *= 2 162 if backoff > maxBackoff { ··· 166 } 167 168 slog.Info("Connected to upstream", slog.String("url", currentUpstream)) 169 backoff = time.Second // Reset backoff on successful connection 170 171 // Read messages from upstream and broadcast them ··· 173 messageType, message, err := conn.ReadMessage() 174 if err != nil { 175 slog.Error("Error reading from upstream", slog.Any("error", err)) 176 conn.Close() 177 break 178 } ··· 271 go connectToUpstream(pool, broadcaster) 272 273 // Setup HTTP server 274 http.HandleFunc("/subscribe", handleSubscribe(broadcaster)) 275 276 slog.Info("Starting proxy server", slog.String("bind", bindAddr))
··· 7 "os" 8 "strings" 9 "sync" 10 + "sync/atomic" 11 "time" 12 13 "github.com/gorilla/websocket" ··· 27 type Broadcaster struct { 28 listeners []chan []byte 29 mu sync.Mutex 30 + connected atomic.Bool 31 } 32 33 // Subscribe returns a new channel that will receive Jetstream events ··· 86 } 87 88 start := time.Now() 89 + // jetstream instances return the "Welcome to jetstream!" banner on / which 90 + // should be useful enough for latency 91 resp, err := client.Get(url) 92 if err != nil { 93 return 0, err ··· 103 }, 104 } 105 106 + // handleHealth returns 200 if connected to upstream 107 + func handleHealth(broadcaster *Broadcaster) http.HandlerFunc { 108 + return func(w http.ResponseWriter, r *http.Request) { 109 + if broadcaster.connected.Load() { 110 + w.WriteHeader(http.StatusOK) 111 + w.Write([]byte("Welcome to jetstream!")) 112 + } else { 113 + w.WriteHeader(http.StatusServiceUnavailable) 114 + w.Write([]byte("Not connected to upstream")) 115 + } 116 + } 117 + } 118 + 119 // handleSubscribe upgrades HTTP connection to websocket and streams events 120 func handleSubscribe(broadcaster *Broadcaster) http.HandlerFunc { 121 return func(w http.ResponseWriter, r *http.Request) { ··· 147 148 // connectToUpstream maintains a connection to the upstream websocket and broadcasts messages 149 func connectToUpstream(pool []string, broadcaster *Broadcaster) { 150 + backoff := 50 * time.Millisecond 151 + maxBackoff := 20 * time.Second 152 var currentUpstream string 153 154 for { ··· 174 conn, _, err := websocket.DefaultDialer.Dial(currentUpstream+"/subscribe", nil) 175 if err != nil { 176 slog.Error("Failed to connect to upstream", slog.String("url", currentUpstream), slog.Any("error", err)) 177 + broadcaster.connected.Store(false) 178 time.Sleep(backoff) 179 backoff *= 2 180 if backoff > maxBackoff { ··· 184 } 185 186 slog.Info("Connected to upstream", slog.String("url", currentUpstream)) 187 + broadcaster.connected.Store(true) 188 backoff = time.Second // Reset backoff on successful connection 189 190 // Read messages from upstream and broadcast them ··· 192 messageType, message, err := conn.ReadMessage() 193 if err != nil { 194 slog.Error("Error reading from upstream", slog.Any("error", err)) 195 + broadcaster.connected.Store(false) 196 conn.Close() 197 break 198 } ··· 291 go connectToUpstream(pool, broadcaster) 292 293 // Setup HTTP server 294 + http.HandleFunc("/", handleHealth(broadcaster)) 295 http.HandleFunc("/subscribe", handleSubscribe(broadcaster)) 296 297 slog.Info("Starting proxy server", slog.String("bind", bindAddr))