A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
at refactor 533 lines 17 kB view raw
1// Package jetstream provides an ATProto Jetstream consumer for real-time updates. 2// It connects to the Bluesky Jetstream WebSocket, processes repository events, 3// indexes manifests and tags, and populates the AppView database for the web UI. 4package jetstream 5 6import ( 7 "context" 8 "database/sql" 9 "encoding/json" 10 "fmt" 11 "log/slog" 12 "net/url" 13 "sync" 14 "time" 15 16 "atcr.io/pkg/appview/db" 17 "atcr.io/pkg/atproto" 18 "github.com/gorilla/websocket" 19 "github.com/klauspost/compress/zstd" 20) 21 22// UserCache caches DID -> handle/PDS mappings to avoid repeated lookups 23type UserCache struct { 24 cache map[string]*db.User 25} 26 27// EventCallback is called for each processed event 28type EventCallback func(timeUS int64) 29 30// Worker consumes Jetstream events and populates the UI database 31type Worker struct { 32 db *sql.DB 33 jetstreamURL string 34 startCursor int64 35 wantedCollections []string 36 debugCollectionCount int 37 processor *Processor // Shared processor for DB operations 38 eventCallback EventCallback 39 connStartTime time.Time // Track when connection started for debugging 40 41 // Ping/pong tracking for connection health 42 pingsSent int64 43 pongsReceived int64 44 lastPongTime time.Time 45 pongMutex sync.Mutex 46 47 // In-memory cursor tracking for reconnects 48 lastCursor int64 49 cursorMutex sync.RWMutex 50} 51 52// NewWorker creates a new Jetstream worker 53// startCursor: Unix microseconds timestamp to start from (0 = start from now) 54func NewWorker(database *sql.DB, jetstreamURL string, startCursor int64) *Worker { 55 if jetstreamURL == "" { 56 jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe" 57 } 58 59 return &Worker{ 60 db: database, 61 jetstreamURL: jetstreamURL, 62 startCursor: startCursor, 63 wantedCollections: []string{ 64 "io.atcr.*", // Subscribe to all ATCR collections 65 }, 66 processor: NewProcessor(database, true), // Use cache for live streaming 67 } 68} 69 70// Start begins consuming Jetstream events 71// This is a blocking function that runs until the context is cancelled 72func (w *Worker) Start(ctx context.Context) error { 73 // Build connection URL with filters 74 u, err := url.Parse(w.jetstreamURL) 75 if err != nil { 76 return fmt.Errorf("invalid jetstream URL: %w", err) 77 } 78 79 q := u.Query() 80 for _, collection := range w.wantedCollections { 81 q.Add("wantedCollections", collection) 82 } 83 84 // Add cursor if specified (for backfilling historical data or reconnects) 85 if w.startCursor > 0 { 86 q.Set("cursor", fmt.Sprintf("%d", w.startCursor)) 87 88 // Calculate lag (cursor is in microseconds) 89 now := time.Now().UnixMicro() 90 lagSeconds := float64(now-w.startCursor) / 1_000_000.0 91 slog.Info("Jetstream starting from cursor", "cursor", w.startCursor, "lag_seconds", lagSeconds) 92 } 93 94 // Disable compression for now to debug 95 // q.Set("compress", "true") 96 u.RawQuery = q.Encode() 97 98 slog.Info("Connecting to Jetstream", "url", u.String()) 99 100 // Connect to Jetstream 101 conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil) 102 if err != nil { 103 return fmt.Errorf("failed to connect to jetstream: %w", err) 104 } 105 defer conn.Close() 106 107 // Track connection start time for debugging 108 w.connStartTime = time.Now() 109 110 // Reset ping/pong counters for this connection 111 w.pongMutex.Lock() 112 w.pingsSent = 0 113 w.pongsReceived = 0 114 w.lastPongTime = time.Now() 115 w.pongMutex.Unlock() 116 117 // Set up pong handler - called when server responds to our ping 118 conn.SetPongHandler(func(appData string) error { 119 w.pongMutex.Lock() 120 w.pongsReceived++ 121 w.lastPongTime = time.Now() 122 w.pongMutex.Unlock() 123 124 // Reset read deadline - we know connection is alive 125 // Allow 90 seconds for next pong (3x ping interval) 126 conn.SetReadDeadline(time.Now().Add(90 * time.Second)) 127 return nil 128 }) 129 130 // Set initial read deadline 131 conn.SetReadDeadline(time.Now().Add(90 * time.Second)) 132 133 // Create zstd decoder for decompressing messages 134 decoder, err := zstd.NewReader(nil) 135 if err != nil { 136 return fmt.Errorf("failed to create zstd decoder: %w", err) 137 } 138 defer decoder.Close() 139 140 slog.Info("Connected to Jetstream, listening for events...") 141 142 // Start heartbeat ticker to show Jetstream is alive 143 heartbeatTicker := time.NewTicker(30 * time.Second) 144 defer heartbeatTicker.Stop() 145 146 // Start ping ticker for keepalive 147 pingTicker := time.NewTicker(30 * time.Second) 148 defer pingTicker.Stop() 149 150 // Start ping sender goroutine 151 pingDone := make(chan struct{}) 152 defer close(pingDone) 153 154 go func() { 155 for { 156 select { 157 case <-ctx.Done(): 158 return 159 case <-pingDone: 160 return 161 case <-pingTicker.C: 162 // Check if we've received a pong recently 163 w.pongMutex.Lock() 164 timeSinceLastPong := time.Since(w.lastPongTime) 165 pingsTotal := w.pingsSent 166 pongsTotal := w.pongsReceived 167 w.pongMutex.Unlock() 168 169 // If no pong for 60 seconds, connection is likely dead 170 if timeSinceLastPong > 60*time.Second { 171 slog.Info("Jetstream no pong received, closing connection", "time_since_last_pong", timeSinceLastPong, "pings_sent", pingsTotal, "pongs_received", pongsTotal) 172 conn.Close() 173 return 174 } 175 176 // Send ping with write deadline 177 conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) 178 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 179 slog.Warn("Jetstream failed to send ping", "error", err) 180 conn.Close() 181 return 182 } 183 184 w.pongMutex.Lock() 185 w.pingsSent++ 186 w.pongMutex.Unlock() 187 } 188 } 189 }() 190 191 eventCount := 0 192 lastHeartbeat := time.Now() 193 194 // Read messages 195 for { 196 select { 197 case <-ctx.Done(): 198 return ctx.Err() 199 case <-heartbeatTicker.C: 200 elapsed := time.Since(lastHeartbeat) 201 slog.Info("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds()) 202 eventCount = 0 203 lastHeartbeat = time.Now() 204 default: 205 _, message, err := conn.ReadMessage() 206 if err != nil { 207 // Calculate connection duration and idle time for debugging 208 connDuration := time.Since(w.connStartTime) 209 timeSinceLastEvent := time.Since(lastHeartbeat) 210 211 // Get ping/pong stats 212 w.pongMutex.Lock() 213 pingsTotal := w.pingsSent 214 pongsTotal := w.pongsReceived 215 timeSinceLastPong := time.Since(w.lastPongTime) 216 w.pongMutex.Unlock() 217 218 // Calculate ping/pong success rate 219 var pongRate float64 220 if pingsTotal > 0 { 221 pongRate = float64(pongsTotal) / float64(pingsTotal) * 100 222 } 223 224 // Determine diagnosis 225 var diagnosis string 226 if pongRate >= 95 && timeSinceLastPong < 60*time.Second { 227 diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption" 228 } else if timeSinceLastPong > 60*time.Second { 229 diagnosis = "Connection died (no pong for >60s), network issue detected" 230 } else if pongRate < 80 { 231 diagnosis = "Connection unstable (low pong rate), network quality issues" 232 } else { 233 diagnosis = "Connection closed unexpectedly" 234 } 235 236 // Log detailed context about the failure 237 slog.Info("Jetstream connection closed", "duration", connDuration, "events_in_last_30s", eventCount, "time_since_last_event", timeSinceLastEvent, "pongs_received", pongsTotal, "pings_sent", pingsTotal, "pong_rate_pct", pongRate, "time_since_last_pong", timeSinceLastPong, "error", err, "diagnosis", diagnosis) 238 239 return fmt.Errorf("failed to read message: %w", err) 240 } 241 242 // For now, process uncompressed messages 243 // TODO: Re-enable compression once debugging is complete 244 _ = decoder // Keep decoder to avoid unused variable error 245 246 if err := w.processMessage(message); err != nil { 247 slog.Error("ERROR processing message", "error", err) 248 // Continue processing other messages 249 } else { 250 eventCount++ 251 } 252 } 253 } 254} 255 256// SetEventCallback sets a callback to be called for each event 257func (w *Worker) SetEventCallback(cb EventCallback) { 258 w.eventCallback = cb 259} 260 261// GetLastCursor returns the last processed cursor (time_us) for reconnects 262func (w *Worker) GetLastCursor() int64 { 263 w.cursorMutex.RLock() 264 defer w.cursorMutex.RUnlock() 265 return w.lastCursor 266} 267 268// processMessage processes a single Jetstream event 269func (w *Worker) processMessage(message []byte) error { 270 var event JetstreamEvent 271 if err := json.Unmarshal(message, &event); err != nil { 272 return fmt.Errorf("failed to unmarshal event: %w", err) 273 } 274 275 // Update cursor for reconnects (do this first, even if processing fails) 276 w.cursorMutex.Lock() 277 w.lastCursor = event.TimeUS 278 w.cursorMutex.Unlock() 279 280 // Call callback if set 281 if w.eventCallback != nil { 282 w.eventCallback(event.TimeUS) 283 } 284 285 // Process based on event kind 286 switch event.Kind { 287 case "commit": 288 commit := event.Commit 289 if commit == nil { 290 return nil 291 } 292 293 // Set DID on commit from parent event 294 commit.DID = event.DID 295 296 // Debug: log first few collections we see to understand what's coming through 297 if w.debugCollectionCount < 5 { 298 slog.Debug("Jetstream received collection", "collection", commit.Collection, "did", commit.DID) 299 w.debugCollectionCount++ 300 } 301 302 // Process based on collection 303 switch commit.Collection { 304 case atproto.ManifestCollection: 305 slog.Info("Jetstream processing manifest event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey) 306 return w.processManifest(commit) 307 case atproto.TagCollection: 308 slog.Info("Jetstream processing tag event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey) 309 return w.processTag(commit) 310 case atproto.StarCollection: 311 slog.Info("Jetstream processing star event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey) 312 return w.processStar(commit) 313 case atproto.RepoPageCollection: 314 slog.Info("Jetstream processing repo page event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey) 315 return w.processRepoPage(commit) 316 default: 317 // Ignore other collections 318 return nil 319 } 320 321 case "identity": 322 if event.Identity == nil { 323 return nil 324 } 325 return w.processIdentity(&event) 326 327 case "account": 328 if event.Account == nil { 329 return nil 330 } 331 return w.processAccount(&event) 332 333 default: 334 // Ignore unknown event kinds 335 return nil 336 } 337} 338 339// processManifest processes a manifest commit event 340func (w *Worker) processManifest(commit *CommitEvent) error { 341 // Resolve and upsert user with handle/PDS endpoint 342 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 343 return fmt.Errorf("failed to ensure user: %w", err) 344 } 345 346 if commit.Operation == "delete" { 347 // Delete manifest - rkey is just the digest, repository is not encoded 348 digest := commit.RKey 349 if err := db.DeleteManifest(w.db, commit.DID, "", digest); err != nil { 350 return err 351 } 352 // Clean up any orphaned tags pointing to this manifest 353 return db.CleanupOrphanedTags(w.db, commit.DID) 354 } 355 356 // Parse manifest record 357 if commit.Record == nil { 358 return nil // No record data, can't process 359 } 360 361 // Marshal map to bytes for processing 362 recordBytes, err := json.Marshal(commit.Record) 363 if err != nil { 364 return fmt.Errorf("failed to marshal record: %w", err) 365 } 366 367 // Use shared processor for DB operations 368 _, err = w.processor.ProcessManifest(context.Background(), commit.DID, recordBytes) 369 return err 370} 371 372// processTag processes a tag commit event 373func (w *Worker) processTag(commit *CommitEvent) error { 374 // Resolve and upsert user with handle/PDS endpoint 375 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 376 return fmt.Errorf("failed to ensure user: %w", err) 377 } 378 379 if commit.Operation == "delete" { 380 // Delete tag - decode rkey back to repository and tag 381 repo, tag := atproto.RKeyToRepositoryTag(commit.RKey) 382 slog.Info("Jetstream deleting tag", "did", commit.DID, "repository", repo, "tag", tag, "rkey", commit.RKey) 383 if err := db.DeleteTag(w.db, commit.DID, repo, tag); err != nil { 384 slog.Error("Jetstream ERROR deleting tag", "error", err) 385 return err 386 } 387 slog.Info("Jetstream successfully deleted tag", "did", commit.DID, "repository", repo, "tag", tag) 388 return nil 389 } 390 391 // Parse tag record 392 if commit.Record == nil { 393 return nil 394 } 395 396 // Marshal map to bytes for processing 397 recordBytes, err := json.Marshal(commit.Record) 398 if err != nil { 399 return fmt.Errorf("failed to marshal record: %w", err) 400 } 401 402 // Use shared processor for DB operations 403 return w.processor.ProcessTag(context.Background(), commit.DID, recordBytes) 404} 405 406// processStar processes a star commit event 407func (w *Worker) processStar(commit *CommitEvent) error { 408 // Resolve and upsert the user who starred (starrer) 409 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 410 return fmt.Errorf("failed to ensure user: %w", err) 411 } 412 413 if commit.Operation == "delete" { 414 // Unstar - parse the rkey to get the subject (owner DID and repository) 415 // Delete events don't include the full record, but the rkey contains the info we need 416 ownerDID, repository, err := atproto.ParseStarRecordKey(commit.RKey) 417 if err != nil { 418 return fmt.Errorf("failed to parse star rkey: %w", err) 419 } 420 421 // Delete the star record 422 return db.DeleteStar(w.db, commit.DID, ownerDID, repository) 423 } 424 425 // Parse star record 426 if commit.Record == nil { 427 return nil 428 } 429 430 // Marshal map to bytes for processing 431 recordBytes, err := json.Marshal(commit.Record) 432 if err != nil { 433 return fmt.Errorf("failed to marshal record: %w", err) 434 } 435 436 // Use shared processor for DB operations 437 return w.processor.ProcessStar(context.Background(), commit.DID, recordBytes) 438} 439 440// processRepoPage processes a repo page commit event 441func (w *Worker) processRepoPage(commit *CommitEvent) error { 442 // Resolve and upsert user with handle/PDS endpoint 443 if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 444 return fmt.Errorf("failed to ensure user: %w", err) 445 } 446 447 isDelete := commit.Operation == "delete" 448 449 if isDelete { 450 // Delete - rkey is the repository name 451 slog.Info("Jetstream deleting repo page", "did", commit.DID, "repository", commit.RKey) 452 if err := w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, nil, true); err != nil { 453 slog.Error("Jetstream ERROR deleting repo page", "error", err) 454 return err 455 } 456 slog.Info("Jetstream successfully deleted repo page", "did", commit.DID, "repository", commit.RKey) 457 return nil 458 } 459 460 // Parse repo page record 461 if commit.Record == nil { 462 return nil 463 } 464 465 // Marshal map to bytes for processing 466 recordBytes, err := json.Marshal(commit.Record) 467 if err != nil { 468 return fmt.Errorf("failed to marshal record: %w", err) 469 } 470 471 // Use shared processor for DB operations 472 return w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, recordBytes, false) 473} 474 475// processIdentity processes an identity event (handle change) 476func (w *Worker) processIdentity(event *JetstreamEvent) error { 477 if event.Identity == nil { 478 return nil 479 } 480 481 identity := event.Identity 482 // Process via shared processor (only ATCR users will be logged at Info level) 483 return w.processor.ProcessIdentity(context.Background(), identity.DID, identity.Handle) 484} 485 486// processAccount processes an account event (status change) 487func (w *Worker) processAccount(event *JetstreamEvent) error { 488 if event.Account == nil { 489 return nil 490 } 491 492 account := event.Account 493 // Process via shared processor (only ATCR users will be logged at Info level) 494 return w.processor.ProcessAccount(context.Background(), account.DID, account.Active, account.Status) 495} 496 497// JetstreamEvent represents a Jetstream event 498type JetstreamEvent struct { 499 DID string `json:"did"` 500 TimeUS int64 `json:"time_us"` 501 Kind string `json:"kind"` // "commit", "identity", "account" 502 Commit *CommitEvent `json:"commit,omitempty"` 503 Identity *IdentityInfo `json:"identity,omitempty"` 504 Account *AccountInfo `json:"account,omitempty"` 505} 506 507// CommitEvent represents a commit event (create/update/delete) 508type CommitEvent struct { 509 Rev string `json:"rev"` 510 Operation string `json:"operation"` // "create", "update", "delete" 511 Collection string `json:"collection"` 512 RKey string `json:"rkey"` 513 Record map[string]any `json:"record,omitempty"` 514 CID string `json:"cid,omitempty"` 515 DID string `json:"-"` // Set from parent event 516} 517 518// IdentityInfo represents an identity event 519type IdentityInfo struct { 520 DID string `json:"did"` 521 Handle string `json:"handle"` 522 Seq int64 `json:"seq"` 523 Time string `json:"time"` 524} 525 526// AccountInfo represents an account status event 527type AccountInfo struct { 528 Active bool `json:"active"` 529 DID string `json:"did"` 530 Seq int64 `json:"seq"` 531 Time string `json:"time"` 532 Status string `json:"status,omitempty"` 533}