A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

fix scanner bugs and firehose bugs

evan.jarrett.net 10b35642 abefcfd1

verified
+112 -31
+6 -6
pkg/appview/db/queries.go
··· 105 105 COALESCE(m.artifact_type, 'container-image'), 106 106 COALESCE((SELECT tag FROM tags WHERE did = m.did AND repository = m.repository ORDER BY created_at DESC LIMIT 1), ''), 107 107 COALESCE(m.digest, ''), 108 - COALESCE(rs.last_push, m.created_at), 108 + MAX(rs.last_push, m.created_at), 109 109 COALESCE(rp.avatar_cid, '') 110 110 FROM matching_repos mr 111 111 JOIN manifests m ON mr.latest_id = m.id ··· 113 113 JOIN repo_stats ON m.did = repo_stats.did AND m.repository = repo_stats.repository 114 114 LEFT JOIN repository_stats rs ON m.did = rs.did AND m.repository = rs.repository 115 115 LEFT JOIN repo_pages rp ON m.did = rp.did AND m.repository = rp.repository 116 - ORDER BY COALESCE(rs.last_push, m.created_at) DESC 116 + ORDER BY MAX(rs.last_push, m.created_at) DESC 117 117 LIMIT ? OFFSET ? 118 118 ` 119 119 ··· 1743 1743 var orderBy string 1744 1744 switch sortOrder { 1745 1745 case SortByLastUpdate: 1746 - orderBy = "COALESCE(rs.last_push, m.created_at) DESC" 1746 + orderBy = "MAX(rs.last_push, m.created_at) DESC" 1747 1747 default: // SortByScore 1748 1748 orderBy = "(COALESCE(rs.pull_count, 0) + COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = m.did AND repository = m.repository), 0) * 10) DESC, m.created_at DESC" 1749 1749 } ··· 1768 1768 COALESCE(m.artifact_type, 'container-image'), 1769 1769 COALESCE((SELECT tag FROM tags WHERE did = m.did AND repository = m.repository ORDER BY created_at DESC LIMIT 1), ''), 1770 1770 COALESCE(m.digest, ''), 1771 - COALESCE(rs.last_push, m.created_at), 1771 + MAX(rs.last_push, m.created_at), 1772 1772 COALESCE(rp.avatar_cid, '') 1773 1773 FROM latest_manifests lm 1774 1774 JOIN manifests m ON lm.latest_id = m.id ··· 1841 1841 COALESCE(m.artifact_type, 'container-image'), 1842 1842 COALESCE((SELECT tag FROM tags WHERE did = m.did AND repository = m.repository ORDER BY created_at DESC LIMIT 1), ''), 1843 1843 COALESCE(m.digest, ''), 1844 - COALESCE(rs.last_push, m.created_at), 1844 + MAX(rs.last_push, m.created_at), 1845 1845 COALESCE(rp.avatar_cid, '') 1846 1846 FROM latest_manifests lm 1847 1847 JOIN manifests m ON lm.latest_id = m.id 1848 1848 JOIN users u ON m.did = u.did 1849 1849 LEFT JOIN repository_stats rs ON m.did = rs.did AND m.repository = rs.repository 1850 1850 LEFT JOIN repo_pages rp ON m.did = rp.did AND m.repository = rp.repository 1851 - ORDER BY COALESCE(rs.last_push, m.created_at) DESC 1851 + ORDER BY MAX(rs.last_push, m.created_at) DESC 1852 1852 ` 1853 1853 1854 1854 rows, err := db.Query(query, userDID, currentUserDID)
+5 -2
pkg/appview/server.go
··· 647 647 next.ServeHTTP(w, r) 648 648 649 649 case regDomains[host]: 650 - // Registry domain: allow /v2/*, redirect everything else 651 - if isV2 { 650 + // Registry domain: allow /v2/*, /auth/token, /auth/device/*, redirect everything else 651 + // Auth endpoints must be served directly to avoid 307 redirects that strip 652 + // the Authorization header on cross-host redirects (Go http.Client behavior). 653 + isAuth := path == "/auth/token" || strings.HasPrefix(path, "/auth/device/") 654 + if isV2 || isAuth { 652 655 next.ServeHTTP(w, r) 653 656 return 654 657 }
+6 -1
pkg/hold/pds/auth.go
··· 355 355 // If captain.public = false: Requires valid DPoP + OAuth and (captain OR crew with blob:read or blob:write permission). 356 356 // Note: blob:write implicitly grants blob:read access. 357 357 // The httpClient parameter is optional and defaults to http.DefaultClient if nil. 358 - func ValidateBlobReadAccess(r *http.Request, pds *HoldPDS, httpClient HTTPClient) (*ValidatedUser, error) { 358 + // If scannerSecret is non-empty, a Bearer token matching it grants full read access (for scanner blob fetches). 359 + func ValidateBlobReadAccess(r *http.Request, pds *HoldPDS, httpClient HTTPClient, scannerSecret string) (*ValidatedUser, error) { 359 360 // Get captain record to check public setting 360 361 _, captain, err := pds.GetCaptainRecord(r.Context()) 361 362 if err != nil { ··· 372 373 var user *ValidatedUser 373 374 374 375 if strings.HasPrefix(authHeader, "Bearer ") { 376 + // Check if this is a scanner using the shared secret 377 + if scannerSecret != "" && strings.TrimPrefix(authHeader, "Bearer ") == scannerSecret { 378 + return &ValidatedUser{DID: "scanner"}, nil 379 + } 375 380 // Service token authentication (from AppView via getServiceAuth) 376 381 user, err = ValidateServiceToken(r, pds.did, httpClient) 377 382 if err != nil {
+5 -5
pkg/hold/pds/auth_test.go
··· 724 724 req := httptest.NewRequest(http.MethodGet, "/test", nil) 725 725 726 726 // This should return nil (public access allowed) for public holds 727 - user, err := ValidateBlobReadAccess(req, pds, nil) 727 + user, err := ValidateBlobReadAccess(req, pds, nil, "") 728 728 if err != nil { 729 729 t.Errorf("Expected public access for public hold, got error: %v", err) 730 730 } ··· 768 768 req := httptest.NewRequest(http.MethodGet, "/test", nil) 769 769 770 770 // This should return error (auth required) for private holds 771 - user, err := ValidateBlobReadAccess(req, pds, nil) 771 + user, err := ValidateBlobReadAccess(req, pds, nil, "") 772 772 if err == nil { 773 773 t.Error("Expected error for private hold without auth") 774 774 } ··· 816 816 } 817 817 818 818 // This should SUCCEED because blob:write implies blob:read 819 - user, err := ValidateBlobReadAccess(req, pds, mockClient) 819 + user, err := ValidateBlobReadAccess(req, pds, mockClient, "") 820 820 if err != nil { 821 821 t.Errorf("Expected blob:write to grant read access, got error: %v", err) 822 822 } ··· 846 846 t.Fatalf("Failed to add DPoP to request: %v", err) 847 847 } 848 848 849 - user, err := ValidateBlobReadAccess(req, pds, mockClient) 849 + user, err := ValidateBlobReadAccess(req, pds, mockClient, "") 850 850 if err != nil { 851 851 t.Errorf("Expected blob:read to grant read access, got error: %v", err) 852 852 } ··· 876 876 t.Fatalf("Failed to add DPoP to request: %v", err) 877 877 } 878 878 879 - _, err = ValidateBlobReadAccess(req, pds, mockClient) 879 + _, err = ValidateBlobReadAccess(req, pds, mockClient, "") 880 880 if err == nil { 881 881 t.Error("Expected error for crew without read or write permission") 882 882 }
+25
pkg/hold/pds/events.go
··· 415 415 // else cursor == currentSeq: relay is caught up, just stream new events 416 416 } 417 417 418 + // Start read pump to handle pings/pongs and detect disconnects. 419 + // gorilla/websocket requires an active reader to process control frames; 420 + // without one, pings go unanswered and the relay times out the connection. 421 + go b.readPump(sub) 422 + 418 423 // Start goroutine to handle sending events to this subscriber 419 424 go b.handleSubscriber(sub) 420 425 ··· 682 687 slog.Warn("Backfill timeout for subscriber", "seq", he.Seq) 683 688 return 684 689 } 690 + } 691 + } 692 + } 693 + 694 + // readPump reads from the WebSocket to process control frames (ping/pong/close). 695 + // gorilla/websocket automatically responds to pings with pongs when there is an 696 + // active reader. Without this, relays time out the connection. 697 + func (b *EventBroadcaster) readPump(sub *Subscriber) { 698 + defer func() { 699 + b.Unsubscribe(sub) 700 + sub.conn.Close() 701 + }() 702 + 703 + for { 704 + _, _, err := sub.conn.ReadMessage() 705 + if err != nil { 706 + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { 707 + slog.Warn("Firehose subscriber disconnected", "remote", sub.conn.RemoteAddr(), "error", err) 708 + } 709 + return 685 710 } 686 711 } 687 712 }
+34 -7
pkg/hold/pds/scan_broadcaster.go
··· 94 94 return nil, fmt.Errorf("failed to ping scan jobs database: %w", err) 95 95 } 96 96 97 + // Set WAL mode and busy timeout (libsql PRAGMAs return rows) 98 + var journalMode string 99 + if err := db.QueryRow("PRAGMA journal_mode = WAL").Scan(&journalMode); err != nil { 100 + db.Close() 101 + return nil, fmt.Errorf("failed to set journal mode: %w", err) 102 + } 103 + var busyTimeout int 104 + if err := db.QueryRow("PRAGMA busy_timeout = 5000").Scan(&busyTimeout); err != nil { 105 + db.Close() 106 + return nil, fmt.Errorf("failed to set busy_timeout: %w", err) 107 + } 108 + 97 109 sb := &ScanBroadcaster{ 98 110 subscribers: make([]*ScanSubscriber, 0), 99 111 db: db, ··· 509 521 "error", msg.Error) 510 522 } 511 523 512 - // drainPendingJobs sends pending/timed-out jobs to a newly connected scanner 524 + // drainPendingJobs sends pending/timed-out jobs to a newly connected scanner. 525 + // Collects all pending rows first, closes cursor, then assigns and dispatches 526 + // to avoid holding a SELECT cursor open during UPDATEs (prevents SQLite BUSY). 513 527 func (sb *ScanBroadcaster) drainPendingJobs(sub *ScanSubscriber, cursor int64) { 514 528 rows, err := sb.db.Query(` 515 529 SELECT seq, manifest_digest, repository, tag, user_did, user_handle, hold_did, hold_endpoint, tier, config_json, layers_json ··· 521 535 slog.Error("Failed to drain pending scan jobs", "error", err) 522 536 return 523 537 } 524 - defer rows.Close() 525 538 526 - count := 0 539 + var jobs []*ScanJobEvent 527 540 for rows.Next() { 528 541 job := &ScanJobEvent{Type: "job"} 529 542 var configJSON, layersJSON string ··· 540 553 541 554 job.Config = json.RawMessage(configJSON) 542 555 job.Layers = json.RawMessage(layersJSON) 556 + jobs = append(jobs, job) 557 + } 558 + rows.Close() 543 559 544 - // Assign and dispatch 560 + count := 0 561 + for _, job := range jobs { 545 562 _, err = sb.db.Exec(` 546 563 UPDATE scan_jobs SET status = 'assigned', assigned_to = ?, assigned_at = ? 547 564 WHERE seq = ? AND status = 'pending' ··· 578 595 } 579 596 } 580 597 581 - // reDispatchTimedOut finds jobs that were assigned but not acked/completed within timeout 598 + // reDispatchTimedOut finds jobs that were assigned but not acked/completed within timeout. 599 + // Collects timed-out rows first, closes cursor, then resets and re-dispatches 600 + // to avoid holding a SELECT cursor open during UPDATEs (prevents SQLite BUSY). 582 601 func (sb *ScanBroadcaster) reDispatchTimedOut() { 583 602 timeout := time.Now().Add(-sb.ackTimeout) 584 603 ··· 592 611 slog.Error("Failed to query timed-out scan jobs", "error", err) 593 612 return 594 613 } 595 - defer rows.Close() 596 614 615 + var jobs []*ScanJobEvent 597 616 for rows.Next() { 598 617 job := &ScanJobEvent{Type: "job"} 599 618 var configJSON, layersJSON string ··· 609 628 610 629 job.Config = json.RawMessage(configJSON) 611 630 job.Layers = json.RawMessage(layersJSON) 631 + jobs = append(jobs, job) 632 + } 633 + rows.Close() 612 634 613 - // Reset to pending and re-dispatch 635 + for _, job := range jobs { 614 636 _, err = sb.db.Exec(` 615 637 UPDATE scan_jobs SET status = 'pending', assigned_to = NULL, assigned_at = NULL 616 638 WHERE seq = ? ··· 633 655 return sb.db.Close() 634 656 } 635 657 return nil 658 + } 659 + 660 + // Secret returns the scanner shared secret for use in blob read authorization 661 + func (sb *ScanBroadcaster) Secret() string { 662 + return sb.secret 636 663 } 637 664 638 665 // ValidateScannerSecret checks if the provided secret matches
+5 -1
pkg/hold/pds/xrpc.go
··· 1108 1108 // Validate blob read access (hold access control) 1109 1109 // If captain.public = true, returns nil (public access allowed) 1110 1110 // If captain.public = false, validates auth and checks for blob:read permission 1111 - _, err := ValidateBlobReadAccess(r, h.pds, h.httpClient) 1111 + scannerSecret := "" 1112 + if h.scanBroadcaster != nil { 1113 + scannerSecret = h.scanBroadcaster.Secret() 1114 + } 1115 + _, err := ValidateBlobReadAccess(r, h.pds, h.httpClient, scannerSecret) 1112 1116 if err != nil { 1113 1117 slog.Warn("OCI blob authorization failed", "error", err, "digest", digest) 1114 1118 http.Error(w, fmt.Sprintf("authorization failed: %v", err), http.StatusForbidden)
+12 -3
scanner/internal/client/hold.go
··· 213 213 c.mu.Unlock() 214 214 } 215 215 216 - // GetBlobPresignedURL gets a presigned download URL from the hold service 217 - func GetBlobPresignedURL(holdEndpoint, holdDID, digest string) (string, error) { 216 + // GetBlobPresignedURL gets a presigned download URL from the hold service. 217 + // If secret is non-empty, it is sent as a Bearer token for private hold access. 218 + func GetBlobPresignedURL(holdEndpoint, holdDID, digest, secret string) (string, error) { 218 219 reqURL := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s&method=GET", 219 220 holdEndpoint, 220 221 url.QueryEscape(holdDID), 221 222 url.QueryEscape(digest)) 222 223 223 - resp, err := http.Get(reqURL) 224 + req, err := http.NewRequest("GET", reqURL, nil) 225 + if err != nil { 226 + return "", fmt.Errorf("failed to create request: %w", err) 227 + } 228 + if secret != "" { 229 + req.Header.Set("Authorization", "Bearer "+secret) 230 + } 231 + 232 + resp, err := http.DefaultClient.Do(req) 224 233 if err != nil { 225 234 return "", fmt.Errorf("failed to get presigned URL: %w", err) 226 235 }
+13 -5
scanner/internal/scan/extractor.go
··· 17 17 18 18 // extractLayers downloads and extracts all image layers via presigned URLs 19 19 // Returns the rootfs directory path and a cleanup function 20 - func extractLayers(job *scanner.ScanJob, tmpDir string) (string, func(), error) { 20 + func extractLayers(job *scanner.ScanJob, tmpDir, secret string) (string, func(), error) { 21 21 scanDir, err := os.MkdirTemp(tmpDir, "scan-*") 22 22 if err != nil { 23 23 return "", nil, fmt.Errorf("failed to create temp directory: %w", err) ··· 41 41 } 42 42 43 43 // Download and validate config blob 44 + if job.Config.Digest == "" { 45 + cleanup() 46 + return "", nil, fmt.Errorf("config blob has empty digest, cannot download") 47 + } 44 48 slog.Info("Downloading config blob", "digest", job.Config.Digest) 45 49 configPath := filepath.Join(imageDir, "config.json") 46 - if err := downloadBlobViaPresignedURL(job.HoldEndpoint, job.HoldDID, job.Config.Digest, configPath); err != nil { 50 + if err := downloadBlobViaPresignedURL(job.HoldEndpoint, job.HoldDID, job.Config.Digest, configPath, secret); err != nil { 47 51 cleanup() 48 52 return "", nil, fmt.Errorf("failed to download config blob: %w", err) 49 53 } ··· 61 65 62 66 // Download and extract each layer 63 67 for i, layer := range job.Layers { 68 + if layer.Digest == "" { 69 + slog.Warn("Skipping layer with empty digest", "index", i) 70 + continue 71 + } 64 72 slog.Info("Extracting layer", "index", i, "digest", layer.Digest, "size", layer.Size) 65 73 66 74 layerPath := filepath.Join(layersDir, fmt.Sprintf("layer-%d.tar.gz", i)) 67 - if err := downloadBlobViaPresignedURL(job.HoldEndpoint, job.HoldDID, layer.Digest, layerPath); err != nil { 75 + if err := downloadBlobViaPresignedURL(job.HoldEndpoint, job.HoldDID, layer.Digest, layerPath, secret); err != nil { 68 76 cleanup() 69 77 return "", nil, fmt.Errorf("failed to download layer %d: %w", i, err) 70 78 } ··· 91 99 } 92 100 93 101 // downloadBlobViaPresignedURL gets a presigned URL from the hold and downloads the blob 94 - func downloadBlobViaPresignedURL(holdEndpoint, holdDID, digest, destPath string) error { 95 - presignedURL, err := client.GetBlobPresignedURL(holdEndpoint, holdDID, digest) 102 + func downloadBlobViaPresignedURL(holdEndpoint, holdDID, digest, destPath, secret string) error { 103 + presignedURL, err := client.GetBlobPresignedURL(holdEndpoint, holdDID, digest, secret) 96 104 if err != nil { 97 105 return fmt.Errorf("failed to get presigned URL for %s: %w", digest, err) 98 106 }
+1 -1
scanner/internal/scan/worker.go
··· 106 106 107 107 // Step 1: Extract image layers from hold via presigned URLs 108 108 slog.Info("Extracting image layers", "repository", job.Repository) 109 - imageDir, cleanup, err := extractLayers(job, wp.cfg.Vuln.TmpDir) 109 + imageDir, cleanup, err := extractLayers(job, wp.cfg.Vuln.TmpDir, wp.cfg.Hold.Secret) 110 110 if err != nil { 111 111 return nil, fmt.Errorf("failed to extract layers: %w", err) 112 112 }