package jetstream import ( "context" "database/sql" "encoding/json" "strings" "testing" "time" "atcr.io/pkg/atproto" _ "github.com/tursodatabase/go-libsql" ) // execStatements splits a multi-statement SQL string and executes each statement individually. // go-libsql does not support multi-statement Exec like mattn/go-sqlite3. func execStatements(t *testing.T, db *sql.DB, schema string) { t.Helper() for _, stmt := range strings.Split(schema, ";") { stmt = strings.TrimSpace(stmt) if stmt == "" { continue } if _, err := db.Exec(stmt); err != nil { t.Fatalf("Failed to execute statement: %v\nSQL: %s", err, stmt) } } } // setupTestDB creates an in-memory SQLite database for testing func setupTestDB(t *testing.T) *sql.DB { database, err := sql.Open("libsql", ":memory:") if err != nil { t.Fatalf("Failed to open test database: %v", err) } // Create schema schema := ` CREATE TABLE users ( did TEXT PRIMARY KEY, handle TEXT NOT NULL, pds_endpoint TEXT NOT NULL, avatar TEXT, default_hold_did TEXT, last_seen TIMESTAMP NOT NULL ); CREATE TABLE manifests ( id INTEGER PRIMARY KEY AUTOINCREMENT, did TEXT NOT NULL, repository TEXT NOT NULL, digest TEXT NOT NULL, hold_endpoint TEXT NOT NULL, schema_version INTEGER NOT NULL, media_type TEXT NOT NULL, config_digest TEXT, config_size INTEGER, artifact_type TEXT NOT NULL DEFAULT 'container-image', created_at TIMESTAMP NOT NULL, UNIQUE(did, repository, digest) ); CREATE TABLE repository_annotations ( did TEXT NOT NULL, repository TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(did, repository, key), FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE ); CREATE TABLE layers ( manifest_id INTEGER NOT NULL, digest TEXT NOT NULL, size INTEGER NOT NULL, media_type TEXT NOT NULL, layer_index INTEGER NOT NULL, annotations TEXT, PRIMARY KEY(manifest_id, layer_index) ); CREATE TABLE manifest_references ( manifest_id INTEGER NOT NULL, digest TEXT NOT NULL, media_type TEXT NOT NULL, size INTEGER NOT NULL, platform_architecture TEXT, platform_os TEXT, platform_variant TEXT, platform_os_version TEXT, is_attestation BOOLEAN DEFAULT FALSE, reference_index INTEGER NOT NULL, PRIMARY KEY(manifest_id, reference_index) ); CREATE TABLE tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, did TEXT NOT NULL, repository TEXT NOT NULL, tag TEXT NOT NULL, digest TEXT NOT NULL, created_at TIMESTAMP NOT NULL, UNIQUE(did, repository, tag) ); CREATE TABLE stars ( starrer_did TEXT NOT NULL, owner_did TEXT NOT NULL, repository TEXT NOT NULL, created_at TIMESTAMP NOT NULL, PRIMARY KEY(starrer_did, owner_did, repository) ); ` execStatements(t, database, schema) return database } func TestNewProcessor(t *testing.T) { database := setupTestDB(t) defer database.Close() tests := []struct { name string useCache bool }{ {"with cache", true}, {"without cache", false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := NewProcessor(database, tt.useCache, nil) if p == nil { t.Fatal("NewProcessor returned nil") } if p.db != database { t.Error("Processor database not set correctly") } if p.useCache != tt.useCache { t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache) } if tt.useCache && p.userCache == nil { t.Error("Cache enabled but userCache is nil") } if !tt.useCache && p.userCache != nil { t.Error("Cache disabled but userCache is not nil") } }) } } func TestProcessManifest_ImageManifest(t *testing.T) { database := setupTestDB(t) defer database.Close() // Insert test user (required for foreign key on repository_annotations) _, err := database.Exec(`INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`, "did:plc:test123", "test.bsky.social", "https://pds.example.com", time.Now()) if err != nil { t.Fatalf("Failed to insert test user: %v", err) } p := NewProcessor(database, false, nil) ctx := context.Background() // Create test manifest record manifestRecord := &atproto.ManifestRecord{ Repository: "test-app", Digest: "sha256:abc123", MediaType: "application/vnd.oci.image.manifest.v1+json", SchemaVersion: 2, HoldEndpoint: "did:web:hold01.atcr.io", CreatedAt: time.Now(), Config: &atproto.BlobReference{ Digest: "sha256:config123", Size: 1234, }, Layers: []atproto.BlobReference{ {Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, {Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, }, Annotations: map[string]string{ "org.opencontainers.image.title": "Test App", "org.opencontainers.image.description": "A test application", "org.opencontainers.image.source": "https://github.com/test/app", "org.opencontainers.image.licenses": "MIT", "io.atcr.icon": "https://example.com/icon.png", }, } // Marshal to bytes for ProcessManifest recordBytes, err := json.Marshal(manifestRecord) if err != nil { t.Fatalf("Failed to marshal manifest: %v", err) } // Process manifest manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("ProcessManifest failed: %v", err) } if manifestID == 0 { t.Error("Expected non-zero manifest ID") } // Verify manifest was inserted var count int err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?", "did:plc:test123", "test-app", "sha256:abc123").Scan(&count) if err != nil { t.Fatalf("Failed to query manifests: %v", err) } if count != 1 { t.Errorf("Expected 1 manifest, got %d", count) } // Verify annotations were stored in repository_annotations table var title, source string err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", "did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title) if err != nil { t.Fatalf("Failed to query title annotation: %v", err) } if title != "Test App" { t.Errorf("title = %q, want %q", title, "Test App") } err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", "did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source) if err != nil { t.Fatalf("Failed to query source annotation: %v", err) } if source != "https://github.com/test/app" { t.Errorf("source = %q, want %q", source, "https://github.com/test/app") } // Verify layers were inserted var layerCount int err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) if err != nil { t.Fatalf("Failed to query layers: %v", err) } if layerCount != 2 { t.Errorf("Expected 2 layers, got %d", layerCount) } // Verify no manifest references (this is an image, not a list) var refCount int err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) if err != nil { t.Fatalf("Failed to query manifest_references: %v", err) } if refCount != 0 { t.Errorf("Expected 0 manifest references, got %d", refCount) } } func TestProcessManifest_ManifestList(t *testing.T) { database := setupTestDB(t) defer database.Close() p := NewProcessor(database, false, nil) ctx := context.Background() // Create test manifest list record manifestRecord := &atproto.ManifestRecord{ Repository: "test-app", Digest: "sha256:list123", MediaType: "application/vnd.oci.image.index.v1+json", SchemaVersion: 2, HoldEndpoint: "did:web:hold01.atcr.io", CreatedAt: time.Now(), Manifests: []atproto.ManifestReference{ { Digest: "sha256:amd64manifest", MediaType: "application/vnd.oci.image.manifest.v1+json", Size: 1000, Platform: &atproto.Platform{ Architecture: "amd64", OS: "linux", }, }, { Digest: "sha256:arm64manifest", MediaType: "application/vnd.oci.image.manifest.v1+json", Size: 1100, Platform: &atproto.Platform{ Architecture: "arm64", OS: "linux", Variant: "v8", }, }, }, } // Marshal to bytes for ProcessManifest recordBytes, err := json.Marshal(manifestRecord) if err != nil { t.Fatalf("Failed to marshal manifest: %v", err) } // Process manifest list manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("ProcessManifest failed: %v", err) } // Verify manifest references were inserted var refCount int err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) if err != nil { t.Fatalf("Failed to query manifest_references: %v", err) } if refCount != 2 { t.Errorf("Expected 2 manifest references, got %d", refCount) } // Verify platform info was stored var arch, os string err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os) if err != nil { t.Fatalf("Failed to query platform info: %v", err) } if arch != "amd64" { t.Errorf("platform_architecture = %q, want %q", arch, "amd64") } if os != "linux" { t.Errorf("platform_os = %q, want %q", os, "linux") } // Verify no layers (this is a list, not an image) var layerCount int err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) if err != nil { t.Fatalf("Failed to query layers: %v", err) } if layerCount != 0 { t.Errorf("Expected 0 layers, got %d", layerCount) } } func TestProcessTag(t *testing.T) { database := setupTestDB(t) defer database.Close() p := NewProcessor(database, false, nil) ctx := context.Background() // Create test tag record (using ManifestDigest field for simplicity) tagRecord := &atproto.TagRecord{ Repository: "test-app", Tag: "latest", ManifestDigest: "sha256:abc123", UpdatedAt: time.Now(), } // Marshal to bytes for ProcessTag recordBytes, err := json.Marshal(tagRecord) if err != nil { t.Fatalf("Failed to marshal tag: %v", err) } // Process tag err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("ProcessTag failed: %v", err) } // Verify tag was inserted var count int err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", "did:plc:test123", "test-app", "latest").Scan(&count) if err != nil { t.Fatalf("Failed to query tags: %v", err) } if count != 1 { t.Errorf("Expected 1 tag, got %d", count) } // Verify digest was stored var digest string err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", "did:plc:test123", "test-app", "latest").Scan(&digest) if err != nil { t.Fatalf("Failed to query tag digest: %v", err) } if digest != "sha256:abc123" { t.Errorf("digest = %q, want %q", digest, "sha256:abc123") } // Test upserting same tag with new digest tagRecord.ManifestDigest = "sha256:newdigest" recordBytes, err = json.Marshal(tagRecord) if err != nil { t.Fatalf("Failed to marshal tag: %v", err) } err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("ProcessTag (upsert) failed: %v", err) } // Verify tag was updated err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", "did:plc:test123", "test-app", "latest").Scan(&digest) if err != nil { t.Fatalf("Failed to query updated tag: %v", err) } if digest != "sha256:newdigest" { t.Errorf("digest = %q, want %q", digest, "sha256:newdigest") } // Verify still only one tag (upsert, not insert) err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", "did:plc:test123", "test-app", "latest").Scan(&count) if err != nil { t.Fatalf("Failed to query tags after upsert: %v", err) } if count != 1 { t.Errorf("Expected 1 tag after upsert, got %d", count) } } func TestProcessStar(t *testing.T) { database := setupTestDB(t) defer database.Close() // Insert test users (starrer + owner) so EnsureUser finds them without network calls for _, did := range []string{"did:plc:starrer123", "did:plc:owner123"} { _, err := database.Exec( `INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`, did, did+".test", "https://pds.example.com", time.Now()) if err != nil { t.Fatalf("Failed to insert test user %s: %v", did, err) } } p := NewProcessor(database, false, nil) ctx := context.Background() // Create test star record (new AT URI format) starRecord := atproto.NewStarRecord("did:plc:owner123", "test-app") // Marshal to bytes for ProcessStar recordBytes, err := json.Marshal(starRecord) if err != nil { t.Fatalf("Failed to marshal star: %v", err) } // Process star err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) if err != nil { t.Fatalf("ProcessStar failed: %v", err) } // Verify star was inserted var count int err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) if err != nil { t.Fatalf("Failed to query stars: %v", err) } if count != 1 { t.Errorf("Expected 1 star, got %d", count) } // Test upserting same star (should be idempotent) recordBytes, err = json.Marshal(starRecord) if err != nil { t.Fatalf("Failed to marshal star: %v", err) } err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) if err != nil { t.Fatalf("ProcessStar (upsert) failed: %v", err) } // Verify still only one star err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) if err != nil { t.Fatalf("Failed to query stars after upsert: %v", err) } if count != 1 { t.Errorf("Expected 1 star after upsert, got %d", count) } } func TestProcessStar_OldFormat(t *testing.T) { database := setupTestDB(t) defer database.Close() // Insert test users for _, did := range []string{"did:plc:starrer456", "did:plc:owner456"} { _, err := database.Exec( `INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`, did, did+".test", "https://pds.example.com", time.Now()) if err != nil { t.Fatalf("Failed to insert test user %s: %v", did, err) } } p := NewProcessor(database, false, nil) ctx := context.Background() // Old format JSON (object subject with did + repository) oldFormatJSON := `{"$type":"io.atcr.sailor.star","subject":{"did":"did:plc:owner456","repository":"legacy-app"},"createdAt":"2025-06-01T00:00:00Z"}` err := p.ProcessStar(ctx, "did:plc:starrer456", []byte(oldFormatJSON)) if err != nil { t.Fatalf("ProcessStar (old format) failed: %v", err) } // Verify star was inserted with correct owner/repo var count int err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", "did:plc:starrer456", "did:plc:owner456", "legacy-app").Scan(&count) if err != nil { t.Fatalf("Failed to query stars: %v", err) } if count != 1 { t.Errorf("Expected 1 star from old format, got %d", count) } } func TestProcessManifest_Duplicate(t *testing.T) { database := setupTestDB(t) defer database.Close() p := NewProcessor(database, false, nil) ctx := context.Background() manifestRecord := &atproto.ManifestRecord{ Repository: "test-app", Digest: "sha256:abc123", MediaType: "application/vnd.oci.image.manifest.v1+json", SchemaVersion: 2, HoldEndpoint: "did:web:hold01.atcr.io", CreatedAt: time.Now(), } // Marshal to bytes for ProcessManifest recordBytes, err := json.Marshal(manifestRecord) if err != nil { t.Fatalf("Failed to marshal manifest: %v", err) } // Insert first time id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("First ProcessManifest failed: %v", err) } // Insert duplicate id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("Duplicate ProcessManifest failed: %v", err) } // Should return existing ID if id1 != id2 { t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2) } // Verify only one manifest exists var count int err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?", "did:plc:test123", "sha256:abc123").Scan(&count) if err != nil { t.Fatalf("Failed to query manifests: %v", err) } if count != 1 { t.Errorf("Expected 1 manifest, got %d", count) } } func TestProcessManifest_EmptyAnnotations(t *testing.T) { database := setupTestDB(t) defer database.Close() p := NewProcessor(database, false, nil) ctx := context.Background() // Manifest with nil annotations manifestRecord := &atproto.ManifestRecord{ Repository: "test-app", Digest: "sha256:abc123", MediaType: "application/vnd.oci.image.manifest.v1+json", SchemaVersion: 2, HoldEndpoint: "did:web:hold01.atcr.io", CreatedAt: time.Now(), Annotations: nil, } // Marshal to bytes for ProcessManifest recordBytes, err := json.Marshal(manifestRecord) if err != nil { t.Fatalf("Failed to marshal manifest: %v", err) } _, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("ProcessManifest failed: %v", err) } // Verify no annotations were stored (nil annotations should not create entries) var annotationCount int err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?", "did:plc:test123", "test-app").Scan(&annotationCount) if err != nil { t.Fatalf("Failed to query annotations: %v", err) } if annotationCount != 0 { t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount) } } func TestProcessIdentity(t *testing.T) { db := setupTestDB(t) defer db.Close() processor := NewProcessor(db, false, nil) // Setup: Create test user testDID := "did:plc:alice123" testHandle := "alice.bsky.social" testPDS := "https://bsky.social" _, err := db.Exec(` INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?) `, testDID, testHandle, testPDS, time.Now()) if err != nil { t.Fatalf("Failed to insert test user: %v", err) } // Test 1: Process identity change event newHandle := "alice-new.bsky.social" err = processor.ProcessIdentity(context.Background(), testDID, newHandle) // Note: This will fail to invalidate cache since we don't have a real identity directory, // but we can still verify the database update happened if err != nil { t.Logf("Expected cache invalidation error (no real directory): %v", err) } // Verify handle was updated in database var retrievedHandle string err = db.QueryRow(` SELECT handle FROM users WHERE did = ? `, testDID).Scan(&retrievedHandle) if err != nil { t.Fatalf("Failed to query updated user: %v", err) } if retrievedHandle != newHandle { t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle) } // Test 2: Process identity change for non-existent user // Should not error (UPDATE just affects 0 rows) err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle") if err != nil { t.Logf("Expected cache invalidation error: %v", err) } // Test 3: Process multiple identity changes handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"} for _, handle := range handles { err = processor.ProcessIdentity(context.Background(), testDID, handle) if err != nil { t.Logf("Expected cache invalidation error: %v", err) } err = db.QueryRow(` SELECT handle FROM users WHERE did = ? `, testDID).Scan(&retrievedHandle) if err != nil { t.Fatalf("Failed to query user after handle update: %v", err) } if retrievedHandle != handle { t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle) } } } func TestProcessRecord_RoutesCorrectly(t *testing.T) { db := setupTestDB(t) defer db.Close() // Add missing tables for this test execStatements(t, db, ` CREATE TABLE repo_pages ( did TEXT NOT NULL, repository TEXT NOT NULL, description TEXT, avatar_cid TEXT, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL, PRIMARY KEY(did, repository) ); CREATE TABLE hold_captain_records ( hold_did TEXT PRIMARY KEY, owner_did TEXT NOT NULL, public BOOLEAN NOT NULL, allow_all_crew BOOLEAN NOT NULL, deployed_at TEXT, region TEXT, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE hold_crew_members ( hold_did TEXT NOT NULL, member_did TEXT NOT NULL, rkey TEXT NOT NULL, role TEXT, permissions TEXT, tier TEXT, added_at TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (hold_did, member_did) ); `) processor := NewProcessor(db, false, nil) ctx := context.Background() var err error // Test 1: ProcessRecord routes manifest correctly // Note: Schema validation may fail for io.atcr.manifest since we can't resolve the schema, // but this tests the routing logic manifestRecord := map[string]any{ "$type": "io.atcr.manifest", "repository": "test-app", "digest": "sha256:route123", "mediaType": "application/vnd.oci.image.manifest.v1+json", "schemaVersion": 2, "holdDid": "did:web:hold01.atcr.io", "createdAt": time.Now().Format(time.RFC3339), } recordBytes, _ := json.Marshal(manifestRecord) // Note: ProcessRecord will skip validation if lexicon can't be resolved (expected in tests) // and will skip EnsureUser since we don't have a real PDS to resolve // Just verify the record is processed without panic err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "route123", recordBytes, false, nil) // Error expected since we can't resolve identity - that's fine for this test if err != nil { t.Logf("Expected error (can't resolve identity): %v", err) } // Test 2: ProcessRecord handles captain record without creating user captainRecord := map[string]any{ "$type": "io.atcr.hold.captain", "owner": "did:plc:owner123", "public": true, "allowAllCrew": false, "enableBlueskyPosts": false, "deployedAt": time.Now().Format(time.RFC3339), } captainBytes, _ := json.Marshal(captainRecord) // This should NOT call EnsureUser (captain is a hold collection) err = processor.ProcessRecord(ctx, "did:web:hold.example.com", atproto.CaptainCollection, "self", captainBytes, false, nil) if err != nil { t.Logf("Error processing captain (validation may fail in test): %v", err) } // Verify no user was created for the hold DID var userCount int err = db.QueryRow(`SELECT COUNT(*) FROM users WHERE did = ?`, "did:web:hold.example.com").Scan(&userCount) if err != nil { t.Fatalf("Failed to query users: %v", err) } if userCount != 0 { t.Error("Captain record processing should NOT create a user entry for holds") } // Test 3: ProcessRecord handles delete operations err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "sha256:todelete", nil, true, nil) if err != nil { t.Errorf("Delete should not error: %v", err) } } func TestProcessRecord_SkipsInvalidRecords(t *testing.T) { db := setupTestDB(t) defer db.Close() processor := NewProcessor(db, false, nil) ctx := context.Background() // Test: Invalid JSON should be skipped silently (no error returned) invalidJSON := []byte(`{invalid json}`) err := processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "test", invalidJSON, false, nil) // Should return nil (skipped silently) not an error if err != nil { t.Errorf("Invalid record should be skipped silently, got error: %v", err) } } func TestValidateRecord(t *testing.T) { db := setupTestDB(t) defer db.Close() processor := NewProcessor(db, false, nil) ctx := context.Background() // Test 1: Manifest passes (no strict validation) manifestJSON := []byte(`{"$type": "io.atcr.manifest", "repository": "test"}`) err := processor.ValidateRecord(ctx, atproto.ManifestCollection, manifestJSON) if err != nil { t.Errorf("Manifest should pass validation: %v", err) } // Test 2: Invalid JSON returns error invalidJSON := []byte(`{invalid}`) err = processor.ValidateRecord(ctx, atproto.ManifestCollection, invalidJSON) if err == nil { t.Error("Invalid JSON should return error") } // Test 3: Captain with valid owner passes captainValid := []byte(`{"owner": "did:plc:owner123", "public": true}`) err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainValid) if err != nil { t.Errorf("Valid captain should pass: %v", err) } // Test 4: Captain with empty owner is rejected captainEmpty := []byte(`{"owner": "", "public": true}`) err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainEmpty) if err == nil { t.Error("Captain with empty owner should be rejected") } // Test 5: Captain with invalid owner (not a DID) is rejected captainInvalid := []byte(`{"owner": "notadid", "public": true}`) err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainInvalid) if err == nil { t.Error("Captain with invalid owner should be rejected") } // Test 6: Crew with valid member passes crewValid := []byte(`{"member": "did:plc:member123", "role": "write"}`) err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewValid) if err != nil { t.Errorf("Valid crew should pass: %v", err) } // Test 7: Crew with empty member is rejected crewEmpty := []byte(`{"member": "", "role": "write"}`) err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewEmpty) if err == nil { t.Error("Crew with empty member should be rejected") } } func TestProcessAccount(t *testing.T) { db := setupTestDB(t) defer db.Close() processor := NewProcessor(db, false, nil) // Setup: Create test user testDID := "did:plc:bob456" testHandle := "bob.bsky.social" testPDS := "https://bsky.social" _, err := db.Exec(` INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?) `, testDID, testHandle, testPDS, time.Now()) if err != nil { t.Fatalf("Failed to insert test user: %v", err) } // Test 1: Process account deactivation event err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated") // Note: Cache invalidation will fail without real directory, but that's expected if err != nil { t.Logf("Expected cache invalidation error (no real directory): %v", err) } // Verify user still exists in database (we don't delete on deactivation) var exists bool err = db.QueryRow(` SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) `, testDID).Scan(&exists) if err != nil { t.Fatalf("Failed to check if user exists: %v", err) } if !exists { t.Error("User should still exist after deactivation event (no deletion)") } // Test 2: Process account with active=true (should be ignored) err = processor.ProcessAccount(context.Background(), testDID, true, "active") if err != nil { t.Errorf("Expected no error for active account, got: %v", err) } // Test 3: Process account with status != "deactivated" (should be ignored) err = processor.ProcessAccount(context.Background(), testDID, false, "suspended") if err != nil { t.Errorf("Expected no error for non-deactivated status, got: %v", err) } // Test 4: Process account deactivation for non-existent user err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated") // Cache invalidation will fail, but that's expected if err != nil { t.Logf("Expected cache invalidation error: %v", err) } // Test 5: Process multiple deactivation events (idempotent) for i := range 3 { err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated") if err != nil { t.Logf("Expected cache invalidation error on iteration %d: %v", i, err) } } // User should still exist after multiple deactivations err = db.QueryRow(` SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) `, testDID).Scan(&exists) if err != nil { t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err) } if !exists { t.Error("User should still exist after multiple deactivation events") } // Test 6: Process account deletion - should delete user data err = processor.ProcessAccount(context.Background(), testDID, false, "deleted") if err != nil { t.Logf("Cache invalidation error during deletion (expected): %v", err) } // User should be deleted after "deleted" status err = db.QueryRow(` SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) `, testDID).Scan(&exists) if err != nil { t.Fatalf("Failed to check if user exists after deletion: %v", err) } if exists { t.Error("User should NOT exist after deletion event") } // Test 7: Process deletion for already-deleted user (idempotent) err = processor.ProcessAccount(context.Background(), testDID, false, "deleted") if err != nil { t.Errorf("Deletion of non-existent user should not error, got: %v", err) } }