A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
at main 534 lines 16 kB view raw
1// Package jetstream provides an ATProto Jetstream consumer for real-time updates. 2// It connects to the Bluesky Jetstream WebSocket, processes repository events, 3// indexes manifests and tags, and populates the AppView database for the web UI. 4package jetstream 5 6import ( 7 "context" 8 "database/sql" 9 "encoding/json" 10 "fmt" 11 "log/slog" 12 "net/url" 13 "sync" 14 "time" 15 16 "atcr.io/pkg/appview/db" 17 "atcr.io/pkg/atproto" 18 "github.com/gorilla/websocket" 19 "github.com/klauspost/compress/zstd" 20) 21 22// UserCache caches DID -> handle/PDS mappings to avoid repeated lookups 23type UserCache struct { 24 cache map[string]*db.User 25} 26 27// EventCallback is called for each processed event 28type EventCallback func(timeUS int64) 29 30// Worker consumes Jetstream events and populates the UI database 31type Worker struct { 32 db *sql.DB 33 jetstreamURL string 34 endpoints *EndpointRotator 35 startCursor int64 36 wantedCollections []string 37 debugCollectionCount int 38 processor *Processor // Shared processor for DB operations 39 statsCache *StatsCache // In-memory cache for stats aggregation across holds 40 eventCallback EventCallback 41 connStartTime time.Time // Track when connection started for debugging 42 43 // Ping/pong tracking for connection health 44 pingsSent int64 45 pongsReceived int64 46 lastPongTime time.Time 47 pongMutex sync.Mutex 48 49 // In-memory cursor tracking for reconnects 50 lastCursor int64 51 cursorMutex sync.RWMutex 52} 53 54// NewWorker creates a new Jetstream worker 55// startCursor: Unix microseconds timestamp to start from (0 = start from now) 56func NewWorker(database *sql.DB, urls []string, startCursor int64) *Worker { 57 if len(urls) == 0 { 58 urls = []string{"wss://jetstream2.us-west.bsky.network/subscribe"} 59 } 60 61 rotator := NewEndpointRotator(urls) 62 63 // Create shared stats cache for aggregating across holds 64 statsCache := NewStatsCache() 65 66 return &Worker{ 67 db: database, 68 jetstreamURL: rotator.Current(), 69 endpoints: rotator, 70 startCursor: startCursor, 71 wantedCollections: []string{ 72 "io.atcr.*", // Subscribe to all ATCR collections 73 "app.bsky.actor.profile", // Subscribe to Bluesky profile updates for avatar sync 74 }, 75 statsCache: statsCache, 76 processor: NewProcessor(database, true, statsCache), // Use cache for live streaming 77 } 78} 79 80// Start begins consuming Jetstream events 81// This is a blocking function that runs until the context is cancelled 82func (w *Worker) Start(ctx context.Context) error { 83 // Build connection URL with filters 84 u, err := url.Parse(w.jetstreamURL) 85 if err != nil { 86 return fmt.Errorf("invalid jetstream URL: %w", err) 87 } 88 89 q := u.Query() 90 for _, collection := range w.wantedCollections { 91 q.Add("wantedCollections", collection) 92 } 93 94 // Add cursor if specified (for backfilling historical data or reconnects) 95 if w.startCursor > 0 { 96 q.Set("cursor", fmt.Sprintf("%d", w.startCursor)) 97 98 // Calculate lag (cursor is in microseconds) 99 now := time.Now().UnixMicro() 100 lagSeconds := float64(now-w.startCursor) / 1_000_000.0 101 slog.Info("Jetstream starting from cursor", "cursor", w.startCursor, "lag_seconds", lagSeconds) 102 } 103 104 // Disable compression for now to debug 105 // q.Set("compress", "true") 106 u.RawQuery = q.Encode() 107 108 slog.Info("Connecting to Jetstream", "url", u.String()) 109 110 // Connect to Jetstream 111 conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil) 112 if err != nil { 113 return fmt.Errorf("failed to connect to jetstream: %w", err) 114 } 115 defer conn.Close() 116 117 // Track connection start time for debugging 118 w.connStartTime = time.Now() 119 120 // Reset ping/pong counters for this connection 121 w.pongMutex.Lock() 122 w.pingsSent = 0 123 w.pongsReceived = 0 124 w.lastPongTime = time.Now() 125 w.pongMutex.Unlock() 126 127 // Set up pong handler - called when server responds to our ping 128 conn.SetPongHandler(func(appData string) error { 129 w.pongMutex.Lock() 130 w.pongsReceived++ 131 w.lastPongTime = time.Now() 132 w.pongMutex.Unlock() 133 134 // Reset read deadline - we know connection is alive 135 // Allow 90 seconds for next pong (3x ping interval) 136 return conn.SetReadDeadline(time.Now().Add(90 * time.Second)) 137 }) 138 139 // Set initial read deadline 140 if err := conn.SetReadDeadline(time.Now().Add(90 * time.Second)); err != nil { 141 return fmt.Errorf("failed to set read deadline: %w", err) 142 } 143 144 // Create zstd decoder for decompressing messages 145 decoder, err := zstd.NewReader(nil) 146 if err != nil { 147 return fmt.Errorf("failed to create zstd decoder: %w", err) 148 } 149 defer decoder.Close() 150 151 slog.Info("Connected to Jetstream, listening for events...") 152 153 // Start heartbeat ticker to show Jetstream is alive 154 heartbeatTicker := time.NewTicker(30 * time.Second) 155 defer heartbeatTicker.Stop() 156 157 // Start ping ticker for keepalive 158 pingTicker := time.NewTicker(30 * time.Second) 159 defer pingTicker.Stop() 160 161 // Start ping sender goroutine 162 pingDone := make(chan struct{}) 163 defer close(pingDone) 164 165 go func() { 166 for { 167 select { 168 case <-ctx.Done(): 169 return 170 case <-pingDone: 171 return 172 case <-pingTicker.C: 173 // Check if we've received a pong recently 174 w.pongMutex.Lock() 175 timeSinceLastPong := time.Since(w.lastPongTime) 176 pingsTotal := w.pingsSent 177 pongsTotal := w.pongsReceived 178 w.pongMutex.Unlock() 179 180 // If no pong for 60 seconds, connection is likely dead 181 if timeSinceLastPong > 60*time.Second { 182 slog.Info("Jetstream no pong received, closing connection", "time_since_last_pong", timeSinceLastPong, "pings_sent", pingsTotal, "pongs_received", pongsTotal) 183 conn.Close() 184 return 185 } 186 187 // Send ping with write deadline 188 if err := conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { 189 slog.Warn("Jetstream failed to set write deadline", "error", err) 190 conn.Close() 191 return 192 } 193 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 194 slog.Warn("Jetstream failed to send ping", "error", err) 195 conn.Close() 196 return 197 } 198 199 w.pongMutex.Lock() 200 w.pingsSent++ 201 w.pongMutex.Unlock() 202 } 203 } 204 }() 205 206 eventCount := 0 207 lastHeartbeat := time.Now() 208 209 // Read messages 210 for { 211 select { 212 case <-ctx.Done(): 213 return ctx.Err() 214 case <-heartbeatTicker.C: 215 elapsed := time.Since(lastHeartbeat) 216 slog.Debug("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds()) 217 eventCount = 0 218 lastHeartbeat = time.Now() 219 default: 220 _, message, err := conn.ReadMessage() 221 if err != nil { 222 // Calculate connection duration and idle time for debugging 223 connDuration := time.Since(w.connStartTime) 224 timeSinceLastEvent := time.Since(lastHeartbeat) 225 226 // Get ping/pong stats 227 w.pongMutex.Lock() 228 pingsTotal := w.pingsSent 229 pongsTotal := w.pongsReceived 230 timeSinceLastPong := time.Since(w.lastPongTime) 231 w.pongMutex.Unlock() 232 233 // Calculate ping/pong success rate 234 var pongRate float64 235 if pingsTotal > 0 { 236 pongRate = float64(pongsTotal) / float64(pingsTotal) * 100 237 } 238 239 // Determine diagnosis 240 var diagnosis string 241 if pongRate >= 95 && timeSinceLastPong < 60*time.Second { 242 diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption" 243 } else if timeSinceLastPong > 60*time.Second { 244 diagnosis = "Connection died (no pong for >60s), network issue detected" 245 } else if pongRate < 80 { 246 diagnosis = "Connection unstable (low pong rate), network quality issues" 247 } else { 248 diagnosis = "Connection closed unexpectedly" 249 } 250 251 // Log detailed context about the failure 252 slog.Info("Jetstream connection closed", "duration", connDuration, "events_in_last_30s", eventCount, "time_since_last_event", timeSinceLastEvent, "pongs_received", pongsTotal, "pings_sent", pingsTotal, "pong_rate_pct", pongRate, "time_since_last_pong", timeSinceLastPong, "error", err, "diagnosis", diagnosis) 253 254 return fmt.Errorf("failed to read message: %w", err) 255 } 256 257 // For now, process uncompressed messages 258 // TODO: Re-enable compression once debugging is complete 259 _ = decoder // Keep decoder to avoid unused variable error 260 261 if err := w.processMessage(message); err != nil { 262 slog.Error("ERROR processing message", "error", err) 263 // Continue processing other messages 264 } else { 265 eventCount++ 266 } 267 } 268 } 269} 270 271// StartWithFailover runs the Jetstream worker with automatic failover across endpoints. 272// On disconnect it retries the same endpoint with escalating delays (1s, 5s, 10s). 273// If all retries fail, it fails over to the next endpoint and rewinds the cursor 274// 30 seconds to avoid missing events (events are idempotent DB upserts). 275// Cycles through all endpoints indefinitely and never gives up. 276func (w *Worker) StartWithFailover(ctx context.Context) { 277 retryDelays := []time.Duration{1 * time.Second, 5 * time.Second, 10 * time.Second} 278 279 for { 280 currentURL := w.endpoints.Current() 281 w.jetstreamURL = currentURL 282 283 slog.Info("Jetstream connecting", "url", currentURL) 284 err := w.Start(ctx) 285 if ctx.Err() != nil { 286 return // Context cancelled, clean shutdown 287 } 288 289 // Capture cursor at disconnect time for rewind calculation 290 disconnectCursor := w.GetLastCursor() 291 292 // Retry same endpoint with escalating delays 293 recovered := false 294 for i, delay := range retryDelays { 295 slog.Warn("Jetstream disconnected, retrying same endpoint", 296 "url", currentURL, 297 "attempt", i+1, 298 "delay", delay, 299 "error", err) 300 time.Sleep(delay) 301 302 if ctx.Err() != nil { 303 return 304 } 305 306 w.jetstreamURL = currentURL 307 err = w.Start(ctx) 308 if ctx.Err() != nil { 309 return 310 } 311 if err == nil { 312 recovered = true 313 break 314 } 315 // Update disconnect cursor if we got further 316 if latest := w.GetLastCursor(); latest > disconnectCursor { 317 disconnectCursor = latest 318 } 319 } 320 321 if recovered { 322 continue 323 } 324 325 // All retries failed — failover to next endpoint 326 failedURL := currentURL 327 nextURL := w.endpoints.Next() 328 329 // Rewind cursor 30 seconds (30M microseconds) to avoid gaps 330 if disconnectCursor > 0 { 331 rewound := disconnectCursor - 30_000_000 332 if rewound < 0 { 333 rewound = 0 334 } 335 w.cursorMutex.Lock() 336 w.lastCursor = rewound 337 w.startCursor = rewound 338 w.cursorMutex.Unlock() 339 slog.Warn("Jetstream failing over to next endpoint", 340 "failed_url", failedURL, 341 "next_url", nextURL, 342 "cursor_rewound_by", "30s") 343 } else { 344 slog.Warn("Jetstream failing over to next endpoint", 345 "failed_url", failedURL, 346 "next_url", nextURL) 347 } 348 } 349} 350 351// SetEventCallback sets a callback to be called for each event 352func (w *Worker) SetEventCallback(cb EventCallback) { 353 w.eventCallback = cb 354} 355 356// Processor returns the worker's processor for configuration (e.g., setting webhook dispatcher) 357func (w *Worker) Processor() *Processor { 358 return w.processor 359} 360 361// GetLastCursor returns the last processed cursor (time_us) for reconnects 362func (w *Worker) GetLastCursor() int64 { 363 w.cursorMutex.RLock() 364 defer w.cursorMutex.RUnlock() 365 return w.lastCursor 366} 367 368// processMessage processes a single Jetstream event 369func (w *Worker) processMessage(message []byte) error { 370 var event JetstreamEvent 371 if err := json.Unmarshal(message, &event); err != nil { 372 return fmt.Errorf("failed to unmarshal event: %w", err) 373 } 374 375 // Update cursor for reconnects (do this first, even if processing fails) 376 w.cursorMutex.Lock() 377 w.lastCursor = event.TimeUS 378 w.cursorMutex.Unlock() 379 380 // Call callback if set 381 if w.eventCallback != nil { 382 w.eventCallback(event.TimeUS) 383 } 384 385 // Process based on event kind 386 switch event.Kind { 387 case "commit": 388 commit := event.Commit 389 if commit == nil { 390 return nil 391 } 392 393 // Set DID on commit from parent event 394 commit.DID = event.DID 395 396 // Debug: log first few collections we see to understand what's coming through 397 if w.debugCollectionCount < 5 { 398 slog.Debug("Jetstream received collection", "collection", commit.Collection, "did", commit.DID) 399 w.debugCollectionCount++ 400 } 401 402 // Check if this is a collection we care about 403 if !isRelevantCollection(commit.Collection) { 404 return nil // Ignore irrelevant collections 405 } 406 407 // Marshal record to bytes for processing 408 var recordBytes []byte 409 if commit.Record != nil { 410 var err error 411 recordBytes, err = json.Marshal(commit.Record) 412 if err != nil { 413 return fmt.Errorf("failed to marshal record: %w", err) 414 } 415 } 416 417 // Handle Bluesky profile updates separately (for avatar sync) 418 if commit.Collection == BlueskyProfileCollection { 419 // Only process creates/updates, not deletes (we don't clear avatars on profile delete) 420 if commit.Operation == "delete" { 421 return nil 422 } 423 return w.processor.ProcessProfileUpdate(context.Background(), commit.DID, recordBytes) 424 } 425 426 // Log ATCR events (but not Bluesky profile events - too noisy) 427 slog.Info("Jetstream processing event", 428 "collection", commit.Collection, 429 "did", commit.DID, 430 "operation", commit.Operation, 431 "rkey", commit.RKey) 432 433 isDelete := commit.Operation == "delete" 434 return w.processor.ProcessRecord(context.Background(), commit.DID, commit.Collection, commit.RKey, recordBytes, isDelete, nil) 435 436 case "identity": 437 if event.Identity == nil { 438 return nil 439 } 440 return w.processIdentity(&event) 441 442 case "account": 443 if event.Account == nil { 444 return nil 445 } 446 return w.processAccount(&event) 447 448 default: 449 // Ignore unknown event kinds 450 return nil 451 } 452} 453 454// processIdentity processes an identity event (handle change) 455func (w *Worker) processIdentity(event *JetstreamEvent) error { 456 if event.Identity == nil { 457 return nil 458 } 459 460 identity := event.Identity 461 // Process via shared processor (only ATCR users will be logged at Info level) 462 return w.processor.ProcessIdentity(context.Background(), identity.DID, identity.Handle) 463} 464 465// processAccount processes an account event (status change) 466func (w *Worker) processAccount(event *JetstreamEvent) error { 467 if event.Account == nil { 468 return nil 469 } 470 471 account := event.Account 472 // Process via shared processor (only ATCR users will be logged at Info level) 473 return w.processor.ProcessAccount(context.Background(), account.DID, account.Active, account.Status) 474} 475 476// BlueskyProfileCollection is the collection for Bluesky actor profiles 477const BlueskyProfileCollection = "app.bsky.actor.profile" 478 479// isRelevantCollection returns true if the collection is one we process 480func isRelevantCollection(collection string) bool { 481 switch collection { 482 case atproto.ManifestCollection, 483 atproto.TagCollection, 484 atproto.StarCollection, 485 atproto.RepoPageCollection, 486 atproto.SailorProfileCollection, 487 atproto.StatsCollection, 488 atproto.CaptainCollection, 489 atproto.CrewCollection, 490 atproto.ScanCollection, 491 BlueskyProfileCollection: // For avatar sync 492 return true 493 default: 494 return false 495 } 496} 497 498// JetstreamEvent represents a Jetstream event 499type JetstreamEvent struct { 500 DID string `json:"did"` 501 TimeUS int64 `json:"time_us"` 502 Kind string `json:"kind"` // "commit", "identity", "account" 503 Commit *CommitEvent `json:"commit,omitempty"` 504 Identity *IdentityInfo `json:"identity,omitempty"` 505 Account *AccountInfo `json:"account,omitempty"` 506} 507 508// CommitEvent represents a commit event (create/update/delete) 509type CommitEvent struct { 510 Rev string `json:"rev"` 511 Operation string `json:"operation"` // "create", "update", "delete" 512 Collection string `json:"collection"` 513 RKey string `json:"rkey"` 514 Record map[string]any `json:"record,omitempty"` 515 CID string `json:"cid,omitempty"` 516 DID string `json:"-"` // Set from parent event 517} 518 519// IdentityInfo represents an identity event 520type IdentityInfo struct { 521 DID string `json:"did"` 522 Handle string `json:"handle"` 523 Seq int64 `json:"seq"` 524 Time string `json:"time"` 525} 526 527// AccountInfo represents an account status event 528type AccountInfo struct { 529 Active bool `json:"active"` 530 DID string `json:"did"` 531 Seq int64 `json:"seq"` 532 Time string `json:"time"` 533 Status string `json:"status,omitempty"` 534}