// Package jetstream provides an ATProto Jetstream consumer for real-time updates. // It connects to the Bluesky Jetstream WebSocket, processes repository events, // indexes manifests and tags, and populates the AppView database for the web UI. package jetstream import ( "context" "database/sql" "encoding/json" "fmt" "log/slog" "net/url" "sync" "time" "atcr.io/pkg/appview/db" "atcr.io/pkg/atproto" "github.com/gorilla/websocket" "github.com/klauspost/compress/zstd" ) // UserCache caches DID -> handle/PDS mappings to avoid repeated lookups type UserCache struct { cache map[string]*db.User } // EventCallback is called for each processed event type EventCallback func(timeUS int64) // Worker consumes Jetstream events and populates the UI database type Worker struct { db *sql.DB jetstreamURL string endpoints *EndpointRotator startCursor int64 wantedCollections []string debugCollectionCount int processor *Processor // Shared processor for DB operations statsCache *StatsCache // In-memory cache for stats aggregation across holds eventCallback EventCallback connStartTime time.Time // Track when connection started for debugging // Ping/pong tracking for connection health pingsSent int64 pongsReceived int64 lastPongTime time.Time pongMutex sync.Mutex // In-memory cursor tracking for reconnects lastCursor int64 cursorMutex sync.RWMutex } // NewWorker creates a new Jetstream worker // startCursor: Unix microseconds timestamp to start from (0 = start from now) func NewWorker(database *sql.DB, urls []string, startCursor int64) *Worker { if len(urls) == 0 { urls = []string{"wss://jetstream2.us-west.bsky.network/subscribe"} } rotator := NewEndpointRotator(urls) // Create shared stats cache for aggregating across holds statsCache := NewStatsCache() return &Worker{ db: database, jetstreamURL: rotator.Current(), endpoints: rotator, startCursor: startCursor, wantedCollections: []string{ "io.atcr.*", // Subscribe to all ATCR collections "app.bsky.actor.profile", // Subscribe to Bluesky profile updates for avatar sync }, statsCache: statsCache, processor: NewProcessor(database, true, statsCache), // Use cache for live streaming } } // Start begins consuming Jetstream events // This is a blocking function that runs until the context is cancelled func (w *Worker) Start(ctx context.Context) error { // Build connection URL with filters u, err := url.Parse(w.jetstreamURL) if err != nil { return fmt.Errorf("invalid jetstream URL: %w", err) } q := u.Query() for _, collection := range w.wantedCollections { q.Add("wantedCollections", collection) } // Add cursor if specified (for backfilling historical data or reconnects) if w.startCursor > 0 { q.Set("cursor", fmt.Sprintf("%d", w.startCursor)) // Calculate lag (cursor is in microseconds) now := time.Now().UnixMicro() lagSeconds := float64(now-w.startCursor) / 1_000_000.0 slog.Info("Jetstream starting from cursor", "cursor", w.startCursor, "lag_seconds", lagSeconds) } // Disable compression for now to debug // q.Set("compress", "true") u.RawQuery = q.Encode() slog.Info("Connecting to Jetstream", "url", u.String()) // Connect to Jetstream conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil) if err != nil { return fmt.Errorf("failed to connect to jetstream: %w", err) } defer conn.Close() // Track connection start time for debugging w.connStartTime = time.Now() // Reset ping/pong counters for this connection w.pongMutex.Lock() w.pingsSent = 0 w.pongsReceived = 0 w.lastPongTime = time.Now() w.pongMutex.Unlock() // Set up pong handler - called when server responds to our ping conn.SetPongHandler(func(appData string) error { w.pongMutex.Lock() w.pongsReceived++ w.lastPongTime = time.Now() w.pongMutex.Unlock() // Reset read deadline - we know connection is alive // Allow 90 seconds for next pong (3x ping interval) return conn.SetReadDeadline(time.Now().Add(90 * time.Second)) }) // Set initial read deadline if err := conn.SetReadDeadline(time.Now().Add(90 * time.Second)); err != nil { return fmt.Errorf("failed to set read deadline: %w", err) } // Create zstd decoder for decompressing messages decoder, err := zstd.NewReader(nil) if err != nil { return fmt.Errorf("failed to create zstd decoder: %w", err) } defer decoder.Close() slog.Info("Connected to Jetstream, listening for events...") // Start heartbeat ticker to show Jetstream is alive heartbeatTicker := time.NewTicker(30 * time.Second) defer heartbeatTicker.Stop() // Start ping ticker for keepalive pingTicker := time.NewTicker(30 * time.Second) defer pingTicker.Stop() // Start ping sender goroutine pingDone := make(chan struct{}) defer close(pingDone) go func() { for { select { case <-ctx.Done(): return case <-pingDone: return case <-pingTicker.C: // Check if we've received a pong recently w.pongMutex.Lock() timeSinceLastPong := time.Since(w.lastPongTime) pingsTotal := w.pingsSent pongsTotal := w.pongsReceived w.pongMutex.Unlock() // If no pong for 60 seconds, connection is likely dead if timeSinceLastPong > 60*time.Second { slog.Info("Jetstream no pong received, closing connection", "time_since_last_pong", timeSinceLastPong, "pings_sent", pingsTotal, "pongs_received", pongsTotal) conn.Close() return } // Send ping with write deadline if err := conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { slog.Warn("Jetstream failed to set write deadline", "error", err) conn.Close() return } if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { slog.Warn("Jetstream failed to send ping", "error", err) conn.Close() return } w.pongMutex.Lock() w.pingsSent++ w.pongMutex.Unlock() } } }() eventCount := 0 lastHeartbeat := time.Now() // Read messages for { select { case <-ctx.Done(): return ctx.Err() case <-heartbeatTicker.C: elapsed := time.Since(lastHeartbeat) slog.Debug("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds()) eventCount = 0 lastHeartbeat = time.Now() default: _, message, err := conn.ReadMessage() if err != nil { // Calculate connection duration and idle time for debugging connDuration := time.Since(w.connStartTime) timeSinceLastEvent := time.Since(lastHeartbeat) // Get ping/pong stats w.pongMutex.Lock() pingsTotal := w.pingsSent pongsTotal := w.pongsReceived timeSinceLastPong := time.Since(w.lastPongTime) w.pongMutex.Unlock() // Calculate ping/pong success rate var pongRate float64 if pingsTotal > 0 { pongRate = float64(pongsTotal) / float64(pingsTotal) * 100 } // Determine diagnosis var diagnosis string if pongRate >= 95 && timeSinceLastPong < 60*time.Second { diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption" } else if timeSinceLastPong > 60*time.Second { diagnosis = "Connection died (no pong for >60s), network issue detected" } else if pongRate < 80 { diagnosis = "Connection unstable (low pong rate), network quality issues" } else { diagnosis = "Connection closed unexpectedly" } // Log detailed context about the failure slog.Info("Jetstream connection closed", "duration", connDuration, "events_in_last_30s", eventCount, "time_since_last_event", timeSinceLastEvent, "pongs_received", pongsTotal, "pings_sent", pingsTotal, "pong_rate_pct", pongRate, "time_since_last_pong", timeSinceLastPong, "error", err, "diagnosis", diagnosis) return fmt.Errorf("failed to read message: %w", err) } // For now, process uncompressed messages // TODO: Re-enable compression once debugging is complete _ = decoder // Keep decoder to avoid unused variable error if err := w.processMessage(message); err != nil { slog.Error("ERROR processing message", "error", err) // Continue processing other messages } else { eventCount++ } } } } // StartWithFailover runs the Jetstream worker with automatic failover across endpoints. // On disconnect it retries the same endpoint with escalating delays (1s, 5s, 10s). // If all retries fail, it fails over to the next endpoint and rewinds the cursor // 30 seconds to avoid missing events (events are idempotent DB upserts). // Cycles through all endpoints indefinitely and never gives up. func (w *Worker) StartWithFailover(ctx context.Context) { retryDelays := []time.Duration{1 * time.Second, 5 * time.Second, 10 * time.Second} for { currentURL := w.endpoints.Current() w.jetstreamURL = currentURL slog.Info("Jetstream connecting", "url", currentURL) err := w.Start(ctx) if ctx.Err() != nil { return // Context cancelled, clean shutdown } // Capture cursor at disconnect time for rewind calculation disconnectCursor := w.GetLastCursor() // Retry same endpoint with escalating delays recovered := false for i, delay := range retryDelays { slog.Warn("Jetstream disconnected, retrying same endpoint", "url", currentURL, "attempt", i+1, "delay", delay, "error", err) time.Sleep(delay) if ctx.Err() != nil { return } w.jetstreamURL = currentURL err = w.Start(ctx) if ctx.Err() != nil { return } if err == nil { recovered = true break } // Update disconnect cursor if we got further if latest := w.GetLastCursor(); latest > disconnectCursor { disconnectCursor = latest } } if recovered { continue } // All retries failed — failover to next endpoint failedURL := currentURL nextURL := w.endpoints.Next() // Rewind cursor 30 seconds (30M microseconds) to avoid gaps if disconnectCursor > 0 { rewound := disconnectCursor - 30_000_000 if rewound < 0 { rewound = 0 } w.cursorMutex.Lock() w.lastCursor = rewound w.startCursor = rewound w.cursorMutex.Unlock() slog.Warn("Jetstream failing over to next endpoint", "failed_url", failedURL, "next_url", nextURL, "cursor_rewound_by", "30s") } else { slog.Warn("Jetstream failing over to next endpoint", "failed_url", failedURL, "next_url", nextURL) } } } // SetEventCallback sets a callback to be called for each event func (w *Worker) SetEventCallback(cb EventCallback) { w.eventCallback = cb } // Processor returns the worker's processor for configuration (e.g., setting webhook dispatcher) func (w *Worker) Processor() *Processor { return w.processor } // GetLastCursor returns the last processed cursor (time_us) for reconnects func (w *Worker) GetLastCursor() int64 { w.cursorMutex.RLock() defer w.cursorMutex.RUnlock() return w.lastCursor } // processMessage processes a single Jetstream event func (w *Worker) processMessage(message []byte) error { var event JetstreamEvent if err := json.Unmarshal(message, &event); err != nil { return fmt.Errorf("failed to unmarshal event: %w", err) } // Update cursor for reconnects (do this first, even if processing fails) w.cursorMutex.Lock() w.lastCursor = event.TimeUS w.cursorMutex.Unlock() // Call callback if set if w.eventCallback != nil { w.eventCallback(event.TimeUS) } // Process based on event kind switch event.Kind { case "commit": commit := event.Commit if commit == nil { return nil } // Set DID on commit from parent event commit.DID = event.DID // Debug: log first few collections we see to understand what's coming through if w.debugCollectionCount < 5 { slog.Debug("Jetstream received collection", "collection", commit.Collection, "did", commit.DID) w.debugCollectionCount++ } // Check if this is a collection we care about if !isRelevantCollection(commit.Collection) { return nil // Ignore irrelevant collections } // Marshal record to bytes for processing var recordBytes []byte if commit.Record != nil { var err error recordBytes, err = json.Marshal(commit.Record) if err != nil { return fmt.Errorf("failed to marshal record: %w", err) } } // Handle Bluesky profile updates separately (for avatar sync) if commit.Collection == BlueskyProfileCollection { // Only process creates/updates, not deletes (we don't clear avatars on profile delete) if commit.Operation == "delete" { return nil } return w.processor.ProcessProfileUpdate(context.Background(), commit.DID, recordBytes) } // Log ATCR events (but not Bluesky profile events - too noisy) slog.Info("Jetstream processing event", "collection", commit.Collection, "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey) isDelete := commit.Operation == "delete" return w.processor.ProcessRecord(context.Background(), commit.DID, commit.Collection, commit.RKey, recordBytes, isDelete, nil) case "identity": if event.Identity == nil { return nil } return w.processIdentity(&event) case "account": if event.Account == nil { return nil } return w.processAccount(&event) default: // Ignore unknown event kinds return nil } } // processIdentity processes an identity event (handle change) func (w *Worker) processIdentity(event *JetstreamEvent) error { if event.Identity == nil { return nil } identity := event.Identity // Process via shared processor (only ATCR users will be logged at Info level) return w.processor.ProcessIdentity(context.Background(), identity.DID, identity.Handle) } // processAccount processes an account event (status change) func (w *Worker) processAccount(event *JetstreamEvent) error { if event.Account == nil { return nil } account := event.Account // Process via shared processor (only ATCR users will be logged at Info level) return w.processor.ProcessAccount(context.Background(), account.DID, account.Active, account.Status) } // BlueskyProfileCollection is the collection for Bluesky actor profiles const BlueskyProfileCollection = "app.bsky.actor.profile" // isRelevantCollection returns true if the collection is one we process func isRelevantCollection(collection string) bool { switch collection { case atproto.ManifestCollection, atproto.TagCollection, atproto.StarCollection, atproto.RepoPageCollection, atproto.SailorProfileCollection, atproto.StatsCollection, atproto.CaptainCollection, atproto.CrewCollection, atproto.ScanCollection, BlueskyProfileCollection: // For avatar sync return true default: return false } } // JetstreamEvent represents a Jetstream event type JetstreamEvent struct { DID string `json:"did"` TimeUS int64 `json:"time_us"` Kind string `json:"kind"` // "commit", "identity", "account" Commit *CommitEvent `json:"commit,omitempty"` Identity *IdentityInfo `json:"identity,omitempty"` Account *AccountInfo `json:"account,omitempty"` } // CommitEvent represents a commit event (create/update/delete) type CommitEvent struct { Rev string `json:"rev"` Operation string `json:"operation"` // "create", "update", "delete" Collection string `json:"collection"` RKey string `json:"rkey"` Record map[string]any `json:"record,omitempty"` CID string `json:"cid,omitempty"` DID string `json:"-"` // Set from parent event } // IdentityInfo represents an identity event type IdentityInfo struct { DID string `json:"did"` Handle string `json:"handle"` Seq int64 `json:"seq"` Time string `json:"time"` } // AccountInfo represents an account status event type AccountInfo struct { Active bool `json:"active"` DID string `json:"did"` Seq int64 `json:"seq"` Time string `json:"time"` Status string `json:"status,omitempty"` }