A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
at main 966 lines 32 kB view raw
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "strings" 9 "time" 10 11 "atcr.io/pkg/appview/db" 12 "atcr.io/pkg/atproto" 13 atpdata "github.com/bluesky-social/indigo/atproto/atdata" 14 "github.com/bluesky-social/indigo/atproto/identity" 15 "github.com/bluesky-social/indigo/atproto/lexicon" 16) 17 18// Processor handles shared database operations for both Worker (live) and Backfill (sync) 19// This eliminates code duplication between the two data ingestion paths 20type Processor struct { 21 db db.DBTX 22 userCache *UserCache // Optional - enabled for Worker, disabled for Backfill 23 statsCache *StatsCache // In-memory cache for per-hold stats aggregation 24 useCache bool 25 catalog *lexicon.ResolvingCatalog // For debug logging of validation failures 26 webhookDispatcher WebhookDispatcher // Optional - only for live Worker (nil for Backfill) 27} 28 29// WebhookDispatcher is an interface for dispatching webhooks on scan completion. 30// Only the live Worker sets this; backfill does NOT (avoids spamming old scan results). 31type WebhookDispatcher interface { 32 DispatchForScan(ctx context.Context, scan, previousScan *db.Scan, userHandle, tag, holdEndpoint string) 33} 34 35// SetWebhookDispatcher sets the webhook dispatcher for scan processing. 36// Only the live Worker should set this — backfill skips webhook dispatch. 37func (p *Processor) SetWebhookDispatcher(d WebhookDispatcher) { 38 p.webhookDispatcher = d 39} 40 41// NewProcessor creates a new shared processor 42// useCache: true for Worker (live streaming), false for Backfill (batch processing) 43// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing) 44func NewProcessor(database db.DBTX, useCache bool, statsCache *StatsCache) *Processor { 45 // Create lexicon catalog for debug validation logging 46 dir := identity.DefaultDirectory() 47 catalog := lexicon.NewResolvingCatalog() 48 catalog.Directory = dir 49 50 p := &Processor{ 51 db: database, 52 useCache: useCache, 53 statsCache: statsCache, 54 catalog: catalog, 55 } 56 57 if useCache { 58 p.userCache = &UserCache{ 59 cache: make(map[string]*db.User), 60 } 61 } 62 63 return p 64} 65 66// EnsureUser resolves and upserts a user by DID 67// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill) 68func (p *Processor) EnsureUser(ctx context.Context, did string) error { 69 // Check cache first (if enabled) 70 if p.useCache && p.userCache != nil { 71 if _, ok := p.userCache.cache[did]; ok { 72 // User in cache - just update last seen timestamp 73 return db.UpdateUserLastSeen(p.db, did) 74 } 75 } else if !p.useCache { 76 // No cache - check if user already exists in DB 77 existingUser, err := db.GetUserByDID(p.db, did) 78 if err == nil && existingUser != nil { 79 // User exists - just update last seen timestamp 80 return db.UpdateUserLastSeen(p.db, did) 81 } 82 } 83 84 // Resolve DID to get handle and PDS endpoint 85 resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did) 86 if err != nil { 87 return err 88 } 89 90 // Fetch user's Bluesky profile record from their PDS (including avatar) 91 avatarURL := "" 92 client := atproto.NewClient(pdsEndpoint, "", "") 93 profileRecord, err := client.GetProfileRecord(ctx, resolvedDID) 94 if err != nil { 95 slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err) 96 // Continue without avatar 97 } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" { 98 avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link) 99 } 100 101 // Create user record 102 user := &db.User{ 103 DID: resolvedDID, 104 Handle: handle, 105 PDSEndpoint: pdsEndpoint, 106 Avatar: avatarURL, 107 LastSeen: time.Now(), 108 } 109 110 // Cache if enabled 111 if p.useCache { 112 p.userCache.cache[did] = user 113 } 114 115 // Upsert to database 116 // Use UpsertUser if we successfully fetched an avatar (to update existing users) 117 // Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars) 118 if avatarURL != "" { 119 return db.UpsertUser(p.db, user) 120 } 121 return db.UpsertUserIgnoreAvatar(p.db, user) 122} 123 124// EnsureUserExists ensures a user row exists in the database without updating it. 125// Used by non-profile collections to avoid unnecessary writes during backfill. 126// If the user doesn't exist, resolves identity and inserts with ON CONFLICT DO NOTHING. 127func (p *Processor) EnsureUserExists(ctx context.Context, did string) error { 128 // Check cache first (if enabled) 129 if p.useCache && p.userCache != nil { 130 if _, ok := p.userCache.cache[did]; ok { 131 return nil // User in cache, nothing to do 132 } 133 } else if !p.useCache { 134 // No cache - check if user already exists in DB 135 existingUser, err := db.GetUserByDID(p.db, did) 136 if err == nil && existingUser != nil { 137 return nil // User exists, nothing to do 138 } 139 } 140 141 // User doesn't exist yet — resolve and insert 142 resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did) 143 if err != nil { 144 return err 145 } 146 147 avatarURL := "" 148 client := atproto.NewClient(pdsEndpoint, "", "") 149 profileRecord, err := client.GetProfileRecord(ctx, resolvedDID) 150 if err != nil { 151 slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err) 152 } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" { 153 avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link) 154 } 155 156 user := &db.User{ 157 DID: resolvedDID, 158 Handle: handle, 159 PDSEndpoint: pdsEndpoint, 160 Avatar: avatarURL, 161 LastSeen: time.Now(), 162 } 163 164 // Cache if enabled 165 if p.useCache { 166 p.userCache.cache[did] = user 167 } 168 169 return db.InsertUserIfNotExists(p.db, user) 170} 171 172// ValidateRecord performs validation on records. 173// - Full lexicon validation is logged for debugging but does NOT block ingestion 174// - Targeted validation (captain/crew DID checks) DOES block bogus records 175func (p *Processor) ValidateRecord(ctx context.Context, collection string, data []byte) error { 176 recordData, err := atpdata.UnmarshalJSON(data) 177 if err != nil { 178 return fmt.Errorf("invalid JSON: %w", err) 179 } 180 181 // Debug: Full lexicon validation (log only, don't block) 182 if p.catalog != nil { 183 if err := lexicon.ValidateRecord(p.catalog, recordData, collection, 0); err != nil { 184 slog.Debug("Record failed full lexicon validation (ingesting anyway)", 185 "component", "processor", 186 "collection", collection, 187 "error", err) 188 } 189 } 190 191 // Targeted validation for collections that had bogus data issues 192 // These DO block ingestion 193 switch collection { 194 case atproto.CaptainCollection: 195 // Captain must have non-empty owner DID 196 owner, _ := recordData["owner"].(string) 197 if owner == "" || !strings.HasPrefix(owner, "did:") { 198 return fmt.Errorf("captain record missing or invalid owner DID") 199 } 200 201 case atproto.CrewCollection: 202 // Crew must have non-empty member DID 203 member, _ := recordData["member"].(string) 204 if member == "" || !strings.HasPrefix(member, "did:") { 205 return fmt.Errorf("crew record missing or invalid member DID") 206 } 207 } 208 209 return nil 210} 211 212// ProcessRecord is the unified entry point for processing any ATCR record. 213// It handles: 214// 1. Schema validation against published lexicons 215// 2. User creation for user-activity collections 216// 3. Dispatch to the appropriate Process* method 217// 218// queryCaptainFn is optional - used by backfill for sailor profile processing 219func (p *Processor) ProcessRecord(ctx context.Context, did, collection, rkey string, data []byte, isDelete bool, queryCaptainFn func(context.Context, string) error) error { 220 // Skip validation for deletes (no record data) 221 if !isDelete && data != nil { 222 if err := p.ValidateRecord(ctx, collection, data); err != nil { 223 slog.Warn("Record failed schema validation, skipping", 224 "component", "processor", 225 "collection", collection, 226 "did", did, 227 "error", err) 228 return nil // Skip invalid records silently 229 } 230 } 231 232 // User-activity collections create/update user entries 233 // Skip for deletes - user should already exist, and we don't need to resolve identity 234 if !isDelete { 235 switch collection { 236 case atproto.SailorProfileCollection: 237 // Sailor profile is the authoritative source for user data — full upsert 238 if err := p.EnsureUser(ctx, did); err != nil { 239 return fmt.Errorf("failed to ensure user: %w", err) 240 } 241 case atproto.ManifestCollection, 242 atproto.TagCollection, 243 atproto.StarCollection, 244 atproto.RepoPageCollection: 245 // Other user collections just need the row to exist — no update if unchanged 246 if err := p.EnsureUserExists(ctx, did); err != nil { 247 return fmt.Errorf("failed to ensure user exists: %w", err) 248 } 249 // Hold collections (captain, crew, stats) - don't create user entries 250 // These are records FROM holds, not user activity 251 } 252 } 253 254 // Dispatch to specific handler 255 switch collection { 256 case atproto.ManifestCollection: 257 if isDelete { 258 return db.DeleteManifest(p.db, did, "", rkey) 259 } 260 _, err := p.ProcessManifest(ctx, did, data) 261 return err 262 263 case atproto.TagCollection: 264 if isDelete { 265 repo, tag := atproto.RKeyToRepositoryTag(rkey) 266 return db.DeleteTag(p.db, did, repo, tag) 267 } 268 return p.ProcessTag(ctx, did, data) 269 270 case atproto.StarCollection: 271 if isDelete { 272 ownerDID, repository, err := atproto.ParseStarRecordKey(rkey) 273 if err != nil { 274 return err 275 } 276 return db.DeleteStar(p.db, did, ownerDID, repository) 277 } 278 return p.ProcessStar(ctx, did, data) 279 280 case atproto.RepoPageCollection: 281 return p.ProcessRepoPage(ctx, did, rkey, data, isDelete) 282 283 case atproto.SailorProfileCollection: 284 return p.ProcessSailorProfile(ctx, did, data, queryCaptainFn) 285 286 case atproto.ScanCollection: 287 return p.ProcessScan(ctx, did, data, isDelete) 288 289 case atproto.StatsCollection: 290 return p.ProcessStats(ctx, did, data, isDelete) 291 292 case atproto.CaptainCollection: 293 if isDelete { 294 return db.DeleteCaptainRecord(p.db, did) 295 } 296 return p.ProcessCaptain(ctx, did, data) 297 298 case atproto.CrewCollection: 299 if isDelete { 300 return db.DeleteCrewMemberByRkey(p.db, did, rkey) 301 } 302 return p.ProcessCrew(ctx, did, rkey, data) 303 304 default: 305 return nil // Unknown collection, ignore 306 } 307} 308 309// ProcessManifest processes a manifest record and stores it in the database 310// Returns the manifest ID for further processing (layers/references) 311func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) { 312 // Unmarshal manifest record 313 var manifestRecord atproto.ManifestRecord 314 if err := json.Unmarshal(recordData, &manifestRecord); err != nil { 315 return 0, fmt.Errorf("failed to unmarshal manifest: %w", err) 316 } 317 // Detect manifest type 318 isManifestList := len(manifestRecord.Manifests) > 0 319 320 // Extract hold DID from manifest (with fallback for legacy manifests) 321 // New manifests use holdDid field (DID format) 322 // Old manifests use holdEndpoint field (URL format) - convert to DID 323 holdDID := manifestRecord.HoldDID 324 if holdDID == "" && manifestRecord.HoldEndpoint != "" { 325 // Legacy manifest - resolve URL to DID via /.well-known/atproto-did 326 if resolved, err := atproto.ResolveHoldDID(ctx, manifestRecord.HoldEndpoint); err != nil { 327 slog.Warn("Failed to resolve hold DID from legacy manifest endpoint", "holdEndpoint", manifestRecord.HoldEndpoint, "error", err) 328 } else { 329 holdDID = resolved 330 } 331 } 332 333 // Detect artifact type from config media type 334 artifactType := "container-image" 335 if !isManifestList && manifestRecord.Config != nil { 336 artifactType = db.GetArtifactType(manifestRecord.Config.MediaType) 337 } 338 339 // Prepare manifest for insertion (WITHOUT annotation fields) 340 manifest := &db.Manifest{ 341 DID: did, 342 Repository: manifestRecord.Repository, 343 Digest: manifestRecord.Digest, 344 MediaType: manifestRecord.MediaType, 345 SchemaVersion: manifestRecord.SchemaVersion, 346 HoldEndpoint: holdDID, 347 ArtifactType: artifactType, 348 CreatedAt: manifestRecord.CreatedAt, 349 // Annotations removed - stored separately in repository_annotations table 350 } 351 352 // Set config fields only for image manifests (not manifest lists) 353 if !isManifestList && manifestRecord.Config != nil { 354 manifest.ConfigDigest = manifestRecord.Config.Digest 355 manifest.ConfigSize = manifestRecord.Config.Size 356 } 357 358 // Insert manifest 359 manifestID, err := db.InsertManifest(p.db, manifest) 360 if err != nil { 361 // For backfill: if manifest already exists, get its ID 362 if strings.Contains(err.Error(), "UNIQUE constraint failed") { 363 var existingID int64 364 err := p.db.QueryRow(` 365 SELECT id FROM manifests 366 WHERE did = ? AND repository = ? AND digest = ? 367 `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID) 368 369 if err != nil { 370 return 0, fmt.Errorf("failed to get existing manifest ID: %w", err) 371 } 372 manifestID = existingID 373 } else { 374 return 0, fmt.Errorf("failed to insert manifest: %w", err) 375 } 376 } 377 378 // Update repository annotations ONLY if manifest has at least one non-empty annotation 379 if manifestRecord.Annotations != nil { 380 hasData := false 381 for _, value := range manifestRecord.Annotations { 382 if value != "" { 383 hasData = true 384 break 385 } 386 } 387 388 if hasData { 389 // Replace all annotations for this repository 390 err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations) 391 if err != nil { 392 return 0, fmt.Errorf("failed to upsert annotations: %w", err) 393 } 394 } 395 } 396 397 // Insert manifest references or layers 398 if isManifestList { 399 // Insert manifest references (for manifest lists/indexes) 400 for i, ref := range manifestRecord.Manifests { 401 platformArch := "" 402 platformOS := "" 403 platformVariant := "" 404 platformOSVersion := "" 405 406 if ref.Platform != nil { 407 platformArch = ref.Platform.Architecture 408 platformOS = ref.Platform.OS 409 platformVariant = ref.Platform.Variant 410 platformOSVersion = ref.Platform.OSVersion 411 } 412 413 // Detect attestation manifests from annotations 414 isAttestation := false 415 if ref.Annotations != nil { 416 if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok { 417 isAttestation = refType == "attestation-manifest" 418 } 419 } 420 421 if err := db.InsertManifestReference(p.db, &db.ManifestReference{ 422 ManifestID: manifestID, 423 Digest: ref.Digest, 424 MediaType: ref.MediaType, 425 Size: ref.Size, 426 PlatformArchitecture: platformArch, 427 PlatformOS: platformOS, 428 PlatformVariant: platformVariant, 429 PlatformOSVersion: platformOSVersion, 430 IsAttestation: isAttestation, 431 ReferenceIndex: i, 432 }); err != nil { 433 // Continue on error - reference might already exist 434 continue 435 } 436 } 437 } else { 438 // Insert layers (for image manifests) 439 for i, layer := range manifestRecord.Layers { 440 if err := db.InsertLayer(p.db, &db.Layer{ 441 ManifestID: manifestID, 442 Digest: layer.Digest, 443 MediaType: layer.MediaType, 444 Size: layer.Size, 445 LayerIndex: i, 446 Annotations: layer.Annotations, 447 }); err != nil { 448 // Continue on error - layer might already exist 449 continue 450 } 451 } 452 } 453 454 return manifestID, nil 455} 456 457// ProcessTag processes a tag record and stores it in the database 458func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error { 459 // Unmarshal tag record 460 var tagRecord atproto.TagRecord 461 if err := json.Unmarshal(recordData, &tagRecord); err != nil { 462 return fmt.Errorf("failed to unmarshal tag: %w", err) 463 } 464 // Extract digest from tag record (tries manifest field first, falls back to manifestDigest) 465 manifestDigest, err := tagRecord.GetManifestDigest() 466 if err != nil { 467 return fmt.Errorf("failed to get manifest digest from tag record: %w", err) 468 } 469 470 // Insert or update tag 471 return db.UpsertTag(p.db, &db.Tag{ 472 DID: did, 473 Repository: tagRecord.Repository, 474 Tag: tagRecord.Tag, 475 Digest: manifestDigest, 476 CreatedAt: tagRecord.UpdatedAt, 477 }) 478} 479 480// ProcessStar processes a star record and stores it in the database 481func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error { 482 // Unmarshal star record (handles both old object and new AT URI subject formats) 483 var starRecord atproto.StarRecord 484 if err := json.Unmarshal(recordData, &starRecord); err != nil { 485 return fmt.Errorf("failed to unmarshal star: %w", err) 486 } 487 488 // Extract owner DID and repository from subject AT URI 489 ownerDID, repository, err := starRecord.GetSubjectDIDAndRepository() 490 if err != nil { 491 return fmt.Errorf("failed to parse star subject: %w", err) 492 } 493 494 // Ensure the starred repository's owner exists in the users table 495 // (the starrer is already ensured by ProcessRecord, but the owner 496 // may not have been processed yet during backfill or live events) 497 if err := p.EnsureUserExists(ctx, ownerDID); err != nil { 498 return fmt.Errorf("failed to ensure star subject user: %w", err) 499 } 500 501 // Upsert the star record (idempotent - won't duplicate) 502 // The DID here is the starrer (user who starred) 503 // Star count will be calculated on demand from the stars table 504 return db.UpsertStar(p.db, did, ownerDID, repository, starRecord.CreatedAt) 505} 506 507// ProcessSailorProfile processes a sailor profile record 508// This is primarily used by backfill to cache captain records for holds 509func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error { 510 // Unmarshal sailor profile record 511 var profileRecord atproto.SailorProfileRecord 512 if err := json.Unmarshal(recordData, &profileRecord); err != nil { 513 return fmt.Errorf("failed to unmarshal sailor profile: %w", err) 514 } 515 516 // Skip if no default hold set 517 if profileRecord.DefaultHold == "" { 518 return nil 519 } 520 521 // Convert hold URL/DID to canonical DID 522 holdDID, err := atproto.ResolveHoldDID(ctx, profileRecord.DefaultHold) 523 if err != nil { 524 slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold, "error", err) 525 return nil 526 } 527 528 // Cache default hold DID on the user record 529 if err := db.UpdateUserDefaultHold(p.db, did, holdDID); err != nil { 530 slog.Warn("Failed to cache default hold DID", "component", "processor", "did", did, "holdDid", holdDID, "error", err) 531 } 532 533 // Query and cache the captain record using provided function 534 // This allows backfill-specific logic (retries, test mode handling) without duplicating it here 535 if queryCaptainFn != nil { 536 return queryCaptainFn(ctx, holdDID) 537 } 538 539 return nil 540} 541 542// ProcessRepoPage processes a repository page record 543// This is called when Jetstream receives a repo page create/update event 544func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error { 545 if isDelete { 546 // Delete the repo page from our cache 547 return db.DeleteRepoPage(p.db, did, rkey) 548 } 549 550 // Unmarshal repo page record 551 var pageRecord atproto.RepoPageRecord 552 if err := json.Unmarshal(recordData, &pageRecord); err != nil { 553 return fmt.Errorf("failed to unmarshal repo page: %w", err) 554 } 555 556 // Extract avatar CID if present 557 avatarCID := "" 558 if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" { 559 avatarCID = pageRecord.Avatar.Ref.Link 560 } 561 562 // Upsert to database 563 return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.CreatedAt, pageRecord.UpdatedAt) 564} 565 566// ProcessIdentity handles identity change events (handle updates) 567// This is called when Jetstream receives an identity event indicating a handle change. 568// The identity cache is invalidated to ensure the next lookup uses the new handle, 569// and the database is updated to reflect the change in the UI. 570// 571// Only processes events for users who already exist in our database (have ATCR activity). 572func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error { 573 // Check if user exists in our database - only update if they're an ATCR user 574 user, err := db.GetUserByDID(p.db, did) 575 if err != nil { 576 return fmt.Errorf("failed to check user existence: %w", err) 577 } 578 579 // Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.) 580 if user == nil { 581 return nil 582 } 583 584 // Update handle in database 585 if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil { 586 slog.Warn("Failed to update user handle in database", 587 "component", "processor", 588 "did", did, 589 "handle", newHandle, 590 "error", err) 591 // Continue to invalidate cache even if DB update fails 592 } 593 594 // Invalidate cached identity data to force re-resolution on next lookup 595 if err := atproto.InvalidateIdentity(ctx, did); err != nil { 596 slog.Warn("Failed to invalidate identity cache", 597 "component", "processor", 598 "did", did, 599 "error", err) 600 return err 601 } 602 603 slog.Info("Processed identity change event", 604 "component", "processor", 605 "did", did, 606 "old_handle", user.Handle, 607 "new_handle", newHandle) 608 609 return nil 610} 611 612// ProcessProfileUpdate handles app.bsky.actor.profile updates for known ATCR users 613// This refreshes the cached avatar URL when a user changes their Bluesky profile picture 614func (p *Processor) ProcessProfileUpdate(ctx context.Context, did string, recordData []byte) error { 615 // Check if user exists in our database - only update if they're an ATCR user 616 user, err := db.GetUserByDID(p.db, did) 617 if err != nil { 618 return fmt.Errorf("failed to check user existence: %w", err) 619 } 620 621 // Skip if user doesn't exist - they don't have any ATCR activity 622 if user == nil { 623 return nil 624 } 625 626 // Parse the profile record to extract avatar 627 var profile struct { 628 Avatar *atproto.ATProtoBlobRef `json:"avatar"` 629 } 630 if err := json.Unmarshal(recordData, &profile); err != nil { 631 return fmt.Errorf("failed to unmarshal profile: %w", err) 632 } 633 634 // Build new avatar URL 635 avatarURL := "" 636 if profile.Avatar != nil && profile.Avatar.Ref.Link != "" { 637 avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link) 638 } 639 640 // Update if changed 641 if avatarURL != user.Avatar { 642 slog.Info("Updating avatar from profile change", 643 "component", "processor", 644 "did", did, 645 "old_avatar", user.Avatar, 646 "new_avatar", avatarURL) 647 return db.UpdateUserAvatar(p.db, did, avatarURL) 648 } 649 650 return nil 651} 652 653// RefreshUserAvatar fetches the user's current Bluesky profile and updates their cached avatar 654// This is called during backfill to ensure avatars stay fresh for existing users 655func (p *Processor) RefreshUserAvatar(ctx context.Context, did, pdsEndpoint string) error { 656 // Get user from database to compare avatar 657 user, err := db.GetUserByDID(p.db, did) 658 if err != nil || user == nil { 659 return nil // User doesn't exist, skip 660 } 661 662 // Fetch profile from PDS 663 client := atproto.NewClient(pdsEndpoint, "", "") 664 profile, err := client.GetProfileRecord(ctx, did) 665 if err != nil { 666 return fmt.Errorf("failed to fetch profile: %w", err) 667 } 668 669 // Build avatar URL 670 avatarURL := "" 671 if profile.Avatar != nil && profile.Avatar.Ref.Link != "" { 672 avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link) 673 } 674 675 // Update if changed 676 if avatarURL != user.Avatar { 677 slog.Info("Backfill refreshing avatar", 678 "component", "processor", 679 "did", did, 680 "old_avatar", user.Avatar, 681 "new_avatar", avatarURL) 682 return db.UpdateUserAvatar(p.db, did, avatarURL) 683 } 684 685 return nil 686} 687 688// ProcessScan handles scan record events from hold PDSes. 689// Caches scan results in the appview DB and dispatches webhooks (if dispatcher is set). 690func (p *Processor) ProcessScan(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error { 691 if isDelete { 692 return nil // Scan deletes are not processed (scans are immutable) 693 } 694 695 // Unmarshal scan record 696 var scanRecord atproto.ScanRecord 697 if err := json.Unmarshal(recordData, &scanRecord); err != nil { 698 return fmt.Errorf("failed to unmarshal scan record: %w", err) 699 } 700 701 // Extract manifest digest from the scan record's manifest AT-URI 702 manifestDigest := "" 703 if parts := strings.Split(scanRecord.Manifest, "/"); len(parts) > 0 { 704 manifestDigest = "sha256:" + parts[len(parts)-1] 705 } 706 707 // Parse scanned_at timestamp 708 scannedAt := time.Now() 709 if t, err := time.Parse(time.RFC3339, scanRecord.ScannedAt); err == nil { 710 scannedAt = t 711 } 712 713 scan := &db.Scan{ 714 HoldDID: holdDID, 715 ManifestDigest: manifestDigest, 716 UserDID: scanRecord.UserDID, 717 Repository: scanRecord.Repository, 718 Critical: int(scanRecord.Critical), 719 High: int(scanRecord.High), 720 Medium: int(scanRecord.Medium), 721 Low: int(scanRecord.Low), 722 Total: int(scanRecord.Total), 723 ScannerVersion: scanRecord.ScannerVersion, 724 ScannedAt: scannedAt, 725 } 726 727 // Upsert scan to DB (returns previous scan for change detection) 728 previousScan, err := db.UpsertScan(p.db, scan) 729 if err != nil { 730 return fmt.Errorf("failed to upsert scan: %w", err) 731 } 732 733 // Dispatch webhooks if dispatcher is set (live Worker only, not backfill) 734 if p.webhookDispatcher != nil { 735 // Resolve user handle from cache or DB 736 userHandle := "" 737 user, userErr := db.GetUserByDID(p.db, scanRecord.UserDID) 738 if userErr == nil && user != nil { 739 userHandle = user.Handle 740 } 741 742 // Resolve tag for the manifest digest 743 tag := "" 744 if tagVal, tagErr := db.GetTagByDigest(p.db, scanRecord.UserDID, scanRecord.Repository, manifestDigest); tagErr == nil { 745 tag = tagVal 746 } 747 748 // Resolve hold endpoint URL 749 holdEndpoint := "" 750 if holdURL, holdErr := atproto.ResolveHoldURL(ctx, holdDID); holdErr == nil { 751 holdEndpoint = holdURL 752 } 753 754 p.webhookDispatcher.DispatchForScan(ctx, scan, previousScan, userHandle, tag, holdEndpoint) 755 } 756 757 return nil 758} 759 760// ProcessStats handles stats record events from hold PDSes 761// This is called when Jetstream receives a stats create/update/delete event from a hold 762// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository 763func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error { 764 // Skip if no stats cache configured 765 if p.statsCache == nil { 766 return nil 767 } 768 769 // Unmarshal stats record 770 var statsRecord atproto.StatsRecord 771 if err := json.Unmarshal(recordData, &statsRecord); err != nil { 772 return fmt.Errorf("failed to unmarshal stats record: %w", err) 773 } 774 775 if isDelete { 776 // Delete from in-memory cache 777 p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository) 778 } else { 779 // Parse timestamps 780 var lastPull, lastPush *time.Time 781 if statsRecord.LastPull != "" { 782 t, err := time.Parse(time.RFC3339, statsRecord.LastPull) 783 if err == nil { 784 lastPull = &t 785 } 786 } 787 if statsRecord.LastPush != "" { 788 t, err := time.Parse(time.RFC3339, statsRecord.LastPush) 789 if err == nil { 790 lastPush = &t 791 } 792 } 793 794 // Update in-memory cache 795 p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository, 796 statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush) 797 } 798 799 // Get aggregated stats across all holds 800 totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated( 801 statsRecord.OwnerDID, statsRecord.Repository) 802 803 // Upsert aggregated stats to repository_stats 804 return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{ 805 DID: statsRecord.OwnerDID, 806 Repository: statsRecord.Repository, 807 PullCount: int(totalPull), 808 PushCount: int(totalPush), 809 LastPull: latestPull, 810 LastPush: latestPush, 811 }) 812} 813 814// ProcessCaptain handles captain record events from hold PDSes 815// This is called when Jetstream receives a captain create/update/delete event from a hold 816// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownership info 817func (p *Processor) ProcessCaptain(ctx context.Context, holdDID string, recordData []byte) error { 818 // Unmarshal captain record 819 var captainRecord atproto.CaptainRecord 820 if err := json.Unmarshal(recordData, &captainRecord); err != nil { 821 return fmt.Errorf("failed to unmarshal captain record: %w", err) 822 } 823 824 // Convert to db struct and upsert 825 record := &db.HoldCaptainRecord{ 826 HoldDID: holdDID, 827 OwnerDID: captainRecord.Owner, 828 Public: captainRecord.Public, 829 AllowAllCrew: captainRecord.AllowAllCrew, 830 DeployedAt: captainRecord.DeployedAt, 831 Region: captainRecord.Region, 832 Successor: captainRecord.Successor, 833 UpdatedAt: time.Now(), 834 } 835 836 if err := db.UpsertCaptainRecord(p.db, record); err != nil { 837 return fmt.Errorf("failed to upsert captain record: %w", err) 838 } 839 840 slog.Info("Processed captain record", 841 "component", "processor", 842 "hold_did", holdDID, 843 "owner_did", captainRecord.Owner, 844 "public", captainRecord.Public, 845 "allow_all_crew", captainRecord.AllowAllCrew) 846 847 return nil 848} 849 850// ProcessCrew handles crew record events from hold PDSes 851// This is called when Jetstream receives a crew create/update/delete event from a hold 852// The holdDID is the DID of the hold PDS (event.DID), and the record contains member info 853func (p *Processor) ProcessCrew(ctx context.Context, holdDID string, rkey string, recordData []byte) error { 854 // Unmarshal crew record 855 var crewRecord atproto.CrewRecord 856 if err := json.Unmarshal(recordData, &crewRecord); err != nil { 857 return fmt.Errorf("failed to unmarshal crew record: %w", err) 858 } 859 860 // Marshal permissions to JSON string 861 permissionsJSON := "" 862 if len(crewRecord.Permissions) > 0 { 863 if jsonBytes, err := json.Marshal(crewRecord.Permissions); err == nil { 864 permissionsJSON = string(jsonBytes) 865 } 866 } 867 868 // Convert to db struct and upsert 869 member := &db.CrewMember{ 870 HoldDID: holdDID, 871 MemberDID: crewRecord.Member, 872 Rkey: rkey, 873 Role: crewRecord.Role, 874 Permissions: permissionsJSON, 875 Tier: crewRecord.Tier, 876 AddedAt: crewRecord.AddedAt, 877 } 878 879 if err := db.UpsertCrewMember(p.db, member); err != nil { 880 return fmt.Errorf("failed to upsert crew member: %w", err) 881 } 882 883 return nil 884} 885 886// ProcessAccount handles account status events (deactivation/deletion/etc) 887// This is called when Jetstream receives an account event indicating status changes. 888// 889// Status handling: 890// - "deleted": Account permanently deleted - remove all cached data 891// - "deactivated": Could be PDS migration or permanent - invalidate cache only 892// - "takendown": Moderation action - invalidate cache only 893// - Other: Ignore 894// 895// For "deactivated", we don't delete data because it's ambiguous: 896// - Could be permanent deactivation (user deleted account) 897// - Could be PDS migration (account moves to new PDS) 898// Cache invalidation forces re-resolution on next lookup. 899// 900// Only processes events for users who already exist in our database (have ATCR activity). 901func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error { 902 // Skip active accounts or unknown statuses 903 if active { 904 return nil 905 } 906 907 // Check if user exists in our database - only process if they're an ATCR user 908 user, err := db.GetUserByDID(p.db, did) 909 if err != nil { 910 return fmt.Errorf("failed to check user existence: %w", err) 911 } 912 913 // Skip if user doesn't exist - they don't have any ATCR activity 914 if user == nil { 915 return nil 916 } 917 918 switch status { 919 case "deleted": 920 // Account permanently deleted - remove all cached data 921 if err := db.DeleteUserData(p.db, did); err != nil { 922 slog.Error("Failed to delete user data for deleted account", 923 "component", "processor", 924 "did", did, 925 "handle", user.Handle, 926 "error", err) 927 return err 928 } 929 930 // Also invalidate identity cache 931 _ = atproto.InvalidateIdentity(ctx, did) 932 933 slog.Info("Deleted user data for deleted account", 934 "component", "processor", 935 "did", did, 936 "handle", user.Handle) 937 938 case "deactivated", "takendown": 939 // Ambiguous status - invalidate cache but keep data 940 // For deactivated: could be PDS migration, will resolve on next lookup 941 // For takendown: moderation action, keep data in case of appeal 942 if err := atproto.InvalidateIdentity(ctx, did); err != nil { 943 slog.Warn("Failed to invalidate identity cache", 944 "component", "processor", 945 "did", did, 946 "status", status, 947 "error", err) 948 return err 949 } 950 951 slog.Info("Processed account status event - cache invalidated", 952 "component", "processor", 953 "did", did, 954 "handle", user.Handle, 955 "status", status) 956 957 default: 958 // Unknown status - ignore 959 slog.Debug("Ignoring unknown account status", 960 "component", "processor", 961 "did", did, 962 "status", status) 963 } 964 965 return nil 966}