A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
1package jetstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "strings"
8 "testing"
9 "time"
10
11 "atcr.io/pkg/atproto"
12 _ "github.com/tursodatabase/go-libsql"
13)
14
15// execStatements splits a multi-statement SQL string and executes each statement individually.
16// go-libsql does not support multi-statement Exec like mattn/go-sqlite3.
17func execStatements(t *testing.T, db *sql.DB, schema string) {
18 t.Helper()
19 for _, stmt := range strings.Split(schema, ";") {
20 stmt = strings.TrimSpace(stmt)
21 if stmt == "" {
22 continue
23 }
24 if _, err := db.Exec(stmt); err != nil {
25 t.Fatalf("Failed to execute statement: %v\nSQL: %s", err, stmt)
26 }
27 }
28}
29
30// setupTestDB creates an in-memory SQLite database for testing
31func setupTestDB(t *testing.T) *sql.DB {
32 database, err := sql.Open("libsql", ":memory:")
33 if err != nil {
34 t.Fatalf("Failed to open test database: %v", err)
35 }
36
37 // Create schema
38 schema := `
39 CREATE TABLE users (
40 did TEXT PRIMARY KEY,
41 handle TEXT NOT NULL,
42 pds_endpoint TEXT NOT NULL,
43 avatar TEXT,
44 default_hold_did TEXT,
45 last_seen TIMESTAMP NOT NULL
46 );
47
48 CREATE TABLE manifests (
49 id INTEGER PRIMARY KEY AUTOINCREMENT,
50 did TEXT NOT NULL,
51 repository TEXT NOT NULL,
52 digest TEXT NOT NULL,
53 hold_endpoint TEXT NOT NULL,
54 schema_version INTEGER NOT NULL,
55 media_type TEXT NOT NULL,
56 config_digest TEXT,
57 config_size INTEGER,
58 artifact_type TEXT NOT NULL DEFAULT 'container-image',
59 created_at TIMESTAMP NOT NULL,
60 UNIQUE(did, repository, digest)
61 );
62
63 CREATE TABLE repository_annotations (
64 did TEXT NOT NULL,
65 repository TEXT NOT NULL,
66 key TEXT NOT NULL,
67 value TEXT NOT NULL,
68 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
69 PRIMARY KEY(did, repository, key),
70 FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE
71 );
72
73 CREATE TABLE layers (
74 manifest_id INTEGER NOT NULL,
75 digest TEXT NOT NULL,
76 size INTEGER NOT NULL,
77 media_type TEXT NOT NULL,
78 layer_index INTEGER NOT NULL,
79 annotations TEXT,
80 PRIMARY KEY(manifest_id, layer_index)
81 );
82
83 CREATE TABLE manifest_references (
84 manifest_id INTEGER NOT NULL,
85 digest TEXT NOT NULL,
86 media_type TEXT NOT NULL,
87 size INTEGER NOT NULL,
88 platform_architecture TEXT,
89 platform_os TEXT,
90 platform_variant TEXT,
91 platform_os_version TEXT,
92 is_attestation BOOLEAN DEFAULT FALSE,
93 reference_index INTEGER NOT NULL,
94 PRIMARY KEY(manifest_id, reference_index)
95 );
96
97 CREATE TABLE tags (
98 id INTEGER PRIMARY KEY AUTOINCREMENT,
99 did TEXT NOT NULL,
100 repository TEXT NOT NULL,
101 tag TEXT NOT NULL,
102 digest TEXT NOT NULL,
103 created_at TIMESTAMP NOT NULL,
104 UNIQUE(did, repository, tag)
105 );
106
107 CREATE TABLE stars (
108 starrer_did TEXT NOT NULL,
109 owner_did TEXT NOT NULL,
110 repository TEXT NOT NULL,
111 created_at TIMESTAMP NOT NULL,
112 PRIMARY KEY(starrer_did, owner_did, repository)
113 );
114 `
115
116 execStatements(t, database, schema)
117
118 return database
119}
120
121func TestNewProcessor(t *testing.T) {
122 database := setupTestDB(t)
123 defer database.Close()
124
125 tests := []struct {
126 name string
127 useCache bool
128 }{
129 {"with cache", true},
130 {"without cache", false},
131 }
132
133 for _, tt := range tests {
134 t.Run(tt.name, func(t *testing.T) {
135 p := NewProcessor(database, tt.useCache, nil)
136 if p == nil {
137 t.Fatal("NewProcessor returned nil")
138 }
139 if p.db != database {
140 t.Error("Processor database not set correctly")
141 }
142 if p.useCache != tt.useCache {
143 t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache)
144 }
145 if tt.useCache && p.userCache == nil {
146 t.Error("Cache enabled but userCache is nil")
147 }
148 if !tt.useCache && p.userCache != nil {
149 t.Error("Cache disabled but userCache is not nil")
150 }
151 })
152 }
153}
154
155func TestProcessManifest_ImageManifest(t *testing.T) {
156 database := setupTestDB(t)
157 defer database.Close()
158
159 // Insert test user (required for foreign key on repository_annotations)
160 _, err := database.Exec(`INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`,
161 "did:plc:test123", "test.bsky.social", "https://pds.example.com", time.Now())
162 if err != nil {
163 t.Fatalf("Failed to insert test user: %v", err)
164 }
165
166 p := NewProcessor(database, false, nil)
167 ctx := context.Background()
168
169 // Create test manifest record
170 manifestRecord := &atproto.ManifestRecord{
171 Repository: "test-app",
172 Digest: "sha256:abc123",
173 MediaType: "application/vnd.oci.image.manifest.v1+json",
174 SchemaVersion: 2,
175 HoldEndpoint: "did:web:hold01.atcr.io",
176 CreatedAt: time.Now(),
177 Config: &atproto.BlobReference{
178 Digest: "sha256:config123",
179 Size: 1234,
180 },
181 Layers: []atproto.BlobReference{
182 {Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
183 {Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
184 },
185 Annotations: map[string]string{
186 "org.opencontainers.image.title": "Test App",
187 "org.opencontainers.image.description": "A test application",
188 "org.opencontainers.image.source": "https://github.com/test/app",
189 "org.opencontainers.image.licenses": "MIT",
190 "io.atcr.icon": "https://example.com/icon.png",
191 },
192 }
193
194 // Marshal to bytes for ProcessManifest
195 recordBytes, err := json.Marshal(manifestRecord)
196 if err != nil {
197 t.Fatalf("Failed to marshal manifest: %v", err)
198 }
199
200 // Process manifest
201 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
202 if err != nil {
203 t.Fatalf("ProcessManifest failed: %v", err)
204 }
205 if manifestID == 0 {
206 t.Error("Expected non-zero manifest ID")
207 }
208
209 // Verify manifest was inserted
210 var count int
211 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?",
212 "did:plc:test123", "test-app", "sha256:abc123").Scan(&count)
213 if err != nil {
214 t.Fatalf("Failed to query manifests: %v", err)
215 }
216 if count != 1 {
217 t.Errorf("Expected 1 manifest, got %d", count)
218 }
219
220 // Verify annotations were stored in repository_annotations table
221 var title, source string
222 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
223 "did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title)
224 if err != nil {
225 t.Fatalf("Failed to query title annotation: %v", err)
226 }
227 if title != "Test App" {
228 t.Errorf("title = %q, want %q", title, "Test App")
229 }
230
231 err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
232 "did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source)
233 if err != nil {
234 t.Fatalf("Failed to query source annotation: %v", err)
235 }
236 if source != "https://github.com/test/app" {
237 t.Errorf("source = %q, want %q", source, "https://github.com/test/app")
238 }
239
240 // Verify layers were inserted
241 var layerCount int
242 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
243 if err != nil {
244 t.Fatalf("Failed to query layers: %v", err)
245 }
246 if layerCount != 2 {
247 t.Errorf("Expected 2 layers, got %d", layerCount)
248 }
249
250 // Verify no manifest references (this is an image, not a list)
251 var refCount int
252 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
253 if err != nil {
254 t.Fatalf("Failed to query manifest_references: %v", err)
255 }
256 if refCount != 0 {
257 t.Errorf("Expected 0 manifest references, got %d", refCount)
258 }
259}
260
261func TestProcessManifest_ManifestList(t *testing.T) {
262 database := setupTestDB(t)
263 defer database.Close()
264
265 p := NewProcessor(database, false, nil)
266 ctx := context.Background()
267
268 // Create test manifest list record
269 manifestRecord := &atproto.ManifestRecord{
270 Repository: "test-app",
271 Digest: "sha256:list123",
272 MediaType: "application/vnd.oci.image.index.v1+json",
273 SchemaVersion: 2,
274 HoldEndpoint: "did:web:hold01.atcr.io",
275 CreatedAt: time.Now(),
276 Manifests: []atproto.ManifestReference{
277 {
278 Digest: "sha256:amd64manifest",
279 MediaType: "application/vnd.oci.image.manifest.v1+json",
280 Size: 1000,
281 Platform: &atproto.Platform{
282 Architecture: "amd64",
283 OS: "linux",
284 },
285 },
286 {
287 Digest: "sha256:arm64manifest",
288 MediaType: "application/vnd.oci.image.manifest.v1+json",
289 Size: 1100,
290 Platform: &atproto.Platform{
291 Architecture: "arm64",
292 OS: "linux",
293 Variant: "v8",
294 },
295 },
296 },
297 }
298
299 // Marshal to bytes for ProcessManifest
300 recordBytes, err := json.Marshal(manifestRecord)
301 if err != nil {
302 t.Fatalf("Failed to marshal manifest: %v", err)
303 }
304
305 // Process manifest list
306 manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
307 if err != nil {
308 t.Fatalf("ProcessManifest failed: %v", err)
309 }
310
311 // Verify manifest references were inserted
312 var refCount int
313 err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
314 if err != nil {
315 t.Fatalf("Failed to query manifest_references: %v", err)
316 }
317 if refCount != 2 {
318 t.Errorf("Expected 2 manifest references, got %d", refCount)
319 }
320
321 // Verify platform info was stored
322 var arch, os string
323 err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os)
324 if err != nil {
325 t.Fatalf("Failed to query platform info: %v", err)
326 }
327 if arch != "amd64" {
328 t.Errorf("platform_architecture = %q, want %q", arch, "amd64")
329 }
330 if os != "linux" {
331 t.Errorf("platform_os = %q, want %q", os, "linux")
332 }
333
334 // Verify no layers (this is a list, not an image)
335 var layerCount int
336 err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
337 if err != nil {
338 t.Fatalf("Failed to query layers: %v", err)
339 }
340 if layerCount != 0 {
341 t.Errorf("Expected 0 layers, got %d", layerCount)
342 }
343}
344
345func TestProcessTag(t *testing.T) {
346 database := setupTestDB(t)
347 defer database.Close()
348
349 p := NewProcessor(database, false, nil)
350 ctx := context.Background()
351
352 // Create test tag record (using ManifestDigest field for simplicity)
353 tagRecord := &atproto.TagRecord{
354 Repository: "test-app",
355 Tag: "latest",
356 ManifestDigest: "sha256:abc123",
357 UpdatedAt: time.Now(),
358 }
359
360 // Marshal to bytes for ProcessTag
361 recordBytes, err := json.Marshal(tagRecord)
362 if err != nil {
363 t.Fatalf("Failed to marshal tag: %v", err)
364 }
365
366 // Process tag
367 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
368 if err != nil {
369 t.Fatalf("ProcessTag failed: %v", err)
370 }
371
372 // Verify tag was inserted
373 var count int
374 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
375 "did:plc:test123", "test-app", "latest").Scan(&count)
376 if err != nil {
377 t.Fatalf("Failed to query tags: %v", err)
378 }
379 if count != 1 {
380 t.Errorf("Expected 1 tag, got %d", count)
381 }
382
383 // Verify digest was stored
384 var digest string
385 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
386 "did:plc:test123", "test-app", "latest").Scan(&digest)
387 if err != nil {
388 t.Fatalf("Failed to query tag digest: %v", err)
389 }
390 if digest != "sha256:abc123" {
391 t.Errorf("digest = %q, want %q", digest, "sha256:abc123")
392 }
393
394 // Test upserting same tag with new digest
395 tagRecord.ManifestDigest = "sha256:newdigest"
396 recordBytes, err = json.Marshal(tagRecord)
397 if err != nil {
398 t.Fatalf("Failed to marshal tag: %v", err)
399 }
400 err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
401 if err != nil {
402 t.Fatalf("ProcessTag (upsert) failed: %v", err)
403 }
404
405 // Verify tag was updated
406 err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
407 "did:plc:test123", "test-app", "latest").Scan(&digest)
408 if err != nil {
409 t.Fatalf("Failed to query updated tag: %v", err)
410 }
411 if digest != "sha256:newdigest" {
412 t.Errorf("digest = %q, want %q", digest, "sha256:newdigest")
413 }
414
415 // Verify still only one tag (upsert, not insert)
416 err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
417 "did:plc:test123", "test-app", "latest").Scan(&count)
418 if err != nil {
419 t.Fatalf("Failed to query tags after upsert: %v", err)
420 }
421 if count != 1 {
422 t.Errorf("Expected 1 tag after upsert, got %d", count)
423 }
424}
425
426func TestProcessStar(t *testing.T) {
427 database := setupTestDB(t)
428 defer database.Close()
429
430 // Insert test users (starrer + owner) so EnsureUser finds them without network calls
431 for _, did := range []string{"did:plc:starrer123", "did:plc:owner123"} {
432 _, err := database.Exec(
433 `INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`,
434 did, did+".test", "https://pds.example.com", time.Now())
435 if err != nil {
436 t.Fatalf("Failed to insert test user %s: %v", did, err)
437 }
438 }
439
440 p := NewProcessor(database, false, nil)
441 ctx := context.Background()
442
443 // Create test star record (new AT URI format)
444 starRecord := atproto.NewStarRecord("did:plc:owner123", "test-app")
445
446 // Marshal to bytes for ProcessStar
447 recordBytes, err := json.Marshal(starRecord)
448 if err != nil {
449 t.Fatalf("Failed to marshal star: %v", err)
450 }
451
452 // Process star
453 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
454 if err != nil {
455 t.Fatalf("ProcessStar failed: %v", err)
456 }
457
458 // Verify star was inserted
459 var count int
460 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
461 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
462 if err != nil {
463 t.Fatalf("Failed to query stars: %v", err)
464 }
465 if count != 1 {
466 t.Errorf("Expected 1 star, got %d", count)
467 }
468
469 // Test upserting same star (should be idempotent)
470 recordBytes, err = json.Marshal(starRecord)
471 if err != nil {
472 t.Fatalf("Failed to marshal star: %v", err)
473 }
474 err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
475 if err != nil {
476 t.Fatalf("ProcessStar (upsert) failed: %v", err)
477 }
478
479 // Verify still only one star
480 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
481 "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
482 if err != nil {
483 t.Fatalf("Failed to query stars after upsert: %v", err)
484 }
485 if count != 1 {
486 t.Errorf("Expected 1 star after upsert, got %d", count)
487 }
488}
489
490func TestProcessStar_OldFormat(t *testing.T) {
491 database := setupTestDB(t)
492 defer database.Close()
493
494 // Insert test users
495 for _, did := range []string{"did:plc:starrer456", "did:plc:owner456"} {
496 _, err := database.Exec(
497 `INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`,
498 did, did+".test", "https://pds.example.com", time.Now())
499 if err != nil {
500 t.Fatalf("Failed to insert test user %s: %v", did, err)
501 }
502 }
503
504 p := NewProcessor(database, false, nil)
505 ctx := context.Background()
506
507 // Old format JSON (object subject with did + repository)
508 oldFormatJSON := `{"$type":"io.atcr.sailor.star","subject":{"did":"did:plc:owner456","repository":"legacy-app"},"createdAt":"2025-06-01T00:00:00Z"}`
509
510 err := p.ProcessStar(ctx, "did:plc:starrer456", []byte(oldFormatJSON))
511 if err != nil {
512 t.Fatalf("ProcessStar (old format) failed: %v", err)
513 }
514
515 // Verify star was inserted with correct owner/repo
516 var count int
517 err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
518 "did:plc:starrer456", "did:plc:owner456", "legacy-app").Scan(&count)
519 if err != nil {
520 t.Fatalf("Failed to query stars: %v", err)
521 }
522 if count != 1 {
523 t.Errorf("Expected 1 star from old format, got %d", count)
524 }
525}
526
527func TestProcessManifest_Duplicate(t *testing.T) {
528 database := setupTestDB(t)
529 defer database.Close()
530
531 p := NewProcessor(database, false, nil)
532 ctx := context.Background()
533
534 manifestRecord := &atproto.ManifestRecord{
535 Repository: "test-app",
536 Digest: "sha256:abc123",
537 MediaType: "application/vnd.oci.image.manifest.v1+json",
538 SchemaVersion: 2,
539 HoldEndpoint: "did:web:hold01.atcr.io",
540 CreatedAt: time.Now(),
541 }
542
543 // Marshal to bytes for ProcessManifest
544 recordBytes, err := json.Marshal(manifestRecord)
545 if err != nil {
546 t.Fatalf("Failed to marshal manifest: %v", err)
547 }
548
549 // Insert first time
550 id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
551 if err != nil {
552 t.Fatalf("First ProcessManifest failed: %v", err)
553 }
554
555 // Insert duplicate
556 id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
557 if err != nil {
558 t.Fatalf("Duplicate ProcessManifest failed: %v", err)
559 }
560
561 // Should return existing ID
562 if id1 != id2 {
563 t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2)
564 }
565
566 // Verify only one manifest exists
567 var count int
568 err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?",
569 "did:plc:test123", "sha256:abc123").Scan(&count)
570 if err != nil {
571 t.Fatalf("Failed to query manifests: %v", err)
572 }
573 if count != 1 {
574 t.Errorf("Expected 1 manifest, got %d", count)
575 }
576}
577
578func TestProcessManifest_EmptyAnnotations(t *testing.T) {
579 database := setupTestDB(t)
580 defer database.Close()
581
582 p := NewProcessor(database, false, nil)
583 ctx := context.Background()
584
585 // Manifest with nil annotations
586 manifestRecord := &atproto.ManifestRecord{
587 Repository: "test-app",
588 Digest: "sha256:abc123",
589 MediaType: "application/vnd.oci.image.manifest.v1+json",
590 SchemaVersion: 2,
591 HoldEndpoint: "did:web:hold01.atcr.io",
592 CreatedAt: time.Now(),
593 Annotations: nil,
594 }
595
596 // Marshal to bytes for ProcessManifest
597 recordBytes, err := json.Marshal(manifestRecord)
598 if err != nil {
599 t.Fatalf("Failed to marshal manifest: %v", err)
600 }
601
602 _, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
603 if err != nil {
604 t.Fatalf("ProcessManifest failed: %v", err)
605 }
606
607 // Verify no annotations were stored (nil annotations should not create entries)
608 var annotationCount int
609 err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?",
610 "did:plc:test123", "test-app").Scan(&annotationCount)
611 if err != nil {
612 t.Fatalf("Failed to query annotations: %v", err)
613 }
614 if annotationCount != 0 {
615 t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount)
616 }
617}
618
619func TestProcessIdentity(t *testing.T) {
620 db := setupTestDB(t)
621 defer db.Close()
622
623 processor := NewProcessor(db, false, nil)
624
625 // Setup: Create test user
626 testDID := "did:plc:alice123"
627 testHandle := "alice.bsky.social"
628 testPDS := "https://bsky.social"
629 _, err := db.Exec(`
630 INSERT INTO users (did, handle, pds_endpoint, last_seen)
631 VALUES (?, ?, ?, ?)
632 `, testDID, testHandle, testPDS, time.Now())
633 if err != nil {
634 t.Fatalf("Failed to insert test user: %v", err)
635 }
636
637 // Test 1: Process identity change event
638 newHandle := "alice-new.bsky.social"
639 err = processor.ProcessIdentity(context.Background(), testDID, newHandle)
640 // Note: This will fail to invalidate cache since we don't have a real identity directory,
641 // but we can still verify the database update happened
642 if err != nil {
643 t.Logf("Expected cache invalidation error (no real directory): %v", err)
644 }
645
646 // Verify handle was updated in database
647 var retrievedHandle string
648 err = db.QueryRow(`
649 SELECT handle FROM users WHERE did = ?
650 `, testDID).Scan(&retrievedHandle)
651 if err != nil {
652 t.Fatalf("Failed to query updated user: %v", err)
653 }
654 if retrievedHandle != newHandle {
655 t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle)
656 }
657
658 // Test 2: Process identity change for non-existent user
659 // Should not error (UPDATE just affects 0 rows)
660 err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle")
661 if err != nil {
662 t.Logf("Expected cache invalidation error: %v", err)
663 }
664
665 // Test 3: Process multiple identity changes
666 handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"}
667 for _, handle := range handles {
668 err = processor.ProcessIdentity(context.Background(), testDID, handle)
669 if err != nil {
670 t.Logf("Expected cache invalidation error: %v", err)
671 }
672
673 err = db.QueryRow(`
674 SELECT handle FROM users WHERE did = ?
675 `, testDID).Scan(&retrievedHandle)
676 if err != nil {
677 t.Fatalf("Failed to query user after handle update: %v", err)
678 }
679 if retrievedHandle != handle {
680 t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle)
681 }
682 }
683}
684
685func TestProcessRecord_RoutesCorrectly(t *testing.T) {
686 db := setupTestDB(t)
687 defer db.Close()
688
689 // Add missing tables for this test
690 execStatements(t, db, `
691 CREATE TABLE repo_pages (
692 did TEXT NOT NULL,
693 repository TEXT NOT NULL,
694 description TEXT,
695 avatar_cid TEXT,
696 created_at TIMESTAMP NOT NULL,
697 updated_at TIMESTAMP NOT NULL,
698 PRIMARY KEY(did, repository)
699 );
700 CREATE TABLE hold_captain_records (
701 hold_did TEXT PRIMARY KEY,
702 owner_did TEXT NOT NULL,
703 public BOOLEAN NOT NULL,
704 allow_all_crew BOOLEAN NOT NULL,
705 deployed_at TEXT,
706 region TEXT,
707 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
708 );
709 CREATE TABLE hold_crew_members (
710 hold_did TEXT NOT NULL,
711 member_did TEXT NOT NULL,
712 rkey TEXT NOT NULL,
713 role TEXT,
714 permissions TEXT,
715 tier TEXT,
716 added_at TEXT,
717 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
718 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
719 PRIMARY KEY (hold_did, member_did)
720 );
721 `)
722
723 processor := NewProcessor(db, false, nil)
724 ctx := context.Background()
725 var err error
726
727 // Test 1: ProcessRecord routes manifest correctly
728 // Note: Schema validation may fail for io.atcr.manifest since we can't resolve the schema,
729 // but this tests the routing logic
730 manifestRecord := map[string]any{
731 "$type": "io.atcr.manifest",
732 "repository": "test-app",
733 "digest": "sha256:route123",
734 "mediaType": "application/vnd.oci.image.manifest.v1+json",
735 "schemaVersion": 2,
736 "holdDid": "did:web:hold01.atcr.io",
737 "createdAt": time.Now().Format(time.RFC3339),
738 }
739 recordBytes, _ := json.Marshal(manifestRecord)
740
741 // Note: ProcessRecord will skip validation if lexicon can't be resolved (expected in tests)
742 // and will skip EnsureUser since we don't have a real PDS to resolve
743 // Just verify the record is processed without panic
744 err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "route123", recordBytes, false, nil)
745 // Error expected since we can't resolve identity - that's fine for this test
746 if err != nil {
747 t.Logf("Expected error (can't resolve identity): %v", err)
748 }
749
750 // Test 2: ProcessRecord handles captain record without creating user
751 captainRecord := map[string]any{
752 "$type": "io.atcr.hold.captain",
753 "owner": "did:plc:owner123",
754 "public": true,
755 "allowAllCrew": false,
756 "enableBlueskyPosts": false,
757 "deployedAt": time.Now().Format(time.RFC3339),
758 }
759 captainBytes, _ := json.Marshal(captainRecord)
760
761 // This should NOT call EnsureUser (captain is a hold collection)
762 err = processor.ProcessRecord(ctx, "did:web:hold.example.com", atproto.CaptainCollection, "self", captainBytes, false, nil)
763 if err != nil {
764 t.Logf("Error processing captain (validation may fail in test): %v", err)
765 }
766
767 // Verify no user was created for the hold DID
768 var userCount int
769 err = db.QueryRow(`SELECT COUNT(*) FROM users WHERE did = ?`, "did:web:hold.example.com").Scan(&userCount)
770 if err != nil {
771 t.Fatalf("Failed to query users: %v", err)
772 }
773 if userCount != 0 {
774 t.Error("Captain record processing should NOT create a user entry for holds")
775 }
776
777 // Test 3: ProcessRecord handles delete operations
778 err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "sha256:todelete", nil, true, nil)
779 if err != nil {
780 t.Errorf("Delete should not error: %v", err)
781 }
782}
783
784func TestProcessRecord_SkipsInvalidRecords(t *testing.T) {
785 db := setupTestDB(t)
786 defer db.Close()
787
788 processor := NewProcessor(db, false, nil)
789 ctx := context.Background()
790
791 // Test: Invalid JSON should be skipped silently (no error returned)
792 invalidJSON := []byte(`{invalid json}`)
793 err := processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "test", invalidJSON, false, nil)
794 // Should return nil (skipped silently) not an error
795 if err != nil {
796 t.Errorf("Invalid record should be skipped silently, got error: %v", err)
797 }
798}
799
800func TestValidateRecord(t *testing.T) {
801 db := setupTestDB(t)
802 defer db.Close()
803
804 processor := NewProcessor(db, false, nil)
805 ctx := context.Background()
806
807 // Test 1: Manifest passes (no strict validation)
808 manifestJSON := []byte(`{"$type": "io.atcr.manifest", "repository": "test"}`)
809 err := processor.ValidateRecord(ctx, atproto.ManifestCollection, manifestJSON)
810 if err != nil {
811 t.Errorf("Manifest should pass validation: %v", err)
812 }
813
814 // Test 2: Invalid JSON returns error
815 invalidJSON := []byte(`{invalid}`)
816 err = processor.ValidateRecord(ctx, atproto.ManifestCollection, invalidJSON)
817 if err == nil {
818 t.Error("Invalid JSON should return error")
819 }
820
821 // Test 3: Captain with valid owner passes
822 captainValid := []byte(`{"owner": "did:plc:owner123", "public": true}`)
823 err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainValid)
824 if err != nil {
825 t.Errorf("Valid captain should pass: %v", err)
826 }
827
828 // Test 4: Captain with empty owner is rejected
829 captainEmpty := []byte(`{"owner": "", "public": true}`)
830 err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainEmpty)
831 if err == nil {
832 t.Error("Captain with empty owner should be rejected")
833 }
834
835 // Test 5: Captain with invalid owner (not a DID) is rejected
836 captainInvalid := []byte(`{"owner": "notadid", "public": true}`)
837 err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainInvalid)
838 if err == nil {
839 t.Error("Captain with invalid owner should be rejected")
840 }
841
842 // Test 6: Crew with valid member passes
843 crewValid := []byte(`{"member": "did:plc:member123", "role": "write"}`)
844 err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewValid)
845 if err != nil {
846 t.Errorf("Valid crew should pass: %v", err)
847 }
848
849 // Test 7: Crew with empty member is rejected
850 crewEmpty := []byte(`{"member": "", "role": "write"}`)
851 err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewEmpty)
852 if err == nil {
853 t.Error("Crew with empty member should be rejected")
854 }
855}
856
857func TestProcessAccount(t *testing.T) {
858 db := setupTestDB(t)
859 defer db.Close()
860
861 processor := NewProcessor(db, false, nil)
862
863 // Setup: Create test user
864 testDID := "did:plc:bob456"
865 testHandle := "bob.bsky.social"
866 testPDS := "https://bsky.social"
867 _, err := db.Exec(`
868 INSERT INTO users (did, handle, pds_endpoint, last_seen)
869 VALUES (?, ?, ?, ?)
870 `, testDID, testHandle, testPDS, time.Now())
871 if err != nil {
872 t.Fatalf("Failed to insert test user: %v", err)
873 }
874
875 // Test 1: Process account deactivation event
876 err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
877 // Note: Cache invalidation will fail without real directory, but that's expected
878 if err != nil {
879 t.Logf("Expected cache invalidation error (no real directory): %v", err)
880 }
881
882 // Verify user still exists in database (we don't delete on deactivation)
883 var exists bool
884 err = db.QueryRow(`
885 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
886 `, testDID).Scan(&exists)
887 if err != nil {
888 t.Fatalf("Failed to check if user exists: %v", err)
889 }
890 if !exists {
891 t.Error("User should still exist after deactivation event (no deletion)")
892 }
893
894 // Test 2: Process account with active=true (should be ignored)
895 err = processor.ProcessAccount(context.Background(), testDID, true, "active")
896 if err != nil {
897 t.Errorf("Expected no error for active account, got: %v", err)
898 }
899
900 // Test 3: Process account with status != "deactivated" (should be ignored)
901 err = processor.ProcessAccount(context.Background(), testDID, false, "suspended")
902 if err != nil {
903 t.Errorf("Expected no error for non-deactivated status, got: %v", err)
904 }
905
906 // Test 4: Process account deactivation for non-existent user
907 err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated")
908 // Cache invalidation will fail, but that's expected
909 if err != nil {
910 t.Logf("Expected cache invalidation error: %v", err)
911 }
912
913 // Test 5: Process multiple deactivation events (idempotent)
914 for i := range 3 {
915 err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
916 if err != nil {
917 t.Logf("Expected cache invalidation error on iteration %d: %v", i, err)
918 }
919 }
920
921 // User should still exist after multiple deactivations
922 err = db.QueryRow(`
923 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
924 `, testDID).Scan(&exists)
925 if err != nil {
926 t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err)
927 }
928 if !exists {
929 t.Error("User should still exist after multiple deactivation events")
930 }
931
932 // Test 6: Process account deletion - should delete user data
933 err = processor.ProcessAccount(context.Background(), testDID, false, "deleted")
934 if err != nil {
935 t.Logf("Cache invalidation error during deletion (expected): %v", err)
936 }
937
938 // User should be deleted after "deleted" status
939 err = db.QueryRow(`
940 SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
941 `, testDID).Scan(&exists)
942 if err != nil {
943 t.Fatalf("Failed to check if user exists after deletion: %v", err)
944 }
945 if exists {
946 t.Error("User should NOT exist after deletion event")
947 }
948
949 // Test 7: Process deletion for already-deleted user (idempotent)
950 err = processor.ProcessAccount(context.Background(), testDID, false, "deleted")
951 if err != nil {
952 t.Errorf("Deletion of non-existent user should not error, got: %v", err)
953 }
954}