A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
at refactor 694 lines 20 kB view raw
1package jetstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "testing" 8 "time" 9 10 "atcr.io/pkg/atproto" 11 _ "github.com/mattn/go-sqlite3" 12) 13 14// setupTestDB creates an in-memory SQLite database for testing 15func setupTestDB(t *testing.T) *sql.DB { 16 database, err := sql.Open("sqlite3", ":memory:") 17 if err != nil { 18 t.Fatalf("Failed to open test database: %v", err) 19 } 20 21 // Create schema 22 schema := ` 23 CREATE TABLE users ( 24 did TEXT PRIMARY KEY, 25 handle TEXT NOT NULL, 26 pds_endpoint TEXT NOT NULL, 27 avatar TEXT, 28 last_seen TIMESTAMP NOT NULL 29 ); 30 31 CREATE TABLE manifests ( 32 id INTEGER PRIMARY KEY AUTOINCREMENT, 33 did TEXT NOT NULL, 34 repository TEXT NOT NULL, 35 digest TEXT NOT NULL, 36 hold_endpoint TEXT NOT NULL, 37 schema_version INTEGER NOT NULL, 38 media_type TEXT NOT NULL, 39 config_digest TEXT, 40 config_size INTEGER, 41 created_at TIMESTAMP NOT NULL, 42 UNIQUE(did, repository, digest) 43 ); 44 45 CREATE TABLE repository_annotations ( 46 did TEXT NOT NULL, 47 repository TEXT NOT NULL, 48 key TEXT NOT NULL, 49 value TEXT NOT NULL, 50 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 51 PRIMARY KEY(did, repository, key), 52 FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE 53 ); 54 55 CREATE TABLE layers ( 56 manifest_id INTEGER NOT NULL, 57 digest TEXT NOT NULL, 58 size INTEGER NOT NULL, 59 media_type TEXT NOT NULL, 60 layer_index INTEGER NOT NULL, 61 PRIMARY KEY(manifest_id, layer_index) 62 ); 63 64 CREATE TABLE manifest_references ( 65 manifest_id INTEGER NOT NULL, 66 digest TEXT NOT NULL, 67 media_type TEXT NOT NULL, 68 size INTEGER NOT NULL, 69 platform_architecture TEXT, 70 platform_os TEXT, 71 platform_variant TEXT, 72 platform_os_version TEXT, 73 is_attestation BOOLEAN DEFAULT FALSE, 74 reference_index INTEGER NOT NULL, 75 PRIMARY KEY(manifest_id, reference_index) 76 ); 77 78 CREATE TABLE tags ( 79 id INTEGER PRIMARY KEY AUTOINCREMENT, 80 did TEXT NOT NULL, 81 repository TEXT NOT NULL, 82 tag TEXT NOT NULL, 83 digest TEXT NOT NULL, 84 created_at TIMESTAMP NOT NULL, 85 UNIQUE(did, repository, tag) 86 ); 87 88 CREATE TABLE stars ( 89 starrer_did TEXT NOT NULL, 90 owner_did TEXT NOT NULL, 91 repository TEXT NOT NULL, 92 created_at TIMESTAMP NOT NULL, 93 PRIMARY KEY(starrer_did, owner_did, repository) 94 ); 95 ` 96 97 if _, err := database.Exec(schema); err != nil { 98 t.Fatalf("Failed to create schema: %v", err) 99 } 100 101 return database 102} 103 104func TestNewProcessor(t *testing.T) { 105 database := setupTestDB(t) 106 defer database.Close() 107 108 tests := []struct { 109 name string 110 useCache bool 111 }{ 112 {"with cache", true}, 113 {"without cache", false}, 114 } 115 116 for _, tt := range tests { 117 t.Run(tt.name, func(t *testing.T) { 118 p := NewProcessor(database, tt.useCache) 119 if p == nil { 120 t.Fatal("NewProcessor returned nil") 121 } 122 if p.db != database { 123 t.Error("Processor database not set correctly") 124 } 125 if p.useCache != tt.useCache { 126 t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache) 127 } 128 if tt.useCache && p.userCache == nil { 129 t.Error("Cache enabled but userCache is nil") 130 } 131 if !tt.useCache && p.userCache != nil { 132 t.Error("Cache disabled but userCache is not nil") 133 } 134 }) 135 } 136} 137 138func TestProcessManifest_ImageManifest(t *testing.T) { 139 database := setupTestDB(t) 140 defer database.Close() 141 142 p := NewProcessor(database, false) 143 ctx := context.Background() 144 145 // Create test manifest record 146 manifestRecord := &atproto.ManifestRecord{ 147 Repository: "test-app", 148 Digest: "sha256:abc123", 149 MediaType: "application/vnd.oci.image.manifest.v1+json", 150 SchemaVersion: 2, 151 HoldEndpoint: "did:web:hold01.atcr.io", 152 CreatedAt: time.Now(), 153 Config: &atproto.BlobReference{ 154 Digest: "sha256:config123", 155 Size: 1234, 156 }, 157 Layers: []atproto.BlobReference{ 158 {Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, 159 {Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, 160 }, 161 Annotations: map[string]string{ 162 "org.opencontainers.image.title": "Test App", 163 "org.opencontainers.image.description": "A test application", 164 "org.opencontainers.image.source": "https://github.com/test/app", 165 "org.opencontainers.image.licenses": "MIT", 166 "io.atcr.icon": "https://example.com/icon.png", 167 }, 168 } 169 170 // Marshal to bytes for ProcessManifest 171 recordBytes, err := json.Marshal(manifestRecord) 172 if err != nil { 173 t.Fatalf("Failed to marshal manifest: %v", err) 174 } 175 176 // Process manifest 177 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 178 if err != nil { 179 t.Fatalf("ProcessManifest failed: %v", err) 180 } 181 if manifestID == 0 { 182 t.Error("Expected non-zero manifest ID") 183 } 184 185 // Verify manifest was inserted 186 var count int 187 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?", 188 "did:plc:test123", "test-app", "sha256:abc123").Scan(&count) 189 if err != nil { 190 t.Fatalf("Failed to query manifests: %v", err) 191 } 192 if count != 1 { 193 t.Errorf("Expected 1 manifest, got %d", count) 194 } 195 196 // Verify annotations were stored in repository_annotations table 197 var title, source string 198 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", 199 "did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title) 200 if err != nil { 201 t.Fatalf("Failed to query title annotation: %v", err) 202 } 203 if title != "Test App" { 204 t.Errorf("title = %q, want %q", title, "Test App") 205 } 206 207 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", 208 "did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source) 209 if err != nil { 210 t.Fatalf("Failed to query source annotation: %v", err) 211 } 212 if source != "https://github.com/test/app" { 213 t.Errorf("source = %q, want %q", source, "https://github.com/test/app") 214 } 215 216 // Verify layers were inserted 217 var layerCount int 218 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) 219 if err != nil { 220 t.Fatalf("Failed to query layers: %v", err) 221 } 222 if layerCount != 2 { 223 t.Errorf("Expected 2 layers, got %d", layerCount) 224 } 225 226 // Verify no manifest references (this is an image, not a list) 227 var refCount int 228 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) 229 if err != nil { 230 t.Fatalf("Failed to query manifest_references: %v", err) 231 } 232 if refCount != 0 { 233 t.Errorf("Expected 0 manifest references, got %d", refCount) 234 } 235} 236 237func TestProcessManifest_ManifestList(t *testing.T) { 238 database := setupTestDB(t) 239 defer database.Close() 240 241 p := NewProcessor(database, false) 242 ctx := context.Background() 243 244 // Create test manifest list record 245 manifestRecord := &atproto.ManifestRecord{ 246 Repository: "test-app", 247 Digest: "sha256:list123", 248 MediaType: "application/vnd.oci.image.index.v1+json", 249 SchemaVersion: 2, 250 HoldEndpoint: "did:web:hold01.atcr.io", 251 CreatedAt: time.Now(), 252 Manifests: []atproto.ManifestReference{ 253 { 254 Digest: "sha256:amd64manifest", 255 MediaType: "application/vnd.oci.image.manifest.v1+json", 256 Size: 1000, 257 Platform: &atproto.Platform{ 258 Architecture: "amd64", 259 OS: "linux", 260 }, 261 }, 262 { 263 Digest: "sha256:arm64manifest", 264 MediaType: "application/vnd.oci.image.manifest.v1+json", 265 Size: 1100, 266 Platform: &atproto.Platform{ 267 Architecture: "arm64", 268 OS: "linux", 269 Variant: "v8", 270 }, 271 }, 272 }, 273 } 274 275 // Marshal to bytes for ProcessManifest 276 recordBytes, err := json.Marshal(manifestRecord) 277 if err != nil { 278 t.Fatalf("Failed to marshal manifest: %v", err) 279 } 280 281 // Process manifest list 282 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 283 if err != nil { 284 t.Fatalf("ProcessManifest failed: %v", err) 285 } 286 287 // Verify manifest references were inserted 288 var refCount int 289 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) 290 if err != nil { 291 t.Fatalf("Failed to query manifest_references: %v", err) 292 } 293 if refCount != 2 { 294 t.Errorf("Expected 2 manifest references, got %d", refCount) 295 } 296 297 // Verify platform info was stored 298 var arch, os string 299 err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os) 300 if err != nil { 301 t.Fatalf("Failed to query platform info: %v", err) 302 } 303 if arch != "amd64" { 304 t.Errorf("platform_architecture = %q, want %q", arch, "amd64") 305 } 306 if os != "linux" { 307 t.Errorf("platform_os = %q, want %q", os, "linux") 308 } 309 310 // Verify no layers (this is a list, not an image) 311 var layerCount int 312 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) 313 if err != nil { 314 t.Fatalf("Failed to query layers: %v", err) 315 } 316 if layerCount != 0 { 317 t.Errorf("Expected 0 layers, got %d", layerCount) 318 } 319} 320 321func TestProcessTag(t *testing.T) { 322 database := setupTestDB(t) 323 defer database.Close() 324 325 p := NewProcessor(database, false) 326 ctx := context.Background() 327 328 // Create test tag record (using ManifestDigest field for simplicity) 329 tagRecord := &atproto.TagRecord{ 330 Repository: "test-app", 331 Tag: "latest", 332 ManifestDigest: "sha256:abc123", 333 UpdatedAt: time.Now(), 334 } 335 336 // Marshal to bytes for ProcessTag 337 recordBytes, err := json.Marshal(tagRecord) 338 if err != nil { 339 t.Fatalf("Failed to marshal tag: %v", err) 340 } 341 342 // Process tag 343 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) 344 if err != nil { 345 t.Fatalf("ProcessTag failed: %v", err) 346 } 347 348 // Verify tag was inserted 349 var count int 350 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", 351 "did:plc:test123", "test-app", "latest").Scan(&count) 352 if err != nil { 353 t.Fatalf("Failed to query tags: %v", err) 354 } 355 if count != 1 { 356 t.Errorf("Expected 1 tag, got %d", count) 357 } 358 359 // Verify digest was stored 360 var digest string 361 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", 362 "did:plc:test123", "test-app", "latest").Scan(&digest) 363 if err != nil { 364 t.Fatalf("Failed to query tag digest: %v", err) 365 } 366 if digest != "sha256:abc123" { 367 t.Errorf("digest = %q, want %q", digest, "sha256:abc123") 368 } 369 370 // Test upserting same tag with new digest 371 tagRecord.ManifestDigest = "sha256:newdigest" 372 recordBytes, err = json.Marshal(tagRecord) 373 if err != nil { 374 t.Fatalf("Failed to marshal tag: %v", err) 375 } 376 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) 377 if err != nil { 378 t.Fatalf("ProcessTag (upsert) failed: %v", err) 379 } 380 381 // Verify tag was updated 382 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", 383 "did:plc:test123", "test-app", "latest").Scan(&digest) 384 if err != nil { 385 t.Fatalf("Failed to query updated tag: %v", err) 386 } 387 if digest != "sha256:newdigest" { 388 t.Errorf("digest = %q, want %q", digest, "sha256:newdigest") 389 } 390 391 // Verify still only one tag (upsert, not insert) 392 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", 393 "did:plc:test123", "test-app", "latest").Scan(&count) 394 if err != nil { 395 t.Fatalf("Failed to query tags after upsert: %v", err) 396 } 397 if count != 1 { 398 t.Errorf("Expected 1 tag after upsert, got %d", count) 399 } 400} 401 402func TestProcessStar(t *testing.T) { 403 database := setupTestDB(t) 404 defer database.Close() 405 406 p := NewProcessor(database, false) 407 ctx := context.Background() 408 409 // Create test star record 410 starRecord := &atproto.StarRecord{ 411 Subject: atproto.StarSubject{ 412 DID: "did:plc:owner123", 413 Repository: "test-app", 414 }, 415 CreatedAt: time.Now(), 416 } 417 418 // Marshal to bytes for ProcessStar 419 recordBytes, err := json.Marshal(starRecord) 420 if err != nil { 421 t.Fatalf("Failed to marshal star: %v", err) 422 } 423 424 // Process star 425 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) 426 if err != nil { 427 t.Fatalf("ProcessStar failed: %v", err) 428 } 429 430 // Verify star was inserted 431 var count int 432 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 433 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) 434 if err != nil { 435 t.Fatalf("Failed to query stars: %v", err) 436 } 437 if count != 1 { 438 t.Errorf("Expected 1 star, got %d", count) 439 } 440 441 // Test upserting same star (should be idempotent) 442 recordBytes, err = json.Marshal(starRecord) 443 if err != nil { 444 t.Fatalf("Failed to marshal star: %v", err) 445 } 446 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) 447 if err != nil { 448 t.Fatalf("ProcessStar (upsert) failed: %v", err) 449 } 450 451 // Verify still only one star 452 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", 453 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) 454 if err != nil { 455 t.Fatalf("Failed to query stars after upsert: %v", err) 456 } 457 if count != 1 { 458 t.Errorf("Expected 1 star after upsert, got %d", count) 459 } 460} 461 462func TestProcessManifest_Duplicate(t *testing.T) { 463 database := setupTestDB(t) 464 defer database.Close() 465 466 p := NewProcessor(database, false) 467 ctx := context.Background() 468 469 manifestRecord := &atproto.ManifestRecord{ 470 Repository: "test-app", 471 Digest: "sha256:abc123", 472 MediaType: "application/vnd.oci.image.manifest.v1+json", 473 SchemaVersion: 2, 474 HoldEndpoint: "did:web:hold01.atcr.io", 475 CreatedAt: time.Now(), 476 } 477 478 // Marshal to bytes for ProcessManifest 479 recordBytes, err := json.Marshal(manifestRecord) 480 if err != nil { 481 t.Fatalf("Failed to marshal manifest: %v", err) 482 } 483 484 // Insert first time 485 id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 486 if err != nil { 487 t.Fatalf("First ProcessManifest failed: %v", err) 488 } 489 490 // Insert duplicate 491 id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 492 if err != nil { 493 t.Fatalf("Duplicate ProcessManifest failed: %v", err) 494 } 495 496 // Should return existing ID 497 if id1 != id2 { 498 t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2) 499 } 500 501 // Verify only one manifest exists 502 var count int 503 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?", 504 "did:plc:test123", "sha256:abc123").Scan(&count) 505 if err != nil { 506 t.Fatalf("Failed to query manifests: %v", err) 507 } 508 if count != 1 { 509 t.Errorf("Expected 1 manifest, got %d", count) 510 } 511} 512 513func TestProcessManifest_EmptyAnnotations(t *testing.T) { 514 database := setupTestDB(t) 515 defer database.Close() 516 517 p := NewProcessor(database, false) 518 ctx := context.Background() 519 520 // Manifest with nil annotations 521 manifestRecord := &atproto.ManifestRecord{ 522 Repository: "test-app", 523 Digest: "sha256:abc123", 524 MediaType: "application/vnd.oci.image.manifest.v1+json", 525 SchemaVersion: 2, 526 HoldEndpoint: "did:web:hold01.atcr.io", 527 CreatedAt: time.Now(), 528 Annotations: nil, 529 } 530 531 // Marshal to bytes for ProcessManifest 532 recordBytes, err := json.Marshal(manifestRecord) 533 if err != nil { 534 t.Fatalf("Failed to marshal manifest: %v", err) 535 } 536 537 _, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes) 538 if err != nil { 539 t.Fatalf("ProcessManifest failed: %v", err) 540 } 541 542 // Verify no annotations were stored (nil annotations should not create entries) 543 var annotationCount int 544 err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?", 545 "did:plc:test123", "test-app").Scan(&annotationCount) 546 if err != nil { 547 t.Fatalf("Failed to query annotations: %v", err) 548 } 549 if annotationCount != 0 { 550 t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount) 551 } 552} 553 554func TestProcessIdentity(t *testing.T) { 555 db := setupTestDB(t) 556 defer db.Close() 557 558 processor := NewProcessor(db, false) 559 560 // Setup: Create test user 561 testDID := "did:plc:alice123" 562 testHandle := "alice.bsky.social" 563 testPDS := "https://bsky.social" 564 _, err := db.Exec(` 565 INSERT INTO users (did, handle, pds_endpoint, last_seen) 566 VALUES (?, ?, ?, ?) 567 `, testDID, testHandle, testPDS, time.Now()) 568 if err != nil { 569 t.Fatalf("Failed to insert test user: %v", err) 570 } 571 572 // Test 1: Process identity change event 573 newHandle := "alice-new.bsky.social" 574 err = processor.ProcessIdentity(context.Background(), testDID, newHandle) 575 // Note: This will fail to invalidate cache since we don't have a real identity directory, 576 // but we can still verify the database update happened 577 if err != nil { 578 t.Logf("Expected cache invalidation error (no real directory): %v", err) 579 } 580 581 // Verify handle was updated in database 582 var retrievedHandle string 583 err = db.QueryRow(` 584 SELECT handle FROM users WHERE did = ? 585 `, testDID).Scan(&retrievedHandle) 586 if err != nil { 587 t.Fatalf("Failed to query updated user: %v", err) 588 } 589 if retrievedHandle != newHandle { 590 t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle) 591 } 592 593 // Test 2: Process identity change for non-existent user 594 // Should not error (UPDATE just affects 0 rows) 595 err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle") 596 if err != nil { 597 t.Logf("Expected cache invalidation error: %v", err) 598 } 599 600 // Test 3: Process multiple identity changes 601 handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"} 602 for _, handle := range handles { 603 err = processor.ProcessIdentity(context.Background(), testDID, handle) 604 if err != nil { 605 t.Logf("Expected cache invalidation error: %v", err) 606 } 607 608 err = db.QueryRow(` 609 SELECT handle FROM users WHERE did = ? 610 `, testDID).Scan(&retrievedHandle) 611 if err != nil { 612 t.Fatalf("Failed to query user after handle update: %v", err) 613 } 614 if retrievedHandle != handle { 615 t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle) 616 } 617 } 618} 619 620func TestProcessAccount(t *testing.T) { 621 db := setupTestDB(t) 622 defer db.Close() 623 624 processor := NewProcessor(db, false) 625 626 // Setup: Create test user 627 testDID := "did:plc:bob456" 628 testHandle := "bob.bsky.social" 629 testPDS := "https://bsky.social" 630 _, err := db.Exec(` 631 INSERT INTO users (did, handle, pds_endpoint, last_seen) 632 VALUES (?, ?, ?, ?) 633 `, testDID, testHandle, testPDS, time.Now()) 634 if err != nil { 635 t.Fatalf("Failed to insert test user: %v", err) 636 } 637 638 // Test 1: Process account deactivation event 639 err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated") 640 // Note: Cache invalidation will fail without real directory, but that's expected 641 if err != nil { 642 t.Logf("Expected cache invalidation error (no real directory): %v", err) 643 } 644 645 // Verify user still exists in database (we don't delete on deactivation) 646 var exists bool 647 err = db.QueryRow(` 648 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) 649 `, testDID).Scan(&exists) 650 if err != nil { 651 t.Fatalf("Failed to check if user exists: %v", err) 652 } 653 if !exists { 654 t.Error("User should still exist after deactivation event (no deletion)") 655 } 656 657 // Test 2: Process account with active=true (should be ignored) 658 err = processor.ProcessAccount(context.Background(), testDID, true, "active") 659 if err != nil { 660 t.Errorf("Expected no error for active account, got: %v", err) 661 } 662 663 // Test 3: Process account with status != "deactivated" (should be ignored) 664 err = processor.ProcessAccount(context.Background(), testDID, false, "suspended") 665 if err != nil { 666 t.Errorf("Expected no error for non-deactivated status, got: %v", err) 667 } 668 669 // Test 4: Process account deactivation for non-existent user 670 err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated") 671 // Cache invalidation will fail, but that's expected 672 if err != nil { 673 t.Logf("Expected cache invalidation error: %v", err) 674 } 675 676 // Test 5: Process multiple deactivation events (idempotent) 677 for i := 0; i < 3; i++ { 678 err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated") 679 if err != nil { 680 t.Logf("Expected cache invalidation error on iteration %d: %v", i, err) 681 } 682 } 683 684 // User should still exist after multiple deactivations 685 err = db.QueryRow(` 686 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) 687 `, testDID).Scan(&exists) 688 if err != nil { 689 t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err) 690 } 691 if !exists { 692 t.Error("User should still exist after multiple deactivation events") 693 } 694}