A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "strings"
9 "time"
10
11 "atcr.io/pkg/appview/db"
12 "atcr.io/pkg/atproto"
13 atpdata "github.com/bluesky-social/indigo/atproto/atdata"
14 "github.com/bluesky-social/indigo/atproto/identity"
15 "github.com/bluesky-social/indigo/atproto/lexicon"
16)
17
18// Processor handles shared database operations for both Worker (live) and Backfill (sync)
19// This eliminates code duplication between the two data ingestion paths
20type Processor struct {
21 db db.DBTX
22 userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
23 statsCache *StatsCache // In-memory cache for per-hold stats aggregation
24 useCache bool
25 catalog *lexicon.ResolvingCatalog // For debug logging of validation failures
26 webhookDispatcher WebhookDispatcher // Optional - only for live Worker (nil for Backfill)
27}
28
29// WebhookDispatcher is an interface for dispatching webhooks on scan completion.
30// Only the live Worker sets this; backfill does NOT (avoids spamming old scan results).
31type WebhookDispatcher interface {
32 DispatchForScan(ctx context.Context, scan, previousScan *db.Scan, userHandle, tag, holdEndpoint string)
33}
34
35// SetWebhookDispatcher sets the webhook dispatcher for scan processing.
36// Only the live Worker should set this — backfill skips webhook dispatch.
37func (p *Processor) SetWebhookDispatcher(d WebhookDispatcher) {
38 p.webhookDispatcher = d
39}
40
41// NewProcessor creates a new shared processor
42// useCache: true for Worker (live streaming), false for Backfill (batch processing)
43// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing)
44func NewProcessor(database db.DBTX, useCache bool, statsCache *StatsCache) *Processor {
45 // Create lexicon catalog for debug validation logging
46 dir := identity.DefaultDirectory()
47 catalog := lexicon.NewResolvingCatalog()
48 catalog.Directory = dir
49
50 p := &Processor{
51 db: database,
52 useCache: useCache,
53 statsCache: statsCache,
54 catalog: catalog,
55 }
56
57 if useCache {
58 p.userCache = &UserCache{
59 cache: make(map[string]*db.User),
60 }
61 }
62
63 return p
64}
65
66// EnsureUser resolves and upserts a user by DID
67// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill)
68func (p *Processor) EnsureUser(ctx context.Context, did string) error {
69 // Check cache first (if enabled)
70 if p.useCache && p.userCache != nil {
71 if _, ok := p.userCache.cache[did]; ok {
72 // User in cache - just update last seen timestamp
73 return db.UpdateUserLastSeen(p.db, did)
74 }
75 } else if !p.useCache {
76 // No cache - check if user already exists in DB
77 existingUser, err := db.GetUserByDID(p.db, did)
78 if err == nil && existingUser != nil {
79 // User exists - just update last seen timestamp
80 return db.UpdateUserLastSeen(p.db, did)
81 }
82 }
83
84 // Resolve DID to get handle and PDS endpoint
85 resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
86 if err != nil {
87 return err
88 }
89
90 // Fetch user's Bluesky profile record from their PDS (including avatar)
91 avatarURL := ""
92 client := atproto.NewClient(pdsEndpoint, "", "")
93 profileRecord, err := client.GetProfileRecord(ctx, resolvedDID)
94 if err != nil {
95 slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err)
96 // Continue without avatar
97 } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
98 avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link)
99 }
100
101 // Create user record
102 user := &db.User{
103 DID: resolvedDID,
104 Handle: handle,
105 PDSEndpoint: pdsEndpoint,
106 Avatar: avatarURL,
107 LastSeen: time.Now(),
108 }
109
110 // Cache if enabled
111 if p.useCache {
112 p.userCache.cache[did] = user
113 }
114
115 // Upsert to database
116 // Use UpsertUser if we successfully fetched an avatar (to update existing users)
117 // Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars)
118 if avatarURL != "" {
119 return db.UpsertUser(p.db, user)
120 }
121 return db.UpsertUserIgnoreAvatar(p.db, user)
122}
123
124// EnsureUserExists ensures a user row exists in the database without updating it.
125// Used by non-profile collections to avoid unnecessary writes during backfill.
126// If the user doesn't exist, resolves identity and inserts with ON CONFLICT DO NOTHING.
127func (p *Processor) EnsureUserExists(ctx context.Context, did string) error {
128 // Check cache first (if enabled)
129 if p.useCache && p.userCache != nil {
130 if _, ok := p.userCache.cache[did]; ok {
131 return nil // User in cache, nothing to do
132 }
133 } else if !p.useCache {
134 // No cache - check if user already exists in DB
135 existingUser, err := db.GetUserByDID(p.db, did)
136 if err == nil && existingUser != nil {
137 return nil // User exists, nothing to do
138 }
139 }
140
141 // User doesn't exist yet — resolve and insert
142 resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
143 if err != nil {
144 return err
145 }
146
147 avatarURL := ""
148 client := atproto.NewClient(pdsEndpoint, "", "")
149 profileRecord, err := client.GetProfileRecord(ctx, resolvedDID)
150 if err != nil {
151 slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err)
152 } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
153 avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link)
154 }
155
156 user := &db.User{
157 DID: resolvedDID,
158 Handle: handle,
159 PDSEndpoint: pdsEndpoint,
160 Avatar: avatarURL,
161 LastSeen: time.Now(),
162 }
163
164 // Cache if enabled
165 if p.useCache {
166 p.userCache.cache[did] = user
167 }
168
169 return db.InsertUserIfNotExists(p.db, user)
170}
171
172// ValidateRecord performs validation on records.
173// - Full lexicon validation is logged for debugging but does NOT block ingestion
174// - Targeted validation (captain/crew DID checks) DOES block bogus records
175func (p *Processor) ValidateRecord(ctx context.Context, collection string, data []byte) error {
176 recordData, err := atpdata.UnmarshalJSON(data)
177 if err != nil {
178 return fmt.Errorf("invalid JSON: %w", err)
179 }
180
181 // Debug: Full lexicon validation (log only, don't block)
182 if p.catalog != nil {
183 if err := lexicon.ValidateRecord(p.catalog, recordData, collection, 0); err != nil {
184 slog.Debug("Record failed full lexicon validation (ingesting anyway)",
185 "component", "processor",
186 "collection", collection,
187 "error", err)
188 }
189 }
190
191 // Targeted validation for collections that had bogus data issues
192 // These DO block ingestion
193 switch collection {
194 case atproto.CaptainCollection:
195 // Captain must have non-empty owner DID
196 owner, _ := recordData["owner"].(string)
197 if owner == "" || !strings.HasPrefix(owner, "did:") {
198 return fmt.Errorf("captain record missing or invalid owner DID")
199 }
200
201 case atproto.CrewCollection:
202 // Crew must have non-empty member DID
203 member, _ := recordData["member"].(string)
204 if member == "" || !strings.HasPrefix(member, "did:") {
205 return fmt.Errorf("crew record missing or invalid member DID")
206 }
207 }
208
209 return nil
210}
211
212// ProcessRecord is the unified entry point for processing any ATCR record.
213// It handles:
214// 1. Schema validation against published lexicons
215// 2. User creation for user-activity collections
216// 3. Dispatch to the appropriate Process* method
217//
218// queryCaptainFn is optional - used by backfill for sailor profile processing
219func (p *Processor) ProcessRecord(ctx context.Context, did, collection, rkey string, data []byte, isDelete bool, queryCaptainFn func(context.Context, string) error) error {
220 // Skip validation for deletes (no record data)
221 if !isDelete && data != nil {
222 if err := p.ValidateRecord(ctx, collection, data); err != nil {
223 slog.Warn("Record failed schema validation, skipping",
224 "component", "processor",
225 "collection", collection,
226 "did", did,
227 "error", err)
228 return nil // Skip invalid records silently
229 }
230 }
231
232 // User-activity collections create/update user entries
233 // Skip for deletes - user should already exist, and we don't need to resolve identity
234 if !isDelete {
235 switch collection {
236 case atproto.SailorProfileCollection:
237 // Sailor profile is the authoritative source for user data — full upsert
238 if err := p.EnsureUser(ctx, did); err != nil {
239 return fmt.Errorf("failed to ensure user: %w", err)
240 }
241 case atproto.ManifestCollection,
242 atproto.TagCollection,
243 atproto.StarCollection,
244 atproto.RepoPageCollection:
245 // Other user collections just need the row to exist — no update if unchanged
246 if err := p.EnsureUserExists(ctx, did); err != nil {
247 return fmt.Errorf("failed to ensure user exists: %w", err)
248 }
249 // Hold collections (captain, crew, stats) - don't create user entries
250 // These are records FROM holds, not user activity
251 }
252 }
253
254 // Dispatch to specific handler
255 switch collection {
256 case atproto.ManifestCollection:
257 if isDelete {
258 return db.DeleteManifest(p.db, did, "", rkey)
259 }
260 _, err := p.ProcessManifest(ctx, did, data)
261 return err
262
263 case atproto.TagCollection:
264 if isDelete {
265 repo, tag := atproto.RKeyToRepositoryTag(rkey)
266 return db.DeleteTag(p.db, did, repo, tag)
267 }
268 return p.ProcessTag(ctx, did, data)
269
270 case atproto.StarCollection:
271 if isDelete {
272 ownerDID, repository, err := atproto.ParseStarRecordKey(rkey)
273 if err != nil {
274 return err
275 }
276 return db.DeleteStar(p.db, did, ownerDID, repository)
277 }
278 return p.ProcessStar(ctx, did, data)
279
280 case atproto.RepoPageCollection:
281 return p.ProcessRepoPage(ctx, did, rkey, data, isDelete)
282
283 case atproto.SailorProfileCollection:
284 return p.ProcessSailorProfile(ctx, did, data, queryCaptainFn)
285
286 case atproto.ScanCollection:
287 return p.ProcessScan(ctx, did, data, isDelete)
288
289 case atproto.StatsCollection:
290 return p.ProcessStats(ctx, did, data, isDelete)
291
292 case atproto.CaptainCollection:
293 if isDelete {
294 return db.DeleteCaptainRecord(p.db, did)
295 }
296 return p.ProcessCaptain(ctx, did, data)
297
298 case atproto.CrewCollection:
299 if isDelete {
300 return db.DeleteCrewMemberByRkey(p.db, did, rkey)
301 }
302 return p.ProcessCrew(ctx, did, rkey, data)
303
304 default:
305 return nil // Unknown collection, ignore
306 }
307}
308
309// ProcessManifest processes a manifest record and stores it in the database
310// Returns the manifest ID for further processing (layers/references)
311func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) {
312 // Unmarshal manifest record
313 var manifestRecord atproto.ManifestRecord
314 if err := json.Unmarshal(recordData, &manifestRecord); err != nil {
315 return 0, fmt.Errorf("failed to unmarshal manifest: %w", err)
316 }
317 // Detect manifest type
318 isManifestList := len(manifestRecord.Manifests) > 0
319
320 // Extract hold DID from manifest (with fallback for legacy manifests)
321 // New manifests use holdDid field (DID format)
322 // Old manifests use holdEndpoint field (URL format) - convert to DID
323 holdDID := manifestRecord.HoldDID
324 if holdDID == "" && manifestRecord.HoldEndpoint != "" {
325 // Legacy manifest - resolve URL to DID via /.well-known/atproto-did
326 if resolved, err := atproto.ResolveHoldDID(ctx, manifestRecord.HoldEndpoint); err != nil {
327 slog.Warn("Failed to resolve hold DID from legacy manifest endpoint", "holdEndpoint", manifestRecord.HoldEndpoint, "error", err)
328 } else {
329 holdDID = resolved
330 }
331 }
332
333 // Detect artifact type from config media type
334 artifactType := "container-image"
335 if !isManifestList && manifestRecord.Config != nil {
336 artifactType = db.GetArtifactType(manifestRecord.Config.MediaType)
337 }
338
339 // Prepare manifest for insertion (WITHOUT annotation fields)
340 manifest := &db.Manifest{
341 DID: did,
342 Repository: manifestRecord.Repository,
343 Digest: manifestRecord.Digest,
344 MediaType: manifestRecord.MediaType,
345 SchemaVersion: manifestRecord.SchemaVersion,
346 HoldEndpoint: holdDID,
347 ArtifactType: artifactType,
348 CreatedAt: manifestRecord.CreatedAt,
349 // Annotations removed - stored separately in repository_annotations table
350 }
351
352 // Set config fields only for image manifests (not manifest lists)
353 if !isManifestList && manifestRecord.Config != nil {
354 manifest.ConfigDigest = manifestRecord.Config.Digest
355 manifest.ConfigSize = manifestRecord.Config.Size
356 }
357
358 // Insert manifest
359 manifestID, err := db.InsertManifest(p.db, manifest)
360 if err != nil {
361 // For backfill: if manifest already exists, get its ID
362 if strings.Contains(err.Error(), "UNIQUE constraint failed") {
363 var existingID int64
364 err := p.db.QueryRow(`
365 SELECT id FROM manifests
366 WHERE did = ? AND repository = ? AND digest = ?
367 `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID)
368
369 if err != nil {
370 return 0, fmt.Errorf("failed to get existing manifest ID: %w", err)
371 }
372 manifestID = existingID
373 } else {
374 return 0, fmt.Errorf("failed to insert manifest: %w", err)
375 }
376 }
377
378 // Update repository annotations ONLY if manifest has at least one non-empty annotation
379 if manifestRecord.Annotations != nil {
380 hasData := false
381 for _, value := range manifestRecord.Annotations {
382 if value != "" {
383 hasData = true
384 break
385 }
386 }
387
388 if hasData {
389 // Replace all annotations for this repository
390 err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations)
391 if err != nil {
392 return 0, fmt.Errorf("failed to upsert annotations: %w", err)
393 }
394 }
395 }
396
397 // Insert manifest references or layers
398 if isManifestList {
399 // Insert manifest references (for manifest lists/indexes)
400 for i, ref := range manifestRecord.Manifests {
401 platformArch := ""
402 platformOS := ""
403 platformVariant := ""
404 platformOSVersion := ""
405
406 if ref.Platform != nil {
407 platformArch = ref.Platform.Architecture
408 platformOS = ref.Platform.OS
409 platformVariant = ref.Platform.Variant
410 platformOSVersion = ref.Platform.OSVersion
411 }
412
413 // Detect attestation manifests from annotations
414 isAttestation := false
415 if ref.Annotations != nil {
416 if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok {
417 isAttestation = refType == "attestation-manifest"
418 }
419 }
420
421 if err := db.InsertManifestReference(p.db, &db.ManifestReference{
422 ManifestID: manifestID,
423 Digest: ref.Digest,
424 MediaType: ref.MediaType,
425 Size: ref.Size,
426 PlatformArchitecture: platformArch,
427 PlatformOS: platformOS,
428 PlatformVariant: platformVariant,
429 PlatformOSVersion: platformOSVersion,
430 IsAttestation: isAttestation,
431 ReferenceIndex: i,
432 }); err != nil {
433 // Continue on error - reference might already exist
434 continue
435 }
436 }
437 } else {
438 // Insert layers (for image manifests)
439 for i, layer := range manifestRecord.Layers {
440 if err := db.InsertLayer(p.db, &db.Layer{
441 ManifestID: manifestID,
442 Digest: layer.Digest,
443 MediaType: layer.MediaType,
444 Size: layer.Size,
445 LayerIndex: i,
446 Annotations: layer.Annotations,
447 }); err != nil {
448 // Continue on error - layer might already exist
449 continue
450 }
451 }
452 }
453
454 return manifestID, nil
455}
456
457// ProcessTag processes a tag record and stores it in the database
458func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error {
459 // Unmarshal tag record
460 var tagRecord atproto.TagRecord
461 if err := json.Unmarshal(recordData, &tagRecord); err != nil {
462 return fmt.Errorf("failed to unmarshal tag: %w", err)
463 }
464 // Extract digest from tag record (tries manifest field first, falls back to manifestDigest)
465 manifestDigest, err := tagRecord.GetManifestDigest()
466 if err != nil {
467 return fmt.Errorf("failed to get manifest digest from tag record: %w", err)
468 }
469
470 // Insert or update tag
471 return db.UpsertTag(p.db, &db.Tag{
472 DID: did,
473 Repository: tagRecord.Repository,
474 Tag: tagRecord.Tag,
475 Digest: manifestDigest,
476 CreatedAt: tagRecord.UpdatedAt,
477 })
478}
479
480// ProcessStar processes a star record and stores it in the database
481func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error {
482 // Unmarshal star record (handles both old object and new AT URI subject formats)
483 var starRecord atproto.StarRecord
484 if err := json.Unmarshal(recordData, &starRecord); err != nil {
485 return fmt.Errorf("failed to unmarshal star: %w", err)
486 }
487
488 // Extract owner DID and repository from subject AT URI
489 ownerDID, repository, err := starRecord.GetSubjectDIDAndRepository()
490 if err != nil {
491 return fmt.Errorf("failed to parse star subject: %w", err)
492 }
493
494 // Ensure the starred repository's owner exists in the users table
495 // (the starrer is already ensured by ProcessRecord, but the owner
496 // may not have been processed yet during backfill or live events)
497 if err := p.EnsureUserExists(ctx, ownerDID); err != nil {
498 return fmt.Errorf("failed to ensure star subject user: %w", err)
499 }
500
501 // Upsert the star record (idempotent - won't duplicate)
502 // The DID here is the starrer (user who starred)
503 // Star count will be calculated on demand from the stars table
504 return db.UpsertStar(p.db, did, ownerDID, repository, starRecord.CreatedAt)
505}
506
507// ProcessSailorProfile processes a sailor profile record
508// This is primarily used by backfill to cache captain records for holds
509func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error {
510 // Unmarshal sailor profile record
511 var profileRecord atproto.SailorProfileRecord
512 if err := json.Unmarshal(recordData, &profileRecord); err != nil {
513 return fmt.Errorf("failed to unmarshal sailor profile: %w", err)
514 }
515
516 // Skip if no default hold set
517 if profileRecord.DefaultHold == "" {
518 return nil
519 }
520
521 // Convert hold URL/DID to canonical DID
522 holdDID, err := atproto.ResolveHoldDID(ctx, profileRecord.DefaultHold)
523 if err != nil {
524 slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold, "error", err)
525 return nil
526 }
527
528 // Cache default hold DID on the user record
529 if err := db.UpdateUserDefaultHold(p.db, did, holdDID); err != nil {
530 slog.Warn("Failed to cache default hold DID", "component", "processor", "did", did, "holdDid", holdDID, "error", err)
531 }
532
533 // Query and cache the captain record using provided function
534 // This allows backfill-specific logic (retries, test mode handling) without duplicating it here
535 if queryCaptainFn != nil {
536 return queryCaptainFn(ctx, holdDID)
537 }
538
539 return nil
540}
541
542// ProcessRepoPage processes a repository page record
543// This is called when Jetstream receives a repo page create/update event
544func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error {
545 if isDelete {
546 // Delete the repo page from our cache
547 return db.DeleteRepoPage(p.db, did, rkey)
548 }
549
550 // Unmarshal repo page record
551 var pageRecord atproto.RepoPageRecord
552 if err := json.Unmarshal(recordData, &pageRecord); err != nil {
553 return fmt.Errorf("failed to unmarshal repo page: %w", err)
554 }
555
556 // Extract avatar CID if present
557 avatarCID := ""
558 if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" {
559 avatarCID = pageRecord.Avatar.Ref.Link
560 }
561
562 // Upsert to database
563 return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.CreatedAt, pageRecord.UpdatedAt)
564}
565
566// ProcessIdentity handles identity change events (handle updates)
567// This is called when Jetstream receives an identity event indicating a handle change.
568// The identity cache is invalidated to ensure the next lookup uses the new handle,
569// and the database is updated to reflect the change in the UI.
570//
571// Only processes events for users who already exist in our database (have ATCR activity).
572func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error {
573 // Check if user exists in our database - only update if they're an ATCR user
574 user, err := db.GetUserByDID(p.db, did)
575 if err != nil {
576 return fmt.Errorf("failed to check user existence: %w", err)
577 }
578
579 // Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.)
580 if user == nil {
581 return nil
582 }
583
584 // Update handle in database
585 if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil {
586 slog.Warn("Failed to update user handle in database",
587 "component", "processor",
588 "did", did,
589 "handle", newHandle,
590 "error", err)
591 // Continue to invalidate cache even if DB update fails
592 }
593
594 // Invalidate cached identity data to force re-resolution on next lookup
595 if err := atproto.InvalidateIdentity(ctx, did); err != nil {
596 slog.Warn("Failed to invalidate identity cache",
597 "component", "processor",
598 "did", did,
599 "error", err)
600 return err
601 }
602
603 slog.Info("Processed identity change event",
604 "component", "processor",
605 "did", did,
606 "old_handle", user.Handle,
607 "new_handle", newHandle)
608
609 return nil
610}
611
612// ProcessProfileUpdate handles app.bsky.actor.profile updates for known ATCR users
613// This refreshes the cached avatar URL when a user changes their Bluesky profile picture
614func (p *Processor) ProcessProfileUpdate(ctx context.Context, did string, recordData []byte) error {
615 // Check if user exists in our database - only update if they're an ATCR user
616 user, err := db.GetUserByDID(p.db, did)
617 if err != nil {
618 return fmt.Errorf("failed to check user existence: %w", err)
619 }
620
621 // Skip if user doesn't exist - they don't have any ATCR activity
622 if user == nil {
623 return nil
624 }
625
626 // Parse the profile record to extract avatar
627 var profile struct {
628 Avatar *atproto.ATProtoBlobRef `json:"avatar"`
629 }
630 if err := json.Unmarshal(recordData, &profile); err != nil {
631 return fmt.Errorf("failed to unmarshal profile: %w", err)
632 }
633
634 // Build new avatar URL
635 avatarURL := ""
636 if profile.Avatar != nil && profile.Avatar.Ref.Link != "" {
637 avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
638 }
639
640 // Update if changed
641 if avatarURL != user.Avatar {
642 slog.Info("Updating avatar from profile change",
643 "component", "processor",
644 "did", did,
645 "old_avatar", user.Avatar,
646 "new_avatar", avatarURL)
647 return db.UpdateUserAvatar(p.db, did, avatarURL)
648 }
649
650 return nil
651}
652
653// RefreshUserAvatar fetches the user's current Bluesky profile and updates their cached avatar
654// This is called during backfill to ensure avatars stay fresh for existing users
655func (p *Processor) RefreshUserAvatar(ctx context.Context, did, pdsEndpoint string) error {
656 // Get user from database to compare avatar
657 user, err := db.GetUserByDID(p.db, did)
658 if err != nil || user == nil {
659 return nil // User doesn't exist, skip
660 }
661
662 // Fetch profile from PDS
663 client := atproto.NewClient(pdsEndpoint, "", "")
664 profile, err := client.GetProfileRecord(ctx, did)
665 if err != nil {
666 return fmt.Errorf("failed to fetch profile: %w", err)
667 }
668
669 // Build avatar URL
670 avatarURL := ""
671 if profile.Avatar != nil && profile.Avatar.Ref.Link != "" {
672 avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
673 }
674
675 // Update if changed
676 if avatarURL != user.Avatar {
677 slog.Info("Backfill refreshing avatar",
678 "component", "processor",
679 "did", did,
680 "old_avatar", user.Avatar,
681 "new_avatar", avatarURL)
682 return db.UpdateUserAvatar(p.db, did, avatarURL)
683 }
684
685 return nil
686}
687
688// ProcessScan handles scan record events from hold PDSes.
689// Caches scan results in the appview DB and dispatches webhooks (if dispatcher is set).
690func (p *Processor) ProcessScan(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error {
691 if isDelete {
692 return nil // Scan deletes are not processed (scans are immutable)
693 }
694
695 // Unmarshal scan record
696 var scanRecord atproto.ScanRecord
697 if err := json.Unmarshal(recordData, &scanRecord); err != nil {
698 return fmt.Errorf("failed to unmarshal scan record: %w", err)
699 }
700
701 // Extract manifest digest from the scan record's manifest AT-URI
702 manifestDigest := ""
703 if parts := strings.Split(scanRecord.Manifest, "/"); len(parts) > 0 {
704 manifestDigest = "sha256:" + parts[len(parts)-1]
705 }
706
707 // Parse scanned_at timestamp
708 scannedAt := time.Now()
709 if t, err := time.Parse(time.RFC3339, scanRecord.ScannedAt); err == nil {
710 scannedAt = t
711 }
712
713 scan := &db.Scan{
714 HoldDID: holdDID,
715 ManifestDigest: manifestDigest,
716 UserDID: scanRecord.UserDID,
717 Repository: scanRecord.Repository,
718 Critical: int(scanRecord.Critical),
719 High: int(scanRecord.High),
720 Medium: int(scanRecord.Medium),
721 Low: int(scanRecord.Low),
722 Total: int(scanRecord.Total),
723 ScannerVersion: scanRecord.ScannerVersion,
724 ScannedAt: scannedAt,
725 }
726
727 // Upsert scan to DB (returns previous scan for change detection)
728 previousScan, err := db.UpsertScan(p.db, scan)
729 if err != nil {
730 return fmt.Errorf("failed to upsert scan: %w", err)
731 }
732
733 // Dispatch webhooks if dispatcher is set (live Worker only, not backfill)
734 if p.webhookDispatcher != nil {
735 // Resolve user handle from cache or DB
736 userHandle := ""
737 user, userErr := db.GetUserByDID(p.db, scanRecord.UserDID)
738 if userErr == nil && user != nil {
739 userHandle = user.Handle
740 }
741
742 // Resolve tag for the manifest digest
743 tag := ""
744 if tagVal, tagErr := db.GetTagByDigest(p.db, scanRecord.UserDID, scanRecord.Repository, manifestDigest); tagErr == nil {
745 tag = tagVal
746 }
747
748 // Resolve hold endpoint URL
749 holdEndpoint := ""
750 if holdURL, holdErr := atproto.ResolveHoldURL(ctx, holdDID); holdErr == nil {
751 holdEndpoint = holdURL
752 }
753
754 p.webhookDispatcher.DispatchForScan(ctx, scan, previousScan, userHandle, tag, holdEndpoint)
755 }
756
757 return nil
758}
759
760// ProcessStats handles stats record events from hold PDSes
761// This is called when Jetstream receives a stats create/update/delete event from a hold
762// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository
763func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error {
764 // Skip if no stats cache configured
765 if p.statsCache == nil {
766 return nil
767 }
768
769 // Unmarshal stats record
770 var statsRecord atproto.StatsRecord
771 if err := json.Unmarshal(recordData, &statsRecord); err != nil {
772 return fmt.Errorf("failed to unmarshal stats record: %w", err)
773 }
774
775 if isDelete {
776 // Delete from in-memory cache
777 p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository)
778 } else {
779 // Parse timestamps
780 var lastPull, lastPush *time.Time
781 if statsRecord.LastPull != "" {
782 t, err := time.Parse(time.RFC3339, statsRecord.LastPull)
783 if err == nil {
784 lastPull = &t
785 }
786 }
787 if statsRecord.LastPush != "" {
788 t, err := time.Parse(time.RFC3339, statsRecord.LastPush)
789 if err == nil {
790 lastPush = &t
791 }
792 }
793
794 // Update in-memory cache
795 p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository,
796 statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush)
797 }
798
799 // Get aggregated stats across all holds
800 totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated(
801 statsRecord.OwnerDID, statsRecord.Repository)
802
803 // Upsert aggregated stats to repository_stats
804 return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{
805 DID: statsRecord.OwnerDID,
806 Repository: statsRecord.Repository,
807 PullCount: int(totalPull),
808 PushCount: int(totalPush),
809 LastPull: latestPull,
810 LastPush: latestPush,
811 })
812}
813
814// ProcessCaptain handles captain record events from hold PDSes
815// This is called when Jetstream receives a captain create/update/delete event from a hold
816// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownership info
817func (p *Processor) ProcessCaptain(ctx context.Context, holdDID string, recordData []byte) error {
818 // Unmarshal captain record
819 var captainRecord atproto.CaptainRecord
820 if err := json.Unmarshal(recordData, &captainRecord); err != nil {
821 return fmt.Errorf("failed to unmarshal captain record: %w", err)
822 }
823
824 // Convert to db struct and upsert
825 record := &db.HoldCaptainRecord{
826 HoldDID: holdDID,
827 OwnerDID: captainRecord.Owner,
828 Public: captainRecord.Public,
829 AllowAllCrew: captainRecord.AllowAllCrew,
830 DeployedAt: captainRecord.DeployedAt,
831 Region: captainRecord.Region,
832 Successor: captainRecord.Successor,
833 UpdatedAt: time.Now(),
834 }
835
836 if err := db.UpsertCaptainRecord(p.db, record); err != nil {
837 return fmt.Errorf("failed to upsert captain record: %w", err)
838 }
839
840 slog.Info("Processed captain record",
841 "component", "processor",
842 "hold_did", holdDID,
843 "owner_did", captainRecord.Owner,
844 "public", captainRecord.Public,
845 "allow_all_crew", captainRecord.AllowAllCrew)
846
847 return nil
848}
849
850// ProcessCrew handles crew record events from hold PDSes
851// This is called when Jetstream receives a crew create/update/delete event from a hold
852// The holdDID is the DID of the hold PDS (event.DID), and the record contains member info
853func (p *Processor) ProcessCrew(ctx context.Context, holdDID string, rkey string, recordData []byte) error {
854 // Unmarshal crew record
855 var crewRecord atproto.CrewRecord
856 if err := json.Unmarshal(recordData, &crewRecord); err != nil {
857 return fmt.Errorf("failed to unmarshal crew record: %w", err)
858 }
859
860 // Marshal permissions to JSON string
861 permissionsJSON := ""
862 if len(crewRecord.Permissions) > 0 {
863 if jsonBytes, err := json.Marshal(crewRecord.Permissions); err == nil {
864 permissionsJSON = string(jsonBytes)
865 }
866 }
867
868 // Convert to db struct and upsert
869 member := &db.CrewMember{
870 HoldDID: holdDID,
871 MemberDID: crewRecord.Member,
872 Rkey: rkey,
873 Role: crewRecord.Role,
874 Permissions: permissionsJSON,
875 Tier: crewRecord.Tier,
876 AddedAt: crewRecord.AddedAt,
877 }
878
879 if err := db.UpsertCrewMember(p.db, member); err != nil {
880 return fmt.Errorf("failed to upsert crew member: %w", err)
881 }
882
883 return nil
884}
885
886// ProcessAccount handles account status events (deactivation/deletion/etc)
887// This is called when Jetstream receives an account event indicating status changes.
888//
889// Status handling:
890// - "deleted": Account permanently deleted - remove all cached data
891// - "deactivated": Could be PDS migration or permanent - invalidate cache only
892// - "takendown": Moderation action - invalidate cache only
893// - Other: Ignore
894//
895// For "deactivated", we don't delete data because it's ambiguous:
896// - Could be permanent deactivation (user deleted account)
897// - Could be PDS migration (account moves to new PDS)
898// Cache invalidation forces re-resolution on next lookup.
899//
900// Only processes events for users who already exist in our database (have ATCR activity).
901func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error {
902 // Skip active accounts or unknown statuses
903 if active {
904 return nil
905 }
906
907 // Check if user exists in our database - only process if they're an ATCR user
908 user, err := db.GetUserByDID(p.db, did)
909 if err != nil {
910 return fmt.Errorf("failed to check user existence: %w", err)
911 }
912
913 // Skip if user doesn't exist - they don't have any ATCR activity
914 if user == nil {
915 return nil
916 }
917
918 switch status {
919 case "deleted":
920 // Account permanently deleted - remove all cached data
921 if err := db.DeleteUserData(p.db, did); err != nil {
922 slog.Error("Failed to delete user data for deleted account",
923 "component", "processor",
924 "did", did,
925 "handle", user.Handle,
926 "error", err)
927 return err
928 }
929
930 // Also invalidate identity cache
931 _ = atproto.InvalidateIdentity(ctx, did)
932
933 slog.Info("Deleted user data for deleted account",
934 "component", "processor",
935 "did", did,
936 "handle", user.Handle)
937
938 case "deactivated", "takendown":
939 // Ambiguous status - invalidate cache but keep data
940 // For deactivated: could be PDS migration, will resolve on next lookup
941 // For takendown: moderation action, keep data in case of appeal
942 if err := atproto.InvalidateIdentity(ctx, did); err != nil {
943 slog.Warn("Failed to invalidate identity cache",
944 "component", "processor",
945 "did", did,
946 "status", status,
947 "error", err)
948 return err
949 }
950
951 slog.Info("Processed account status event - cache invalidated",
952 "component", "processor",
953 "did", did,
954 "handle", user.Handle,
955 "status", status)
956
957 default:
958 // Unknown status - ignore
959 slog.Debug("Ignoring unknown account status",
960 "component", "processor",
961 "did", did,
962 "status", status)
963 }
964
965 return nil
966}