···141 Secret string `yaml:"secret" comment:"Shared secret for scanner WebSocket auth. Empty disables scanning."`
142143 // Minimum interval between re-scans of the same manifest. 0 disables proactive scanning.
144- RescanInterval time.Duration `yaml:"rescan_interval" comment:"Minimum interval between re-scans of the same manifest. When set, the hold proactively scans manifests when the scanner is idle. Default: 24h. Set to 0 to disable."`
145}
146147// DatabaseConfig defines embedded PDS database settings
···223 v.SetDefault("gc.enabled", false)
224 // Scanner defaults
225 v.SetDefault("scanner.secret", "")
226- v.SetDefault("scanner.rescan_interval", "24h")
227228 // Log shipper defaults
229 v.SetDefault("log_shipper.batch_size", 100)
···141 Secret string `yaml:"secret" comment:"Shared secret for scanner WebSocket auth. Empty disables scanning."`
142143 // Minimum interval between re-scans of the same manifest. 0 disables proactive scanning.
144+ RescanInterval time.Duration `yaml:"rescan_interval" comment:"Minimum interval between re-scans of the same manifest. When set, the hold proactively scans manifests when the scanner is idle. Default: 168h (7 days). Set to 0 to disable."`
145}
146147// DatabaseConfig defines embedded PDS database settings
···223 v.SetDefault("gc.enabled", false)
224 // Scanner defaults
225 v.SetDefault("scanner.secret", "")
226+ v.SetDefault("scanner.rescan_interval", "168h") // 7 days
227228 // Log shipper defaults
229 v.SetDefault("log_shipper.batch_size", 100)
+112
pkg/hold/gc/gc.go
···296 return preview, nil
297}
2980000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000299// analyzeRecords performs Phase 1 analysis: builds referenced set, finds orphaned records,
300// and identifies missing layer records. Pure analysis — no mutations.
301// Discovers users, fetches manifests, scans records, identifies missing records.
···296 return preview, nil
297}
298299+// Reconcile creates missing layer records without deleting anything.
300+// Requires a prior Preview() to identify missing records.
301+func (gc *GarbageCollector) Reconcile(ctx context.Context) (*GCResult, error) {
302+ if !gc.tryStart() {
303+ return nil, fmt.Errorf("GC operation already in progress")
304+ }
305+ defer gc.finish()
306+307+ gc.mu.Lock()
308+ preview := gc.lastPreview
309+ gc.mu.Unlock()
310+311+ if preview == nil {
312+ return nil, fmt.Errorf("no preview available — run Scan first")
313+ }
314+ if len(preview.MissingRecords) == 0 {
315+ return &GCResult{}, nil
316+ }
317+318+ start := time.Now()
319+ result := &GCResult{}
320+321+ gc.logger.Info("Starting reconciliation", "missingRecords", len(preview.MissingRecords))
322+ gc.reconcileMissingRecords(ctx, preview.MissingRecords, result)
323+ result.Duration = time.Since(start)
324+325+ gc.mu.Lock()
326+ gc.lastResult = result
327+ gc.lastResultAt = time.Now()
328+ gc.mu.Unlock()
329+330+ return result, nil
331+}
332+333+// DeleteOrphanedRecords deletes layer records whose manifests no longer exist.
334+// Requires a prior Preview() to identify orphaned records.
335+func (gc *GarbageCollector) DeleteOrphanedRecords(ctx context.Context) (*GCResult, error) {
336+ if !gc.tryStart() {
337+ return nil, fmt.Errorf("GC operation already in progress")
338+ }
339+ defer gc.finish()
340+341+ gc.mu.Lock()
342+ preview := gc.lastPreview
343+ gc.mu.Unlock()
344+345+ if preview == nil {
346+ return nil, fmt.Errorf("no preview available — run Scan first")
347+ }
348+ if len(preview.OrphanedRecords) == 0 {
349+ return &GCResult{}, nil
350+ }
351+352+ start := time.Now()
353+ result := &GCResult{
354+ OrphanedRecords: int64(len(preview.OrphanedRecords)),
355+ }
356+357+ rkeys := make([]string, len(preview.OrphanedRecords))
358+ for i, r := range preview.OrphanedRecords {
359+ rkeys[i] = r.Rkey
360+ }
361+362+ gc.logger.Info("Deleting orphaned records", "count", len(rkeys))
363+ if err := gc.deleteOrphanedRecords(ctx, rkeys, result); err != nil {
364+ return nil, fmt.Errorf("delete orphaned records: %w", err)
365+ }
366+ result.Duration = time.Since(start)
367+368+ gc.mu.Lock()
369+ gc.lastResult = result
370+ gc.lastResultAt = time.Now()
371+ gc.mu.Unlock()
372+373+ return result, nil
374+}
375+376+// DeleteOrphanedBlobs walks S3 and deletes blobs not referenced by any manifest.
377+// Runs a fresh analysis to build the current referenced set (reflects any reconciliation
378+// done since the last preview).
379+func (gc *GarbageCollector) DeleteOrphanedBlobs(ctx context.Context) (*GCResult, error) {
380+ if !gc.tryStart() {
381+ return nil, fmt.Errorf("GC operation already in progress")
382+ }
383+ defer gc.finish()
384+385+ start := time.Now()
386+ result := &GCResult{}
387+388+ gc.logger.Info("Starting orphaned blob deletion (fresh analysis)")
389+390+ // Fresh analysis so the referenced set includes any records reconciled since preview
391+ analysis, err := gc.analyzeRecords(ctx)
392+ if err != nil {
393+ return nil, fmt.Errorf("analyze records: %w", err)
394+ }
395+396+ result.ReferencedBlobs = int64(len(analysis.referenced))
397+398+ if err := gc.deleteOrphanedBlobs(ctx, analysis.referenced, result); err != nil {
399+ return nil, fmt.Errorf("delete orphaned blobs: %w", err)
400+ }
401+ result.Duration = time.Since(start)
402+403+ gc.mu.Lock()
404+ gc.lastResult = result
405+ gc.lastResultAt = time.Now()
406+ gc.mu.Unlock()
407+408+ return result, nil
409+}
410+411// analyzeRecords performs Phase 1 analysis: builds referenced set, finds orphaned records,
412// and identifies missing layer records. Pure analysis — no mutations.
413// Discovers users, fetches manifests, scans records, identifies missing records.
+2-2
pkg/hold/oci/xrpc.go
···380 }
381 }
382383- // Enqueue scan job if scanner is connected
384- if h.scanBroadcaster != nil {
385 tier := "deckhand"
386 if stats != nil && stats.Tier != "" {
387 tier = stats.Tier
···380 }
381 }
382383+ // Enqueue scan job if scanner is connected (skip manifest lists — children get their own jobs)
384+ if h.scanBroadcaster != nil && !isMultiArch {
385 tier := "deckhand"
386 if stats != nil && stats.Tier != "" {
387 tier = stats.Tier
+146-48
pkg/hold/pds/scan_broadcaster.go
···551 "total", msg.Summary.Total)
552}
553554-// handleError marks a job as failed
0555func (sb *ScanBroadcaster) handleError(sub *ScanSubscriber, msg ScannerMessage) {
556- _, err := sb.db.Exec(`
000000000000000000000000000557 UPDATE scan_jobs SET status = 'failed', completed_at = ?
558 WHERE seq = ?
559 `, time.Now(), msg.Seq)
···567 "seq", msg.Seq,
568 "subscriberId", sub.id,
569 "error", msg.Error)
0000000570}
571572// drainPendingJobs sends pending/timed-out jobs to a newly connected scanner.
···650 }
651}
652653-// reDispatchTimedOut finds jobs that were assigned but not acked/completed within timeout.
0654// Collects timed-out rows first, closes cursor, then resets and re-dispatches
655// to avoid holding a SELECT cursor open during UPDATEs (prevents SQLite BUSY).
656func (sb *ScanBroadcaster) reDispatchTimedOut() {
657 timeout := time.Now().Add(-sb.ackTimeout)
658000000000000659 rows, err := sb.db.Query(`
660 SELECT seq, manifest_digest, repository, tag, user_did, user_handle, hold_did, hold_endpoint, tier, config_json, layers_json
661 FROM scan_jobs
···798func (sb *ScanBroadcaster) proactiveScanLoop() {
799 defer sb.wg.Done()
800801- // Wait a bit before starting to let the system settle
802 select {
803 case <-sb.stopCh:
804 return
805- case <-time.After(30 * time.Second):
806 }
8070000808 ticker := time.NewTicker(60 * time.Second)
809 defer ticker.Stop()
810···824// Uses the cached DID list from the relay (refreshed by refreshManifestDIDsLoop).
825func (sb *ScanBroadcaster) tryEnqueueProactiveScan() {
826 if !sb.hasConnectedScanners() {
0827 return
828 }
829 if sb.hasActiveJobs() {
0830 return
831 }
832···839 sb.manifestDIDsMu.RUnlock()
840841 if len(userDIDs) == 0 {
0842 return
843 }
844···854 }
855}
85600000000857// tryEnqueueForUser fetches manifests from a user's PDS and enqueues a scan for the
858-// first one that needs scanning. Returns true if a job was enqueued.
0859func (sb *ScanBroadcaster) tryEnqueueForUser(ctx context.Context, userDID string) bool {
860 // Resolve user DID to PDS endpoint and handle
861 did, userHandle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, userDID)
···865 return false
866 }
867868- // Fetch manifest records from user's PDS
000869 client := atproto.NewClient(pdsEndpoint, did, "")
870 var cursor string
871 for {
···879 for _, record := range records {
880 var manifest atproto.ManifestRecord
881 if err := json.Unmarshal(record.Value, &manifest); err != nil {
882- slog.Debug("Proactive scan: failed to unmarshal manifest record",
883- "uri", record.URI, "error", err)
884 continue
885 }
886···898 continue
899 }
900901- // Skip if config is nil (shouldn't happen for image manifests, but be safe)
902 if manifest.Config == nil {
903 continue
904 }
905906- // Check if already scanned recently
907- if sb.isRecentlyScanned(ctx, manifest.Digest) {
0000000908 continue
909 }
910911- // Construct and enqueue scan job
912- configJSON, _ := json.Marshal(manifest.Config)
913- layersJSON, _ := json.Marshal(manifest.Layers)
0000000914915- slog.Info("Enqueuing proactive scan",
916- "manifestDigest", manifest.Digest,
917- "repository", manifest.Repository,
918- "userDID", did)
919920- if err := sb.Enqueue(&ScanJobEvent{
921- ManifestDigest: manifest.Digest,
922- Repository: manifest.Repository,
923- UserDID: did,
924- UserHandle: userHandle,
925- Tier: "deckhand",
926- Config: configJSON,
927- Layers: layersJSON,
928- }); err != nil {
929- slog.Error("Proactive scan: failed to enqueue",
930- "manifest", manifest.Digest, "error", err)
931- return false
932 }
933- return true
934 }
935936 if nextCursor == "" || len(records) == 0 {
···939 cursor = nextCursor
940 }
941942- return false
000000000000000000000000000000000000000943}
944945// isOurManifest checks if a manifest's holdDID matches this hold directly,
···1026 }
10271028 return false
1029-}
1030-1031-// isRecentlyScanned checks if a manifest has been scanned within the rescan interval.
1032-func (sb *ScanBroadcaster) isRecentlyScanned(ctx context.Context, manifestDigest string) bool {
1033- _, scanRecord, err := sb.pds.GetScanRecord(ctx, manifestDigest)
1034- if err != nil {
1035- return false // Not scanned or error reading → needs scanning
1036- }
1037-1038- scannedAt, err := time.Parse(time.RFC3339, scanRecord.ScannedAt)
1039- if err != nil {
1040- return false // Can't parse timestamp → treat as needing scan
1041- }
1042-1043- return time.Since(scannedAt) < sb.rescanInterval
1044}
10451046// hasConnectedScanners returns true if at least one scanner is connected.
···551 "total", msg.Summary.Total)
552}
553554+// handleError marks a job as failed and creates a scan record so the proactive
555+// scanner treats it as "stale" rather than "never scanned" (avoids retry loops).
556func (sb *ScanBroadcaster) handleError(sub *ScanSubscriber, msg ScannerMessage) {
557+ ctx := context.Background()
558+559+ // Get job details to create failure scan record
560+ var manifestDigest, repository, userDID string
561+ err := sb.db.QueryRow(`
562+ SELECT manifest_digest, repository, user_did
563+ FROM scan_jobs WHERE seq = ?
564+ `, msg.Seq).Scan(&manifestDigest, &repository, &userDID)
565+ if err != nil {
566+ slog.Error("Failed to get job details for failure record",
567+ "seq", msg.Seq, "error", err)
568+ } else {
569+ // Create a scan record with zero counts and nil blobs — marks it as
570+ // "scanned" so the proactive scheduler won't retry until rescan interval
571+ scanRecord := atproto.NewScanRecord(
572+ manifestDigest, repository, userDID,
573+ nil, nil, // no SBOM or vuln report
574+ 0, 0, 0, 0, 0,
575+ "failed: "+truncateError(msg.Error, 200),
576+ )
577+ if _, _, err := sb.pds.CreateScanRecord(ctx, scanRecord); err != nil {
578+ slog.Error("Failed to store failure scan record",
579+ "seq", msg.Seq, "error", err)
580+ }
581+ }
582+583+ // Mark job as failed
584+ _, err = sb.db.Exec(`
585 UPDATE scan_jobs SET status = 'failed', completed_at = ?
586 WHERE seq = ?
587 `, time.Now(), msg.Seq)
···595 "seq", msg.Seq,
596 "subscriberId", sub.id,
597 "error", msg.Error)
598+}
599+600+func truncateError(s string, maxLen int) string {
601+ if len(s) <= maxLen {
602+ return s
603+ }
604+ return s[:maxLen]
605}
606607// drainPendingJobs sends pending/timed-out jobs to a newly connected scanner.
···685 }
686}
687688+// reDispatchTimedOut finds jobs that were assigned but not acked/completed within timeout,
689+// and also marks stuck processing jobs as failed.
690// Collects timed-out rows first, closes cursor, then resets and re-dispatches
691// to avoid holding a SELECT cursor open during UPDATEs (prevents SQLite BUSY).
692func (sb *ScanBroadcaster) reDispatchTimedOut() {
693 timeout := time.Now().Add(-sb.ackTimeout)
694695+ // Fail processing jobs stuck for >10 minutes (scanner likely crashed mid-scan)
696+ processingTimeout := time.Now().Add(-10 * time.Minute)
697+ res, err := sb.db.Exec(`
698+ UPDATE scan_jobs SET status = 'failed', completed_at = ?
699+ WHERE status = 'processing' AND assigned_at < ?
700+ `, time.Now(), processingTimeout)
701+ if err != nil {
702+ slog.Error("Failed to clean up stuck processing jobs", "error", err)
703+ } else if n, _ := res.RowsAffected(); n > 0 {
704+ slog.Warn("Cleaned up stuck processing jobs", "count", n)
705+ }
706+707 rows, err := sb.db.Query(`
708 SELECT seq, manifest_digest, repository, tag, user_did, user_handle, hold_did, hold_endpoint, tier, config_json, layers_json
709 FROM scan_jobs
···846func (sb *ScanBroadcaster) proactiveScanLoop() {
847 defer sb.wg.Done()
848849+ // Wait for the system to settle and DID list to populate
850 select {
851 case <-sb.stopCh:
852 return
853+ case <-time.After(45 * time.Second):
854 }
855856+ // Run immediately on startup, then every 60s
857+ slog.Info("Proactive scan loop started")
858+ sb.tryEnqueueProactiveScan()
859+860 ticker := time.NewTicker(60 * time.Second)
861 defer ticker.Stop()
862···876// Uses the cached DID list from the relay (refreshed by refreshManifestDIDsLoop).
877func (sb *ScanBroadcaster) tryEnqueueProactiveScan() {
878 if !sb.hasConnectedScanners() {
879+ slog.Debug("Proactive scan: no scanners connected, skipping")
880 return
881 }
882 if sb.hasActiveJobs() {
883+ slog.Debug("Proactive scan: active jobs in queue, skipping")
884 return
885 }
886···893 sb.manifestDIDsMu.RUnlock()
894895 if len(userDIDs) == 0 {
896+ slog.Debug("Proactive scan: no manifest DIDs cached from relay, skipping")
897 return
898 }
899···909 }
910}
911912+// scanCandidate is a manifest that needs scanning, with its scan freshness.
913+type scanCandidate struct {
914+ manifest atproto.ManifestRecord
915+ userDID string
916+ userHandle string
917+ scannedAt time.Time // zero value = never scanned
918+}
919+920// tryEnqueueForUser fetches manifests from a user's PDS and enqueues a scan for the
921+// one that most needs it: never-scanned manifests first, then the stalest scan.
922+// Returns true if a job was enqueued.
923func (sb *ScanBroadcaster) tryEnqueueForUser(ctx context.Context, userDID string) bool {
924 // Resolve user DID to PDS endpoint and handle
925 did, userHandle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, userDID)
···929 return false
930 }
931932+ // Collect all scannable manifests with their scan age
933+ var unscanned []scanCandidate
934+ var oldest *scanCandidate
935+936 client := atproto.NewClient(pdsEndpoint, did, "")
937 var cursor string
938 for {
···946 for _, record := range records {
947 var manifest atproto.ManifestRecord
948 if err := json.Unmarshal(record.Value, &manifest); err != nil {
00949 continue
950 }
951···963 continue
964 }
965966+ // Skip if config is nil
967 if manifest.Config == nil {
968 continue
969 }
970971+ // Check scan status
972+ _, scanRecord, err := sb.pds.GetScanRecord(ctx, manifest.Digest)
973+ if err != nil {
974+ // No scan record — never scanned
975+ unscanned = append(unscanned, scanCandidate{
976+ manifest: manifest,
977+ userDID: did,
978+ userHandle: userHandle,
979+ })
980 continue
981 }
982983+ scannedAt, err := time.Parse(time.RFC3339, scanRecord.ScannedAt)
984+ if err != nil {
985+ // Can't parse timestamp — treat as never scanned
986+ unscanned = append(unscanned, scanCandidate{
987+ manifest: manifest,
988+ userDID: did,
989+ userHandle: userHandle,
990+ })
991+ continue
992+ }
993994+ // Skip if scanned recently
995+ if time.Since(scannedAt) < sb.rescanInterval {
996+ continue
997+ }
998999+ // Stale scan — track the oldest
1000+ if oldest == nil || scannedAt.Before(oldest.scannedAt) {
1001+ oldest = &scanCandidate{
1002+ manifest: manifest,
1003+ userDID: did,
1004+ userHandle: userHandle,
1005+ scannedAt: scannedAt,
1006+ }
00001007 }
01008 }
10091010 if nextCursor == "" || len(records) == 0 {
···1013 cursor = nextCursor
1014 }
10151016+ // Prefer never-scanned, then oldest stale scan
1017+ var pick *scanCandidate
1018+ if len(unscanned) > 0 {
1019+ pick = &unscanned[0]
1020+ } else if oldest != nil {
1021+ pick = oldest
1022+ }
1023+1024+ if pick == nil {
1025+ return false
1026+ }
1027+1028+ configJSON, _ := json.Marshal(pick.manifest.Config)
1029+ layersJSON, _ := json.Marshal(pick.manifest.Layers)
1030+1031+ reason := "never scanned"
1032+ if !pick.scannedAt.IsZero() {
1033+ reason = fmt.Sprintf("last scanned %s ago", time.Since(pick.scannedAt).Truncate(time.Minute))
1034+ }
1035+1036+ slog.Info("Enqueuing proactive scan",
1037+ "manifestDigest", pick.manifest.Digest,
1038+ "repository", pick.manifest.Repository,
1039+ "userDID", pick.userDID,
1040+ "reason", reason)
1041+1042+ if err := sb.Enqueue(&ScanJobEvent{
1043+ ManifestDigest: pick.manifest.Digest,
1044+ Repository: pick.manifest.Repository,
1045+ UserDID: pick.userDID,
1046+ UserHandle: pick.userHandle,
1047+ Tier: "deckhand",
1048+ Config: configJSON,
1049+ Layers: layersJSON,
1050+ }); err != nil {
1051+ slog.Error("Proactive scan: failed to enqueue",
1052+ "manifest", pick.manifest.Digest, "error", err)
1053+ return false
1054+ }
1055+ return true
1056}
10571058// isOurManifest checks if a manifest's holdDID matches this hold directly,
···1139 }
11401141 return false
0000000000000001142}
11431144// hasConnectedScanners returns true if at least one scanner is connected.