A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

refactor jetstream code to unify shared functionality between that and backfill. add tests

evan.jarrett.net a118904c 9daf364d

verified
+1683 -573
+703
docs/ANNOTATIONS_REFACTOR.md
··· 1 + # Annotations Table Refactoring 2 + 3 + ## Overview 4 + 5 + Refactor manifest annotations from individual columns (`title`, `description`, `source_url`, etc.) to a normalized key-value table. This enables flexible annotation storage without schema changes for new OCI annotations. 6 + 7 + ## Motivation 8 + 9 + **Current Problems:** 10 + - Each new annotation (e.g., `org.opencontainers.image.version`) requires schema change 11 + - Many NULL columns in manifests table 12 + - Rigid schema doesn't match OCI's flexible annotation model 13 + 14 + **Benefits:** 15 + - ✅ Add any annotation without code/schema changes 16 + - ✅ Normalized database design 17 + - ✅ Easy to query "all repos with annotation X" 18 + - ✅ Simple queries (no joins needed for repository pages) 19 + 20 + ## Database Schema Changes 21 + 22 + ### 1. New Table: `repository_annotations` 23 + 24 + ```sql 25 + CREATE TABLE IF NOT EXISTS repository_annotations ( 26 + did TEXT NOT NULL, 27 + repository TEXT NOT NULL, 28 + key TEXT NOT NULL, 29 + value TEXT NOT NULL, 30 + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 31 + PRIMARY KEY(did, repository, key), 32 + FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 33 + ); 34 + CREATE INDEX IF NOT EXISTS idx_repository_annotations_did_repo ON repository_annotations(did, repository); 35 + CREATE INDEX IF NOT EXISTS idx_repository_annotations_key ON repository_annotations(key); 36 + ``` 37 + 38 + **Key Design Decisions:** 39 + - Primary key: `(did, repository, key)` - one value per annotation per repository 40 + - No `manifest_id` foreign key - annotations are repository-level, not manifest-level 41 + - `updated_at` - track when annotation was last updated (from most recent manifest) 42 + - Stored at repository level because that's where they're displayed 43 + 44 + ### 2. Drop Columns from `manifests` Table 45 + 46 + Remove these columns (migration will preserve data by copying to annotations table): 47 + - `title` 48 + - `description` 49 + - `source_url` 50 + - `documentation_url` 51 + - `licenses` 52 + - `icon_url` 53 + - `readme_url` 54 + - `version` 55 + 56 + Keep only core manifest metadata: 57 + - `id`, `did`, `repository`, `digest` 58 + - `hold_endpoint`, `schema_version`, `media_type` 59 + - `config_digest`, `config_size` 60 + - `created_at` 61 + 62 + ## Migration Strategy 63 + 64 + ### Migration File: `0004_refactor_annotations_table.yaml` 65 + 66 + ```yaml 67 + description: Migrate manifest annotations to separate table 68 + query: | 69 + -- Step 1: Create new annotations table 70 + CREATE TABLE IF NOT EXISTS repository_annotations ( 71 + did TEXT NOT NULL, 72 + repository TEXT NOT NULL, 73 + key TEXT NOT NULL, 74 + value TEXT NOT NULL, 75 + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 76 + PRIMARY KEY(did, repository, key), 77 + FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 78 + ); 79 + CREATE INDEX IF NOT EXISTS idx_repository_annotations_did_repo ON repository_annotations(did, repository); 80 + CREATE INDEX IF NOT EXISTS idx_repository_annotations_key ON repository_annotations(key); 81 + 82 + -- Step 2: Migrate existing data from manifests to annotations 83 + -- For each repository, use the most recent manifest with non-empty data 84 + INSERT OR REPLACE INTO repository_annotations (did, repository, key, value, updated_at) 85 + SELECT 86 + m.did, 87 + m.repository, 88 + 'org.opencontainers.image.title' as key, 89 + m.title as value, 90 + m.created_at as updated_at 91 + FROM manifests m 92 + WHERE m.title IS NOT NULL AND m.title != '' 93 + AND m.created_at = ( 94 + SELECT MAX(created_at) FROM manifests m2 95 + WHERE m2.did = m.did AND m2.repository = m.repository 96 + AND m2.title IS NOT NULL AND m2.title != '' 97 + ); 98 + 99 + INSERT OR REPLACE INTO repository_annotations (did, repository, key, value, updated_at) 100 + SELECT m.did, m.repository, 'org.opencontainers.image.description', m.description, m.created_at 101 + FROM manifests m 102 + WHERE m.description IS NOT NULL AND m.description != '' 103 + AND m.created_at = ( 104 + SELECT MAX(created_at) FROM manifests m2 105 + WHERE m2.did = m.did AND m2.repository = m.repository 106 + AND m2.description IS NOT NULL AND m2.description != '' 107 + ); 108 + 109 + INSERT OR REPLACE INTO repository_annotations (did, repository, key, value, updated_at) 110 + SELECT m.did, m.repository, 'org.opencontainers.image.source', m.source_url, m.created_at 111 + FROM manifests m 112 + WHERE m.source_url IS NOT NULL AND m.source_url != '' 113 + AND m.created_at = ( 114 + SELECT MAX(created_at) FROM manifests m2 115 + WHERE m2.did = m.did AND m2.repository = m.repository 116 + AND m2.source_url IS NOT NULL AND m2.source_url != '' 117 + ); 118 + 119 + INSERT OR REPLACE INTO repository_annotations (did, repository, key, value, updated_at) 120 + SELECT m.did, m.repository, 'org.opencontainers.image.documentation', m.documentation_url, m.created_at 121 + FROM manifests m 122 + WHERE m.documentation_url IS NOT NULL AND m.documentation_url != '' 123 + AND m.created_at = ( 124 + SELECT MAX(created_at) FROM manifests m2 125 + WHERE m2.did = m.did AND m2.repository = m.repository 126 + AND m2.documentation_url IS NOT NULL AND m2.documentation_url != '' 127 + ); 128 + 129 + INSERT OR REPLACE INTO repository_annotations (did, repository, key, value, updated_at) 130 + SELECT m.did, m.repository, 'org.opencontainers.image.licenses', m.licenses, m.created_at 131 + FROM manifests m 132 + WHERE m.licenses IS NOT NULL AND m.licenses != '' 133 + AND m.created_at = ( 134 + SELECT MAX(created_at) FROM manifests m2 135 + WHERE m2.did = m.did AND m2.repository = m.repository 136 + AND m2.licenses IS NOT NULL AND m2.licenses != '' 137 + ); 138 + 139 + INSERT OR REPLACE INTO repository_annotations (did, repository, key, value, updated_at) 140 + SELECT m.did, m.repository, 'io.atcr.icon', m.icon_url, m.created_at 141 + FROM manifests m 142 + WHERE m.icon_url IS NOT NULL AND m.icon_url != '' 143 + AND m.created_at = ( 144 + SELECT MAX(created_at) FROM manifests m2 145 + WHERE m2.did = m.did AND m2.repository = m.repository 146 + AND m2.icon_url IS NOT NULL AND m2.icon_url != '' 147 + ); 148 + 149 + INSERT OR REPLACE INTO repository_annotations (did, repository, key, value, updated_at) 150 + SELECT m.did, m.repository, 'io.atcr.readme', m.readme_url, m.created_at 151 + FROM manifests m 152 + WHERE m.readme_url IS NOT NULL AND m.readme_url != '' 153 + AND m.created_at = ( 154 + SELECT MAX(created_at) FROM manifests m2 155 + WHERE m2.did = m.did AND m2.repository = m.repository 156 + AND m2.readme_url IS NOT NULL AND m2.readme_url != '' 157 + ); 158 + 159 + -- Step 3: Drop old columns from manifests table 160 + -- SQLite requires recreating table to drop columns 161 + CREATE TABLE manifests_new ( 162 + id INTEGER PRIMARY KEY AUTOINCREMENT, 163 + did TEXT NOT NULL, 164 + repository TEXT NOT NULL, 165 + digest TEXT NOT NULL, 166 + hold_endpoint TEXT NOT NULL, 167 + schema_version INTEGER NOT NULL, 168 + media_type TEXT NOT NULL, 169 + config_digest TEXT, 170 + config_size INTEGER, 171 + created_at TIMESTAMP NOT NULL, 172 + UNIQUE(did, repository, digest), 173 + FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 174 + ); 175 + 176 + -- Copy data to new table 177 + INSERT INTO manifests_new 178 + SELECT id, did, repository, digest, hold_endpoint, schema_version, media_type, 179 + config_digest, config_size, created_at 180 + FROM manifests; 181 + 182 + -- Replace old table 183 + DROP TABLE manifests; 184 + ALTER TABLE manifests_new RENAME TO manifests; 185 + 186 + -- Recreate indexes 187 + CREATE INDEX IF NOT EXISTS idx_manifests_did_repo ON manifests(did, repository); 188 + CREATE INDEX IF NOT EXISTS idx_manifests_created_at ON manifests(created_at DESC); 189 + CREATE INDEX IF NOT EXISTS idx_manifests_digest ON manifests(digest); 190 + ``` 191 + 192 + ## Code Changes 193 + 194 + ### 1. Database Helper Functions 195 + 196 + **New file: `pkg/appview/db/annotations.go`** 197 + 198 + ```go 199 + package db 200 + 201 + import ( 202 + "database/sql" 203 + "time" 204 + ) 205 + 206 + // GetRepositoryAnnotations retrieves all annotations for a repository 207 + func GetRepositoryAnnotations(db *sql.DB, did, repository string) (map[string]string, error) { 208 + rows, err := db.Query(` 209 + SELECT key, value 210 + FROM repository_annotations 211 + WHERE did = ? AND repository = ? 212 + `, did, repository) 213 + if err != nil { 214 + return nil, err 215 + } 216 + defer rows.Close() 217 + 218 + annotations := make(map[string]string) 219 + for rows.Next() { 220 + var key, value string 221 + if err := rows.Scan(&key, &value); err != nil { 222 + return nil, err 223 + } 224 + annotations[key] = value 225 + } 226 + 227 + return annotations, rows.Err() 228 + } 229 + 230 + // UpsertRepositoryAnnotations replaces all annotations for a repository 231 + // Only called when manifest has at least one non-empty annotation 232 + func UpsertRepositoryAnnotations(db *sql.DB, did, repository string, annotations map[string]string) error { 233 + tx, err := db.Begin() 234 + if err != nil { 235 + return err 236 + } 237 + defer tx.Rollback() 238 + 239 + // Delete existing annotations 240 + _, err = tx.Exec(` 241 + DELETE FROM repository_annotations 242 + WHERE did = ? AND repository = ? 243 + `, did, repository) 244 + if err != nil { 245 + return err 246 + } 247 + 248 + // Insert new annotations 249 + stmt, err := tx.Prepare(` 250 + INSERT INTO repository_annotations (did, repository, key, value, updated_at) 251 + VALUES (?, ?, ?, ?, ?) 252 + `) 253 + if err != nil { 254 + return err 255 + } 256 + defer stmt.Close() 257 + 258 + now := time.Now() 259 + for key, value := range annotations { 260 + _, err = stmt.Exec(did, repository, key, value, now) 261 + if err != nil { 262 + return err 263 + } 264 + } 265 + 266 + return tx.Commit() 267 + } 268 + 269 + // DeleteRepositoryAnnotations removes all annotations for a repository 270 + func DeleteRepositoryAnnotations(db *sql.DB, did, repository string) error { 271 + _, err := db.Exec(` 272 + DELETE FROM repository_annotations 273 + WHERE did = ? AND repository = ? 274 + `, did, repository) 275 + return err 276 + } 277 + ``` 278 + 279 + ### 2. Update Backfill Worker 280 + 281 + **File: `pkg/appview/jetstream/backfill.go`** 282 + 283 + In `processManifestRecord()` function, after extracting annotations: 284 + 285 + ```go 286 + // Extract OCI annotations from manifest 287 + var title, description, sourceURL, documentationURL, licenses, iconURL, readmeURL string 288 + if manifestRecord.Annotations != nil { 289 + title = manifestRecord.Annotations["org.opencontainers.image.title"] 290 + description = manifestRecord.Annotations["org.opencontainers.image.description"] 291 + sourceURL = manifestRecord.Annotations["org.opencontainers.image.source"] 292 + documentationURL = manifestRecord.Annotations["org.opencontainers.image.documentation"] 293 + licenses = manifestRecord.Annotations["org.opencontainers.image.licenses"] 294 + iconURL = manifestRecord.Annotations["io.atcr.icon"] 295 + readmeURL = manifestRecord.Annotations["io.atcr.readme"] 296 + } 297 + 298 + // Prepare manifest for insertion (WITHOUT annotation fields) 299 + manifest := &db.Manifest{ 300 + DID: did, 301 + Repository: manifestRecord.Repository, 302 + Digest: manifestRecord.Digest, 303 + MediaType: manifestRecord.MediaType, 304 + SchemaVersion: manifestRecord.SchemaVersion, 305 + HoldEndpoint: manifestRecord.HoldEndpoint, 306 + CreatedAt: manifestRecord.CreatedAt, 307 + // NO annotation fields 308 + } 309 + 310 + // Set config fields only for image manifests (not manifest lists) 311 + if !isManifestList && manifestRecord.Config != nil { 312 + manifest.ConfigDigest = manifestRecord.Config.Digest 313 + manifest.ConfigSize = manifestRecord.Config.Size 314 + } 315 + 316 + // Insert manifest 317 + manifestID, err := db.InsertManifest(b.db, manifest) 318 + if err != nil { 319 + return fmt.Errorf("failed to insert manifest: %w", err) 320 + } 321 + 322 + // Update repository annotations ONLY if manifest has at least one non-empty annotation 323 + if manifestRecord.Annotations != nil { 324 + hasData := false 325 + for _, value := range manifestRecord.Annotations { 326 + if value != "" { 327 + hasData = true 328 + break 329 + } 330 + } 331 + 332 + if hasData { 333 + // Replace all annotations for this repository 334 + err = db.UpsertRepositoryAnnotations(b.db, did, manifestRecord.Repository, manifestRecord.Annotations) 335 + if err != nil { 336 + return fmt.Errorf("failed to upsert annotations: %w", err) 337 + } 338 + } 339 + } 340 + ``` 341 + 342 + ### 3. Update Jetstream Worker 343 + 344 + **File: `pkg/appview/jetstream/worker.go`** 345 + 346 + Same changes as backfill - in `processManifestCommit()` function: 347 + 348 + ```go 349 + // Extract OCI annotations from manifest 350 + var title, description, sourceURL, documentationURL, licenses, iconURL, readmeURL string 351 + if manifestRecord.Annotations != nil { 352 + title = manifestRecord.Annotations["org.opencontainers.image.title"] 353 + description = manifestRecord.Annotations["org.opencontainers.image.description"] 354 + sourceURL = manifestRecord.Annotations["org.opencontainers.image.source"] 355 + documentationURL = manifestRecord.Annotations["org.opencontainers.image.documentation"] 356 + licenses = manifestRecord.Annotations["org.opencontainers.image.licenses"] 357 + iconURL = manifestRecord.Annotations["io.atcr.icon"] 358 + readmeURL = manifestRecord.Annotations["io.atcr.readme"] 359 + } 360 + 361 + // Prepare manifest for insertion (WITHOUT annotation fields) 362 + manifest := &db.Manifest{ 363 + DID: commit.DID, 364 + Repository: manifestRecord.Repository, 365 + Digest: manifestRecord.Digest, 366 + MediaType: manifestRecord.MediaType, 367 + SchemaVersion: manifestRecord.SchemaVersion, 368 + HoldEndpoint: manifestRecord.HoldEndpoint, 369 + CreatedAt: manifestRecord.CreatedAt, 370 + // NO annotation fields 371 + } 372 + 373 + // Set config fields only for image manifests (not manifest lists) 374 + if !isManifestList && manifestRecord.Config != nil { 375 + manifest.ConfigDigest = manifestRecord.Config.Digest 376 + manifest.ConfigSize = manifestRecord.Config.Size 377 + } 378 + 379 + // Insert manifest 380 + manifestID, err := db.InsertManifest(w.db, manifest) 381 + if err != nil { 382 + return fmt.Errorf("failed to insert manifest: %w", err) 383 + } 384 + 385 + // Update repository annotations ONLY if manifest has at least one non-empty annotation 386 + if manifestRecord.Annotations != nil { 387 + hasData := false 388 + for _, value := range manifestRecord.Annotations { 389 + if value != "" { 390 + hasData = true 391 + break 392 + } 393 + } 394 + 395 + if hasData { 396 + // Replace all annotations for this repository 397 + err = db.UpsertRepositoryAnnotations(w.db, commit.DID, manifestRecord.Repository, manifestRecord.Annotations) 398 + if err != nil { 399 + return fmt.Errorf("failed to upsert annotations: %w", err) 400 + } 401 + } 402 + } 403 + ``` 404 + 405 + ### 4. Update Database Queries 406 + 407 + **File: `pkg/appview/db/queries.go`** 408 + 409 + Replace `GetRepositoryMetadata()` function: 410 + 411 + ```go 412 + // GetRepositoryMetadata retrieves metadata for a repository from annotations table 413 + func GetRepositoryMetadata(db *sql.DB, did string, repository string) (title, description, sourceURL, documentationURL, licenses, iconURL, readmeURL, version string, err error) { 414 + annotations, err := GetRepositoryAnnotations(db, did, repository) 415 + if err != nil { 416 + return "", "", "", "", "", "", "", "", err 417 + } 418 + 419 + title = annotations["org.opencontainers.image.title"] 420 + description = annotations["org.opencontainers.image.description"] 421 + sourceURL = annotations["org.opencontainers.image.source"] 422 + documentationURL = annotations["org.opencontainers.image.documentation"] 423 + licenses = annotations["org.opencontainers.image.licenses"] 424 + iconURL = annotations["io.atcr.icon"] 425 + readmeURL = annotations["io.atcr.readme"] 426 + version = annotations["org.opencontainers.image.version"] 427 + 428 + return title, description, sourceURL, documentationURL, licenses, iconURL, readmeURL, version, nil 429 + } 430 + ``` 431 + 432 + Update `InsertManifest()` to remove annotation columns: 433 + 434 + ```go 435 + func InsertManifest(db *sql.DB, manifest *Manifest) (int64, error) { 436 + _, err := db.Exec(` 437 + INSERT INTO manifests 438 + (did, repository, digest, hold_endpoint, schema_version, media_type, 439 + config_digest, config_size, created_at) 440 + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 441 + ON CONFLICT(did, repository, digest) DO UPDATE SET 442 + hold_endpoint = excluded.hold_endpoint, 443 + schema_version = excluded.schema_version, 444 + media_type = excluded.media_type, 445 + config_digest = excluded.config_digest, 446 + config_size = excluded.config_size 447 + `, manifest.DID, manifest.Repository, manifest.Digest, manifest.HoldEndpoint, 448 + manifest.SchemaVersion, manifest.MediaType, manifest.ConfigDigest, 449 + manifest.ConfigSize, manifest.CreatedAt) 450 + 451 + if err != nil { 452 + return 0, err 453 + } 454 + 455 + // Query for the ID (works for both insert and update) 456 + var id int64 457 + err = db.QueryRow(` 458 + SELECT id FROM manifests 459 + WHERE did = ? AND repository = ? AND digest = ? 460 + `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&id) 461 + 462 + if err != nil { 463 + return 0, fmt.Errorf("failed to get manifest ID after upsert: %w", err) 464 + } 465 + 466 + return id, nil 467 + } 468 + ``` 469 + 470 + Similar updates needed for: 471 + - `GetUserRepositories()` - fetch annotations separately and populate Repository struct 472 + - `GetRecentPushes()` - join with annotations or fetch separately 473 + - `SearchPushes()` - can now search annotations table directly 474 + 475 + ### 5. Update Models 476 + 477 + **File: `pkg/appview/db/models.go`** 478 + 479 + Remove annotation fields from `Manifest` struct: 480 + 481 + ```go 482 + type Manifest struct { 483 + ID int64 484 + DID string 485 + Repository string 486 + Digest string 487 + HoldEndpoint string 488 + SchemaVersion int 489 + MediaType string 490 + ConfigDigest string 491 + ConfigSize int64 492 + CreatedAt time.Time 493 + // Removed: Title, Description, SourceURL, DocumentationURL, Licenses, IconURL, ReadmeURL 494 + } 495 + ``` 496 + 497 + Keep annotation fields on `Repository` struct (populated from annotations table): 498 + 499 + ```go 500 + type Repository struct { 501 + Name string 502 + TagCount int 503 + ManifestCount int 504 + LastPush time.Time 505 + Tags []Tag 506 + Manifests []Manifest 507 + Title string 508 + Description string 509 + SourceURL string 510 + DocumentationURL string 511 + Licenses string 512 + IconURL string 513 + ReadmeURL string 514 + Version string // NEW 515 + } 516 + ``` 517 + 518 + ### 6. Update Schema.sql 519 + 520 + **File: `pkg/appview/db/schema.sql`** 521 + 522 + Add new table: 523 + 524 + ```sql 525 + CREATE TABLE IF NOT EXISTS repository_annotations ( 526 + did TEXT NOT NULL, 527 + repository TEXT NOT NULL, 528 + key TEXT NOT NULL, 529 + value TEXT NOT NULL, 530 + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 531 + PRIMARY KEY(did, repository, key), 532 + FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 533 + ); 534 + CREATE INDEX IF NOT EXISTS idx_repository_annotations_did_repo ON repository_annotations(did, repository); 535 + CREATE INDEX IF NOT EXISTS idx_repository_annotations_key ON repository_annotations(key); 536 + ``` 537 + 538 + Update manifests table (remove annotation columns): 539 + 540 + ```sql 541 + CREATE TABLE IF NOT EXISTS manifests ( 542 + id INTEGER PRIMARY KEY AUTOINCREMENT, 543 + did TEXT NOT NULL, 544 + repository TEXT NOT NULL, 545 + digest TEXT NOT NULL, 546 + hold_endpoint TEXT NOT NULL, 547 + schema_version INTEGER NOT NULL, 548 + media_type TEXT NOT NULL, 549 + config_digest TEXT, 550 + config_size INTEGER, 551 + created_at TIMESTAMP NOT NULL, 552 + UNIQUE(did, repository, digest), 553 + FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 554 + ); 555 + ``` 556 + 557 + ## Update Logic Summary 558 + 559 + **Key Decision: Only update annotations when manifest has data** 560 + 561 + ``` 562 + For each manifest processed (backfill or jetstream): 563 + 1. Parse manifest.Annotations map 564 + 2. Check if ANY annotation has non-empty value 565 + 3. IF hasData: 566 + DELETE all annotations for (did, repository) 567 + INSERT all annotations from manifest (including empty ones) 568 + ELSE: 569 + SKIP (don't touch existing annotations) 570 + ``` 571 + 572 + **Why this works:** 573 + - Manifest lists have no annotations or all empty → skip, preserve existing 574 + - Platform manifests have real data → replace everything 575 + - Removing annotation from Dockerfile → it's gone (not in new INSERT) 576 + - Can't accidentally clear data (need at least one non-empty value) 577 + 578 + ## UI/Template Changes 579 + 580 + ### Handler Updates 581 + 582 + **File: `pkg/appview/handlers/repository.go`** 583 + 584 + Update the handler to include version: 585 + 586 + ```go 587 + // Fetch repository metadata from annotations 588 + title, description, sourceURL, documentationURL, licenses, iconURL, readmeURL, version, err := db.GetRepositoryMetadata(h.DB, owner.DID, repository) 589 + if err != nil { 590 + log.Printf("Failed to fetch repository metadata: %v", err) 591 + // Continue without metadata on error 592 + } else { 593 + repo.Title = title 594 + repo.Description = description 595 + repo.SourceURL = sourceURL 596 + repo.DocumentationURL = documentationURL 597 + repo.Licenses = licenses 598 + repo.IconURL = iconURL 599 + repo.ReadmeURL = readmeURL 600 + repo.Version = version // NEW 601 + } 602 + ``` 603 + 604 + ### Template Updates 605 + 606 + **File: `pkg/appview/templates/pages/repository.html`** 607 + 608 + Update the metadata section condition to include version: 609 + 610 + ```html 611 + <!-- Metadata Section --> 612 + {{ if or .Repository.Licenses .Repository.SourceURL .Repository.DocumentationURL .Repository.Version }} 613 + <div class="repo-metadata"> 614 + <!-- Version Badge (if present) --> 615 + {{ if .Repository.Version }} 616 + <span class="metadata-badge version-badge" title="Version"> 617 + {{ .Repository.Version }} 618 + </span> 619 + {{ end }} 620 + 621 + <!-- License Badges --> 622 + {{ if .Repository.Licenses }} 623 + {{ range parseLicenses .Repository.Licenses }} 624 + {{ if .IsValid }} 625 + <a href="{{ .URL }}" target="_blank" rel="noopener noreferrer" class="metadata-badge license-badge" title="{{ .Name }}"> 626 + {{ .SPDXID }} 627 + </a> 628 + {{ else }} 629 + <span class="metadata-badge license-badge" title="Custom license: {{ .Name }}"> 630 + {{ .Name }} 631 + </span> 632 + {{ end }} 633 + {{ end }} 634 + {{ end }} 635 + 636 + <!-- Source Link --> 637 + {{ if .Repository.SourceURL }} 638 + <a href="{{ .Repository.SourceURL }}" target="_blank" class="metadata-link"> 639 + Source 640 + </a> 641 + {{ end }} 642 + 643 + <!-- Documentation Link --> 644 + {{ if .Repository.DocumentationURL }} 645 + <a href="{{ .Repository.DocumentationURL }}" target="_blank" class="metadata-link"> 646 + Documentation 647 + </a> 648 + {{ end }} 649 + </div> 650 + {{ end }} 651 + ``` 652 + 653 + ### CSS Updates 654 + 655 + **File: `pkg/appview/static/css/style.css`** 656 + 657 + Add styling for version badge (different color from license badge): 658 + 659 + ```css 660 + .version-badge { 661 + background: #0969da; /* GitHub blue */ 662 + color: white; 663 + padding: 0.25rem 0.5rem; 664 + border-radius: 0.25rem; 665 + font-size: 0.875rem; 666 + font-weight: 500; 667 + display: inline-block; 668 + } 669 + ``` 670 + 671 + ### Data Flow Summary 672 + 673 + **Before refactor:** 674 + ``` 675 + DB columns → GetRepositoryMetadata() → Handler assigns to Repository struct → Template displays 676 + ``` 677 + 678 + **After refactor:** 679 + ``` 680 + annotations table → GetRepositoryAnnotations() → GetRepositoryMetadata() extracts known fields → 681 + Handler assigns to Repository struct → Template displays (same as before) 682 + ``` 683 + 684 + **Key point:** Templates still access `.Repository.Title`, `.Repository.Version`, etc. - the source just changed from DB columns to annotations table. The abstraction layer hides this complexity. 685 + 686 + ## Benefits Recap 687 + 688 + 1. **Flexible**: Support any OCI annotation without code changes 689 + 2. **Clean**: No NULL columns in manifests table 690 + 3. **Simple queries**: `SELECT * FROM repository_annotations WHERE did=? AND repo=?` 691 + 4. **Safe updates**: Only update when manifest has data 692 + 5. **Natural deletion**: Remove annotation from Dockerfile → it's deleted on next push 693 + 6. **Extensible**: Future features (annotation search, filtering) are trivial 694 + 695 + ## Testing Checklist 696 + 697 + After migration: 698 + - [ ] Verify existing repositories show annotations correctly 699 + - [ ] Push new manifest with annotations → updates correctly 700 + - [ ] Push manifest list → doesn't clear annotations 701 + - [ ] Remove annotation from Dockerfile and push → annotation deleted 702 + - [ ] Backfill re-run → annotations repopulated correctly 703 + - [ ] Search still works (if implemented)
+8 -8
pkg/appview/db/hold_store.go
··· 8 8 9 9 // HoldCaptainRecord represents a cached captain record from a hold's PDS 10 10 type HoldCaptainRecord struct { 11 - HoldDID string 12 - OwnerDID string 13 - Public bool 14 - AllowAllCrew bool 15 - DeployedAt string 16 - Region string 17 - Provider string 18 - UpdatedAt time.Time 11 + HoldDID string `json:"-"` // Set manually, not from JSON 12 + OwnerDID string `json:"owner"` 13 + Public bool `json:"public"` 14 + AllowAllCrew bool `json:"allowAllCrew"` 15 + DeployedAt string `json:"deployedAt"` 16 + Region string `json:"region"` 17 + Provider string `json:"provider"` 18 + UpdatedAt time.Time `json:"-"` // Set manually, not from JSON 19 19 } 20 20 21 21 // GetCaptainRecord retrieves a captain record from the cache
+27 -335
pkg/appview/jetstream/backfill.go
··· 8 8 "strings" 9 9 "time" 10 10 11 - "github.com/bluesky-social/indigo/atproto/identity" 12 11 "github.com/bluesky-social/indigo/atproto/syntax" 13 12 13 + "atcr.io/pkg/appview" 14 14 "atcr.io/pkg/appview/db" 15 15 "atcr.io/pkg/atproto" 16 16 ) ··· 19 19 type BackfillWorker struct { 20 20 db *sql.DB 21 21 client *atproto.Client 22 - directory identity.Directory 23 - defaultHoldDID string // Default hold DID from AppView config (e.g., "did:web:hold01.atcr.io") 24 - testMode bool // If true, suppress warnings for external holds 22 + processor *Processor // Shared processor for DB operations 23 + defaultHoldDID string // Default hold DID from AppView config (e.g., "did:web:hold01.atcr.io") 24 + testMode bool // If true, suppress warnings for external holds 25 25 } 26 26 27 27 // BackfillState tracks backfill progress ··· 44 44 45 45 return &BackfillWorker{ 46 46 db: database, 47 - client: client, // This points to the relay 48 - directory: identity.DefaultDirectory(), 47 + client: client, // This points to the relay 48 + processor: NewProcessor(database, false), // No cache for batch processing 49 49 defaultHoldDID: defaultHoldDID, 50 50 testMode: testMode, 51 51 }, nil ··· 132 132 // backfillRepo backfills all records for a single repo/DID 133 133 func (b *BackfillWorker) backfillRepo(ctx context.Context, did, collection string) (int, error) { 134 134 // Ensure user exists in database and get their PDS endpoint 135 - if err := b.ensureUser(ctx, did); err != nil { 135 + if err := b.processor.EnsureUser(ctx, did); err != nil { 136 136 return 0, fmt.Errorf("failed to ensure user: %w", err) 137 137 } 138 138 ··· 142 142 return 0, fmt.Errorf("invalid DID %s: %w", did, err) 143 143 } 144 144 145 - ident, err := b.directory.LookupDID(ctx, didParsed) 145 + ident, err := b.processor.directory.LookupDID(ctx, didParsed) 146 146 if err != nil { 147 147 return 0, fmt.Errorf("failed to resolve DID to PDS: %w", err) 148 148 } ··· 173 173 // Process each record 174 174 for _, record := range records { 175 175 // Track what we found for deletion reconciliation 176 - if collection == atproto.ManifestCollection { 176 + switch collection { 177 + case atproto.ManifestCollection: 177 178 var manifestRecord atproto.ManifestRecord 178 179 if err := json.Unmarshal(record.Value, &manifestRecord); err == nil { 179 180 foundManifestDigests = append(foundManifestDigests, manifestRecord.Digest) 180 181 } 181 - } else if collection == atproto.TagCollection { 182 + case atproto.TagCollection: 182 183 var tagRecord atproto.TagRecord 183 184 if err := json.Unmarshal(record.Value, &tagRecord); err == nil { 184 185 foundTags = append(foundTags, struct{ Repository, Tag string }{ ··· 186 187 Tag: tagRecord.Tag, 187 188 }) 188 189 } 189 - } else if collection == atproto.StarCollection { 190 + case atproto.StarCollection: 190 191 var starRecord atproto.StarRecord 191 192 if err := json.Unmarshal(record.Value, &starRecord); err == nil { 192 193 key := fmt.Sprintf("%s/%s", starRecord.Subject.DID, starRecord.Subject.Repository) ··· 278 279 func (b *BackfillWorker) processRecord(ctx context.Context, did, collection string, record *atproto.Record) error { 279 280 switch collection { 280 281 case atproto.ManifestCollection: 281 - return b.processManifestRecord(did, record) 282 + _, err := b.processor.ProcessManifest(context.Background(), did, record.Value) 283 + return err 282 284 case atproto.TagCollection: 283 - return b.processTagRecord(did, record) 285 + return b.processor.ProcessTag(context.Background(), did, record.Value) 284 286 case atproto.StarCollection: 285 - return b.processStarRecord(did, record) 287 + return b.processor.ProcessStar(context.Background(), did, record.Value) 286 288 case atproto.SailorProfileCollection: 287 - return b.processSailorProfileRecord(ctx, did, record) 289 + return b.processor.ProcessSailorProfile(ctx, did, record.Value, b.queryCaptainRecordWrapper) 288 290 default: 289 291 return fmt.Errorf("unsupported collection: %s", collection) 290 292 } 291 293 } 292 294 293 - // processManifestRecord processes a manifest record 294 - func (b *BackfillWorker) processManifestRecord(did string, record *atproto.Record) error { 295 - var manifestRecord atproto.ManifestRecord 296 - if err := json.Unmarshal(record.Value, &manifestRecord); err != nil { 297 - return fmt.Errorf("failed to unmarshal manifest: %w", err) 298 - } 299 - 300 - // Extract OCI annotations from manifest 301 - var title, description, sourceURL, documentationURL, licenses, iconURL, readmeURL string 302 - if manifestRecord.Annotations != nil { 303 - title = manifestRecord.Annotations["org.opencontainers.image.title"] 304 - description = manifestRecord.Annotations["org.opencontainers.image.description"] 305 - sourceURL = manifestRecord.Annotations["org.opencontainers.image.source"] 306 - documentationURL = manifestRecord.Annotations["org.opencontainers.image.documentation"] 307 - licenses = manifestRecord.Annotations["org.opencontainers.image.licenses"] 308 - iconURL = manifestRecord.Annotations["io.atcr.icon"] 309 - readmeURL = manifestRecord.Annotations["io.atcr.readme"] 310 - } 311 - 312 - // Detect manifest type 313 - isManifestList := len(manifestRecord.Manifests) > 0 314 - 315 - // Prepare manifest for insertion 316 - manifest := &db.Manifest{ 317 - DID: did, 318 - Repository: manifestRecord.Repository, 319 - Digest: manifestRecord.Digest, 320 - MediaType: manifestRecord.MediaType, 321 - SchemaVersion: manifestRecord.SchemaVersion, 322 - HoldEndpoint: manifestRecord.HoldEndpoint, 323 - CreatedAt: manifestRecord.CreatedAt, 324 - Title: title, 325 - Description: description, 326 - SourceURL: sourceURL, 327 - DocumentationURL: documentationURL, 328 - Licenses: licenses, 329 - IconURL: iconURL, 330 - ReadmeURL: readmeURL, 331 - } 332 - 333 - // Set config fields only for image manifests (not manifest lists) 334 - if !isManifestList && manifestRecord.Config != nil { 335 - manifest.ConfigDigest = manifestRecord.Config.Digest 336 - manifest.ConfigSize = manifestRecord.Config.Size 337 - } 338 - 339 - // Platform info is only stored for multi-arch images in manifest_references table 340 - // Single-arch images don't need platform display (it's obvious) 341 - 342 - // Insert manifest (or get existing ID if already exists) 343 - manifestID, err := db.InsertManifest(b.db, manifest) 344 - if err != nil { 345 - // If manifest already exists, get its ID so we can still insert references/layers 346 - if strings.Contains(err.Error(), "UNIQUE constraint failed") { 347 - // Query for existing manifest ID 348 - var existingID int64 349 - err := b.db.QueryRow(` 350 - SELECT id FROM manifests 351 - WHERE did = ? AND repository = ? AND digest = ? 352 - `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID) 353 - 354 - if err != nil { 355 - return fmt.Errorf("failed to get existing manifest ID: %w", err) 356 - } 357 - manifestID = existingID 358 - } else { 359 - return fmt.Errorf("failed to insert manifest: %w", err) 360 - } 361 - } 362 - 363 - if isManifestList { 364 - // Insert manifest references (for manifest lists/indexes) 365 - for i, ref := range manifestRecord.Manifests { 366 - platformArch := "" 367 - platformOS := "" 368 - platformVariant := "" 369 - platformOSVersion := "" 370 - 371 - if ref.Platform != nil { 372 - platformArch = ref.Platform.Architecture 373 - platformOS = ref.Platform.OS 374 - platformVariant = ref.Platform.Variant 375 - platformOSVersion = ref.Platform.OSVersion 376 - } 377 - 378 - if err := db.InsertManifestReference(b.db, &db.ManifestReference{ 379 - ManifestID: manifestID, 380 - Digest: ref.Digest, 381 - MediaType: ref.MediaType, 382 - Size: ref.Size, 383 - PlatformArchitecture: platformArch, 384 - PlatformOS: platformOS, 385 - PlatformVariant: platformVariant, 386 - PlatformOSVersion: platformOSVersion, 387 - ReferenceIndex: i, 388 - }); err != nil { 389 - // Continue on error - reference might already exist 390 - continue 391 - } 392 - } 393 - } else { 394 - // Insert layers (for image manifests) 395 - for i, layer := range manifestRecord.Layers { 396 - if err := db.InsertLayer(b.db, &db.Layer{ 397 - ManifestID: manifestID, 398 - Digest: layer.Digest, 399 - MediaType: layer.MediaType, 400 - Size: layer.Size, 401 - LayerIndex: i, 402 - }); err != nil { 403 - // Continue on error - layer might already exist 404 - continue 405 - } 406 - } 407 - } 408 - 409 - return nil 410 - } 411 - 412 - // processTagRecord processes a tag record 413 - func (b *BackfillWorker) processTagRecord(did string, record *atproto.Record) error { 414 - var tagRecord atproto.TagRecord 415 - if err := json.Unmarshal(record.Value, &tagRecord); err != nil { 416 - return fmt.Errorf("failed to unmarshal tag: %w", err) 417 - } 418 - 419 - // Extract digest from tag record (tries manifest field first, falls back to manifestDigest) 420 - manifestDigest, err := tagRecord.GetManifestDigest() 421 - if err != nil { 422 - return fmt.Errorf("failed to get manifest digest from tag record: %w", err) 423 - } 424 - 425 - // Insert or update tag 426 - return db.UpsertTag(b.db, &db.Tag{ 427 - DID: did, 428 - Repository: tagRecord.Repository, 429 - Tag: tagRecord.Tag, 430 - Digest: manifestDigest, 431 - CreatedAt: tagRecord.UpdatedAt, 432 - }) 433 - } 434 - 435 - // processStarRecord processes a star record 436 - func (b *BackfillWorker) processStarRecord(did string, record *atproto.Record) error { 437 - var starRecord atproto.StarRecord 438 - if err := json.Unmarshal(record.Value, &starRecord); err != nil { 439 - return fmt.Errorf("failed to unmarshal star: %w", err) 440 - } 441 - 442 - // Upsert the star record (idempotent - won't duplicate) 443 - // The DID here is the starrer (user who starred) 444 - // The subject contains the owner DID and repository 445 - // Star count will be calculated on demand from the stars table 446 - return db.UpsertStar(b.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt) 447 - } 448 - 449 - // processSailorProfileRecord processes a sailor profile record 450 - // Extracts defaultHold and queries the hold's captain record to cache it 451 - func (b *BackfillWorker) processSailorProfileRecord(ctx context.Context, did string, record *atproto.Record) error { 452 - var profileRecord atproto.SailorProfileRecord 453 - if err := json.Unmarshal(record.Value, &profileRecord); err != nil { 454 - return fmt.Errorf("failed to unmarshal sailor profile: %w", err) 455 - } 456 - 457 - // Skip if no default hold set 458 - if profileRecord.DefaultHold == "" { 459 - return nil 460 - } 461 - 462 - // Convert hold URL/DID to canonical DID 463 - holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold) 464 - if holdDID == "" { 465 - fmt.Printf("WARNING [backfill]: Invalid hold reference in profile for %s: %s\n", did, profileRecord.DefaultHold) 466 - return nil 467 - } 468 - 469 - // Query and cache the captain record 295 + // queryCaptainRecordWrapper wraps queryCaptainRecord with backfill-specific logic 296 + func (b *BackfillWorker) queryCaptainRecordWrapper(ctx context.Context, holdDID string) error { 470 297 if err := b.queryCaptainRecord(ctx, holdDID); err != nil { 471 298 // In test mode, only warn about default hold (local hold) 472 299 // External/production holds may not have captain records yet (dev ahead of prod) ··· 478 305 // Don't fail the whole backfill - just skip this hold 479 306 return nil 480 307 } 481 - 482 308 return nil 483 309 } 484 310 ··· 494 320 } 495 321 496 322 // Resolve hold DID to URL 497 - // For did:web, we need to fetch .well-known/did.json 498 - holdURL, err := resolveHoldDIDToURL(ctx, holdDID) 499 - if err != nil { 500 - return fmt.Errorf("failed to resolve hold DID to URL: %w", err) 501 - } 323 + holdURL := appview.ResolveHoldURL(holdDID) 502 324 503 325 // Create client for hold's PDS 504 326 holdClient := atproto.NewClient(holdURL, holdDID, "") ··· 522 344 return fmt.Errorf("failed to get captain record: %w", err) 523 345 } 524 346 525 - // Parse captain record from the record's Value field 526 - var captainRecord struct { 527 - Owner string `json:"owner"` 528 - Public bool `json:"public"` 529 - AllowAllCrew bool `json:"allowAllCrew"` 530 - DeployedAt string `json:"deployedAt"` 531 - Region string `json:"region"` 532 - Provider string `json:"provider"` 533 - } 534 - 347 + // Parse captain record directly into db struct 348 + var captainRecord db.HoldCaptainRecord 535 349 if err := json.Unmarshal(record.Value, &captainRecord); err != nil { 536 350 return fmt.Errorf("failed to parse captain record: %w", err) 537 351 } 538 352 539 - // Cache in database 540 - dbRecord := &db.HoldCaptainRecord{ 541 - HoldDID: holdDID, 542 - OwnerDID: captainRecord.Owner, 543 - Public: captainRecord.Public, 544 - AllowAllCrew: captainRecord.AllowAllCrew, 545 - DeployedAt: captainRecord.DeployedAt, 546 - Region: captainRecord.Region, 547 - Provider: captainRecord.Provider, 548 - UpdatedAt: time.Now(), 549 - } 353 + // Set fields not from JSON 354 + captainRecord.HoldDID = holdDID 355 + captainRecord.UpdatedAt = time.Now() 550 356 551 - if err := db.UpsertCaptainRecord(b.db, dbRecord); err != nil { 357 + if err := db.UpsertCaptainRecord(b.db, &captainRecord); err != nil { 552 358 return fmt.Errorf("failed to cache captain record: %w", err) 553 359 } 554 360 555 - fmt.Printf("Backfill: Cached captain record for hold %s (owner: %s)\n", holdDID, captainRecord.Owner) 361 + fmt.Printf("Backfill: Cached captain record for hold %s (owner: %s)\n", holdDID, captainRecord.OwnerDID) 556 362 return nil 557 363 } 558 - 559 - // resolveHoldDIDToURL resolves a hold DID to its service endpoint URL 560 - // Fetches the DID document and returns both the canonical DID and service endpoint 561 - func resolveHoldDIDToURL(ctx context.Context, inputDID string) (string, error) { 562 - // For did:web, construct the .well-known URL 563 - if !strings.HasPrefix(inputDID, "did:web:") { 564 - return "", fmt.Errorf("only did:web is supported, got: %s", inputDID) 565 - } 566 - 567 - // Extract hostname from did:web:hostname[:port] 568 - hostname := strings.TrimPrefix(inputDID, "did:web:") 569 - 570 - // Try HTTP first (for local Docker), then HTTPS 571 - var serviceEndpoint string 572 - for _, scheme := range []string{"http", "https"} { 573 - testURL := fmt.Sprintf("%s://%s/.well-known/did.json", scheme, hostname) 574 - 575 - // Fetch DID document (use NewClient to initialize httpClient) 576 - client := atproto.NewClient("", "", "") 577 - didDoc, err := client.FetchDIDDocument(ctx, testURL) 578 - if err == nil && didDoc != nil { 579 - // Extract service endpoint from DID document 580 - for _, service := range didDoc.Service { 581 - if service.Type == "AtprotoPersonalDataServer" || service.Type == "AtcrHoldService" { 582 - serviceEndpoint = service.ServiceEndpoint 583 - break 584 - } 585 - } 586 - 587 - if serviceEndpoint != "" { 588 - fmt.Printf("DEBUG [backfill]: Resolved %s → canonical DID: %s, endpoint: %s\n", 589 - inputDID, didDoc.ID, serviceEndpoint) 590 - return serviceEndpoint, nil 591 - } 592 - } 593 - } 594 - 595 - // Fallback: assume the hold service is at the root of the hostname 596 - // Try HTTP first for local development 597 - url := fmt.Sprintf("http://%s", hostname) 598 - fmt.Printf("WARNING [backfill]: Failed to fetch DID document for %s, using fallback URL: %s\n", inputDID, url) 599 - return url, nil 600 - } 601 - 602 - // ensureUser resolves and upserts a user by DID 603 - func (b *BackfillWorker) ensureUser(ctx context.Context, did string) error { 604 - // Check if user already exists 605 - existingUser, err := db.GetUserByDID(b.db, did) 606 - if err == nil && existingUser != nil { 607 - // Update last seen 608 - existingUser.LastSeen = time.Now() 609 - return db.UpsertUser(b.db, existingUser) 610 - } 611 - 612 - // Resolve DID to get handle and PDS endpoint 613 - didParsed, err := syntax.ParseDID(did) 614 - if err != nil { 615 - // Fallback: use DID as handle 616 - user := &db.User{ 617 - DID: did, 618 - Handle: did, 619 - PDSEndpoint: "https://bsky.social", 620 - LastSeen: time.Now(), 621 - } 622 - return db.UpsertUser(b.db, user) 623 - } 624 - 625 - ident, err := b.directory.LookupDID(ctx, didParsed) 626 - if err != nil { 627 - // Fallback: use DID as handle 628 - user := &db.User{ 629 - DID: did, 630 - Handle: did, 631 - PDSEndpoint: "https://bsky.social", 632 - LastSeen: time.Now(), 633 - } 634 - return db.UpsertUser(b.db, user) 635 - } 636 - 637 - resolvedDID := ident.DID.String() 638 - handle := ident.Handle.String() 639 - pdsEndpoint := ident.PDSEndpoint() 640 - 641 - // If handle is invalid or PDS is missing, use defaults 642 - if handle == "handle.invalid" || handle == "" { 643 - handle = resolvedDID 644 - } 645 - if pdsEndpoint == "" { 646 - pdsEndpoint = "https://bsky.social" 647 - } 648 - 649 - // Fetch user's Bluesky profile (including avatar) 650 - // Use public Bluesky AppView API (doesn't require auth for public profiles) 651 - avatar := "" 652 - publicClient := atproto.NewClient("https://public.api.bsky.app", "", "") 653 - profile, err := publicClient.GetActorProfile(ctx, resolvedDID) 654 - if err != nil { 655 - fmt.Printf("WARNING [backfill]: Failed to fetch profile for DID %s: %v\n", resolvedDID, err) 656 - // Continue without avatar 657 - } else { 658 - avatar = profile.Avatar 659 - } 660 - 661 - // Upsert to database 662 - user := &db.User{ 663 - DID: resolvedDID, 664 - Handle: handle, 665 - PDSEndpoint: pdsEndpoint, 666 - Avatar: avatar, 667 - LastSeen: time.Now(), 668 - } 669 - 670 - return db.UpsertUser(b.db, user) 671 - }
+320
pkg/appview/jetstream/processor.go
··· 1 + package jetstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "strings" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/atproto/identity" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + 14 + "atcr.io/pkg/appview/db" 15 + "atcr.io/pkg/atproto" 16 + ) 17 + 18 + // Processor handles shared database operations for both Worker (live) and Backfill (sync) 19 + // This eliminates code duplication between the two data ingestion paths 20 + type Processor struct { 21 + db *sql.DB 22 + directory identity.Directory 23 + userCache *UserCache // Optional - enabled for Worker, disabled for Backfill 24 + useCache bool 25 + } 26 + 27 + // NewProcessor creates a new shared processor 28 + // useCache: true for Worker (live streaming), false for Backfill (batch processing) 29 + func NewProcessor(database *sql.DB, useCache bool) *Processor { 30 + p := &Processor{ 31 + db: database, 32 + directory: identity.DefaultDirectory(), 33 + useCache: useCache, 34 + } 35 + 36 + if useCache { 37 + p.userCache = &UserCache{ 38 + cache: make(map[string]*db.User), 39 + } 40 + } 41 + 42 + return p 43 + } 44 + 45 + // EnsureUser resolves and upserts a user by DID 46 + // Uses cache if enabled (Worker), queries DB if cache disabled (Backfill) 47 + func (p *Processor) EnsureUser(ctx context.Context, did string) error { 48 + // Check cache first (if enabled) 49 + if p.useCache && p.userCache != nil { 50 + if user, ok := p.userCache.cache[did]; ok { 51 + // Update last seen 52 + user.LastSeen = time.Now() 53 + return db.UpsertUser(p.db, user) 54 + } 55 + } else if !p.useCache { 56 + // No cache - check if user already exists in DB 57 + existingUser, err := db.GetUserByDID(p.db, did) 58 + if err == nil && existingUser != nil { 59 + // Update last seen 60 + existingUser.LastSeen = time.Now() 61 + return db.UpsertUser(p.db, existingUser) 62 + } 63 + } 64 + 65 + // Resolve DID to get handle and PDS endpoint 66 + didParsed, err := syntax.ParseDID(did) 67 + if err != nil { 68 + // Fallback: use DID as handle 69 + user := &db.User{ 70 + DID: did, 71 + Handle: did, 72 + PDSEndpoint: "https://bsky.social", 73 + LastSeen: time.Now(), 74 + } 75 + if p.useCache { 76 + p.userCache.cache[did] = user 77 + } 78 + return db.UpsertUser(p.db, user) 79 + } 80 + 81 + ident, err := p.directory.LookupDID(ctx, didParsed) 82 + if err != nil { 83 + // Fallback: use DID as handle 84 + user := &db.User{ 85 + DID: did, 86 + Handle: did, 87 + PDSEndpoint: "https://bsky.social", 88 + LastSeen: time.Now(), 89 + } 90 + if p.useCache { 91 + p.userCache.cache[did] = user 92 + } 93 + return db.UpsertUser(p.db, user) 94 + } 95 + 96 + resolvedDID := ident.DID.String() 97 + handle := ident.Handle.String() 98 + pdsEndpoint := ident.PDSEndpoint() 99 + 100 + // If handle is invalid or PDS is missing, use defaults 101 + if handle == "handle.invalid" || handle == "" { 102 + handle = resolvedDID 103 + } 104 + if pdsEndpoint == "" { 105 + pdsEndpoint = "https://bsky.social" 106 + } 107 + 108 + // Fetch user's Bluesky profile (including avatar) 109 + // Use public Bluesky AppView API (doesn't require auth for public profiles) 110 + avatar := "" 111 + publicClient := atproto.NewClient("https://public.api.bsky.app", "", "") 112 + profile, err := publicClient.GetActorProfile(ctx, resolvedDID) 113 + if err != nil { 114 + fmt.Printf("WARNING [processor]: Failed to fetch profile for DID %s: %v\n", resolvedDID, err) 115 + // Continue without avatar 116 + } else { 117 + avatar = profile.Avatar 118 + } 119 + 120 + // Create user record 121 + user := &db.User{ 122 + DID: resolvedDID, 123 + Handle: handle, 124 + PDSEndpoint: pdsEndpoint, 125 + Avatar: avatar, 126 + LastSeen: time.Now(), 127 + } 128 + 129 + // Cache if enabled 130 + if p.useCache { 131 + p.userCache.cache[did] = user 132 + } 133 + 134 + // Upsert to database 135 + return db.UpsertUser(p.db, user) 136 + } 137 + 138 + // ProcessManifest processes a manifest record and stores it in the database 139 + // Returns the manifest ID for further processing (layers/references) 140 + func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) { 141 + // Unmarshal manifest record 142 + var manifestRecord atproto.ManifestRecord 143 + if err := json.Unmarshal(recordData, &manifestRecord); err != nil { 144 + return 0, fmt.Errorf("failed to unmarshal manifest: %w", err) 145 + } 146 + // Extract OCI annotations from manifest 147 + var title, description, sourceURL, documentationURL, licenses, iconURL, readmeURL string 148 + if manifestRecord.Annotations != nil { 149 + title = manifestRecord.Annotations["org.opencontainers.image.title"] 150 + description = manifestRecord.Annotations["org.opencontainers.image.description"] 151 + sourceURL = manifestRecord.Annotations["org.opencontainers.image.source"] 152 + documentationURL = manifestRecord.Annotations["org.opencontainers.image.documentation"] 153 + licenses = manifestRecord.Annotations["org.opencontainers.image.licenses"] 154 + iconURL = manifestRecord.Annotations["io.atcr.icon"] 155 + readmeURL = manifestRecord.Annotations["io.atcr.readme"] 156 + } 157 + 158 + // Detect manifest type 159 + isManifestList := len(manifestRecord.Manifests) > 0 160 + 161 + // Prepare manifest for insertion 162 + manifest := &db.Manifest{ 163 + DID: did, 164 + Repository: manifestRecord.Repository, 165 + Digest: manifestRecord.Digest, 166 + MediaType: manifestRecord.MediaType, 167 + SchemaVersion: manifestRecord.SchemaVersion, 168 + HoldEndpoint: manifestRecord.HoldEndpoint, 169 + CreatedAt: manifestRecord.CreatedAt, 170 + Title: title, 171 + Description: description, 172 + SourceURL: sourceURL, 173 + DocumentationURL: documentationURL, 174 + Licenses: licenses, 175 + IconURL: iconURL, 176 + ReadmeURL: readmeURL, 177 + } 178 + 179 + // Set config fields only for image manifests (not manifest lists) 180 + if !isManifestList && manifestRecord.Config != nil { 181 + manifest.ConfigDigest = manifestRecord.Config.Digest 182 + manifest.ConfigSize = manifestRecord.Config.Size 183 + } 184 + 185 + // Insert manifest 186 + manifestID, err := db.InsertManifest(p.db, manifest) 187 + if err != nil { 188 + // For backfill: if manifest already exists, get its ID 189 + if strings.Contains(err.Error(), "UNIQUE constraint failed") { 190 + var existingID int64 191 + err := p.db.QueryRow(` 192 + SELECT id FROM manifests 193 + WHERE did = ? AND repository = ? AND digest = ? 194 + `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID) 195 + 196 + if err != nil { 197 + return 0, fmt.Errorf("failed to get existing manifest ID: %w", err) 198 + } 199 + manifestID = existingID 200 + } else { 201 + return 0, fmt.Errorf("failed to insert manifest: %w", err) 202 + } 203 + } 204 + 205 + // Insert manifest references or layers 206 + if isManifestList { 207 + // Insert manifest references (for manifest lists/indexes) 208 + for i, ref := range manifestRecord.Manifests { 209 + platformArch := "" 210 + platformOS := "" 211 + platformVariant := "" 212 + platformOSVersion := "" 213 + 214 + if ref.Platform != nil { 215 + platformArch = ref.Platform.Architecture 216 + platformOS = ref.Platform.OS 217 + platformVariant = ref.Platform.Variant 218 + platformOSVersion = ref.Platform.OSVersion 219 + } 220 + 221 + if err := db.InsertManifestReference(p.db, &db.ManifestReference{ 222 + ManifestID: manifestID, 223 + Digest: ref.Digest, 224 + MediaType: ref.MediaType, 225 + Size: ref.Size, 226 + PlatformArchitecture: platformArch, 227 + PlatformOS: platformOS, 228 + PlatformVariant: platformVariant, 229 + PlatformOSVersion: platformOSVersion, 230 + ReferenceIndex: i, 231 + }); err != nil { 232 + // Continue on error - reference might already exist 233 + continue 234 + } 235 + } 236 + } else { 237 + // Insert layers (for image manifests) 238 + for i, layer := range manifestRecord.Layers { 239 + if err := db.InsertLayer(p.db, &db.Layer{ 240 + ManifestID: manifestID, 241 + Digest: layer.Digest, 242 + MediaType: layer.MediaType, 243 + Size: layer.Size, 244 + LayerIndex: i, 245 + }); err != nil { 246 + // Continue on error - layer might already exist 247 + continue 248 + } 249 + } 250 + } 251 + 252 + return manifestID, nil 253 + } 254 + 255 + // ProcessTag processes a tag record and stores it in the database 256 + func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error { 257 + // Unmarshal tag record 258 + var tagRecord atproto.TagRecord 259 + if err := json.Unmarshal(recordData, &tagRecord); err != nil { 260 + return fmt.Errorf("failed to unmarshal tag: %w", err) 261 + } 262 + // Extract digest from tag record (tries manifest field first, falls back to manifestDigest) 263 + manifestDigest, err := tagRecord.GetManifestDigest() 264 + if err != nil { 265 + return fmt.Errorf("failed to get manifest digest from tag record: %w", err) 266 + } 267 + 268 + // Insert or update tag 269 + return db.UpsertTag(p.db, &db.Tag{ 270 + DID: did, 271 + Repository: tagRecord.Repository, 272 + Tag: tagRecord.Tag, 273 + Digest: manifestDigest, 274 + CreatedAt: tagRecord.UpdatedAt, 275 + }) 276 + } 277 + 278 + // ProcessStar processes a star record and stores it in the database 279 + func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error { 280 + // Unmarshal star record 281 + var starRecord atproto.StarRecord 282 + if err := json.Unmarshal(recordData, &starRecord); err != nil { 283 + return fmt.Errorf("failed to unmarshal star: %w", err) 284 + } 285 + // Upsert the star record (idempotent - won't duplicate) 286 + // The DID here is the starrer (user who starred) 287 + // The subject contains the owner DID and repository 288 + // Star count will be calculated on demand from the stars table 289 + return db.UpsertStar(p.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt) 290 + } 291 + 292 + // ProcessSailorProfile processes a sailor profile record 293 + // This is primarily used by backfill to cache captain records for holds 294 + func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error { 295 + // Unmarshal sailor profile record 296 + var profileRecord atproto.SailorProfileRecord 297 + if err := json.Unmarshal(recordData, &profileRecord); err != nil { 298 + return fmt.Errorf("failed to unmarshal sailor profile: %w", err) 299 + } 300 + 301 + // Skip if no default hold set 302 + if profileRecord.DefaultHold == "" { 303 + return nil 304 + } 305 + 306 + // Convert hold URL/DID to canonical DID 307 + holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold) 308 + if holdDID == "" { 309 + fmt.Printf("WARNING [processor]: Invalid hold reference in profile for %s: %s\n", did, profileRecord.DefaultHold) 310 + return nil 311 + } 312 + 313 + // Query and cache the captain record using provided function 314 + // This allows backfill-specific logic (retries, test mode handling) without duplicating it here 315 + if queryCaptainFn != nil { 316 + return queryCaptainFn(ctx, holdDID) 317 + } 318 + 319 + return nil 320 + }
+540
pkg/appview/jetstream/processor_test.go
··· 1 + package jetstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "testing" 8 + "time" 9 + 10 + "atcr.io/pkg/atproto" 11 + _ "github.com/mattn/go-sqlite3" 12 + ) 13 + 14 + // setupTestDB creates an in-memory SQLite database for testing 15 + func setupTestDB(t *testing.T) *sql.DB { 16 + database, err := sql.Open("sqlite3", ":memory:") 17 + if err != nil { 18 + t.Fatalf("Failed to open test database: %v", err) 19 + } 20 + 21 + // Create schema 22 + schema := ` 23 + CREATE TABLE users ( 24 + did TEXT PRIMARY KEY, 25 + handle TEXT NOT NULL, 26 + pds_endpoint TEXT NOT NULL, 27 + avatar TEXT, 28 + last_seen TIMESTAMP NOT NULL 29 + ); 30 + 31 + CREATE TABLE manifests ( 32 + id INTEGER PRIMARY KEY AUTOINCREMENT, 33 + did TEXT NOT NULL, 34 + repository TEXT NOT NULL, 35 + digest TEXT NOT NULL, 36 + hold_endpoint TEXT NOT NULL, 37 + schema_version INTEGER NOT NULL, 38 + media_type TEXT NOT NULL, 39 + config_digest TEXT, 40 + config_size INTEGER, 41 + created_at TIMESTAMP NOT NULL, 42 + title TEXT, 43 + description TEXT, 44 + source_url TEXT, 45 + documentation_url TEXT, 46 + licenses TEXT, 47 + icon_url TEXT, 48 + readme_url TEXT, 49 + UNIQUE(did, repository, digest) 50 + ); 51 + 52 + CREATE TABLE layers ( 53 + manifest_id INTEGER NOT NULL, 54 + digest TEXT NOT NULL, 55 + size INTEGER NOT NULL, 56 + media_type TEXT NOT NULL, 57 + layer_index INTEGER NOT NULL, 58 + PRIMARY KEY(manifest_id, layer_index) 59 + ); 60 + 61 + CREATE TABLE manifest_references ( 62 + manifest_id INTEGER NOT NULL, 63 + digest TEXT NOT NULL, 64 + media_type TEXT NOT NULL, 65 + size INTEGER NOT NULL, 66 + platform_architecture TEXT, 67 + platform_os TEXT, 68 + platform_variant TEXT, 69 + platform_os_version TEXT, 70 + reference_index INTEGER NOT NULL, 71 + PRIMARY KEY(manifest_id, reference_index) 72 + ); 73 + 74 + CREATE TABLE tags ( 75 + id INTEGER PRIMARY KEY AUTOINCREMENT, 76 + did TEXT NOT NULL, 77 + repository TEXT NOT NULL, 78 + tag TEXT NOT NULL, 79 + digest TEXT NOT NULL, 80 + created_at TIMESTAMP NOT NULL, 81 + UNIQUE(did, repository, tag) 82 + ); 83 + 84 + CREATE TABLE stars ( 85 + starrer_did TEXT NOT NULL, 86 + owner_did TEXT NOT NULL, 87 + repository TEXT NOT NULL, 88 + created_at TIMESTAMP NOT NULL, 89 + PRIMARY KEY(starrer_did, owner_did, repository) 90 + ); 91 + ` 92 + 93 + if _, err := database.Exec(schema); err != nil { 94 + t.Fatalf("Failed to create schema: %v", err) 95 + } 96 + 97 + return database 98 + } 99 + 100 + func TestNewProcessor(t *testing.T) { 101 + database := setupTestDB(t) 102 + defer database.Close() 103 + 104 + tests := []struct { 105 + name string 106 + useCache bool 107 + }{ 108 + {"with cache", true}, 109 + {"without cache", false}, 110 + } 111 + 112 + for _, tt := range tests { 113 + t.Run(tt.name, func(t *testing.T) { 114 + p := NewProcessor(database, tt.useCache) 115 + if p == nil { 116 + t.Fatal("NewProcessor returned nil") 117 + } 118 + if p.db != database { 119 + t.Error("Processor database not set correctly") 120 + } 121 + if p.useCache != tt.useCache { 122 + t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache) 123 + } 124 + if tt.useCache && p.userCache == nil { 125 + t.Error("Cache enabled but userCache is nil") 126 + } 127 + if !tt.useCache && p.userCache != nil { 128 + t.Error("Cache disabled but userCache is not nil") 129 + } 130 + }) 131 + } 132 + } 133 + 134 + func TestProcessManifest_ImageManifest(t *testing.T) { 135 + database := setupTestDB(t) 136 + defer database.Close() 137 + 138 + p := NewProcessor(database, false) 139 + ctx := context.Background() 140 + 141 + // Create test manifest record 142 + manifestRecord := &atproto.ManifestRecord{ 143 + Repository: "test-app", 144 + Digest: "sha256:abc123", 145 + MediaType: "application/vnd.oci.image.manifest.v1+json", 146 + SchemaVersion: 2, 147 + HoldEndpoint: "did:web:hold01.atcr.io", 148 + CreatedAt: time.Now(), 149 + Config: &atproto.BlobReference{ 150 + Digest: "sha256:config123", 151 + Size: 1234, 152 + }, 153 + Layers: []atproto.BlobReference{ 154 + {Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, 155 + {Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, 156 + }, 157 + Annotations: map[string]string{ 158 + "org.opencontainers.image.title": "Test App", 159 + "org.opencontainers.image.description": "A test application", 160 + "org.opencontainers.image.source": "https://github.com/test/app", 161 + "org.opencontainers.image.licenses": "MIT", 162 + "io.atcr.icon": "https://example.com/icon.png", 163 + }, 164 + } 165 + 166 + // Marshal to bytes for ProcessManifest 167 + recordBytes, err := json.Marshal(manifestRecord) 168 + if err != nil { 169 + t.Fatalf("Failed to marshal manifest: %v", err) 170 + } 171 + 172 + // Process manifest 173 + manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 174 + if err != nil { 175 + t.Fatalf("ProcessManifest failed: %v", err) 176 + } 177 + if manifestID == 0 { 178 + t.Error("Expected non-zero manifest ID") 179 + } 180 + 181 + // Verify manifest was inserted 182 + var count int 183 + err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?", 184 + "did:plc:test123", "test-app", "sha256:abc123").Scan(&count) 185 + if err != nil { 186 + t.Fatalf("Failed to query manifests: %v", err) 187 + } 188 + if count != 1 { 189 + t.Errorf("Expected 1 manifest, got %d", count) 190 + } 191 + 192 + // Verify annotations were stored 193 + var title, source string 194 + err = database.QueryRow("SELECT title, source_url FROM manifests WHERE id = ?", manifestID).Scan(&title, &source) 195 + if err != nil { 196 + t.Fatalf("Failed to query manifest fields: %v", err) 197 + } 198 + if title != "Test App" { 199 + t.Errorf("title = %q, want %q", title, "Test App") 200 + } 201 + if source != "https://github.com/test/app" { 202 + t.Errorf("source_url = %q, want %q", source, "https://github.com/test/app") 203 + } 204 + 205 + // Verify layers were inserted 206 + var layerCount int 207 + err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) 208 + if err != nil { 209 + t.Fatalf("Failed to query layers: %v", err) 210 + } 211 + if layerCount != 2 { 212 + t.Errorf("Expected 2 layers, got %d", layerCount) 213 + } 214 + 215 + // Verify no manifest references (this is an image, not a list) 216 + var refCount int 217 + err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) 218 + if err != nil { 219 + t.Fatalf("Failed to query manifest_references: %v", err) 220 + } 221 + if refCount != 0 { 222 + t.Errorf("Expected 0 manifest references, got %d", refCount) 223 + } 224 + } 225 + 226 + func TestProcessManifest_ManifestList(t *testing.T) { 227 + database := setupTestDB(t) 228 + defer database.Close() 229 + 230 + p := NewProcessor(database, false) 231 + ctx := context.Background() 232 + 233 + // Create test manifest list record 234 + manifestRecord := &atproto.ManifestRecord{ 235 + Repository: "test-app", 236 + Digest: "sha256:list123", 237 + MediaType: "application/vnd.oci.image.index.v1+json", 238 + SchemaVersion: 2, 239 + HoldEndpoint: "did:web:hold01.atcr.io", 240 + CreatedAt: time.Now(), 241 + Manifests: []atproto.ManifestReference{ 242 + { 243 + Digest: "sha256:amd64manifest", 244 + MediaType: "application/vnd.oci.image.manifest.v1+json", 245 + Size: 1000, 246 + Platform: &atproto.Platform{ 247 + Architecture: "amd64", 248 + OS: "linux", 249 + }, 250 + }, 251 + { 252 + Digest: "sha256:arm64manifest", 253 + MediaType: "application/vnd.oci.image.manifest.v1+json", 254 + Size: 1100, 255 + Platform: &atproto.Platform{ 256 + Architecture: "arm64", 257 + OS: "linux", 258 + Variant: "v8", 259 + }, 260 + }, 261 + }, 262 + } 263 + 264 + // Marshal to bytes for ProcessManifest 265 + recordBytes, err := json.Marshal(manifestRecord) 266 + if err != nil { 267 + t.Fatalf("Failed to marshal manifest: %v", err) 268 + } 269 + 270 + // Process manifest list 271 + manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 272 + if err != nil { 273 + t.Fatalf("ProcessManifest failed: %v", err) 274 + } 275 + 276 + // Verify manifest references were inserted 277 + var refCount int 278 + err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) 279 + if err != nil { 280 + t.Fatalf("Failed to query manifest_references: %v", err) 281 + } 282 + if refCount != 2 { 283 + t.Errorf("Expected 2 manifest references, got %d", refCount) 284 + } 285 + 286 + // Verify platform info was stored 287 + var arch, os string 288 + err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os) 289 + if err != nil { 290 + t.Fatalf("Failed to query platform info: %v", err) 291 + } 292 + if arch != "amd64" { 293 + t.Errorf("platform_architecture = %q, want %q", arch, "amd64") 294 + } 295 + if os != "linux" { 296 + t.Errorf("platform_os = %q, want %q", os, "linux") 297 + } 298 + 299 + // Verify no layers (this is a list, not an image) 300 + var layerCount int 301 + err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) 302 + if err != nil { 303 + t.Fatalf("Failed to query layers: %v", err) 304 + } 305 + if layerCount != 0 { 306 + t.Errorf("Expected 0 layers, got %d", layerCount) 307 + } 308 + } 309 + 310 + func TestProcessTag(t *testing.T) { 311 + database := setupTestDB(t) 312 + defer database.Close() 313 + 314 + p := NewProcessor(database, false) 315 + ctx := context.Background() 316 + 317 + // Create test tag record (using ManifestDigest field for simplicity) 318 + tagRecord := &atproto.TagRecord{ 319 + Repository: "test-app", 320 + Tag: "latest", 321 + ManifestDigest: "sha256:abc123", 322 + UpdatedAt: time.Now(), 323 + } 324 + 325 + // Marshal to bytes for ProcessTag 326 + recordBytes, err := json.Marshal(tagRecord) 327 + if err != nil { 328 + t.Fatalf("Failed to marshal tag: %v", err) 329 + } 330 + 331 + // Process tag 332 + err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) 333 + if err != nil { 334 + t.Fatalf("ProcessTag failed: %v", err) 335 + } 336 + 337 + // Verify tag was inserted 338 + var count int 339 + err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", 340 + "did:plc:test123", "test-app", "latest").Scan(&count) 341 + if err != nil { 342 + t.Fatalf("Failed to query tags: %v", err) 343 + } 344 + if count != 1 { 345 + t.Errorf("Expected 1 tag, got %d", count) 346 + } 347 + 348 + // Verify digest was stored 349 + var digest string 350 + err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", 351 + "did:plc:test123", "test-app", "latest").Scan(&digest) 352 + if err != nil { 353 + t.Fatalf("Failed to query tag digest: %v", err) 354 + } 355 + if digest != "sha256:abc123" { 356 + t.Errorf("digest = %q, want %q", digest, "sha256:abc123") 357 + } 358 + 359 + // Test upserting same tag with new digest 360 + tagRecord.ManifestDigest = "sha256:newdigest" 361 + recordBytes, err = json.Marshal(tagRecord) 362 + if err != nil { 363 + t.Fatalf("Failed to marshal tag: %v", err) 364 + } 365 + err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) 366 + if err != nil { 367 + t.Fatalf("ProcessTag (upsert) failed: %v", err) 368 + } 369 + 370 + // Verify tag was updated 371 + err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", 372 + "did:plc:test123", "test-app", "latest").Scan(&digest) 373 + if err != nil { 374 + t.Fatalf("Failed to query updated tag: %v", err) 375 + } 376 + if digest != "sha256:newdigest" { 377 + t.Errorf("digest = %q, want %q", digest, "sha256:newdigest") 378 + } 379 + 380 + // Verify still only one tag (upsert, not insert) 381 + err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", 382 + "did:plc:test123", "test-app", "latest").Scan(&count) 383 + if err != nil { 384 + t.Fatalf("Failed to query tags after upsert: %v", err) 385 + } 386 + if count != 1 { 387 + t.Errorf("Expected 1 tag after upsert, got %d", count) 388 + } 389 + } 390 + 391 + func TestProcessStar(t *testing.T) { 392 + database := setupTestDB(t) 393 + defer database.Close() 394 + 395 + p := NewProcessor(database, false) 396 + ctx := context.Background() 397 + 398 + // Create test star record 399 + starRecord := &atproto.StarRecord{ 400 + Subject: atproto.StarSubject{ 401 + DID: "did:plc:owner123", 402 + Repository: "test-app", 403 + }, 404 + CreatedAt: time.Now(), 405 + } 406 + 407 + // Marshal to bytes for ProcessStar 408 + recordBytes, err := json.Marshal(starRecord) 409 + if err != nil { 410 + t.Fatalf("Failed to marshal star: %v", err) 411 + } 412 + 413 + // Process star 414 + err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) 415 + if err != nil { 416 + t.Fatalf("ProcessStar failed: %v", err) 417 + } 418 + 419 + // Verify star was inserted 420 + var count int 421 + err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 422 + "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) 423 + if err != nil { 424 + t.Fatalf("Failed to query stars: %v", err) 425 + } 426 + if count != 1 { 427 + t.Errorf("Expected 1 star, got %d", count) 428 + } 429 + 430 + // Test upserting same star (should be idempotent) 431 + recordBytes, err = json.Marshal(starRecord) 432 + if err != nil { 433 + t.Fatalf("Failed to marshal star: %v", err) 434 + } 435 + err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) 436 + if err != nil { 437 + t.Fatalf("ProcessStar (upsert) failed: %v", err) 438 + } 439 + 440 + // Verify still only one star 441 + err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 442 + "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) 443 + if err != nil { 444 + t.Fatalf("Failed to query stars after upsert: %v", err) 445 + } 446 + if count != 1 { 447 + t.Errorf("Expected 1 star after upsert, got %d", count) 448 + } 449 + } 450 + 451 + func TestProcessManifest_Duplicate(t *testing.T) { 452 + database := setupTestDB(t) 453 + defer database.Close() 454 + 455 + p := NewProcessor(database, false) 456 + ctx := context.Background() 457 + 458 + manifestRecord := &atproto.ManifestRecord{ 459 + Repository: "test-app", 460 + Digest: "sha256:abc123", 461 + MediaType: "application/vnd.oci.image.manifest.v1+json", 462 + SchemaVersion: 2, 463 + HoldEndpoint: "did:web:hold01.atcr.io", 464 + CreatedAt: time.Now(), 465 + } 466 + 467 + // Marshal to bytes for ProcessManifest 468 + recordBytes, err := json.Marshal(manifestRecord) 469 + if err != nil { 470 + t.Fatalf("Failed to marshal manifest: %v", err) 471 + } 472 + 473 + // Insert first time 474 + id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 475 + if err != nil { 476 + t.Fatalf("First ProcessManifest failed: %v", err) 477 + } 478 + 479 + // Insert duplicate 480 + id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 481 + if err != nil { 482 + t.Fatalf("Duplicate ProcessManifest failed: %v", err) 483 + } 484 + 485 + // Should return existing ID 486 + if id1 != id2 { 487 + t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2) 488 + } 489 + 490 + // Verify only one manifest exists 491 + var count int 492 + err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?", 493 + "did:plc:test123", "sha256:abc123").Scan(&count) 494 + if err != nil { 495 + t.Fatalf("Failed to query manifests: %v", err) 496 + } 497 + if count != 1 { 498 + t.Errorf("Expected 1 manifest, got %d", count) 499 + } 500 + } 501 + 502 + func TestProcessManifest_EmptyAnnotations(t *testing.T) { 503 + database := setupTestDB(t) 504 + defer database.Close() 505 + 506 + p := NewProcessor(database, false) 507 + ctx := context.Background() 508 + 509 + // Manifest with nil annotations 510 + manifestRecord := &atproto.ManifestRecord{ 511 + Repository: "test-app", 512 + Digest: "sha256:abc123", 513 + MediaType: "application/vnd.oci.image.manifest.v1+json", 514 + SchemaVersion: 2, 515 + HoldEndpoint: "did:web:hold01.atcr.io", 516 + CreatedAt: time.Now(), 517 + Annotations: nil, 518 + } 519 + 520 + // Marshal to bytes for ProcessManifest 521 + recordBytes, err := json.Marshal(manifestRecord) 522 + if err != nil { 523 + t.Fatalf("Failed to marshal manifest: %v", err) 524 + } 525 + 526 + manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 527 + if err != nil { 528 + t.Fatalf("ProcessManifest failed: %v", err) 529 + } 530 + 531 + // Verify annotation fields are empty strings (not NULL) 532 + var title string 533 + err = database.QueryRow("SELECT title FROM manifests WHERE id = ?", manifestID).Scan(&title) 534 + if err != nil { 535 + t.Fatalf("Failed to query title: %v", err) 536 + } 537 + if title != "" { 538 + t.Errorf("Expected empty title, got %q", title) 539 + } 540 + }
+28 -222
pkg/appview/jetstream/worker.go
··· 9 9 "sync" 10 10 "time" 11 11 12 - "github.com/bluesky-social/indigo/atproto/identity" 13 - "github.com/bluesky-social/indigo/atproto/syntax" 14 - 15 12 "atcr.io/pkg/appview/db" 16 13 "atcr.io/pkg/atproto" 17 14 "github.com/gorilla/websocket" ··· 33 30 startCursor int64 34 31 wantedCollections []string 35 32 debugCollectionCount int 36 - userCache *UserCache 37 - directory identity.Directory 33 + processor *Processor // Shared processor for DB operations 38 34 eventCallback EventCallback 39 35 connStartTime time.Time // Track when connection started for debugging 40 36 ··· 65 61 atproto.TagCollection, // io.atcr.tag 66 62 atproto.StarCollection, // io.atcr.sailor.star 67 63 }, 68 - userCache: &UserCache{ 69 - cache: make(map[string]*db.User), 70 - }, 71 - directory: identity.DefaultDirectory(), 64 + processor: NewProcessor(database, true), // Use cache for live streaming 72 65 } 73 66 } 74 67 ··· 333 326 } 334 327 } 335 328 336 - // ensureUser resolves and upserts a user by DID 337 - func (w *Worker) ensureUser(ctx context.Context, did string) error { 338 - // Check cache first 339 - if user, ok := w.userCache.cache[did]; ok { 340 - // Update last seen 341 - user.LastSeen = time.Now() 342 - return db.UpsertUser(w.db, user) 343 - } 344 - 345 - // Resolve DID to get handle and PDS endpoint 346 - didParsed, err := syntax.ParseDID(did) 347 - if err != nil { 348 - fmt.Printf("WARNING: Invalid DID %s: %v (using DID as handle)\n", did, err) 349 - // Fallback: use DID as handle 350 - user := &db.User{ 351 - DID: did, 352 - Handle: did, 353 - PDSEndpoint: "https://bsky.social", // Default PDS endpoint as fallback 354 - LastSeen: time.Now(), 355 - } 356 - w.userCache.cache[did] = user 357 - return db.UpsertUser(w.db, user) 358 - } 359 - 360 - ident, err := w.directory.LookupDID(ctx, didParsed) 361 - if err != nil { 362 - fmt.Printf("WARNING: Failed to resolve DID %s: %v (using DID as handle)\n", did, err) 363 - // Fallback: use DID as handle 364 - user := &db.User{ 365 - DID: did, 366 - Handle: did, 367 - PDSEndpoint: "https://bsky.social", // Default PDS endpoint as fallback 368 - LastSeen: time.Now(), 369 - } 370 - w.userCache.cache[did] = user 371 - return db.UpsertUser(w.db, user) 372 - } 373 - 374 - resolvedDID := ident.DID.String() 375 - handle := ident.Handle.String() 376 - pdsEndpoint := ident.PDSEndpoint() 377 - 378 - // If handle is invalid or PDS is missing, use defaults 379 - if handle == "handle.invalid" || handle == "" { 380 - handle = resolvedDID 381 - } 382 - if pdsEndpoint == "" { 383 - pdsEndpoint = "https://bsky.social" 384 - } 385 - 386 - // Fetch user's Bluesky profile (including avatar) 387 - // Use public Bluesky AppView API (doesn't require auth for public profiles) 388 - avatar := "" 389 - publicClient := atproto.NewClient("https://public.api.bsky.app", "", "") 390 - profile, err := publicClient.GetActorProfile(ctx, resolvedDID) 391 - if err != nil { 392 - fmt.Printf("WARNING [worker]: Failed to fetch profile for DID %s: %v\n", resolvedDID, err) 393 - // Continue without avatar 394 - } else { 395 - avatar = profile.Avatar 396 - } 397 - 398 - // Cache the user 399 - user := &db.User{ 400 - DID: resolvedDID, 401 - Handle: handle, 402 - PDSEndpoint: pdsEndpoint, 403 - Avatar: avatar, 404 - LastSeen: time.Now(), 405 - } 406 - w.userCache.cache[did] = user 407 - 408 - // Upsert to database 409 - return db.UpsertUser(w.db, user) 410 - } 411 - 412 329 // processManifest processes a manifest commit event 413 330 func (w *Worker) processManifest(commit *CommitEvent) error { 414 331 // Resolve and upsert user with handle/PDS endpoint 415 - if err := w.ensureUser(context.Background(), commit.DID); err != nil { 332 + if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 416 333 return fmt.Errorf("failed to ensure user: %w", err) 417 334 } 418 335 ··· 427 344 } 428 345 429 346 // Parse manifest record 430 - var manifestRecord atproto.ManifestRecord 431 - if commit.Record != nil { 432 - recordBytes, err := json.Marshal(commit.Record) 433 - if err != nil { 434 - return fmt.Errorf("failed to marshal record: %w", err) 435 - } 436 - if err := json.Unmarshal(recordBytes, &manifestRecord); err != nil { 437 - return fmt.Errorf("failed to unmarshal manifest: %w", err) 438 - } 439 - } else { 440 - // No record data, can't process 441 - return nil 442 - } 443 - 444 - // Extract OCI annotations from manifest 445 - var title, description, sourceURL, documentationURL, licenses, iconURL, readmeURL string 446 - if manifestRecord.Annotations != nil { 447 - title = manifestRecord.Annotations["org.opencontainers.image.title"] 448 - description = manifestRecord.Annotations["org.opencontainers.image.description"] 449 - sourceURL = manifestRecord.Annotations["org.opencontainers.image.source"] 450 - documentationURL = manifestRecord.Annotations["org.opencontainers.image.documentation"] 451 - licenses = manifestRecord.Annotations["org.opencontainers.image.licenses"] 452 - iconURL = manifestRecord.Annotations["io.atcr.icon"] 453 - readmeURL = manifestRecord.Annotations["io.atcr.readme"] 454 - } 455 - 456 - // Detect manifest type 457 - isManifestList := len(manifestRecord.Manifests) > 0 458 - 459 - // Prepare manifest for insertion 460 - manifest := &db.Manifest{ 461 - DID: commit.DID, 462 - Repository: manifestRecord.Repository, 463 - Digest: manifestRecord.Digest, 464 - MediaType: manifestRecord.MediaType, 465 - SchemaVersion: manifestRecord.SchemaVersion, 466 - HoldEndpoint: manifestRecord.HoldEndpoint, 467 - CreatedAt: manifestRecord.CreatedAt, 468 - Title: title, 469 - Description: description, 470 - SourceURL: sourceURL, 471 - DocumentationURL: documentationURL, 472 - Licenses: licenses, 473 - IconURL: iconURL, 474 - ReadmeURL: readmeURL, 475 - } 476 - 477 - // Set config fields only for image manifests (not manifest lists) 478 - if !isManifestList && manifestRecord.Config != nil { 479 - manifest.ConfigDigest = manifestRecord.Config.Digest 480 - manifest.ConfigSize = manifestRecord.Config.Size 347 + if commit.Record == nil { 348 + return nil // No record data, can't process 481 349 } 482 350 483 - // Insert manifest 484 - manifestID, err := db.InsertManifest(w.db, manifest) 351 + // Marshal map to bytes for processing 352 + recordBytes, err := json.Marshal(commit.Record) 485 353 if err != nil { 486 - return fmt.Errorf("failed to insert manifest: %w", err) 354 + return fmt.Errorf("failed to marshal record: %w", err) 487 355 } 488 356 489 - if isManifestList { 490 - // Insert manifest references (for manifest lists/indexes) 491 - for i, ref := range manifestRecord.Manifests { 492 - platformArch := "" 493 - platformOS := "" 494 - platformVariant := "" 495 - platformOSVersion := "" 496 - 497 - if ref.Platform != nil { 498 - platformArch = ref.Platform.Architecture 499 - platformOS = ref.Platform.OS 500 - platformVariant = ref.Platform.Variant 501 - platformOSVersion = ref.Platform.OSVersion 502 - } 503 - 504 - if err := db.InsertManifestReference(w.db, &db.ManifestReference{ 505 - ManifestID: manifestID, 506 - Digest: ref.Digest, 507 - MediaType: ref.MediaType, 508 - Size: ref.Size, 509 - PlatformArchitecture: platformArch, 510 - PlatformOS: platformOS, 511 - PlatformVariant: platformVariant, 512 - PlatformOSVersion: platformOSVersion, 513 - ReferenceIndex: i, 514 - }); err != nil { 515 - // Continue on error - reference might already exist 516 - continue 517 - } 518 - } 519 - } else { 520 - // Insert layers (for image manifests) 521 - for i, layer := range manifestRecord.Layers { 522 - if err := db.InsertLayer(w.db, &db.Layer{ 523 - ManifestID: manifestID, 524 - Digest: layer.Digest, 525 - MediaType: layer.MediaType, 526 - Size: layer.Size, 527 - LayerIndex: i, 528 - }); err != nil { 529 - // Continue on error - layer might already exist 530 - continue 531 - } 532 - } 533 - } 534 - 535 - return nil 357 + // Use shared processor for DB operations 358 + _, err = w.processor.ProcessManifest(context.Background(), commit.DID, recordBytes) 359 + return err 536 360 } 537 361 538 362 // processTag processes a tag commit event 539 363 func (w *Worker) processTag(commit *CommitEvent) error { 540 364 // Resolve and upsert user with handle/PDS endpoint 541 - if err := w.ensureUser(context.Background(), commit.DID); err != nil { 365 + if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 542 366 return fmt.Errorf("failed to ensure user: %w", err) 543 367 } 544 368 ··· 557 381 } 558 382 559 383 // Parse tag record 560 - var tagRecord atproto.TagRecord 561 - if commit.Record != nil { 562 - recordBytes, err := json.Marshal(commit.Record) 563 - if err != nil { 564 - return fmt.Errorf("failed to marshal record: %w", err) 565 - } 566 - if err := json.Unmarshal(recordBytes, &tagRecord); err != nil { 567 - return fmt.Errorf("failed to unmarshal tag: %w", err) 568 - } 569 - } else { 384 + if commit.Record == nil { 570 385 return nil 571 386 } 572 387 573 - // Extract digest from tag record (tries manifest field first, falls back to manifestDigest) 574 - manifestDigest, err := tagRecord.GetManifestDigest() 388 + // Marshal map to bytes for processing 389 + recordBytes, err := json.Marshal(commit.Record) 575 390 if err != nil { 576 - return fmt.Errorf("failed to get manifest digest from tag record: %w", err) 391 + return fmt.Errorf("failed to marshal record: %w", err) 577 392 } 578 393 579 - // Insert or update tag 580 - return db.UpsertTag(w.db, &db.Tag{ 581 - DID: commit.DID, 582 - Repository: tagRecord.Repository, 583 - Tag: tagRecord.Tag, 584 - Digest: manifestDigest, 585 - CreatedAt: tagRecord.UpdatedAt, 586 - }) 394 + // Use shared processor for DB operations 395 + return w.processor.ProcessTag(context.Background(), commit.DID, recordBytes) 587 396 } 588 397 589 398 // processStar processes a star commit event 590 399 func (w *Worker) processStar(commit *CommitEvent) error { 591 400 // Resolve and upsert the user who starred (starrer) 592 - if err := w.ensureUser(context.Background(), commit.DID); err != nil { 401 + if err := w.processor.EnsureUser(context.Background(), commit.DID); err != nil { 593 402 return fmt.Errorf("failed to ensure user: %w", err) 594 403 } 595 404 ··· 606 415 } 607 416 608 417 // Parse star record 609 - var starRecord atproto.StarRecord 610 - if commit.Record != nil { 611 - recordBytes, err := json.Marshal(commit.Record) 612 - if err != nil { 613 - return fmt.Errorf("failed to marshal record: %w", err) 614 - } 615 - if err := json.Unmarshal(recordBytes, &starRecord); err != nil { 616 - return fmt.Errorf("failed to unmarshal star: %w", err) 617 - } 618 - } else { 418 + if commit.Record == nil { 619 419 return nil 620 420 } 621 421 622 - // Upsert the star record (idempotent - star count will be calculated on demand) 623 - return db.UpsertStar(w.db, commit.DID, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt) 422 + // Marshal map to bytes for processing 423 + recordBytes, err := json.Marshal(commit.Record) 424 + if err != nil { 425 + return fmt.Errorf("failed to marshal record: %w", err) 426 + } 427 + 428 + // Use shared processor for DB operations 429 + return w.processor.ProcessStar(context.Background(), commit.DID, recordBytes) 624 430 } 625 431 626 432 // JetstreamEvent represents a Jetstream event
+1 -7
pkg/appview/storage/proxy_blob_store.go
··· 40 40 // NewProxyBlobStore creates a new proxy blob store 41 41 func NewProxyBlobStore(ctx *RegistryContext) *ProxyBlobStore { 42 42 // Resolve DID to URL once at construction time 43 - holdURL := resolveHoldURL(ctx.HoldDID) 43 + holdURL := appview.ResolveHoldURL(ctx.HoldDID) 44 44 45 45 fmt.Printf("DEBUG [proxy_blob_store]: NewProxyBlobStore created with holdDID=%s, holdURL=%s, userDID=%s, repo=%s\n", 46 46 ctx.HoldDID, holdURL, ctx.DID, ctx.Repository) ··· 106 106 return fmt.Errorf("write access denied to hold %s", p.ctx.HoldDID) 107 107 } 108 108 return nil 109 - } 110 - 111 - // resolveHoldURL converts a hold identifier (DID or URL) to an HTTP URL 112 - // Deprecated: Use appview.ResolveHoldURL instead 113 - func resolveHoldURL(holdDID string) string { 114 - return appview.ResolveHoldURL(holdDID) 115 109 } 116 110 117 111 // Stat returns the descriptor for a blob
+2 -1
pkg/appview/storage/proxy_blob_store_test.go
··· 11 11 "testing" 12 12 "time" 13 13 14 + "atcr.io/pkg/appview" 14 15 "atcr.io/pkg/atproto" 15 16 "atcr.io/pkg/auth/token" 16 17 "github.com/opencontainers/go-digest" ··· 218 219 219 220 for _, tt := range tests { 220 221 t.Run(tt.name, func(t *testing.T) { 221 - result := resolveHoldURL(tt.holdDID) 222 + result := appview.ResolveHoldURL(tt.holdDID) 222 223 if result != tt.expected { 223 224 t.Errorf("Expected %s, got %s", tt.expected, result) 224 225 }
+54
scripts/migrate-image.sh
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + # Configuration 5 + SOURCE_REGISTRY="ghcr.io/evanjarrett/hsm-secrets-operator" 6 + TARGET_REGISTRY="atcr.io/evan.jarrett.net/hsm-secrets-operator" 7 + TAG="latest" 8 + 9 + # Image digests 10 + AMD64_DIGEST="sha256:274284a623810cf07c5b4735628832751926b7d192863681d5af1b4137f44254" 11 + ARM64_DIGEST="sha256:b57929fd100033092766aad1c7e747deef9b1e3206756c11d0d7a7af74daedff" 12 + 13 + echo "=== Migrating multi-arch image from GHCR to ATCR ===" 14 + echo "Source: ${SOURCE_REGISTRY}" 15 + echo "Target: ${TARGET_REGISTRY}:${TAG}" 16 + echo "" 17 + 18 + # Tag and push amd64 image 19 + echo ">>> Tagging and pushing amd64 image..." 20 + docker tag "${SOURCE_REGISTRY}@${AMD64_DIGEST}" "${TARGET_REGISTRY}:${TAG}-amd64" 21 + docker push "${TARGET_REGISTRY}:${TAG}-amd64" 22 + echo "" 23 + 24 + # Tag and push arm64 image 25 + echo ">>> Tagging and pushing arm64 image..." 26 + docker tag "${SOURCE_REGISTRY}@${ARM64_DIGEST}" "${TARGET_REGISTRY}:${TAG}-arm64" 27 + docker push "${TARGET_REGISTRY}:${TAG}-arm64" 28 + echo "" 29 + 30 + # Create multi-arch manifest using the pushed tags 31 + echo ">>> Creating multi-arch manifest..." 32 + docker manifest create "${TARGET_REGISTRY}:${TAG}" \ 33 + --amend "${TARGET_REGISTRY}:${TAG}-amd64" \ 34 + --amend "${TARGET_REGISTRY}:${TAG}-arm64" 35 + echo "" 36 + 37 + # Annotate the manifest with platform information 38 + echo ">>> Annotating manifest with platform information..." 39 + docker manifest annotate "${TARGET_REGISTRY}:${TAG}" \ 40 + "${TARGET_REGISTRY}:${TAG}-amd64" \ 41 + --os linux --arch amd64 42 + 43 + docker manifest annotate "${TARGET_REGISTRY}:${TAG}" \ 44 + "${TARGET_REGISTRY}:${TAG}-arm64" \ 45 + --os linux --arch arm64 46 + echo "" 47 + 48 + # Push the manifest list 49 + echo ">>> Pushing multi-arch manifest..." 50 + docker manifest push "${TARGET_REGISTRY}:${TAG}" 51 + echo "" 52 + 53 + echo "=== Migration complete! ===" 54 + echo "You can now pull: docker pull ${TARGET_REGISTRY}:${TAG}"