package main import ( "context" "encoding/json" "fmt" "log/slog" "net/http" "os" "strings" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" ) var DEFAULT_POOL = []string{ "wss://jetstream1.us-east.bsky.network", "wss://jetstream2.us-east.bsky.network", "wss://jetstream1.us-west.bsky.network", "wss://jetstream2.us-west.bsky.network", "wss://jetstream.fire.hose.cam", "wss://jetstream2.fr.hose.cam", // want yours here? contact me } // Event represents a Jetstream event type Event struct { Did string `json:"did"` TimeUS int64 `json:"time_us"` Kind string `json:"kind,omitempty"` Commit *Commit `json:"commit,omitempty"` } // Commit represents a commit event type Commit struct { Rev string `json:"rev,omitempty"` Operation string `json:"operation,omitempty"` Collection string `json:"collection,omitempty"` RKey string `json:"rkey,omitempty"` Record json.RawMessage `json:"record,omitempty"` CID string `json:"cid,omitempty"` } // Message wraps a Jetstream event with both parsed and raw forms type Message struct { Event *Event Raw []byte } // Broadcaster manages subscribers to Jetstream events type Broadcaster struct { listeners []chan *Message mu sync.Mutex connected atomic.Bool lastMessageTime atomic.Int64 // Unix timestamp in seconds } // Subscribe returns a new channel that will receive Jetstream events func (b *Broadcaster) Subscribe() chan *Message { b.mu.Lock() defer b.mu.Unlock() // firehose can be more-than-1k events per second, // prefer to create a large buffer for the subscribers ch := make(chan *Message, 10000) b.listeners = append(b.listeners, ch) return ch } func (b *Broadcaster) Unsubscribe(ch chan *Message) { b.mu.Lock() defer b.mu.Unlock() for i, listener := range b.listeners { if listener == ch { b.listeners = append(b.listeners[:i], b.listeners[i+1:]...) close(ch) break } } } func (b *Broadcaster) Broadcast(rawMessage []byte) { b.lastMessageTime.Store(time.Now().Unix()) // Parse the event once var event Event if err := json.Unmarshal(rawMessage, &event); err != nil { slog.Debug("Failed to parse event", slog.Any("error", err)) // Broadcast anyway with nil event } msg := &Message{ Event: &event, Raw: rawMessage, } b.mu.Lock() defer b.mu.Unlock() for _, ch := range b.listeners { select { case ch <- msg: // event sent successfully. we don't want to block default: // channel full, skip to avoid blocking slog.Warn("jetstream broadcast: channel full, dropping event") } } } type latencyResult struct { url string latency time.Duration err error } func measureLatency(url string) (time.Duration, error) { url = strings.Replace(url, "wss://", "https://", 1) // also support non-tls upstreams url = strings.Replace(url, "ws://", "http://", 1) client := &http.Client{ Timeout: 20 * time.Second, } start := time.Now() // jetstream instances return the "Welcome to jetstream!" banner on / which // should be useful enough for latency resp, err := client.Get(url) if err != nil { return 0, err } defer resp.Body.Close() return time.Since(start), nil } var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true // Allow all origins }, } // handleHealth returns 200 if connected to upstream func handleHealth(broadcaster *Broadcaster) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if broadcaster.connected.Load() { w.WriteHeader(http.StatusOK) w.Write([]byte("Welcome to jetstream!")) } else { w.WriteHeader(http.StatusServiceUnavailable) w.Write([]byte("Not connected to upstream")) } } } // matchesCollection checks if an event matches any of the wanted collections func matchesCollection(event *Event, wantedCollections []string) bool { // Always pass through account and identity events if event.Kind == "account" || event.Kind == "identity" { return true } // If no wanted collections specified, pass everything if len(wantedCollections) == 0 { return true } // For commit events, check the collection if event.Commit == nil { return false } collection := event.Commit.Collection for _, wanted := range wantedCollections { // Support wildcard matching like "app.bsky.graph.*" if strings.HasSuffix(wanted, ".*") { prefix := strings.TrimSuffix(wanted, ".*") if strings.HasPrefix(collection, prefix+".") || collection == prefix { return true } } else if collection == wanted { return true } } return false } // handleSubscribe upgrades HTTP connection to websocket and streams events func handleSubscribe(broadcaster *Broadcaster) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { slog.Error("Failed to upgrade connection", slog.Any("error", err)) return } defer conn.Close() // Parse wantedCollections from query params wantedCollections := r.URL.Query()["wantedCollections"] if len(wantedCollections) > 100 { slog.Warn("Client requested too many collections, limiting to 100", slog.Int("requested", len(wantedCollections))) wantedCollections = wantedCollections[:100] } // Subscribe to broadcaster ch := broadcaster.Subscribe() defer broadcaster.Unsubscribe(ch) if len(wantedCollections) > 0 { slog.Info("Client connected", slog.String("remote", r.RemoteAddr), slog.Any("wantedCollections", wantedCollections)) } else { slog.Info("Client connected", slog.String("remote", r.RemoteAddr)) } // Stream events to client for msg := range ch { // If filtering is enabled, check the event if len(wantedCollections) > 0 && msg.Event != nil { // Check if event matches wanted collections if !matchesCollection(msg.Event, wantedCollections) { continue } } err := conn.WriteMessage(websocket.TextMessage, msg.Raw) if err != nil { slog.Debug("Client disconnected", slog.String("remote", r.RemoteAddr), slog.Any("error", err)) break } } slog.Info("Client disconnected", slog.String("remote", r.RemoteAddr)) } } // raceUpstreams connects to all upstreams simultaneously and returns the first one to deliver a message func raceUpstreams(pool []string) (string, error) { slog.Info("Racing upstreams to find fastest message delivery") type result struct { url string duration time.Duration } results := make(chan result, len(pool)) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() for _, url := range pool { go func(u string) { start := time.Now() conn, _, err := websocket.DefaultDialer.DialContext(ctx, u+"/subscribe", nil) if err != nil { slog.Debug("Failed to connect during race", slog.String("url", u), slog.Any("error", err)) return } defer conn.Close() // Wait for first message _, _, err = conn.ReadMessage() if err != nil { slog.Debug("Failed to read message during race", slog.String("url", u), slog.Any("error", err)) return } duration := time.Since(start) select { case results <- result{url: u, duration: duration}: case <-ctx.Done(): } }(url) } select { case res := <-results: slog.Info("Race winner", slog.String("url", res.url), slog.Duration("time_to_first_message", res.duration)) return res.url, nil case <-ctx.Done(): return "", fmt.Errorf("no upstream delivered a message within timeout") } } // watchdog monitors message activity and triggers reconnection if stalled // this is useful if bsky relay is down, which means all the jetstreams will be // reachable but not send any messages. we should swap to alternate relay infrastructure // in this case (via raceUpstreams) func watchdog(broadcaster *Broadcaster, trigger chan struct{}) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for range ticker.C { if !broadcaster.connected.Load() { continue } lastMsg := broadcaster.lastMessageTime.Load() if lastMsg == 0 { // No messages received yet continue } timeSinceLastMsg := time.Since(time.Unix(lastMsg, 0)) if timeSinceLastMsg > 20*time.Second { slog.Warn("No messages received", slog.Duration("duration", timeSinceLastMsg)) select { case trigger <- struct{}{}: // Trigger sent default: // Trigger already pending } } } } // connectToUpstream maintains a connection to the upstream websocket and broadcasts messages func connectToUpstream(pool []string, broadcaster *Broadcaster, watchdogTrigger <-chan struct{}) { backoff := 50 * time.Millisecond maxBackoff := 20 * time.Second var currentUpstream string var raceTriggered bool for { if raceTriggered { slog.Info("Watchdog triggered, racing upstreams") // Watchdog triggered - race all upstreams bestUpstream, err := raceUpstreams(pool) if err != nil { slog.Error("Failed to race upstreams", slog.Any("error", err)) time.Sleep(backoff) backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } continue } currentUpstream = bestUpstream backoff = 50 * time.Millisecond // Reset backoff raceTriggered = false } else { // Find best upstream (re-evaluate on each connection attempt) bestUpstream, err := findBestUpstream(pool) if err != nil { slog.Error("Failed to find best upstream", slog.Any("error", err)) time.Sleep(backoff) backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } continue } if bestUpstream != currentUpstream { slog.Info("Switching to new upstream", slog.String("url", bestUpstream)) currentUpstream = bestUpstream } } slog.Info("Connecting to upstream", slog.String("url", currentUpstream)) conn, _, err := websocket.DefaultDialer.Dial(currentUpstream+"/subscribe", nil) if err != nil { slog.Error("Failed to connect to upstream", slog.String("url", currentUpstream), slog.Any("error", err)) broadcaster.connected.Store(false) time.Sleep(backoff) backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } continue } slog.Info("Connected to upstream", slog.String("url", currentUpstream)) broadcaster.connected.Store(true) broadcaster.lastMessageTime.Store(time.Now().Unix()) backoff = 50 * time.Millisecond // Reset backoff on successful connection // Read messages from upstream and broadcast them readDone := make(chan struct{}) go func() { defer close(readDone) for { messageType, message, err := conn.ReadMessage() if err != nil { slog.Error("Error reading from upstream", slog.Any("error", err)) return } // Only broadcast text/binary messages if messageType == websocket.TextMessage || messageType == websocket.BinaryMessage { broadcaster.Broadcast(message) } } }() // Wait for either read error or watchdog trigger select { case <-readDone: // Normal disconnection case <-watchdogTrigger: // Watchdog triggered disconnection slog.Info("Watchdog triggered disconnection") raceTriggered = true conn.Close() <-readDone // Wait for read goroutine to finish } broadcaster.connected.Store(false) if !raceTriggered { // Connection lost, will re-evaluate best upstream on next iteration slog.Info("Connection lost, finding new upstream", slog.Duration("backoff", backoff)) time.Sleep(backoff) backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } } } } func findBestUpstream(pool []string) (string, error) { // Measure latency concurrently results := make(chan latencyResult, len(pool)) var wg sync.WaitGroup for _, url := range pool { wg.Add(1) go func(u string) { defer wg.Done() latency, err := measureLatency(u) results <- latencyResult{url: u, latency: latency, err: err} }(url) } wg.Wait() close(results) // Find the best connection var best latencyResult best.latency = time.Hour // Start with a very high latency slog.Debug("Latency results:") var err error for result := range results { if result.err != nil { slog.Debug("connection error", slog.String("url", result.url), slog.Any("error", result.err)) continue } slog.Debug("latency measured", slog.String("url", result.url), slog.Duration("latency", result.latency)) if result.latency < best.latency { best = result } } if best.err == nil && best.latency < time.Hour { slog.Debug("Best connection", slog.String("url", best.url), slog.Duration("latency", best.latency)) return best.url, nil } else { slog.Debug("No valid connections found") return "", err } } func main() { maybePool := os.Getenv("POOL") pool := DEFAULT_POOL if maybePool != "" { pool = strings.Split(maybePool, ",") } if os.Getenv("DEBUG") == "1" { slog.SetLogLoggerLevel(slog.LevelDebug) } envPort := os.Getenv("PORT") port := envPort if envPort == "" { port = "8096" } envHost := os.Getenv("HOST") host := envHost if envHost == "" { // should be running on the same hardware as your service host = "127.0.0.1" } bindAddr := fmt.Sprintf("%s:%s", host, port) // Create broadcaster and start upstream connection // connectToUpstream will continuously find the best upstream and reconnect on failures broadcaster := &Broadcaster{} watchdogTrigger := make(chan struct{}, 1) go watchdog(broadcaster, watchdogTrigger) go connectToUpstream(pool, broadcaster, watchdogTrigger) // Setup HTTP server http.HandleFunc("/", handleHealth(broadcaster)) http.HandleFunc("/subscribe", handleSubscribe(broadcaster)) slog.Info("Starting proxy server", slog.String("bind", bindAddr)) if err := http.ListenAndServe(bindAddr, nil); err != nil { slog.Error("Server failed", slog.Any("error", err)) panic(err) } // TODO (future) let zlib compression be env'd // TODO: the proxy subscribes to all lexicons, but then filters out at client level. add env var for lex filtering too }