A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
at main 954 lines 30 kB view raw
1package jetstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "strings" 8 "testing" 9 "time" 10 11 "atcr.io/pkg/atproto" 12 _ "github.com/tursodatabase/go-libsql" 13) 14 15// execStatements splits a multi-statement SQL string and executes each statement individually. 16// go-libsql does not support multi-statement Exec like mattn/go-sqlite3. 17func execStatements(t *testing.T, db *sql.DB, schema string) { 18 t.Helper() 19 for _, stmt := range strings.Split(schema, ";") { 20 stmt = strings.TrimSpace(stmt) 21 if stmt == "" { 22 continue 23 } 24 if _, err := db.Exec(stmt); err != nil { 25 t.Fatalf("Failed to execute statement: %v\nSQL: %s", err, stmt) 26 } 27 } 28} 29 30// setupTestDB creates an in-memory SQLite database for testing 31func setupTestDB(t *testing.T) *sql.DB { 32 database, err := sql.Open("libsql", ":memory:") 33 if err != nil { 34 t.Fatalf("Failed to open test database: %v", err) 35 } 36 37 // Create schema 38 schema := ` 39 CREATE TABLE users ( 40 did TEXT PRIMARY KEY, 41 handle TEXT NOT NULL, 42 pds_endpoint TEXT NOT NULL, 43 avatar TEXT, 44 default_hold_did TEXT, 45 last_seen TIMESTAMP NOT NULL 46 ); 47 48 CREATE TABLE manifests ( 49 id INTEGER PRIMARY KEY AUTOINCREMENT, 50 did TEXT NOT NULL, 51 repository TEXT NOT NULL, 52 digest TEXT NOT NULL, 53 hold_endpoint TEXT NOT NULL, 54 schema_version INTEGER NOT NULL, 55 media_type TEXT NOT NULL, 56 config_digest TEXT, 57 config_size INTEGER, 58 artifact_type TEXT NOT NULL DEFAULT 'container-image', 59 created_at TIMESTAMP NOT NULL, 60 UNIQUE(did, repository, digest) 61 ); 62 63 CREATE TABLE repository_annotations ( 64 did TEXT NOT NULL, 65 repository TEXT NOT NULL, 66 key TEXT NOT NULL, 67 value TEXT NOT NULL, 68 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 69 PRIMARY KEY(did, repository, key), 70 FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 71 ); 72 73 CREATE TABLE layers ( 74 manifest_id INTEGER NOT NULL, 75 digest TEXT NOT NULL, 76 size INTEGER NOT NULL, 77 media_type TEXT NOT NULL, 78 layer_index INTEGER NOT NULL, 79 annotations TEXT, 80 PRIMARY KEY(manifest_id, layer_index) 81 ); 82 83 CREATE TABLE manifest_references ( 84 manifest_id INTEGER NOT NULL, 85 digest TEXT NOT NULL, 86 media_type TEXT NOT NULL, 87 size INTEGER NOT NULL, 88 platform_architecture TEXT, 89 platform_os TEXT, 90 platform_variant TEXT, 91 platform_os_version TEXT, 92 is_attestation BOOLEAN DEFAULT FALSE, 93 reference_index INTEGER NOT NULL, 94 PRIMARY KEY(manifest_id, reference_index) 95 ); 96 97 CREATE TABLE tags ( 98 id INTEGER PRIMARY KEY AUTOINCREMENT, 99 did TEXT NOT NULL, 100 repository TEXT NOT NULL, 101 tag TEXT NOT NULL, 102 digest TEXT NOT NULL, 103 created_at TIMESTAMP NOT NULL, 104 UNIQUE(did, repository, tag) 105 ); 106 107 CREATE TABLE stars ( 108 starrer_did TEXT NOT NULL, 109 owner_did TEXT NOT NULL, 110 repository TEXT NOT NULL, 111 created_at TIMESTAMP NOT NULL, 112 PRIMARY KEY(starrer_did, owner_did, repository) 113 ); 114 ` 115 116 execStatements(t, database, schema) 117 118 return database 119} 120 121func TestNewProcessor(t *testing.T) { 122 database := setupTestDB(t) 123 defer database.Close() 124 125 tests := []struct { 126 name string 127 useCache bool 128 }{ 129 {"with cache", true}, 130 {"without cache", false}, 131 } 132 133 for _, tt := range tests { 134 t.Run(tt.name, func(t *testing.T) { 135 p := NewProcessor(database, tt.useCache, nil) 136 if p == nil { 137 t.Fatal("NewProcessor returned nil") 138 } 139 if p.db != database { 140 t.Error("Processor database not set correctly") 141 } 142 if p.useCache != tt.useCache { 143 t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache) 144 } 145 if tt.useCache && p.userCache == nil { 146 t.Error("Cache enabled but userCache is nil") 147 } 148 if !tt.useCache && p.userCache != nil { 149 t.Error("Cache disabled but userCache is not nil") 150 } 151 }) 152 } 153} 154 155func TestProcessManifest_ImageManifest(t *testing.T) { 156 database := setupTestDB(t) 157 defer database.Close() 158 159 // Insert test user (required for foreign key on repository_annotations) 160 _, err := database.Exec(`INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`, 161 "did:plc:test123", "test.bsky.social", "https://pds.example.com", time.Now()) 162 if err != nil { 163 t.Fatalf("Failed to insert test user: %v", err) 164 } 165 166 p := NewProcessor(database, false, nil) 167 ctx := context.Background() 168 169 // Create test manifest record 170 manifestRecord := &atproto.ManifestRecord{ 171 Repository: "test-app", 172 Digest: "sha256:abc123", 173 MediaType: "application/vnd.oci.image.manifest.v1+json", 174 SchemaVersion: 2, 175 HoldEndpoint: "did:web:hold01.atcr.io", 176 CreatedAt: time.Now(), 177 Config: &atproto.BlobReference{ 178 Digest: "sha256:config123", 179 Size: 1234, 180 }, 181 Layers: []atproto.BlobReference{ 182 {Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, 183 {Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, 184 }, 185 Annotations: map[string]string{ 186 "org.opencontainers.image.title": "Test App", 187 "org.opencontainers.image.description": "A test application", 188 "org.opencontainers.image.source": "https://github.com/test/app", 189 "org.opencontainers.image.licenses": "MIT", 190 "io.atcr.icon": "https://example.com/icon.png", 191 }, 192 } 193 194 // Marshal to bytes for ProcessManifest 195 recordBytes, err := json.Marshal(manifestRecord) 196 if err != nil { 197 t.Fatalf("Failed to marshal manifest: %v", err) 198 } 199 200 // Process manifest 201 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 202 if err != nil { 203 t.Fatalf("ProcessManifest failed: %v", err) 204 } 205 if manifestID == 0 { 206 t.Error("Expected non-zero manifest ID") 207 } 208 209 // Verify manifest was inserted 210 var count int 211 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?", 212 "did:plc:test123", "test-app", "sha256:abc123").Scan(&count) 213 if err != nil { 214 t.Fatalf("Failed to query manifests: %v", err) 215 } 216 if count != 1 { 217 t.Errorf("Expected 1 manifest, got %d", count) 218 } 219 220 // Verify annotations were stored in repository_annotations table 221 var title, source string 222 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", 223 "did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title) 224 if err != nil { 225 t.Fatalf("Failed to query title annotation: %v", err) 226 } 227 if title != "Test App" { 228 t.Errorf("title = %q, want %q", title, "Test App") 229 } 230 231 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", 232 "did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source) 233 if err != nil { 234 t.Fatalf("Failed to query source annotation: %v", err) 235 } 236 if source != "https://github.com/test/app" { 237 t.Errorf("source = %q, want %q", source, "https://github.com/test/app") 238 } 239 240 // Verify layers were inserted 241 var layerCount int 242 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) 243 if err != nil { 244 t.Fatalf("Failed to query layers: %v", err) 245 } 246 if layerCount != 2 { 247 t.Errorf("Expected 2 layers, got %d", layerCount) 248 } 249 250 // Verify no manifest references (this is an image, not a list) 251 var refCount int 252 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) 253 if err != nil { 254 t.Fatalf("Failed to query manifest_references: %v", err) 255 } 256 if refCount != 0 { 257 t.Errorf("Expected 0 manifest references, got %d", refCount) 258 } 259} 260 261func TestProcessManifest_ManifestList(t *testing.T) { 262 database := setupTestDB(t) 263 defer database.Close() 264 265 p := NewProcessor(database, false, nil) 266 ctx := context.Background() 267 268 // Create test manifest list record 269 manifestRecord := &atproto.ManifestRecord{ 270 Repository: "test-app", 271 Digest: "sha256:list123", 272 MediaType: "application/vnd.oci.image.index.v1+json", 273 SchemaVersion: 2, 274 HoldEndpoint: "did:web:hold01.atcr.io", 275 CreatedAt: time.Now(), 276 Manifests: []atproto.ManifestReference{ 277 { 278 Digest: "sha256:amd64manifest", 279 MediaType: "application/vnd.oci.image.manifest.v1+json", 280 Size: 1000, 281 Platform: &atproto.Platform{ 282 Architecture: "amd64", 283 OS: "linux", 284 }, 285 }, 286 { 287 Digest: "sha256:arm64manifest", 288 MediaType: "application/vnd.oci.image.manifest.v1+json", 289 Size: 1100, 290 Platform: &atproto.Platform{ 291 Architecture: "arm64", 292 OS: "linux", 293 Variant: "v8", 294 }, 295 }, 296 }, 297 } 298 299 // Marshal to bytes for ProcessManifest 300 recordBytes, err := json.Marshal(manifestRecord) 301 if err != nil { 302 t.Fatalf("Failed to marshal manifest: %v", err) 303 } 304 305 // Process manifest list 306 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 307 if err != nil { 308 t.Fatalf("ProcessManifest failed: %v", err) 309 } 310 311 // Verify manifest references were inserted 312 var refCount int 313 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) 314 if err != nil { 315 t.Fatalf("Failed to query manifest_references: %v", err) 316 } 317 if refCount != 2 { 318 t.Errorf("Expected 2 manifest references, got %d", refCount) 319 } 320 321 // Verify platform info was stored 322 var arch, os string 323 err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os) 324 if err != nil { 325 t.Fatalf("Failed to query platform info: %v", err) 326 } 327 if arch != "amd64" { 328 t.Errorf("platform_architecture = %q, want %q", arch, "amd64") 329 } 330 if os != "linux" { 331 t.Errorf("platform_os = %q, want %q", os, "linux") 332 } 333 334 // Verify no layers (this is a list, not an image) 335 var layerCount int 336 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) 337 if err != nil { 338 t.Fatalf("Failed to query layers: %v", err) 339 } 340 if layerCount != 0 { 341 t.Errorf("Expected 0 layers, got %d", layerCount) 342 } 343} 344 345func TestProcessTag(t *testing.T) { 346 database := setupTestDB(t) 347 defer database.Close() 348 349 p := NewProcessor(database, false, nil) 350 ctx := context.Background() 351 352 // Create test tag record (using ManifestDigest field for simplicity) 353 tagRecord := &atproto.TagRecord{ 354 Repository: "test-app", 355 Tag: "latest", 356 ManifestDigest: "sha256:abc123", 357 UpdatedAt: time.Now(), 358 } 359 360 // Marshal to bytes for ProcessTag 361 recordBytes, err := json.Marshal(tagRecord) 362 if err != nil { 363 t.Fatalf("Failed to marshal tag: %v", err) 364 } 365 366 // Process tag 367 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) 368 if err != nil { 369 t.Fatalf("ProcessTag failed: %v", err) 370 } 371 372 // Verify tag was inserted 373 var count int 374 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", 375 "did:plc:test123", "test-app", "latest").Scan(&count) 376 if err != nil { 377 t.Fatalf("Failed to query tags: %v", err) 378 } 379 if count != 1 { 380 t.Errorf("Expected 1 tag, got %d", count) 381 } 382 383 // Verify digest was stored 384 var digest string 385 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", 386 "did:plc:test123", "test-app", "latest").Scan(&digest) 387 if err != nil { 388 t.Fatalf("Failed to query tag digest: %v", err) 389 } 390 if digest != "sha256:abc123" { 391 t.Errorf("digest = %q, want %q", digest, "sha256:abc123") 392 } 393 394 // Test upserting same tag with new digest 395 tagRecord.ManifestDigest = "sha256:newdigest" 396 recordBytes, err = json.Marshal(tagRecord) 397 if err != nil { 398 t.Fatalf("Failed to marshal tag: %v", err) 399 } 400 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) 401 if err != nil { 402 t.Fatalf("ProcessTag (upsert) failed: %v", err) 403 } 404 405 // Verify tag was updated 406 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", 407 "did:plc:test123", "test-app", "latest").Scan(&digest) 408 if err != nil { 409 t.Fatalf("Failed to query updated tag: %v", err) 410 } 411 if digest != "sha256:newdigest" { 412 t.Errorf("digest = %q, want %q", digest, "sha256:newdigest") 413 } 414 415 // Verify still only one tag (upsert, not insert) 416 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", 417 "did:plc:test123", "test-app", "latest").Scan(&count) 418 if err != nil { 419 t.Fatalf("Failed to query tags after upsert: %v", err) 420 } 421 if count != 1 { 422 t.Errorf("Expected 1 tag after upsert, got %d", count) 423 } 424} 425 426func TestProcessStar(t *testing.T) { 427 database := setupTestDB(t) 428 defer database.Close() 429 430 // Insert test users (starrer + owner) so EnsureUser finds them without network calls 431 for _, did := range []string{"did:plc:starrer123", "did:plc:owner123"} { 432 _, err := database.Exec( 433 `INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`, 434 did, did+".test", "https://pds.example.com", time.Now()) 435 if err != nil { 436 t.Fatalf("Failed to insert test user %s: %v", did, err) 437 } 438 } 439 440 p := NewProcessor(database, false, nil) 441 ctx := context.Background() 442 443 // Create test star record (new AT URI format) 444 starRecord := atproto.NewStarRecord("did:plc:owner123", "test-app") 445 446 // Marshal to bytes for ProcessStar 447 recordBytes, err := json.Marshal(starRecord) 448 if err != nil { 449 t.Fatalf("Failed to marshal star: %v", err) 450 } 451 452 // Process star 453 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) 454 if err != nil { 455 t.Fatalf("ProcessStar failed: %v", err) 456 } 457 458 // Verify star was inserted 459 var count int 460 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 461 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) 462 if err != nil { 463 t.Fatalf("Failed to query stars: %v", err) 464 } 465 if count != 1 { 466 t.Errorf("Expected 1 star, got %d", count) 467 } 468 469 // Test upserting same star (should be idempotent) 470 recordBytes, err = json.Marshal(starRecord) 471 if err != nil { 472 t.Fatalf("Failed to marshal star: %v", err) 473 } 474 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) 475 if err != nil { 476 t.Fatalf("ProcessStar (upsert) failed: %v", err) 477 } 478 479 // Verify still only one star 480 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 481 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) 482 if err != nil { 483 t.Fatalf("Failed to query stars after upsert: %v", err) 484 } 485 if count != 1 { 486 t.Errorf("Expected 1 star after upsert, got %d", count) 487 } 488} 489 490func TestProcessStar_OldFormat(t *testing.T) { 491 database := setupTestDB(t) 492 defer database.Close() 493 494 // Insert test users 495 for _, did := range []string{"did:plc:starrer456", "did:plc:owner456"} { 496 _, err := database.Exec( 497 `INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`, 498 did, did+".test", "https://pds.example.com", time.Now()) 499 if err != nil { 500 t.Fatalf("Failed to insert test user %s: %v", did, err) 501 } 502 } 503 504 p := NewProcessor(database, false, nil) 505 ctx := context.Background() 506 507 // Old format JSON (object subject with did + repository) 508 oldFormatJSON := `{"$type":"io.atcr.sailor.star","subject":{"did":"did:plc:owner456","repository":"legacy-app"},"createdAt":"2025-06-01T00:00:00Z"}` 509 510 err := p.ProcessStar(ctx, "did:plc:starrer456", []byte(oldFormatJSON)) 511 if err != nil { 512 t.Fatalf("ProcessStar (old format) failed: %v", err) 513 } 514 515 // Verify star was inserted with correct owner/repo 516 var count int 517 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 518 "did:plc:starrer456", "did:plc:owner456", "legacy-app").Scan(&count) 519 if err != nil { 520 t.Fatalf("Failed to query stars: %v", err) 521 } 522 if count != 1 { 523 t.Errorf("Expected 1 star from old format, got %d", count) 524 } 525} 526 527func TestProcessManifest_Duplicate(t *testing.T) { 528 database := setupTestDB(t) 529 defer database.Close() 530 531 p := NewProcessor(database, false, nil) 532 ctx := context.Background() 533 534 manifestRecord := &atproto.ManifestRecord{ 535 Repository: "test-app", 536 Digest: "sha256:abc123", 537 MediaType: "application/vnd.oci.image.manifest.v1+json", 538 SchemaVersion: 2, 539 HoldEndpoint: "did:web:hold01.atcr.io", 540 CreatedAt: time.Now(), 541 } 542 543 // Marshal to bytes for ProcessManifest 544 recordBytes, err := json.Marshal(manifestRecord) 545 if err != nil { 546 t.Fatalf("Failed to marshal manifest: %v", err) 547 } 548 549 // Insert first time 550 id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 551 if err != nil { 552 t.Fatalf("First ProcessManifest failed: %v", err) 553 } 554 555 // Insert duplicate 556 id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 557 if err != nil { 558 t.Fatalf("Duplicate ProcessManifest failed: %v", err) 559 } 560 561 // Should return existing ID 562 if id1 != id2 { 563 t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2) 564 } 565 566 // Verify only one manifest exists 567 var count int 568 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?", 569 "did:plc:test123", "sha256:abc123").Scan(&count) 570 if err != nil { 571 t.Fatalf("Failed to query manifests: %v", err) 572 } 573 if count != 1 { 574 t.Errorf("Expected 1 manifest, got %d", count) 575 } 576} 577 578func TestProcessManifest_EmptyAnnotations(t *testing.T) { 579 database := setupTestDB(t) 580 defer database.Close() 581 582 p := NewProcessor(database, false, nil) 583 ctx := context.Background() 584 585 // Manifest with nil annotations 586 manifestRecord := &atproto.ManifestRecord{ 587 Repository: "test-app", 588 Digest: "sha256:abc123", 589 MediaType: "application/vnd.oci.image.manifest.v1+json", 590 SchemaVersion: 2, 591 HoldEndpoint: "did:web:hold01.atcr.io", 592 CreatedAt: time.Now(), 593 Annotations: nil, 594 } 595 596 // Marshal to bytes for ProcessManifest 597 recordBytes, err := json.Marshal(manifestRecord) 598 if err != nil { 599 t.Fatalf("Failed to marshal manifest: %v", err) 600 } 601 602 _, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 603 if err != nil { 604 t.Fatalf("ProcessManifest failed: %v", err) 605 } 606 607 // Verify no annotations were stored (nil annotations should not create entries) 608 var annotationCount int 609 err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?", 610 "did:plc:test123", "test-app").Scan(&annotationCount) 611 if err != nil { 612 t.Fatalf("Failed to query annotations: %v", err) 613 } 614 if annotationCount != 0 { 615 t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount) 616 } 617} 618 619func TestProcessIdentity(t *testing.T) { 620 db := setupTestDB(t) 621 defer db.Close() 622 623 processor := NewProcessor(db, false, nil) 624 625 // Setup: Create test user 626 testDID := "did:plc:alice123" 627 testHandle := "alice.bsky.social" 628 testPDS := "https://bsky.social" 629 _, err := db.Exec(` 630 INSERT INTO users (did, handle, pds_endpoint, last_seen) 631 VALUES (?, ?, ?, ?) 632 `, testDID, testHandle, testPDS, time.Now()) 633 if err != nil { 634 t.Fatalf("Failed to insert test user: %v", err) 635 } 636 637 // Test 1: Process identity change event 638 newHandle := "alice-new.bsky.social" 639 err = processor.ProcessIdentity(context.Background(), testDID, newHandle) 640 // Note: This will fail to invalidate cache since we don't have a real identity directory, 641 // but we can still verify the database update happened 642 if err != nil { 643 t.Logf("Expected cache invalidation error (no real directory): %v", err) 644 } 645 646 // Verify handle was updated in database 647 var retrievedHandle string 648 err = db.QueryRow(` 649 SELECT handle FROM users WHERE did = ? 650 `, testDID).Scan(&retrievedHandle) 651 if err != nil { 652 t.Fatalf("Failed to query updated user: %v", err) 653 } 654 if retrievedHandle != newHandle { 655 t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle) 656 } 657 658 // Test 2: Process identity change for non-existent user 659 // Should not error (UPDATE just affects 0 rows) 660 err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle") 661 if err != nil { 662 t.Logf("Expected cache invalidation error: %v", err) 663 } 664 665 // Test 3: Process multiple identity changes 666 handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"} 667 for _, handle := range handles { 668 err = processor.ProcessIdentity(context.Background(), testDID, handle) 669 if err != nil { 670 t.Logf("Expected cache invalidation error: %v", err) 671 } 672 673 err = db.QueryRow(` 674 SELECT handle FROM users WHERE did = ? 675 `, testDID).Scan(&retrievedHandle) 676 if err != nil { 677 t.Fatalf("Failed to query user after handle update: %v", err) 678 } 679 if retrievedHandle != handle { 680 t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle) 681 } 682 } 683} 684 685func TestProcessRecord_RoutesCorrectly(t *testing.T) { 686 db := setupTestDB(t) 687 defer db.Close() 688 689 // Add missing tables for this test 690 execStatements(t, db, ` 691 CREATE TABLE repo_pages ( 692 did TEXT NOT NULL, 693 repository TEXT NOT NULL, 694 description TEXT, 695 avatar_cid TEXT, 696 created_at TIMESTAMP NOT NULL, 697 updated_at TIMESTAMP NOT NULL, 698 PRIMARY KEY(did, repository) 699 ); 700 CREATE TABLE hold_captain_records ( 701 hold_did TEXT PRIMARY KEY, 702 owner_did TEXT NOT NULL, 703 public BOOLEAN NOT NULL, 704 allow_all_crew BOOLEAN NOT NULL, 705 deployed_at TEXT, 706 region TEXT, 707 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 708 ); 709 CREATE TABLE hold_crew_members ( 710 hold_did TEXT NOT NULL, 711 member_did TEXT NOT NULL, 712 rkey TEXT NOT NULL, 713 role TEXT, 714 permissions TEXT, 715 tier TEXT, 716 added_at TEXT, 717 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 718 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 719 PRIMARY KEY (hold_did, member_did) 720 ); 721 `) 722 723 processor := NewProcessor(db, false, nil) 724 ctx := context.Background() 725 var err error 726 727 // Test 1: ProcessRecord routes manifest correctly 728 // Note: Schema validation may fail for io.atcr.manifest since we can't resolve the schema, 729 // but this tests the routing logic 730 manifestRecord := map[string]any{ 731 "$type": "io.atcr.manifest", 732 "repository": "test-app", 733 "digest": "sha256:route123", 734 "mediaType": "application/vnd.oci.image.manifest.v1+json", 735 "schemaVersion": 2, 736 "holdDid": "did:web:hold01.atcr.io", 737 "createdAt": time.Now().Format(time.RFC3339), 738 } 739 recordBytes, _ := json.Marshal(manifestRecord) 740 741 // Note: ProcessRecord will skip validation if lexicon can't be resolved (expected in tests) 742 // and will skip EnsureUser since we don't have a real PDS to resolve 743 // Just verify the record is processed without panic 744 err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "route123", recordBytes, false, nil) 745 // Error expected since we can't resolve identity - that's fine for this test 746 if err != nil { 747 t.Logf("Expected error (can't resolve identity): %v", err) 748 } 749 750 // Test 2: ProcessRecord handles captain record without creating user 751 captainRecord := map[string]any{ 752 "$type": "io.atcr.hold.captain", 753 "owner": "did:plc:owner123", 754 "public": true, 755 "allowAllCrew": false, 756 "enableBlueskyPosts": false, 757 "deployedAt": time.Now().Format(time.RFC3339), 758 } 759 captainBytes, _ := json.Marshal(captainRecord) 760 761 // This should NOT call EnsureUser (captain is a hold collection) 762 err = processor.ProcessRecord(ctx, "did:web:hold.example.com", atproto.CaptainCollection, "self", captainBytes, false, nil) 763 if err != nil { 764 t.Logf("Error processing captain (validation may fail in test): %v", err) 765 } 766 767 // Verify no user was created for the hold DID 768 var userCount int 769 err = db.QueryRow(`SELECT COUNT(*) FROM users WHERE did = ?`, "did:web:hold.example.com").Scan(&userCount) 770 if err != nil { 771 t.Fatalf("Failed to query users: %v", err) 772 } 773 if userCount != 0 { 774 t.Error("Captain record processing should NOT create a user entry for holds") 775 } 776 777 // Test 3: ProcessRecord handles delete operations 778 err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "sha256:todelete", nil, true, nil) 779 if err != nil { 780 t.Errorf("Delete should not error: %v", err) 781 } 782} 783 784func TestProcessRecord_SkipsInvalidRecords(t *testing.T) { 785 db := setupTestDB(t) 786 defer db.Close() 787 788 processor := NewProcessor(db, false, nil) 789 ctx := context.Background() 790 791 // Test: Invalid JSON should be skipped silently (no error returned) 792 invalidJSON := []byte(`{invalid json}`) 793 err := processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "test", invalidJSON, false, nil) 794 // Should return nil (skipped silently) not an error 795 if err != nil { 796 t.Errorf("Invalid record should be skipped silently, got error: %v", err) 797 } 798} 799 800func TestValidateRecord(t *testing.T) { 801 db := setupTestDB(t) 802 defer db.Close() 803 804 processor := NewProcessor(db, false, nil) 805 ctx := context.Background() 806 807 // Test 1: Manifest passes (no strict validation) 808 manifestJSON := []byte(`{"$type": "io.atcr.manifest", "repository": "test"}`) 809 err := processor.ValidateRecord(ctx, atproto.ManifestCollection, manifestJSON) 810 if err != nil { 811 t.Errorf("Manifest should pass validation: %v", err) 812 } 813 814 // Test 2: Invalid JSON returns error 815 invalidJSON := []byte(`{invalid}`) 816 err = processor.ValidateRecord(ctx, atproto.ManifestCollection, invalidJSON) 817 if err == nil { 818 t.Error("Invalid JSON should return error") 819 } 820 821 // Test 3: Captain with valid owner passes 822 captainValid := []byte(`{"owner": "did:plc:owner123", "public": true}`) 823 err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainValid) 824 if err != nil { 825 t.Errorf("Valid captain should pass: %v", err) 826 } 827 828 // Test 4: Captain with empty owner is rejected 829 captainEmpty := []byte(`{"owner": "", "public": true}`) 830 err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainEmpty) 831 if err == nil { 832 t.Error("Captain with empty owner should be rejected") 833 } 834 835 // Test 5: Captain with invalid owner (not a DID) is rejected 836 captainInvalid := []byte(`{"owner": "notadid", "public": true}`) 837 err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainInvalid) 838 if err == nil { 839 t.Error("Captain with invalid owner should be rejected") 840 } 841 842 // Test 6: Crew with valid member passes 843 crewValid := []byte(`{"member": "did:plc:member123", "role": "write"}`) 844 err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewValid) 845 if err != nil { 846 t.Errorf("Valid crew should pass: %v", err) 847 } 848 849 // Test 7: Crew with empty member is rejected 850 crewEmpty := []byte(`{"member": "", "role": "write"}`) 851 err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewEmpty) 852 if err == nil { 853 t.Error("Crew with empty member should be rejected") 854 } 855} 856 857func TestProcessAccount(t *testing.T) { 858 db := setupTestDB(t) 859 defer db.Close() 860 861 processor := NewProcessor(db, false, nil) 862 863 // Setup: Create test user 864 testDID := "did:plc:bob456" 865 testHandle := "bob.bsky.social" 866 testPDS := "https://bsky.social" 867 _, err := db.Exec(` 868 INSERT INTO users (did, handle, pds_endpoint, last_seen) 869 VALUES (?, ?, ?, ?) 870 `, testDID, testHandle, testPDS, time.Now()) 871 if err != nil { 872 t.Fatalf("Failed to insert test user: %v", err) 873 } 874 875 // Test 1: Process account deactivation event 876 err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated") 877 // Note: Cache invalidation will fail without real directory, but that's expected 878 if err != nil { 879 t.Logf("Expected cache invalidation error (no real directory): %v", err) 880 } 881 882 // Verify user still exists in database (we don't delete on deactivation) 883 var exists bool 884 err = db.QueryRow(` 885 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) 886 `, testDID).Scan(&exists) 887 if err != nil { 888 t.Fatalf("Failed to check if user exists: %v", err) 889 } 890 if !exists { 891 t.Error("User should still exist after deactivation event (no deletion)") 892 } 893 894 // Test 2: Process account with active=true (should be ignored) 895 err = processor.ProcessAccount(context.Background(), testDID, true, "active") 896 if err != nil { 897 t.Errorf("Expected no error for active account, got: %v", err) 898 } 899 900 // Test 3: Process account with status != "deactivated" (should be ignored) 901 err = processor.ProcessAccount(context.Background(), testDID, false, "suspended") 902 if err != nil { 903 t.Errorf("Expected no error for non-deactivated status, got: %v", err) 904 } 905 906 // Test 4: Process account deactivation for non-existent user 907 err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated") 908 // Cache invalidation will fail, but that's expected 909 if err != nil { 910 t.Logf("Expected cache invalidation error: %v", err) 911 } 912 913 // Test 5: Process multiple deactivation events (idempotent) 914 for i := range 3 { 915 err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated") 916 if err != nil { 917 t.Logf("Expected cache invalidation error on iteration %d: %v", i, err) 918 } 919 } 920 921 // User should still exist after multiple deactivations 922 err = db.QueryRow(` 923 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) 924 `, testDID).Scan(&exists) 925 if err != nil { 926 t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err) 927 } 928 if !exists { 929 t.Error("User should still exist after multiple deactivation events") 930 } 931 932 // Test 6: Process account deletion - should delete user data 933 err = processor.ProcessAccount(context.Background(), testDID, false, "deleted") 934 if err != nil { 935 t.Logf("Cache invalidation error during deletion (expected): %v", err) 936 } 937 938 // User should be deleted after "deleted" status 939 err = db.QueryRow(` 940 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) 941 `, testDID).Scan(&exists) 942 if err != nil { 943 t.Fatalf("Failed to check if user exists after deletion: %v", err) 944 } 945 if exists { 946 t.Error("User should NOT exist after deletion event") 947 } 948 949 // Test 7: Process deletion for already-deleted user (idempotent) 950 err = processor.ProcessAccount(context.Background(), testDID, false, "deleted") 951 if err != nil { 952 t.Errorf("Deletion of non-existent user should not error, got: %v", err) 953 } 954}