···11+package pds
22+33+import (
44+ "context"
55+ "crypto/rand"
66+ "crypto/sha256"
77+ "database/sql"
88+ "encoding/hex"
99+ "encoding/json"
1010+ "fmt"
1111+ "log/slog"
1212+ "sync"
1313+ "time"
1414+1515+ storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
1616+ "github.com/gorilla/websocket"
1717+)
1818+1919+// ScanBroadcaster manages scanner WebSocket connections and dispatches scan jobs
2020+// using a competing-consumer pattern. Jobs are persisted in SQLite and dispatched
2121+// round-robin to connected scanners.
2222+type ScanBroadcaster struct {
2323+ mu sync.RWMutex
2424+ subscribers []*ScanSubscriber
2525+ nextIdx int // Round-robin index for dispatch
2626+ db *sql.DB
2727+ holdDID string
2828+ holdEndpoint string
2929+ driver storagedriver.StorageDriver
3030+ pds *HoldPDS
3131+ ackTimeout time.Duration
3232+ secret string // Shared secret for scanner authentication
3333+}
3434+3535+// ScanSubscriber represents a connected scanner WebSocket client
3636+type ScanSubscriber struct {
3737+ conn *websocket.Conn
3838+ send chan *ScanJobEvent
3939+ id string // Unique subscriber ID
4040+ done chan struct{}
4141+}
4242+4343+// ScanJobEvent is the message sent from hold to scanner over WebSocket
4444+type ScanJobEvent struct {
4545+ Type string `json:"type"` // Always "job"
4646+ Seq int64 `json:"seq"`
4747+ ManifestDigest string `json:"manifestDigest"`
4848+ Repository string `json:"repository"`
4949+ Tag string `json:"tag"`
5050+ UserDID string `json:"userDid"`
5151+ UserHandle string `json:"userHandle"`
5252+ HoldDID string `json:"holdDid"`
5353+ HoldEndpoint string `json:"holdEndpoint"`
5454+ Tier string `json:"tier"`
5555+ Config json.RawMessage `json:"config"`
5656+ Layers json.RawMessage `json:"layers"`
5757+}
5858+5959+// ScannerMessage is a message received from scanner over WebSocket
6060+type ScannerMessage struct {
6161+ Type string `json:"type"` // "ack", "result", "error"
6262+ Seq int64 `json:"seq"` // Job sequence number
6363+ SBOM string `json:"sbom,omitempty"`
6464+ VulnReport string `json:"vulnReport,omitempty"`
6565+ Summary *VulnerabilitySummary `json:"summary,omitempty"`
6666+ Error string `json:"error,omitempty"`
6767+}
6868+6969+// VulnerabilitySummary contains counts of vulnerabilities by severity
7070+type VulnerabilitySummary struct {
7171+ Critical int `json:"critical"`
7272+ High int `json:"high"`
7373+ Medium int `json:"medium"`
7474+ Low int `json:"low"`
7575+ Total int `json:"total"`
7676+}
7777+7878+// NewScanBroadcaster creates a new scan job broadcaster
7979+// dbPath should point to a SQLite database file (e.g., "/path/to/pds/db.sqlite3")
8080+func NewScanBroadcaster(holdDID, holdEndpoint, secret, dbPath string, driver storagedriver.StorageDriver, holdPDS *HoldPDS) (*ScanBroadcaster, error) {
8181+ db, err := sql.Open("sqlite3", dbPath)
8282+ if err != nil {
8383+ return nil, fmt.Errorf("failed to open scan jobs database: %w", err)
8484+ }
8585+ if err := db.Ping(); err != nil {
8686+ db.Close()
8787+ return nil, fmt.Errorf("failed to ping scan jobs database: %w", err)
8888+ }
8989+9090+ sb := &ScanBroadcaster{
9191+ subscribers: make([]*ScanSubscriber, 0),
9292+ db: db,
9393+ holdDID: holdDID,
9494+ holdEndpoint: holdEndpoint,
9595+ driver: driver,
9696+ pds: holdPDS,
9797+ ackTimeout: 5 * time.Minute,
9898+ secret: secret,
9999+ }
100100+101101+ if err := sb.initSchema(); err != nil {
102102+ db.Close()
103103+ return nil, fmt.Errorf("failed to initialize scan_jobs schema: %w", err)
104104+ }
105105+106106+ // Start re-dispatch loop for timed-out jobs
107107+ go sb.reDispatchLoop()
108108+109109+ return sb, nil
110110+}
111111+112112+// initSchema creates the scan_jobs table if it doesn't exist
113113+func (sb *ScanBroadcaster) initSchema() error {
114114+ schema := `
115115+ CREATE TABLE IF NOT EXISTS scan_jobs (
116116+ seq INTEGER PRIMARY KEY AUTOINCREMENT,
117117+ manifest_digest TEXT NOT NULL,
118118+ repository TEXT NOT NULL,
119119+ tag TEXT,
120120+ user_did TEXT NOT NULL,
121121+ user_handle TEXT,
122122+ hold_did TEXT NOT NULL,
123123+ hold_endpoint TEXT NOT NULL,
124124+ tier TEXT NOT NULL DEFAULT 'deckhand',
125125+ config_json TEXT NOT NULL,
126126+ layers_json TEXT NOT NULL,
127127+ status TEXT NOT NULL DEFAULT 'pending',
128128+ assigned_to TEXT,
129129+ assigned_at TIMESTAMP,
130130+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
131131+ completed_at TIMESTAMP
132132+ );
133133+ CREATE INDEX IF NOT EXISTS idx_scan_jobs_status ON scan_jobs(status);
134134+ CREATE INDEX IF NOT EXISTS idx_scan_jobs_assigned ON scan_jobs(assigned_to, status);
135135+ `
136136+ _, err := sb.db.Exec(schema)
137137+ return err
138138+}
139139+140140+// Enqueue inserts a scan job into SQLite and dispatches to the next available scanner
141141+func (sb *ScanBroadcaster) Enqueue(job *ScanJobEvent) error {
142142+ job.Type = "job"
143143+ job.HoldDID = sb.holdDID
144144+ job.HoldEndpoint = sb.holdEndpoint
145145+146146+ // Insert into database
147147+ result, err := sb.db.Exec(`
148148+ INSERT INTO scan_jobs (manifest_digest, repository, tag, user_did, user_handle, hold_did, hold_endpoint, tier, config_json, layers_json, status)
149149+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')
150150+ `, job.ManifestDigest, job.Repository, job.Tag, job.UserDID, job.UserHandle, job.HoldDID, job.HoldEndpoint, job.Tier, string(job.Config), string(job.Layers))
151151+ if err != nil {
152152+ return fmt.Errorf("failed to insert scan job: %w", err)
153153+ }
154154+155155+ seq, err := result.LastInsertId()
156156+ if err != nil {
157157+ return fmt.Errorf("failed to get job seq: %w", err)
158158+ }
159159+ job.Seq = seq
160160+161161+ slog.Info("Scan job enqueued",
162162+ "seq", seq,
163163+ "repository", job.Repository,
164164+ "tag", job.Tag,
165165+ "tier", job.Tier)
166166+167167+ // Try to dispatch immediately
168168+ sb.dispatchJob(job)
169169+170170+ return nil
171171+}
172172+173173+// Subscribe adds a new scanner WebSocket subscriber and drains pending jobs to it
174174+func (sb *ScanBroadcaster) Subscribe(conn *websocket.Conn, cursor int64) *ScanSubscriber {
175175+ id := generateSubscriberID()
176176+ sub := &ScanSubscriber{
177177+ conn: conn,
178178+ send: make(chan *ScanJobEvent, 20),
179179+ id: id,
180180+ done: make(chan struct{}),
181181+ }
182182+183183+ sb.mu.Lock()
184184+ sb.subscribers = append(sb.subscribers, sub)
185185+ sb.mu.Unlock()
186186+187187+ slog.Info("Scanner subscribed",
188188+ "id", id,
189189+ "remote", conn.RemoteAddr(),
190190+ "cursor", cursor,
191191+ "totalSubscribers", len(sb.subscribers))
192192+193193+ // Start writer goroutine (sends jobs to scanner)
194194+ go sb.handleWriter(sub)
195195+196196+ // Start reader goroutine (receives acks/results/errors from scanner)
197197+ go sb.handleReader(sub)
198198+199199+ // Drain pending and timed-out jobs from database
200200+ go sb.drainPendingJobs(sub, cursor)
201201+202202+ return sub
203203+}
204204+205205+// Unsubscribe removes a scanner subscriber and makes its jobs re-dispatchable
206206+func (sb *ScanBroadcaster) Unsubscribe(sub *ScanSubscriber) {
207207+ sb.mu.Lock()
208208+ defer sb.mu.Unlock()
209209+210210+ for i, s := range sb.subscribers {
211211+ if s == sub {
212212+ sb.subscribers = append(sb.subscribers[:i], sb.subscribers[i+1:]...)
213213+ break
214214+ }
215215+ }
216216+217217+ // Mark assigned jobs as pending again so they can be re-dispatched
218218+ _, err := sb.db.Exec(`
219219+ UPDATE scan_jobs SET status = 'pending', assigned_to = NULL, assigned_at = NULL
220220+ WHERE assigned_to = ? AND status IN ('pending', 'assigned')
221221+ `, sub.id)
222222+ if err != nil {
223223+ slog.Error("Failed to unassign jobs from disconnected scanner",
224224+ "subscriberId", sub.id,
225225+ "error", err)
226226+ }
227227+228228+ close(sub.send)
229229+230230+ slog.Info("Scanner unsubscribed",
231231+ "id", sub.id,
232232+ "totalSubscribers", len(sb.subscribers))
233233+}
234234+235235+// dispatchJob sends a job to the next available scanner via round-robin
236236+func (sb *ScanBroadcaster) dispatchJob(job *ScanJobEvent) {
237237+ sb.mu.Lock()
238238+ defer sb.mu.Unlock()
239239+240240+ if len(sb.subscribers) == 0 {
241241+ slog.Debug("No scanners connected, job will wait in queue", "seq", job.Seq)
242242+ return
243243+ }
244244+245245+ // Round-robin dispatch
246246+ sub := sb.subscribers[sb.nextIdx%len(sb.subscribers)]
247247+ sb.nextIdx++
248248+249249+ // Mark as assigned in database
250250+ _, err := sb.db.Exec(`
251251+ UPDATE scan_jobs SET status = 'assigned', assigned_to = ?, assigned_at = ?
252252+ WHERE seq = ? AND status = 'pending'
253253+ `, sub.id, time.Now(), job.Seq)
254254+ if err != nil {
255255+ slog.Error("Failed to assign scan job", "seq", job.Seq, "error", err)
256256+ return
257257+ }
258258+259259+ // Send to subscriber
260260+ select {
261261+ case sub.send <- job:
262262+ slog.Info("Scan job dispatched",
263263+ "seq", job.Seq,
264264+ "repository", job.Repository,
265265+ "subscriberId", sub.id)
266266+ default:
267267+ slog.Warn("Scanner buffer full, re-marking job as pending",
268268+ "seq", job.Seq,
269269+ "subscriberId", sub.id)
270270+ sb.db.Exec(`UPDATE scan_jobs SET status = 'pending', assigned_to = NULL, assigned_at = NULL WHERE seq = ?`, job.Seq)
271271+ }
272272+}
273273+274274+// handleWriter sends jobs to a scanner over its WebSocket connection
275275+func (sb *ScanBroadcaster) handleWriter(sub *ScanSubscriber) {
276276+ defer func() {
277277+ sub.conn.Close()
278278+ close(sub.done)
279279+ }()
280280+281281+ for job := range sub.send {
282282+ data, err := json.Marshal(job)
283283+ if err != nil {
284284+ slog.Error("Failed to marshal scan job", "seq", job.Seq, "error", err)
285285+ continue
286286+ }
287287+288288+ if err := sub.conn.WriteMessage(websocket.TextMessage, data); err != nil {
289289+ slog.Error("Failed to write scan job to WebSocket",
290290+ "seq", job.Seq,
291291+ "subscriberId", sub.id,
292292+ "error", err)
293293+ sb.Unsubscribe(sub)
294294+ return
295295+ }
296296+ }
297297+}
298298+299299+// handleReader receives ack/result/error messages from a scanner
300300+func (sb *ScanBroadcaster) handleReader(sub *ScanSubscriber) {
301301+ defer sb.Unsubscribe(sub)
302302+303303+ for {
304304+ _, data, err := sub.conn.ReadMessage()
305305+ if err != nil {
306306+ if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
307307+ slog.Error("Scanner WebSocket read error",
308308+ "subscriberId", sub.id,
309309+ "error", err)
310310+ }
311311+ return
312312+ }
313313+314314+ var msg ScannerMessage
315315+ if err := json.Unmarshal(data, &msg); err != nil {
316316+ slog.Error("Failed to unmarshal scanner message",
317317+ "subscriberId", sub.id,
318318+ "error", err)
319319+ continue
320320+ }
321321+322322+ switch msg.Type {
323323+ case "ack":
324324+ sb.handleAck(sub, msg.Seq)
325325+ case "result":
326326+ sb.handleResult(sub, msg)
327327+ case "error":
328328+ sb.handleError(sub, msg)
329329+ default:
330330+ slog.Warn("Unknown scanner message type",
331331+ "type", msg.Type,
332332+ "subscriberId", sub.id)
333333+ }
334334+ }
335335+}
336336+337337+// handleAck marks a job as processing (scanner received and queued it)
338338+func (sb *ScanBroadcaster) handleAck(sub *ScanSubscriber, seq int64) {
339339+ _, err := sb.db.Exec(`
340340+ UPDATE scan_jobs SET status = 'processing'
341341+ WHERE seq = ? AND assigned_to = ? AND status = 'assigned'
342342+ `, seq, sub.id)
343343+ if err != nil {
344344+ slog.Error("Failed to update job status to processing",
345345+ "seq", seq,
346346+ "subscriberId", sub.id,
347347+ "error", err)
348348+ return
349349+ }
350350+351351+ slog.Info("Scan job acknowledged",
352352+ "seq", seq,
353353+ "subscriberId", sub.id)
354354+}
355355+356356+// handleResult processes a completed scan result: stores ORAS manifest + marks completed
357357+func (sb *ScanBroadcaster) handleResult(sub *ScanSubscriber, msg ScannerMessage) {
358358+ ctx := context.Background()
359359+360360+ slog.Info("Scan result received",
361361+ "seq", msg.Seq,
362362+ "subscriberId", sub.id,
363363+ "hasSBOM", msg.SBOM != "",
364364+ "hasVulnReport", msg.VulnReport != "",
365365+ "summary", msg.Summary)
366366+367367+ // Get job details from database
368368+ var (
369369+ manifestDigest string
370370+ repository string
371371+ tag string
372372+ userDID string
373373+ )
374374+ err := sb.db.QueryRow(`
375375+ SELECT manifest_digest, repository, tag, user_did
376376+ FROM scan_jobs WHERE seq = ?
377377+ `, msg.Seq).Scan(&manifestDigest, &repository, &tag, &userDID)
378378+ if err != nil {
379379+ slog.Error("Failed to get job details for result storage",
380380+ "seq", msg.Seq,
381381+ "error", err)
382382+ return
383383+ }
384384+385385+ // Store vulnerability report blob in S3
386386+ if msg.VulnReport != "" {
387387+ vulnJSON := []byte(msg.VulnReport)
388388+ vulnDigest := fmt.Sprintf("sha256:%x", sha256.Sum256(vulnJSON))
389389+390390+ if err := sb.uploadBlob(ctx, vulnDigest, vulnJSON); err != nil {
391391+ slog.Error("Failed to upload vulnerability report blob",
392392+ "seq", msg.Seq,
393393+ "error", err)
394394+ }
395395+396396+ // Build and store ORAS manifest
397397+ if msg.Summary != nil {
398398+ if err := sb.storeORASManifest(ctx, manifestDigest, repository, userDID, vulnDigest, vulnJSON, *msg.Summary); err != nil {
399399+ slog.Error("Failed to store ORAS manifest",
400400+ "seq", msg.Seq,
401401+ "error", err)
402402+ }
403403+ }
404404+ }
405405+406406+ // Store SBOM blob if provided
407407+ if msg.SBOM != "" {
408408+ sbomJSON := []byte(msg.SBOM)
409409+ sbomDigest := fmt.Sprintf("sha256:%x", sha256.Sum256(sbomJSON))
410410+411411+ if err := sb.uploadBlob(ctx, sbomDigest, sbomJSON); err != nil {
412412+ slog.Error("Failed to upload SBOM blob",
413413+ "seq", msg.Seq,
414414+ "error", err)
415415+ }
416416+ }
417417+418418+ // Mark job as completed
419419+ _, err = sb.db.Exec(`
420420+ UPDATE scan_jobs SET status = 'completed', completed_at = ?
421421+ WHERE seq = ?
422422+ `, time.Now(), msg.Seq)
423423+ if err != nil {
424424+ slog.Error("Failed to mark scan job as completed",
425425+ "seq", msg.Seq,
426426+ "error", err)
427427+ }
428428+429429+ slog.Info("Scan job completed",
430430+ "seq", msg.Seq,
431431+ "repository", repository,
432432+ "tag", tag,
433433+ "critical", msg.Summary.Critical,
434434+ "high", msg.Summary.High,
435435+ "total", msg.Summary.Total)
436436+}
437437+438438+// handleError marks a job as failed
439439+func (sb *ScanBroadcaster) handleError(sub *ScanSubscriber, msg ScannerMessage) {
440440+ _, err := sb.db.Exec(`
441441+ UPDATE scan_jobs SET status = 'failed', completed_at = ?
442442+ WHERE seq = ?
443443+ `, time.Now(), msg.Seq)
444444+ if err != nil {
445445+ slog.Error("Failed to mark scan job as failed",
446446+ "seq", msg.Seq,
447447+ "error", err)
448448+ }
449449+450450+ slog.Warn("Scan job failed",
451451+ "seq", msg.Seq,
452452+ "subscriberId", sub.id,
453453+ "error", msg.Error)
454454+}
455455+456456+// drainPendingJobs sends pending/timed-out jobs to a newly connected scanner
457457+func (sb *ScanBroadcaster) drainPendingJobs(sub *ScanSubscriber, cursor int64) {
458458+ rows, err := sb.db.Query(`
459459+ SELECT seq, manifest_digest, repository, tag, user_did, user_handle, hold_did, hold_endpoint, tier, config_json, layers_json
460460+ FROM scan_jobs
461461+ WHERE status = 'pending' AND seq > ?
462462+ ORDER BY seq ASC
463463+ `, cursor)
464464+ if err != nil {
465465+ slog.Error("Failed to drain pending scan jobs", "error", err)
466466+ return
467467+ }
468468+ defer rows.Close()
469469+470470+ count := 0
471471+ for rows.Next() {
472472+ job := &ScanJobEvent{Type: "job"}
473473+ var configJSON, layersJSON string
474474+475475+ err := rows.Scan(
476476+ &job.Seq, &job.ManifestDigest, &job.Repository, &job.Tag,
477477+ &job.UserDID, &job.UserHandle, &job.HoldDID, &job.HoldEndpoint,
478478+ &job.Tier, &configJSON, &layersJSON,
479479+ )
480480+ if err != nil {
481481+ slog.Error("Failed to scan pending job row", "error", err)
482482+ continue
483483+ }
484484+485485+ job.Config = json.RawMessage(configJSON)
486486+ job.Layers = json.RawMessage(layersJSON)
487487+488488+ // Assign and dispatch
489489+ _, err = sb.db.Exec(`
490490+ UPDATE scan_jobs SET status = 'assigned', assigned_to = ?, assigned_at = ?
491491+ WHERE seq = ? AND status = 'pending'
492492+ `, sub.id, time.Now(), job.Seq)
493493+ if err != nil {
494494+ continue
495495+ }
496496+497497+ select {
498498+ case sub.send <- job:
499499+ count++
500500+ case <-sub.done:
501501+ return
502502+ case <-time.After(5 * time.Second):
503503+ slog.Warn("Drain timeout for scanner", "subscriberId", sub.id)
504504+ return
505505+ }
506506+ }
507507+508508+ if count > 0 {
509509+ slog.Info("Drained pending jobs to scanner",
510510+ "subscriberId", sub.id,
511511+ "jobsDrained", count)
512512+ }
513513+}
514514+515515+// reDispatchLoop periodically checks for timed-out jobs and re-dispatches them
516516+func (sb *ScanBroadcaster) reDispatchLoop() {
517517+ ticker := time.NewTicker(30 * time.Second)
518518+ defer ticker.Stop()
519519+520520+ for range ticker.C {
521521+ sb.reDispatchTimedOut()
522522+ }
523523+}
524524+525525+// reDispatchTimedOut finds jobs that were assigned but not acked/completed within timeout
526526+func (sb *ScanBroadcaster) reDispatchTimedOut() {
527527+ timeout := time.Now().Add(-sb.ackTimeout)
528528+529529+ rows, err := sb.db.Query(`
530530+ SELECT seq, manifest_digest, repository, tag, user_did, user_handle, hold_did, hold_endpoint, tier, config_json, layers_json
531531+ FROM scan_jobs
532532+ WHERE status = 'assigned' AND assigned_at < ?
533533+ ORDER BY seq ASC
534534+ `, timeout)
535535+ if err != nil {
536536+ slog.Error("Failed to query timed-out scan jobs", "error", err)
537537+ return
538538+ }
539539+ defer rows.Close()
540540+541541+ for rows.Next() {
542542+ job := &ScanJobEvent{Type: "job"}
543543+ var configJSON, layersJSON string
544544+545545+ err := rows.Scan(
546546+ &job.Seq, &job.ManifestDigest, &job.Repository, &job.Tag,
547547+ &job.UserDID, &job.UserHandle, &job.HoldDID, &job.HoldEndpoint,
548548+ &job.Tier, &configJSON, &layersJSON,
549549+ )
550550+ if err != nil {
551551+ continue
552552+ }
553553+554554+ job.Config = json.RawMessage(configJSON)
555555+ job.Layers = json.RawMessage(layersJSON)
556556+557557+ // Reset to pending and re-dispatch
558558+ _, err = sb.db.Exec(`
559559+ UPDATE scan_jobs SET status = 'pending', assigned_to = NULL, assigned_at = NULL
560560+ WHERE seq = ?
561561+ `, job.Seq)
562562+ if err != nil {
563563+ continue
564564+ }
565565+566566+ slog.Info("Re-dispatching timed-out scan job",
567567+ "seq", job.Seq,
568568+ "repository", job.Repository)
569569+570570+ sb.dispatchJob(job)
571571+ }
572572+}
573573+574574+// Close closes the scan broadcaster's database connection
575575+func (sb *ScanBroadcaster) Close() error {
576576+ if sb.db != nil {
577577+ return sb.db.Close()
578578+ }
579579+ return nil
580580+}
581581+582582+// ValidateScannerSecret checks if the provided secret matches
583583+func (sb *ScanBroadcaster) ValidateScannerSecret(secret string) bool {
584584+ return sb.secret != "" && secret == sb.secret
585585+}
586586+587587+// storeORASManifest creates an ORAS vulnerability manifest as a blob in S3
588588+// The ORAS manifest's "subject" field references the original manifest by digest,
589589+// enabling OCI referrers API discovery.
590590+func (sb *ScanBroadcaster) storeORASManifest(ctx context.Context, manifestDigest, repository, userDID, vulnDigest string, vulnJSON []byte, summary VulnerabilitySummary) error {
591591+ scannerVersion := "atcr-scanner-v1.0.0"
592592+593593+ // Create ORAS manifest
594594+ orasManifest := map[string]interface{}{
595595+ "schemaVersion": 2,
596596+ "mediaType": "application/vnd.oci.image.manifest.v1+json",
597597+ "artifactType": "application/vnd.atcr.vulnerabilities+json",
598598+ "config": map[string]interface{}{
599599+ "mediaType": "application/vnd.oci.empty.v1+json",
600600+ "digest": "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a",
601601+ "size": 2,
602602+ },
603603+ "subject": map[string]interface{}{
604604+ "mediaType": "application/vnd.oci.image.manifest.v1+json",
605605+ "digest": manifestDigest,
606606+ "size": 0,
607607+ },
608608+ "layers": []map[string]interface{}{
609609+ {
610610+ "mediaType": "application/json",
611611+ "digest": vulnDigest,
612612+ "size": len(vulnJSON),
613613+ "annotations": map[string]string{
614614+ "org.opencontainers.image.title": "vulnerability-report.json",
615615+ },
616616+ },
617617+ },
618618+ "annotations": map[string]string{
619619+ "io.atcr.vuln.critical": fmt.Sprintf("%d", summary.Critical),
620620+ "io.atcr.vuln.high": fmt.Sprintf("%d", summary.High),
621621+ "io.atcr.vuln.medium": fmt.Sprintf("%d", summary.Medium),
622622+ "io.atcr.vuln.low": fmt.Sprintf("%d", summary.Low),
623623+ "io.atcr.vuln.total": fmt.Sprintf("%d", summary.Total),
624624+ "io.atcr.vuln.scannedAt": time.Now().Format(time.RFC3339),
625625+ "io.atcr.vuln.scannerVersion": scannerVersion,
626626+ "io.atcr.vuln.repository": repository,
627627+ "io.atcr.vuln.ownerDid": userDID,
628628+ "io.atcr.vuln.holdDid": sb.holdDID,
629629+ },
630630+ }
631631+632632+ orasManifestJSON, err := json.Marshal(orasManifest)
633633+ if err != nil {
634634+ return fmt.Errorf("failed to encode ORAS manifest: %w", err)
635635+ }
636636+637637+ orasHash := sha256.Sum256(orasManifestJSON)
638638+ orasDigest := fmt.Sprintf("sha256:%x", orasHash)
639639+640640+ // Upload ORAS manifest blob to S3
641641+ if err := sb.uploadBlob(ctx, orasDigest, orasManifestJSON); err != nil {
642642+ return fmt.Errorf("failed to upload ORAS manifest blob: %w", err)
643643+ }
644644+645645+ slog.Info("ORAS manifest stored",
646646+ "digest", orasDigest,
647647+ "repository", repository,
648648+ "userDid", userDID,
649649+ "critical", summary.Critical,
650650+ "high", summary.High,
651651+ "total", summary.Total)
652652+653653+ return nil
654654+}
655655+656656+// uploadBlob uploads a blob to S3 storage
657657+func (sb *ScanBroadcaster) uploadBlob(ctx context.Context, digest string, data []byte) error {
658658+ digestHex := digest[len("sha256:"):]
659659+ if len(digestHex) < 2 {
660660+ return fmt.Errorf("invalid digest: %s", digest)
661661+ }
662662+663663+ blobPath := fmt.Sprintf("/docker/registry/v2/blobs/sha256/%s/%s/data",
664664+ digestHex[:2], digestHex)
665665+666666+ writer, err := sb.driver.Writer(ctx, blobPath, false)
667667+ if err != nil {
668668+ return fmt.Errorf("failed to create storage writer: %w", err)
669669+ }
670670+ defer writer.Close()
671671+672672+ if _, err := writer.Write(data); err != nil {
673673+ writer.Cancel(ctx)
674674+ return fmt.Errorf("failed to write blob data: %w", err)
675675+ }
676676+677677+ return writer.Commit(ctx)
678678+}
679679+680680+func generateSubscriberID() string {
681681+ b := make([]byte, 8)
682682+ rand.Read(b)
683683+ return hex.EncodeToString(b)
684684+}
+70-20
pkg/hold/pds/xrpc.go
···30303131 "github.com/multiformats/go-multihash"
32323333- awss3 "github.com/aws/aws-sdk-go/service/s3"
3333+ awss3 "github.com/aws/aws-sdk-go-v2/service/s3"
3434)
35353636// XRPC handler for ATProto endpoints
···44444545// XRPCHandler handles XRPC requests for the embedded PDS
4646type XRPCHandler struct {
4747- pds *HoldPDS
4848- s3Service s3.S3Service
4949- storageDriver driver.StorageDriver
5050- broadcaster *EventBroadcaster
5151- httpClient HTTPClient // For testing - allows injecting mock HTTP client
5252- quotaMgr *quota.Manager // Quota manager for tier-based limits
4747+ pds *HoldPDS
4848+ s3Service s3.S3Service
4949+ storageDriver driver.StorageDriver
5050+ broadcaster *EventBroadcaster
5151+ scanBroadcaster *ScanBroadcaster // Scan job dispatcher for connected scanners
5252+ httpClient HTTPClient // For testing - allows injecting mock HTTP client
5353+ quotaMgr *quota.Manager // Quota manager for tier-based limits
5354}
54555556// PartInfo represents a completed part in a multipart upload
···7677 httpClient: httpClient,
7778 quotaMgr: quotaMgr,
7879 }
8080+}
8181+8282+// SetScanBroadcaster sets the scan broadcaster for dispatching scan jobs to scanners
8383+func (h *XRPCHandler) SetScanBroadcaster(sb *ScanBroadcaster) {
8484+ h.scanBroadcaster = sb
7985}
80868187// CORSMiddleware returns a simple CORS middleware configured for ATProto
···203209204210 // Public quota endpoint (no auth - quota is per-user, just needs userDid param)
205211 r.Get(atproto.HoldGetQuota, h.HandleGetQuota)
212212+213213+ // Scanner WebSocket endpoint (shared secret auth)
214214+ r.Get(atproto.HoldSubscribeScanJobs, h.HandleSubscribeScanJobs)
206215}
207216208217// HandleHealth returns health check information
···953962 // Subscribe to events
954963 // The broadcaster's handleSubscriber goroutine will manage this connection
955964 // and handle cleanup when the client disconnects
956956- h.broadcaster.Subscribe(conn, cursor)
965965+ h.broadcaster.Subscribe(conn, cursor, r.UserAgent())
966966+}
967967+968968+// HandleSubscribeScanJobs handles WebSocket connections from scanners
969969+// Scanners connect here to receive scan jobs and send back results
970970+func (h *XRPCHandler) HandleSubscribeScanJobs(w http.ResponseWriter, r *http.Request) {
971971+ if h.scanBroadcaster == nil {
972972+ http.Error(w, "scanning not enabled", http.StatusNotImplemented)
973973+ return
974974+ }
975975+976976+ // Authenticate via shared secret (query param or header)
977977+ secret := r.URL.Query().Get("secret")
978978+ if secret == "" {
979979+ secret = r.Header.Get("X-Scanner-Secret")
980980+ }
981981+ if !h.scanBroadcaster.ValidateScannerSecret(secret) {
982982+ http.Error(w, "invalid scanner secret", http.StatusUnauthorized)
983983+ return
984984+ }
985985+986986+ // Get optional cursor for backfill
987987+ var cursor int64 = -1
988988+ if cursorStr := r.URL.Query().Get("cursor"); cursorStr != "" {
989989+ var err error
990990+ cursor, err = strconv.ParseInt(cursorStr, 10, 64)
991991+ if err != nil {
992992+ http.Error(w, "invalid cursor parameter", http.StatusBadRequest)
993993+ return
994994+ }
995995+ }
996996+997997+ // Upgrade to WebSocket
998998+ conn, err := upgrader.Upgrade(w, r, nil)
999999+ if err != nil {
10001000+ slog.Error("Scanner WebSocket upgrade failed", "error", err)
10011001+ return
10021002+ }
10031003+10041004+ h.scanBroadcaster.Subscribe(conn, cursor)
10051005+}
10061006+10071007+// ScanBroadcasterRef returns the scan broadcaster (used by OCI handler to enqueue jobs)
10081008+func (h *XRPCHandler) ScanBroadcasterRef() *ScanBroadcaster {
10091009+ return h.scanBroadcaster
9571010}
95810119591012// HandleUploadBlob handles blob uploads with support for multipart operations
···13871440 s3Key = h.s3Service.PathPrefix + "/" + s3Key
13881441 }
1389144213901390- // Create appropriate S3 request based on operation
13911391- var req interface {
13921392- Presign(time.Duration) (string, error)
13931393- }
14431443+ // Generate presigned URL with 15 minute expiry
14441444+ var url string
14451445+ var err error
13941446 contentType := "application/octet-stream"
13951447 switch operation {
13961448 case http.MethodGet:
13971449 // Note: Don't use ResponseContentType - not supported by all S3-compatible services
13981398- req = h.s3Service.Client.GetObjectPresignable(&awss3.GetObjectInput{
14501450+ url, err = h.s3Service.Client.PresignGetObject(ctx, &awss3.GetObjectInput{
13991451 Bucket: &h.s3Service.Bucket,
14001452 Key: &s3Key,
14011401- })
14531453+ }, 15*time.Minute)
1402145414031455 case http.MethodHead:
14041404- req = h.s3Service.Client.HeadObjectPresignable(&awss3.HeadObjectInput{
14561456+ url, err = h.s3Service.Client.PresignHeadObject(ctx, &awss3.HeadObjectInput{
14051457 Bucket: &h.s3Service.Bucket,
14061458 Key: &s3Key,
14071407- })
14591459+ }, 15*time.Minute)
1408146014091461 case http.MethodPut:
14101410- req = h.s3Service.Client.PutObjectPresignable(&awss3.PutObjectInput{
14621462+ url, err = h.s3Service.Client.PresignPutObject(ctx, &awss3.PutObjectInput{
14111463 Bucket: &h.s3Service.Bucket,
14121464 Key: &s3Key,
14131465 ContentType: &contentType,
14141414- })
14661466+ }, 15*time.Minute)
1415146714161468 default:
14171469 return "", fmt.Errorf("unsupported operation: %s", operation)
14181470 }
1419147114201420- // Generate presigned URL with 15 minute expiry
14211421- url, err := req.Presign(15 * time.Minute)
14221472 if err != nil {
14231473 slog.Warn("Presign failed, falling back to XRPC endpoint",
14241474 "error", err,
-270
pkg/hold/scanner/extractor.go
···11-package scanner
22-33-import (
44- "archive/tar"
55- "compress/gzip"
66- "context"
77- "encoding/json"
88- "fmt"
99- "io"
1010- "log/slog"
1111- "os"
1212- "path/filepath"
1313- "strings"
1414-)
1515-1616-// extractLayers extracts all image layers from storage to a temporary directory
1717-// Returns the directory path and a cleanup function
1818-func (w *Worker) extractLayers(ctx context.Context, job *ScanJob) (string, func(), error) {
1919- // Create temp directory for extraction
2020- // Use the database directory as the base (since we're in a scratch container with no /tmp)
2121- scanTmpBase := filepath.Join(w.config.Database.Path, "scanner-tmp")
2222- if err := os.MkdirAll(scanTmpBase, 0755); err != nil {
2323- return "", nil, fmt.Errorf("failed to create scanner temp base: %w", err)
2424- }
2525-2626- tmpDir, err := os.MkdirTemp(scanTmpBase, "scan-*")
2727- if err != nil {
2828- return "", nil, fmt.Errorf("failed to create temp directory: %w", err)
2929- }
3030-3131- cleanup := func() {
3232- if err := os.RemoveAll(tmpDir); err != nil {
3333- slog.Warn("Failed to clean up temp directory", "dir", tmpDir, "error", err)
3434- }
3535- }
3636-3737- // Create image directory structure
3838- imageDir := filepath.Join(tmpDir, "image")
3939- if err := os.MkdirAll(imageDir, 0755); err != nil {
4040- cleanup()
4141- return "", nil, fmt.Errorf("failed to create image directory: %w", err)
4242- }
4343-4444- // Download and extract config blob
4545- slog.Info("Downloading config blob", "digest", job.Config.Digest)
4646- configPath := filepath.Join(imageDir, "config.json")
4747- if err := w.downloadBlob(ctx, job.Config.Digest, configPath); err != nil {
4848- cleanup()
4949- return "", nil, fmt.Errorf("failed to download config blob: %w", err)
5050- }
5151-5252- // Validate config is valid JSON
5353- configData, err := os.ReadFile(configPath)
5454- if err != nil {
5555- cleanup()
5656- return "", nil, fmt.Errorf("failed to read config: %w", err)
5757- }
5858- var configObj map[string]interface{}
5959- if err := json.Unmarshal(configData, &configObj); err != nil {
6060- cleanup()
6161- return "", nil, fmt.Errorf("invalid config JSON: %w", err)
6262- }
6363-6464- // Create layers directory for extracted content
6565- layersDir := filepath.Join(imageDir, "layers")
6666- if err := os.MkdirAll(layersDir, 0755); err != nil {
6767- cleanup()
6868- return "", nil, fmt.Errorf("failed to create layers directory: %w", err)
6969- }
7070-7171- // Download and extract each layer in order (creating overlayfs-style filesystem)
7272- rootfsDir := filepath.Join(imageDir, "rootfs")
7373- if err := os.MkdirAll(rootfsDir, 0755); err != nil {
7474- cleanup()
7575- return "", nil, fmt.Errorf("failed to create rootfs directory: %w", err)
7676- }
7777-7878- for i, layer := range job.Layers {
7979- slog.Info("Extracting layer", "index", i, "digest", layer.Digest, "size", layer.Size)
8080-8181- // Download layer blob to temp file
8282- layerPath := filepath.Join(layersDir, fmt.Sprintf("layer-%d.tar.gz", i))
8383- if err := w.downloadBlob(ctx, layer.Digest, layerPath); err != nil {
8484- cleanup()
8585- return "", nil, fmt.Errorf("failed to download layer %d: %w", i, err)
8686- }
8787-8888- // Extract layer on top of rootfs (overlayfs style)
8989- if err := w.extractTarGz(layerPath, rootfsDir); err != nil {
9090- cleanup()
9191- return "", nil, fmt.Errorf("failed to extract layer %d: %w", i, err)
9292- }
9393-9494- // Remove layer tar.gz to save space
9595- os.Remove(layerPath)
9696- }
9797-9898- // Check what was extracted
9999- entries, err := os.ReadDir(rootfsDir)
100100- if err != nil {
101101- slog.Warn("Failed to read rootfs directory", "error", err)
102102- } else {
103103- slog.Info("Successfully extracted image",
104104- "layers", len(job.Layers),
105105- "rootfs", rootfsDir,
106106- "topLevelEntries", len(entries),
107107- "sampleEntries", func() []string {
108108- var samples []string
109109- for i, e := range entries {
110110- if i >= 10 {
111111- break
112112- }
113113- samples = append(samples, e.Name())
114114- }
115115- return samples
116116- }())
117117- }
118118-119119- return rootfsDir, cleanup, nil
120120-}
121121-122122-// downloadBlob downloads a blob from storage to a local file
123123-func (w *Worker) downloadBlob(ctx context.Context, digest, destPath string) error {
124124- // Convert digest to storage path using distribution's sharding scheme
125125- // Format: /docker/registry/v2/blobs/sha256/47/4734bc89.../data
126126- // where 47 is the first 2 characters of the hash for directory sharding
127127- blobPath := blobPathForDigest(digest)
128128-129129- // Open blob from storage driver
130130- reader, err := w.driver.Reader(ctx, blobPath, 0)
131131- if err != nil {
132132- return fmt.Errorf("failed to open blob %s: %w", digest, err)
133133- }
134134- defer reader.Close()
135135-136136- // Create destination file
137137- dest, err := os.Create(destPath)
138138- if err != nil {
139139- return fmt.Errorf("failed to create destination file: %w", err)
140140- }
141141- defer dest.Close()
142142-143143- // Copy blob data to file
144144- if _, err := io.Copy(dest, reader); err != nil {
145145- return fmt.Errorf("failed to copy blob data: %w", err)
146146- }
147147-148148- return nil
149149-}
150150-151151-// extractTarGz extracts a tar.gz file to a destination directory (overlayfs style)
152152-func (w *Worker) extractTarGz(tarGzPath, destDir string) error {
153153- // Open tar.gz file
154154- file, err := os.Open(tarGzPath)
155155- if err != nil {
156156- return fmt.Errorf("failed to open tar.gz: %w", err)
157157- }
158158- defer file.Close()
159159-160160- // Create gzip reader
161161- gzr, err := gzip.NewReader(file)
162162- if err != nil {
163163- return fmt.Errorf("failed to create gzip reader: %w", err)
164164- }
165165- defer gzr.Close()
166166-167167- // Create tar reader
168168- tr := tar.NewReader(gzr)
169169-170170- // Extract each file
171171- for {
172172- header, err := tr.Next()
173173- if err == io.EOF {
174174- break
175175- }
176176- if err != nil {
177177- return fmt.Errorf("failed to read tar header: %w", err)
178178- }
179179-180180- // Build target path (clean to prevent path traversal)
181181- target := filepath.Join(destDir, filepath.Clean(header.Name))
182182-183183- // Ensure target is within destDir (security check)
184184- if !strings.HasPrefix(target, filepath.Clean(destDir)+string(os.PathSeparator)) {
185185- slog.Warn("Skipping path outside destination", "path", header.Name)
186186- continue
187187- }
188188-189189- switch header.Typeflag {
190190- case tar.TypeDir:
191191- // Create directory
192192- if err := os.MkdirAll(target, os.FileMode(header.Mode)); err != nil {
193193- return fmt.Errorf("failed to create directory %s: %w", target, err)
194194- }
195195-196196- case tar.TypeReg:
197197- // Create parent directory
198198- if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil {
199199- return fmt.Errorf("failed to create parent directory: %w", err)
200200- }
201201-202202- // Create file
203203- outFile, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.FileMode(header.Mode))
204204- if err != nil {
205205- return fmt.Errorf("failed to create file %s: %w", target, err)
206206- }
207207-208208- // Copy file contents
209209- if _, err := io.Copy(outFile, tr); err != nil {
210210- outFile.Close()
211211- return fmt.Errorf("failed to write file %s: %w", target, err)
212212- }
213213- outFile.Close()
214214-215215- case tar.TypeSymlink:
216216- // Create symlink
217217- if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil {
218218- return fmt.Errorf("failed to create parent directory for symlink: %w", err)
219219- }
220220-221221- // Remove existing file/symlink if it exists
222222- os.Remove(target)
223223-224224- if err := os.Symlink(header.Linkname, target); err != nil {
225225- slog.Warn("Failed to create symlink", "target", target, "link", header.Linkname, "error", err)
226226- }
227227-228228- case tar.TypeLink:
229229- // Create hard link
230230- linkTarget := filepath.Join(destDir, filepath.Clean(header.Linkname))
231231- if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil {
232232- return fmt.Errorf("failed to create parent directory for hardlink: %w", err)
233233- }
234234-235235- // Remove existing file if it exists
236236- os.Remove(target)
237237-238238- if err := os.Link(linkTarget, target); err != nil {
239239- slog.Warn("Failed to create hardlink", "target", target, "link", linkTarget, "error", err)
240240- }
241241-242242- default:
243243- slog.Debug("Skipping unsupported tar entry type", "type", header.Typeflag, "name", header.Name)
244244- }
245245- }
246246-247247- return nil
248248-}
249249-250250-// blobPathForDigest converts a digest to a storage path using distribution's sharding scheme
251251-// Format: /docker/registry/v2/blobs/sha256/47/4734bc89.../data
252252-// where 47 is the first 2 characters of the hash for directory sharding
253253-func blobPathForDigest(digest string) string {
254254- // Split digest into algorithm and hash
255255- parts := strings.SplitN(digest, ":", 2)
256256- if len(parts) != 2 {
257257- // Fallback for malformed digest
258258- return fmt.Sprintf("/docker/registry/v2/blobs/%s/data", digest)
259259- }
260260-261261- algorithm := parts[0]
262262- hash := parts[1]
263263-264264- // Use first 2 characters for sharding
265265- if len(hash) < 2 {
266266- return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/data", algorithm, hash)
267267- }
268268-269269- return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/%s/data", algorithm, hash[:2], hash)
270270-}
-351
pkg/hold/scanner/grype.go
···11-package scanner
22-33-import (
44- "context"
55- "crypto/sha256"
66- "encoding/json"
77- "fmt"
88- "log/slog"
99- "os"
1010- "path/filepath"
1111- "sync"
1212-1313- "github.com/anchore/grype/grype"
1414- "github.com/anchore/grype/grype/db/v6/distribution"
1515- "github.com/anchore/grype/grype/db/v6/installation"
1616- "github.com/anchore/grype/grype/distro"
1717- "github.com/anchore/grype/grype/match"
1818- "github.com/anchore/grype/grype/matcher"
1919- "github.com/anchore/grype/grype/matcher/dotnet"
2020- "github.com/anchore/grype/grype/matcher/golang"
2121- "github.com/anchore/grype/grype/matcher/java"
2222- "github.com/anchore/grype/grype/matcher/javascript"
2323- "github.com/anchore/grype/grype/matcher/python"
2424- "github.com/anchore/grype/grype/matcher/ruby"
2525- "github.com/anchore/grype/grype/matcher/stock"
2626- grypePkg "github.com/anchore/grype/grype/pkg"
2727- "github.com/anchore/grype/grype/vulnerability"
2828- "github.com/anchore/syft/syft/sbom"
2929-)
3030-3131-// Global vulnerability database (shared across workers)
3232-var (
3333- vulnDB vulnerability.Provider
3434- vulnDBLock sync.RWMutex
3535-)
3636-3737-// scanVulnerabilities scans an SBOM for vulnerabilities using Grype
3838-// Returns vulnerability report JSON, digest, summary, and any error
3939-func (w *Worker) scanVulnerabilities(ctx context.Context, s *sbom.SBOM) ([]byte, string, VulnerabilitySummary, error) {
4040- slog.Info("Scanning for vulnerabilities with Grype")
4141-4242- // Load vulnerability database (cached globally)
4343- store, err := w.loadVulnDatabase(ctx)
4444- if err != nil {
4545- return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to load vulnerability database: %w", err)
4646- }
4747-4848- // Create package context from SBOM (need distro for synthesis)
4949- var grypeDistro *distro.Distro
5050- if s.Artifacts.LinuxDistribution != nil {
5151- grypeDistro = distro.FromRelease(s.Artifacts.LinuxDistribution, nil)
5252- if grypeDistro != nil {
5353- slog.Info("Using distro for package synthesis",
5454- "name", grypeDistro.Name(),
5555- "version", grypeDistro.Version,
5656- "type", grypeDistro.Type,
5757- "codename", grypeDistro.Codename)
5858- }
5959- }
6060-6161- // Convert Syft packages to Grype packages WITH distro info
6262- synthesisConfig := grypePkg.SynthesisConfig{
6363- GenerateMissingCPEs: true,
6464- Distro: grypePkg.DistroConfig{
6565- Override: grypeDistro,
6666- },
6767- }
6868- grypePackages := grypePkg.FromCollection(s.Artifacts.Packages, synthesisConfig)
6969-7070- slog.Info("Converted packages for vulnerability scanning",
7171- "syftPackages", s.Artifacts.Packages.PackageCount(),
7272- "grypePackages", len(grypePackages),
7373- "distro", func() string {
7474- if s.Artifacts.LinuxDistribution != nil {
7575- return fmt.Sprintf("%s %s", s.Artifacts.LinuxDistribution.Name, s.Artifacts.LinuxDistribution.Version)
7676- }
7777- return "none"
7878- }())
7979-8080- // Create matchers
8181- matchers := matcher.NewDefaultMatchers(matcher.Config{
8282- Java: java.MatcherConfig{},
8383- Ruby: ruby.MatcherConfig{},
8484- Python: python.MatcherConfig{},
8585- Dotnet: dotnet.MatcherConfig{},
8686- Javascript: javascript.MatcherConfig{},
8787- Golang: golang.MatcherConfig{},
8888- Stock: stock.MatcherConfig{},
8989- })
9090-9191- // Create package context with the same distro we used for synthesis
9292- pkgContext := grypePkg.Context{
9393- Source: &s.Source,
9494- Distro: grypeDistro,
9595- }
9696-9797- // Create vulnerability matcher
9898- vulnerabilityMatcher := &grype.VulnerabilityMatcher{
9999- VulnerabilityProvider: store,
100100- Matchers: matchers,
101101- NormalizeByCVE: true,
102102- }
103103-104104- // Find vulnerabilities
105105- slog.Info("Matching vulnerabilities",
106106- "packages", len(grypePackages),
107107- "distro", func() string {
108108- if grypeDistro != nil {
109109- return fmt.Sprintf("%s %s", grypeDistro.Name(), grypeDistro.Version)
110110- }
111111- return "none"
112112- }())
113113- allMatches, _, err := vulnerabilityMatcher.FindMatches(grypePackages, pkgContext)
114114- if err != nil {
115115- return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to find vulnerabilities: %w", err)
116116- }
117117-118118- slog.Info("Vulnerability matching complete",
119119- "totalMatches", allMatches.Count())
120120-121121- // If we found 0 matches, log some diagnostic info
122122- if allMatches.Count() == 0 {
123123- slog.Warn("No vulnerability matches found - this may indicate an issue",
124124- "distro", func() string {
125125- if grypeDistro != nil {
126126- return fmt.Sprintf("%s %s", grypeDistro.Name(), grypeDistro.Version)
127127- }
128128- return "none"
129129- }(),
130130- "packages", len(grypePackages),
131131- "databaseBuilt", func() string {
132132- vulnDBLock.RLock()
133133- defer vulnDBLock.RUnlock()
134134- if vulnDB == nil {
135135- return "not loaded"
136136- }
137137- // We can't easily get the build date here without exposing internal state
138138- return "loaded"
139139- }())
140140- }
141141-142142- // Count vulnerabilities by severity
143143- summary := w.countVulnerabilitiesBySeverity(*allMatches)
144144-145145- slog.Info("Vulnerability scan complete",
146146- "critical", summary.Critical,
147147- "high", summary.High,
148148- "medium", summary.Medium,
149149- "low", summary.Low,
150150- "total", summary.Total)
151151-152152- // Create vulnerability report JSON
153153- report := map[string]interface{}{
154154- "matches": allMatches.Sorted(),
155155- "source": s.Source,
156156- "distro": s.Artifacts.LinuxDistribution,
157157- "descriptor": map[string]interface{}{
158158- "name": "grype",
159159- "version": "v0.102.0", // TODO: Get actual Grype version
160160- },
161161- "summary": summary,
162162- }
163163-164164- // Encode report to JSON
165165- reportJSON, err := json.MarshalIndent(report, "", " ")
166166- if err != nil {
167167- return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to encode vulnerability report: %w", err)
168168- }
169169-170170- // Calculate digest
171171- hash := sha256.Sum256(reportJSON)
172172- digest := fmt.Sprintf("sha256:%x", hash)
173173-174174- slog.Info("Vulnerability report generated", "size", len(reportJSON), "digest", digest)
175175-176176- // Upload report blob to storage
177177- if err := w.uploadBlob(ctx, digest, reportJSON); err != nil {
178178- return nil, "", VulnerabilitySummary{}, fmt.Errorf("failed to upload vulnerability report: %w", err)
179179- }
180180-181181- return reportJSON, digest, summary, nil
182182-}
183183-184184-// loadVulnDatabase loads the Grype vulnerability database (with caching)
185185-func (w *Worker) loadVulnDatabase(ctx context.Context) (vulnerability.Provider, error) {
186186- // Check if database is already loaded
187187- vulnDBLock.RLock()
188188- if vulnDB != nil {
189189- vulnDBLock.RUnlock()
190190- return vulnDB, nil
191191- }
192192- vulnDBLock.RUnlock()
193193-194194- // Acquire write lock to load database
195195- vulnDBLock.Lock()
196196- defer vulnDBLock.Unlock()
197197-198198- // Check again (another goroutine might have loaded it)
199199- if vulnDB != nil {
200200- return vulnDB, nil
201201- }
202202-203203- slog.Info("Loading Grype vulnerability database", "path", w.config.Scanner.VulnDBPath)
204204-205205- // Ensure database directory exists
206206- if err := ensureDir(w.config.Scanner.VulnDBPath); err != nil {
207207- return nil, fmt.Errorf("failed to create vulnerability database directory: %w", err)
208208- }
209209-210210- // Configure database distribution
211211- distConfig := distribution.DefaultConfig()
212212-213213- // Configure database installation
214214- installConfig := installation.Config{
215215- DBRootDir: w.config.Scanner.VulnDBPath,
216216- ValidateAge: true,
217217- ValidateChecksum: true,
218218- MaxAllowedBuiltAge: w.config.Scanner.VulnDBUpdateInterval,
219219- }
220220-221221- // Load database (should already be downloaded by initializeVulnDatabase)
222222- store, status, err := grype.LoadVulnerabilityDB(distConfig, installConfig, false)
223223- if err != nil {
224224- return nil, fmt.Errorf("failed to load vulnerability database (status=%v): %w (hint: database may still be downloading)", status, err)
225225- }
226226-227227- slog.Info("Vulnerability database loaded",
228228- "status", status,
229229- "built", status.Built,
230230- "location", status.Path,
231231- "schemaVersion", status.SchemaVersion)
232232-233233- // Check database file size to verify it has content
234234- if stat, err := os.Stat(status.Path); err == nil {
235235- slog.Info("Vulnerability database file stats",
236236- "size", stat.Size(),
237237- "sizeMB", stat.Size()/1024/1024)
238238- }
239239-240240- // Cache database globally
241241- vulnDB = store
242242-243243- slog.Info("Vulnerability database loaded successfully")
244244- return vulnDB, nil
245245-}
246246-247247-// countVulnerabilitiesBySeverity counts vulnerabilities by severity level
248248-func (w *Worker) countVulnerabilitiesBySeverity(matches match.Matches) VulnerabilitySummary {
249249- summary := VulnerabilitySummary{}
250250-251251- for m := range matches.Enumerate() {
252252- summary.Total++
253253-254254- // Get severity from vulnerability metadata
255255- if m.Vulnerability.Metadata != nil {
256256- severity := m.Vulnerability.Metadata.Severity
257257- switch severity {
258258- case "Critical":
259259- summary.Critical++
260260- case "High":
261261- summary.High++
262262- case "Medium":
263263- summary.Medium++
264264- case "Low":
265265- summary.Low++
266266- }
267267- }
268268- }
269269-270270- return summary
271271-}
272272-273273-// initializeVulnDatabase downloads and initializes the vulnerability database on startup
274274-func (w *Worker) initializeVulnDatabase(ctx context.Context) error {
275275- slog.Info("Initializing vulnerability database", "path", w.config.Scanner.VulnDBPath)
276276-277277- // Ensure database directory exists
278278- if err := ensureDir(w.config.Scanner.VulnDBPath); err != nil {
279279- return fmt.Errorf("failed to create vulnerability database directory: %w", err)
280280- }
281281-282282- // Create temp directory for Grype downloads (scratch container has no /tmp)
283283- tmpDir := filepath.Join(w.config.Database.Path, "tmp")
284284- if err := ensureDir(tmpDir); err != nil {
285285- return fmt.Errorf("failed to create temp directory: %w", err)
286286- }
287287-288288- // Set TMPDIR environment variable so Grype uses our temp directory
289289- oldTmpDir := os.Getenv("TMPDIR")
290290- os.Setenv("TMPDIR", tmpDir)
291291- defer func() {
292292- if oldTmpDir != "" {
293293- os.Setenv("TMPDIR", oldTmpDir)
294294- } else {
295295- os.Unsetenv("TMPDIR")
296296- }
297297- }()
298298-299299- // Configure database distribution
300300- distConfig := distribution.DefaultConfig()
301301-302302- // Configure database installation
303303- installConfig := installation.Config{
304304- DBRootDir: w.config.Scanner.VulnDBPath,
305305- ValidateAge: true,
306306- ValidateChecksum: true,
307307- MaxAllowedBuiltAge: w.config.Scanner.VulnDBUpdateInterval,
308308- }
309309-310310- // Create distribution client for downloading
311311- downloader, err := distribution.NewClient(distConfig)
312312- if err != nil {
313313- return fmt.Errorf("failed to create database downloader: %w", err)
314314- }
315315-316316- // Create curator to manage database
317317- curator, err := installation.NewCurator(installConfig, downloader)
318318- if err != nil {
319319- return fmt.Errorf("failed to create database curator: %w", err)
320320- }
321321-322322- // Check if database already exists
323323- status := curator.Status()
324324- if !status.Built.IsZero() && status.Error == nil {
325325- slog.Info("Vulnerability database already exists", "built", status.Built, "schema", status.SchemaVersion)
326326- return nil
327327- }
328328-329329- // Download database (this may take several minutes)
330330- slog.Info("Downloading vulnerability database (this may take 5-10 minutes)...")
331331- updated, err := curator.Update()
332332- if err != nil {
333333- return fmt.Errorf("failed to download vulnerability database: %w", err)
334334- }
335335-336336- if updated {
337337- slog.Info("Vulnerability database downloaded successfully")
338338- } else {
339339- slog.Info("Vulnerability database is up to date")
340340- }
341341-342342- return nil
343343-}
344344-345345-// ensureDir creates a directory if it doesn't exist
346346-func ensureDir(path string) error {
347347- if err := os.MkdirAll(path, 0755); err != nil {
348348- return fmt.Errorf("failed to create directory %s: %w", path, err)
349349- }
350350- return nil
351351-}
-67
pkg/hold/scanner/job.go
···11-package scanner
22-33-import (
44- "time"
55-66- "atcr.io/pkg/atproto"
77-)
88-99-// ScanJob represents a vulnerability scanning job for a container image
1010-type ScanJob struct {
1111- // ManifestDigest is the digest of the manifest to scan
1212- ManifestDigest string
1313-1414- // Repository is the repository name (e.g., "alice/myapp")
1515- Repository string
1616-1717- // Tag is the tag name (e.g., "latest")
1818- Tag string
1919-2020- // UserDID is the DID of the user who owns this image
2121- UserDID string
2222-2323- // UserHandle is the handle of the user (for display)
2424- UserHandle string
2525-2626- // Config is the image config blob descriptor
2727- Config atproto.BlobReference
2828-2929- // Layers are the image layer blob descriptors (in order)
3030- Layers []atproto.BlobReference
3131-3232- // EnqueuedAt is when this job was enqueued
3333- EnqueuedAt time.Time
3434-}
3535-3636-// ScanResult represents the result of a vulnerability scan
3737-type ScanResult struct {
3838- // Job is the original scan job
3939- Job *ScanJob
4040-4141- // VulnerabilitiesJSON is the raw Grype JSON output
4242- VulnerabilitiesJSON []byte
4343-4444- // Summary contains vulnerability counts by severity
4545- Summary VulnerabilitySummary
4646-4747- // SBOMDigest is the digest of the SBOM blob (if SBOM was generated)
4848- SBOMDigest string
4949-5050- // VulnDigest is the digest of the vulnerability report blob
5151- VulnDigest string
5252-5353- // ScannedAt is when the scan completed
5454- ScannedAt time.Time
5555-5656- // ScannerVersion is the version of the scanner used
5757- ScannerVersion string
5858-}
5959-6060-// VulnerabilitySummary contains counts of vulnerabilities by severity
6161-type VulnerabilitySummary struct {
6262- Critical int `json:"critical"`
6363- High int `json:"high"`
6464- Medium int `json:"medium"`
6565- Low int `json:"low"`
6666- Total int `json:"total"`
6767-}
-226
pkg/hold/scanner/queue.go
···11-package scanner
22-33-import (
44- "context"
55- "fmt"
66- "log/slog"
77- "sync"
88-99- "atcr.io/pkg/atproto"
1010-)
1111-1212-// Queue manages a pool of workers for scanning container images
1313-type Queue struct {
1414- jobs chan *ScanJob
1515- results chan *ScanResult
1616- workers int
1717- wg sync.WaitGroup
1818- ctx context.Context
1919- cancel context.CancelFunc
2020-}
2121-2222-// NewQueue creates a new scanner queue with the specified number of workers
2323-func NewQueue(workers int, bufferSize int) *Queue {
2424- ctx, cancel := context.WithCancel(context.Background())
2525-2626- return &Queue{
2727- jobs: make(chan *ScanJob, bufferSize),
2828- results: make(chan *ScanResult, bufferSize),
2929- workers: workers,
3030- ctx: ctx,
3131- cancel: cancel,
3232- }
3333-}
3434-3535-// Start starts the worker pool
3636-// The workerFunc is called for each job to perform the actual scanning
3737-func (q *Queue) Start(workerFunc func(context.Context, *ScanJob) (*ScanResult, error)) {
3838- slog.Info("Starting scanner worker pool", "workers", q.workers)
3939-4040- for i := 0; i < q.workers; i++ {
4141- q.wg.Add(1)
4242- go q.worker(i, workerFunc)
4343- }
4444-4545- // Start result handler goroutine
4646- q.wg.Add(1)
4747- go q.resultHandler()
4848-}
4949-5050-// worker processes jobs from the queue
5151-func (q *Queue) worker(id int, workerFunc func(context.Context, *ScanJob) (*ScanResult, error)) {
5252- defer q.wg.Done()
5353-5454- slog.Info("Scanner worker started", "worker_id", id)
5555-5656- for {
5757- select {
5858- case <-q.ctx.Done():
5959- slog.Info("Scanner worker shutting down", "worker_id", id)
6060- return
6161-6262- case job, ok := <-q.jobs:
6363- if !ok {
6464- slog.Info("Scanner worker: jobs channel closed", "worker_id", id)
6565- return
6666- }
6767-6868- slog.Info("Scanner worker processing job",
6969- "worker_id", id,
7070- "repository", job.Repository,
7171- "tag", job.Tag,
7272- "digest", job.ManifestDigest)
7373-7474- result, err := workerFunc(q.ctx, job)
7575- if err != nil {
7676- slog.Error("Scanner worker failed to process job",
7777- "worker_id", id,
7878- "repository", job.Repository,
7979- "tag", job.Tag,
8080- "error", err)
8181- continue
8282- }
8383-8484- // Send result to results channel
8585- select {
8686- case q.results <- result:
8787- slog.Info("Scanner worker completed job",
8888- "worker_id", id,
8989- "repository", job.Repository,
9090- "tag", job.Tag,
9191- "vulnerabilities", result.Summary.Total)
9292- case <-q.ctx.Done():
9393- return
9494- }
9595- }
9696- }
9797-}
9898-9999-// resultHandler processes scan results (for logging and metrics)
100100-func (q *Queue) resultHandler() {
101101- defer q.wg.Done()
102102-103103- for {
104104- select {
105105- case <-q.ctx.Done():
106106- return
107107-108108- case result, ok := <-q.results:
109109- if !ok {
110110- return
111111- }
112112-113113- // Log the result
114114- slog.Info("Scan completed",
115115- "repository", result.Job.Repository,
116116- "tag", result.Job.Tag,
117117- "digest", result.Job.ManifestDigest,
118118- "critical", result.Summary.Critical,
119119- "high", result.Summary.High,
120120- "medium", result.Summary.Medium,
121121- "low", result.Summary.Low,
122122- "total", result.Summary.Total,
123123- "scanner", result.ScannerVersion)
124124- }
125125- }
126126-}
127127-128128-// Enqueue adds a job to the queue
129129-func (q *Queue) Enqueue(jobAny any) error {
130130- // Type assert to ScanJob (can be map or struct from HandleNotifyManifest)
131131- var job *ScanJob
132132-133133- switch v := jobAny.(type) {
134134- case *ScanJob:
135135- job = v
136136- case map[string]interface{}:
137137- // Convert map to ScanJob (from HandleNotifyManifest)
138138- job = &ScanJob{
139139- ManifestDigest: v["manifestDigest"].(string),
140140- Repository: v["repository"].(string),
141141- Tag: v["tag"].(string),
142142- UserDID: v["userDID"].(string),
143143- UserHandle: v["userHandle"].(string),
144144- }
145145-146146- // Parse config blob reference
147147- if configMap, ok := v["config"].(map[string]interface{}); ok {
148148- job.Config = atproto.BlobReference{
149149- Digest: configMap["digest"].(string),
150150- Size: convertToInt64(configMap["size"]),
151151- MediaType: configMap["mediaType"].(string),
152152- }
153153- }
154154-155155- // Parse layers
156156- if layersSlice, ok := v["layers"].([]interface{}); ok {
157157- slog.Info("Parsing layers from scan job",
158158- "layersFound", len(layersSlice))
159159- job.Layers = make([]atproto.BlobReference, len(layersSlice))
160160- for i, layerAny := range layersSlice {
161161- if layerMap, ok := layerAny.(map[string]interface{}); ok {
162162- job.Layers[i] = atproto.BlobReference{
163163- Digest: layerMap["digest"].(string),
164164- Size: convertToInt64(layerMap["size"]),
165165- MediaType: layerMap["mediaType"].(string),
166166- }
167167- }
168168- }
169169- } else {
170170- slog.Warn("No layers found in scan job map",
171171- "layersType", fmt.Sprintf("%T", v["layers"]),
172172- "layersValue", v["layers"])
173173- }
174174- default:
175175- return fmt.Errorf("invalid job type: %T", jobAny)
176176- }
177177-178178- select {
179179- case q.jobs <- job:
180180- slog.Info("Enqueued scan job",
181181- "repository", job.Repository,
182182- "tag", job.Tag,
183183- "digest", job.ManifestDigest)
184184- return nil
185185- case <-q.ctx.Done():
186186- return q.ctx.Err()
187187- }
188188-}
189189-190190-// Shutdown gracefully shuts down the queue, waiting for all workers to finish
191191-func (q *Queue) Shutdown() {
192192- slog.Info("Shutting down scanner queue")
193193-194194- // Close the jobs channel to signal no more jobs
195195- close(q.jobs)
196196-197197- // Wait for all workers to finish
198198- q.wg.Wait()
199199-200200- // Close results channel
201201- close(q.results)
202202-203203- // Cancel context
204204- q.cancel()
205205-206206- slog.Info("Scanner queue shut down complete")
207207-}
208208-209209-// Len returns the number of jobs currently in the queue
210210-func (q *Queue) Len() int {
211211- return len(q.jobs)
212212-}
213213-214214-// convertToInt64 converts an interface{} number to int64, handling both float64 and int64
215215-func convertToInt64(v interface{}) int64 {
216216- switch n := v.(type) {
217217- case float64:
218218- return int64(n)
219219- case int64:
220220- return n
221221- case int:
222222- return int64(n)
223223- default:
224224- return 0
225225- }
226226-}
-123
pkg/hold/scanner/storage.go
···11-package scanner
22-33-import (
44- "context"
55- "crypto/sha256"
66- "encoding/json"
77- "fmt"
88- "log/slog"
99- "time"
1010-1111- "atcr.io/pkg/atproto"
1212-)
1313-1414-// storeResults uploads scan results and creates ORAS manifest records in the hold's PDS
1515-func (w *Worker) storeResults(ctx context.Context, job *ScanJob, sbomDigest, vulnDigest string, vulnJSON []byte, summary VulnerabilitySummary) error {
1616- if !w.config.Scanner.VulnEnabled {
1717- slog.Info("Vulnerability scanning disabled, skipping result storage")
1818- return nil
1919- }
2020-2121- slog.Info("Storing scan results as ORAS artifact",
2222- "repository", job.Repository,
2323- "subjectDigest", job.ManifestDigest,
2424- "vulnDigest", vulnDigest)
2525-2626- // Create ORAS manifest for vulnerability report
2727- orasManifest := map[string]interface{}{
2828- "schemaVersion": 2,
2929- "mediaType": "application/vnd.oci.image.manifest.v1+json",
3030- "artifactType": "application/vnd.atcr.vulnerabilities+json",
3131- "config": map[string]interface{}{
3232- "mediaType": "application/vnd.oci.empty.v1+json",
3333- "digest": "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", // Empty JSON object
3434- "size": 2,
3535- },
3636- "subject": map[string]interface{}{
3737- "mediaType": "application/vnd.oci.image.manifest.v1+json",
3838- "digest": job.ManifestDigest,
3939- "size": 0, // We don't have the size, but it's optional
4040- },
4141- "layers": []map[string]interface{}{
4242- {
4343- "mediaType": "application/json",
4444- "digest": vulnDigest,
4545- "size": len(vulnJSON),
4646- "annotations": map[string]string{
4747- "org.opencontainers.image.title": "vulnerability-report.json",
4848- },
4949- },
5050- },
5151- "annotations": map[string]string{
5252- "io.atcr.vuln.critical": fmt.Sprintf("%d", summary.Critical),
5353- "io.atcr.vuln.high": fmt.Sprintf("%d", summary.High),
5454- "io.atcr.vuln.medium": fmt.Sprintf("%d", summary.Medium),
5555- "io.atcr.vuln.low": fmt.Sprintf("%d", summary.Low),
5656- "io.atcr.vuln.total": fmt.Sprintf("%d", summary.Total),
5757- "io.atcr.vuln.scannedAt": time.Now().Format(time.RFC3339),
5858- "io.atcr.vuln.scannerVersion": w.getScannerVersion(),
5959- },
6060- }
6161-6262- // Encode ORAS manifest to JSON
6363- orasManifestJSON, err := json.Marshal(orasManifest)
6464- if err != nil {
6565- return fmt.Errorf("failed to encode ORAS manifest: %w", err)
6666- }
6767-6868- // Calculate ORAS manifest digest
6969- orasDigest := fmt.Sprintf("sha256:%x", sha256Bytes(orasManifestJSON))
7070-7171- // Upload ORAS manifest blob to storage
7272- if err := w.uploadBlob(ctx, orasDigest, orasManifestJSON); err != nil {
7373- return fmt.Errorf("failed to upload ORAS manifest blob: %w", err)
7474- }
7575-7676- // Create manifest record in hold's PDS
7777- if err := w.createManifestRecord(ctx, job, orasDigest, orasManifestJSON, summary); err != nil {
7878- return fmt.Errorf("failed to create manifest record: %w", err)
7979- }
8080-8181- slog.Info("Successfully stored scan results", "orasDigest", orasDigest)
8282- return nil
8383-}
8484-8585-// createManifestRecord creates an ORAS manifest record in the hold's PDS
8686-func (w *Worker) createManifestRecord(ctx context.Context, job *ScanJob, orasDigest string, orasManifestJSON []byte, summary VulnerabilitySummary) error {
8787- // Create ManifestRecord from ORAS manifest
8888- record, err := atproto.NewManifestRecord(job.Repository, orasDigest, orasManifestJSON)
8989- if err != nil {
9090- return fmt.Errorf("failed to create manifest record: %w", err)
9191- }
9292-9393- // Set SBOM/vulnerability specific fields
9494- record.OwnerDID = job.UserDID
9595- record.ScannedAt = time.Now().Format(time.RFC3339)
9696- record.ScannerVersion = w.getScannerVersion()
9797-9898- // Add hold DID (this ORAS artifact is stored in the hold's PDS)
9999- record.HoldDID = w.pds.DID()
100100-101101- // Convert digest to record key (remove "sha256:" prefix)
102102- rkey := orasDigest[len("sha256:"):]
103103-104104- // Store record in hold's PDS
105105- slog.Info("Creating manifest record in hold's PDS",
106106- "collection", atproto.ManifestCollection,
107107- "rkey", rkey,
108108- "ownerDid", job.UserDID)
109109-110110- _, _, err = w.pds.CreateManifestRecord(ctx, record, rkey)
111111- if err != nil {
112112- return fmt.Errorf("failed to put record in PDS: %w", err)
113113- }
114114-115115- slog.Info("Manifest record created successfully", "uri", fmt.Sprintf("at://%s/%s/%s", w.pds.DID(), atproto.ManifestCollection, rkey))
116116- return nil
117117-}
118118-119119-// sha256Bytes calculates SHA256 hash of byte slice
120120-func sha256Bytes(data []byte) []byte {
121121- hash := sha256.Sum256(data)
122122- return hash[:]
123123-}
-128
pkg/hold/scanner/syft.go
···11-package scanner
22-33-import (
44- "context"
55- "crypto/sha256"
66- "fmt"
77- "log/slog"
88- "os"
99-1010- "github.com/anchore/syft/syft"
1111- "github.com/anchore/syft/syft/format"
1212- "github.com/anchore/syft/syft/format/spdxjson"
1313- "github.com/anchore/syft/syft/sbom"
1414- "github.com/anchore/syft/syft/source/directorysource"
1515-)
1616-1717-// generateSBOM generates an SBOM using Syft from an extracted image directory
1818-// Returns the SBOM object, SBOM JSON, its digest, and any error
1919-func (w *Worker) generateSBOM(ctx context.Context, imageDir string) (*sbom.SBOM, []byte, string, error) {
2020- slog.Info("Generating SBOM with Syft", "imageDir", imageDir)
2121-2222- // Check if directory exists and is accessible
2323- entries, err := os.ReadDir(imageDir)
2424- if err != nil {
2525- return nil, nil, "", fmt.Errorf("failed to read image directory: %w", err)
2626- }
2727- slog.Info("Image directory contents",
2828- "path", imageDir,
2929- "entries", len(entries),
3030- "sampleFiles", func() []string {
3131- var samples []string
3232- for i, e := range entries {
3333- if i >= 20 {
3434- break
3535- }
3636- samples = append(samples, e.Name())
3737- }
3838- return samples
3939- }())
4040-4141- // Create Syft source from directory
4242- src, err := directorysource.NewFromPath(imageDir)
4343- if err != nil {
4444- return nil, nil, "", fmt.Errorf("failed to create Syft source: %w", err)
4545- }
4646- defer src.Close()
4747-4848- // Generate SBOM
4949- slog.Info("Running Syft cataloging")
5050- sbomResult, err := syft.CreateSBOM(ctx, src, nil)
5151- if err != nil {
5252- return nil, nil, "", fmt.Errorf("failed to generate SBOM: %w", err)
5353- }
5454-5555- if sbomResult == nil {
5656- return nil, nil, "", fmt.Errorf("Syft returned nil SBOM")
5757- }
5858-5959- slog.Info("SBOM generated",
6060- "packages", sbomResult.Artifacts.Packages.PackageCount(),
6161- "distro", func() string {
6262- if sbomResult.Artifacts.LinuxDistribution != nil {
6363- return fmt.Sprintf("%s %s", sbomResult.Artifacts.LinuxDistribution.Name, sbomResult.Artifacts.LinuxDistribution.Version)
6464- }
6565- return "none"
6666- }())
6767-6868- // Encode SBOM to SPDX JSON format
6969- encoder, err := spdxjson.NewFormatEncoderWithConfig(spdxjson.DefaultEncoderConfig())
7070- if err != nil {
7171- return nil, nil, "", fmt.Errorf("failed to create SPDX encoder: %w", err)
7272- }
7373-7474- sbomJSON, err := format.Encode(*sbomResult, encoder)
7575- if err != nil {
7676- return nil, nil, "", fmt.Errorf("failed to encode SBOM to SPDX JSON: %w", err)
7777- }
7878-7979- // Calculate digest
8080- hash := sha256.Sum256(sbomJSON)
8181- digest := fmt.Sprintf("sha256:%x", hash)
8282-8383- slog.Info("SBOM encoded", "format", "spdx-json", "size", len(sbomJSON), "digest", digest)
8484-8585- // Upload SBOM blob to storage
8686- if err := w.uploadBlob(ctx, digest, sbomJSON); err != nil {
8787- return nil, nil, "", fmt.Errorf("failed to upload SBOM blob: %w", err)
8888- }
8989-9090- return sbomResult, sbomJSON, digest, nil
9191-}
9292-9393-// uploadBlob uploads a blob to storage
9494-func (w *Worker) uploadBlob(ctx context.Context, digest string, data []byte) error {
9595- // Convert digest to storage path (same format as distribution uses)
9696- // Path format: /docker/registry/v2/blobs/sha256/ab/abcd1234.../data
9797- algorithm := "sha256"
9898- digestHex := digest[len("sha256:"):]
9999- if len(digestHex) < 2 {
100100- return fmt.Errorf("invalid digest: %s", digest)
101101- }
102102-103103- blobPath := fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/%s/data",
104104- algorithm,
105105- digestHex[:2],
106106- digestHex)
107107-108108- slog.Info("Uploading blob to storage", "digest", digest, "size", len(data), "path", blobPath)
109109-110110- // Write blob to storage
111111- writer, err := w.driver.Writer(ctx, blobPath, false)
112112- if err != nil {
113113- return fmt.Errorf("failed to create storage writer: %w", err)
114114- }
115115- defer writer.Close()
116116-117117- if _, err := writer.Write(data); err != nil {
118118- writer.Cancel(ctx)
119119- return fmt.Errorf("failed to write blob data: %w", err)
120120- }
121121-122122- if err := writer.Commit(ctx); err != nil {
123123- return fmt.Errorf("failed to commit blob: %w", err)
124124- }
125125-126126- slog.Info("Successfully uploaded blob", "digest", digest)
127127- return nil
128128-}
-116
pkg/hold/scanner/worker.go
···11-package scanner
22-33-import (
44- "context"
55- "fmt"
66- "log/slog"
77- "time"
88-99- "atcr.io/pkg/hold"
1010- "atcr.io/pkg/hold/pds"
1111- "github.com/distribution/distribution/v3/registry/storage/driver"
1212-)
1313-1414-// Worker performs vulnerability scanning on container images
1515-type Worker struct {
1616- config *hold.Config
1717- driver driver.StorageDriver
1818- pds *pds.HoldPDS
1919- queue *Queue
2020-}
2121-2222-// NewWorker creates a new scanner worker
2323-func NewWorker(config *hold.Config, driver driver.StorageDriver, pds *pds.HoldPDS) *Worker {
2424- return &Worker{
2525- config: config,
2626- driver: driver,
2727- pds: pds,
2828- }
2929-}
3030-3131-// Start starts the worker pool and initializes vulnerability database
3232-func (w *Worker) Start(queue *Queue) {
3333- w.queue = queue
3434-3535- // Initialize vulnerability database on startup if scanning is enabled
3636- if w.config.Scanner.VulnEnabled {
3737- go func() {
3838- ctx := context.Background()
3939- if err := w.initializeVulnDatabase(ctx); err != nil {
4040- slog.Error("Failed to initialize vulnerability database", "error", err)
4141- slog.Warn("Vulnerability scanning will be disabled until database is available")
4242- }
4343- }()
4444- }
4545-4646- queue.Start(w.processJob)
4747-}
4848-4949-// processJob processes a single scan job
5050-func (w *Worker) processJob(ctx context.Context, job *ScanJob) (*ScanResult, error) {
5151- slog.Info("Processing scan job",
5252- "repository", job.Repository,
5353- "tag", job.Tag,
5454- "digest", job.ManifestDigest,
5555- "layers", len(job.Layers))
5656-5757- startTime := time.Now()
5858-5959- // Step 1: Extract image layers from storage
6060- slog.Info("Extracting image layers", "repository", job.Repository)
6161- imageDir, cleanup, err := w.extractLayers(ctx, job)
6262- if err != nil {
6363- return nil, fmt.Errorf("failed to extract layers: %w", err)
6464- }
6565- defer cleanup()
6666-6767- // Step 2: Generate SBOM with Syft
6868- slog.Info("Generating SBOM", "repository", job.Repository)
6969- sbomResult, _, sbomDigest, err := w.generateSBOM(ctx, imageDir)
7070- if err != nil {
7171- return nil, fmt.Errorf("failed to generate SBOM: %w", err)
7272- }
7373-7474- // Step 3: Scan SBOM with Grype (if enabled)
7575- var vulnJSON []byte
7676- var vulnDigest string
7777- var summary VulnerabilitySummary
7878-7979- if w.config.Scanner.VulnEnabled {
8080- slog.Info("Scanning for vulnerabilities", "repository", job.Repository)
8181- vulnJSON, vulnDigest, summary, err = w.scanVulnerabilities(ctx, sbomResult)
8282- if err != nil {
8383- return nil, fmt.Errorf("failed to scan vulnerabilities: %w", err)
8484- }
8585- }
8686-8787- // Step 4: Upload results to storage and create ORAS manifests
8888- slog.Info("Storing scan results", "repository", job.Repository)
8989- err = w.storeResults(ctx, job, sbomDigest, vulnDigest, vulnJSON, summary)
9090- if err != nil {
9191- return nil, fmt.Errorf("failed to store results: %w", err)
9292- }
9393-9494- duration := time.Since(startTime)
9595- slog.Info("Scan job completed",
9696- "repository", job.Repository,
9797- "tag", job.Tag,
9898- "duration", duration,
9999- "vulnerabilities", summary.Total)
100100-101101- return &ScanResult{
102102- Job: job,
103103- VulnerabilitiesJSON: vulnJSON,
104104- Summary: summary,
105105- SBOMDigest: sbomDigest,
106106- VulnDigest: vulnDigest,
107107- ScannedAt: time.Now(),
108108- ScannerVersion: w.getScannerVersion(),
109109- }, nil
110110-}
111111-112112-// getScannerVersion returns the version string for the scanner
113113-func (w *Worker) getScannerVersion() string {
114114- // TODO: Get actual Syft and Grype versions dynamically
115115- return "syft-v1.36.0/grype-v0.102.0"
116116-}