A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/identity"
5 "Coves/internal/core/users"
6 "context"
7 "encoding/json"
8 "errors"
9 "testing"
10 "time"
11)
12
13// mockUserService is a test double for users.UserService
14type mockUserService struct {
15 users map[string]*users.User
16 updatedCalls []users.UpdateProfileInput
17 updatedDIDs []string
18 shouldFailGet bool
19 getError error
20 updateError error
21}
22
23func newMockUserService() *mockUserService {
24 return &mockUserService{
25 users: make(map[string]*users.User),
26 updatedCalls: []users.UpdateProfileInput{},
27 updatedDIDs: []string{},
28 }
29}
30
31func (m *mockUserService) CreateUser(ctx context.Context, req users.CreateUserRequest) (*users.User, error) {
32 return nil, nil
33}
34
35func (m *mockUserService) GetUserByDID(ctx context.Context, did string) (*users.User, error) {
36 if m.shouldFailGet {
37 return nil, m.getError
38 }
39 user, exists := m.users[did]
40 if !exists {
41 return nil, users.ErrUserNotFound
42 }
43 return user, nil
44}
45
46func (m *mockUserService) GetUserByHandle(ctx context.Context, handle string) (*users.User, error) {
47 return nil, nil
48}
49
50func (m *mockUserService) UpdateHandle(ctx context.Context, did, newHandle string) (*users.User, error) {
51 return nil, nil
52}
53
54func (m *mockUserService) ResolveHandleToDID(ctx context.Context, handle string) (string, error) {
55 return "", nil
56}
57
58func (m *mockUserService) RegisterAccount(ctx context.Context, req users.RegisterAccountRequest) (*users.RegisterAccountResponse, error) {
59 return nil, nil
60}
61
62func (m *mockUserService) IndexUser(ctx context.Context, did, handle, pdsURL string) error {
63 return nil
64}
65
66func (m *mockUserService) GetProfile(ctx context.Context, did string) (*users.ProfileViewDetailed, error) {
67 return nil, nil
68}
69
70func (m *mockUserService) UpdateProfile(ctx context.Context, did string, input users.UpdateProfileInput) (*users.User, error) {
71 if m.updateError != nil {
72 return nil, m.updateError
73 }
74 m.updatedCalls = append(m.updatedCalls, input)
75 m.updatedDIDs = append(m.updatedDIDs, did)
76 user := m.users[did]
77 if user == nil {
78 return nil, users.ErrUserNotFound
79 }
80 // Apply updates to mock user
81 if input.DisplayName != nil {
82 user.DisplayName = *input.DisplayName
83 }
84 if input.Bio != nil {
85 user.Bio = *input.Bio
86 }
87 if input.AvatarCID != nil {
88 user.AvatarCID = *input.AvatarCID
89 }
90 if input.BannerCID != nil {
91 user.BannerCID = *input.BannerCID
92 }
93 return user, nil
94}
95
96func (m *mockUserService) DeleteAccount(ctx context.Context, did string) error {
97 return nil
98}
99
100// mockIdentityResolverForUser is a test double for identity.Resolver
101type mockIdentityResolverForUser struct{}
102
103func (m *mockIdentityResolverForUser) Resolve(ctx context.Context, identifier string) (*identity.Identity, error) {
104 return nil, nil
105}
106
107func (m *mockIdentityResolverForUser) ResolveHandle(ctx context.Context, handle string) (string, string, error) {
108 return "", "", nil
109}
110
111func (m *mockIdentityResolverForUser) ResolveDID(ctx context.Context, did string) (*identity.DIDDocument, error) {
112 return nil, nil
113}
114
115func (m *mockIdentityResolverForUser) Purge(ctx context.Context, identifier string) error {
116 return nil
117}
118
119func TestUserConsumer_HandleProfileCommit(t *testing.T) {
120 t.Run("ignores commits for unknown collections", func(t *testing.T) {
121 mockService := newMockUserService()
122 mockResolver := &mockIdentityResolverForUser{}
123 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
124 ctx := context.Background()
125
126 // Event with a non-profile collection (e.g., social.coves.post)
127 event := &JetstreamEvent{
128 Did: "did:plc:testuser123",
129 TimeUS: time.Now().UnixMicro(),
130 Kind: "commit",
131 Commit: &CommitEvent{
132 Rev: "rev123",
133 Operation: "create",
134 Collection: "social.coves.post", // Not CovesProfileCollection
135 RKey: "post123",
136 CID: "bafy123",
137 Record: map[string]interface{}{
138 "text": "Hello world",
139 },
140 },
141 }
142
143 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
144 if err != nil {
145 t.Errorf("Expected no error for unknown collection, got: %v", err)
146 }
147
148 // Verify no UpdateProfile calls were made
149 if len(mockService.updatedCalls) != 0 {
150 t.Errorf("Expected 0 UpdateProfile calls, got %d", len(mockService.updatedCalls))
151 }
152 })
153
154 t.Run("ignores commits for users not in database", func(t *testing.T) {
155 mockService := newMockUserService()
156 // Don't add any users - the user lookup will fail
157 mockResolver := &mockIdentityResolverForUser{}
158 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
159 ctx := context.Background()
160
161 event := &JetstreamEvent{
162 Did: "did:plc:unknownuser",
163 TimeUS: time.Now().UnixMicro(),
164 Kind: "commit",
165 Commit: &CommitEvent{
166 Rev: "rev123",
167 Operation: "create",
168 Collection: CovesProfileCollection,
169 RKey: "self",
170 CID: "bafy123",
171 Record: map[string]interface{}{
172 "displayName": "Unknown User",
173 },
174 },
175 }
176
177 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
178 // Should return nil (not an error) for users not in our database
179 if err != nil {
180 t.Errorf("Expected nil error for unknown user, got: %v", err)
181 }
182
183 // Verify no UpdateProfile calls were made
184 if len(mockService.updatedCalls) != 0 {
185 t.Errorf("Expected 0 UpdateProfile calls, got %d", len(mockService.updatedCalls))
186 }
187 })
188
189 t.Run("extracts displayName from record", func(t *testing.T) {
190 mockService := newMockUserService()
191 mockService.users["did:plc:testuser"] = &users.User{
192 DID: "did:plc:testuser",
193 Handle: "testuser.bsky.social",
194 PDSURL: "https://bsky.social",
195 }
196 mockResolver := &mockIdentityResolverForUser{}
197 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
198 ctx := context.Background()
199
200 event := &JetstreamEvent{
201 Did: "did:plc:testuser",
202 TimeUS: time.Now().UnixMicro(),
203 Kind: "commit",
204 Commit: &CommitEvent{
205 Rev: "rev123",
206 Operation: "create",
207 Collection: CovesProfileCollection,
208 RKey: "self",
209 CID: "bafy123",
210 Record: map[string]interface{}{
211 "displayName": "Test Display Name",
212 },
213 },
214 }
215
216 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
217 if err != nil {
218 t.Fatalf("Expected no error, got: %v", err)
219 }
220
221 if len(mockService.updatedCalls) != 1 {
222 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls))
223 }
224
225 call := mockService.updatedCalls[0]
226 if call.DisplayName == nil || *call.DisplayName != "Test Display Name" {
227 t.Errorf("Expected displayName 'Test Display Name', got %v", call.DisplayName)
228 }
229 })
230
231 t.Run("extracts description (bio) from record", func(t *testing.T) {
232 mockService := newMockUserService()
233 mockService.users["did:plc:testuser"] = &users.User{
234 DID: "did:plc:testuser",
235 Handle: "testuser.bsky.social",
236 PDSURL: "https://bsky.social",
237 }
238 mockResolver := &mockIdentityResolverForUser{}
239 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
240 ctx := context.Background()
241
242 event := &JetstreamEvent{
243 Did: "did:plc:testuser",
244 TimeUS: time.Now().UnixMicro(),
245 Kind: "commit",
246 Commit: &CommitEvent{
247 Rev: "rev123",
248 Operation: "create",
249 Collection: CovesProfileCollection,
250 RKey: "self",
251 CID: "bafy123",
252 Record: map[string]interface{}{
253 "description": "This is my bio",
254 },
255 },
256 }
257
258 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
259 if err != nil {
260 t.Fatalf("Expected no error, got: %v", err)
261 }
262
263 if len(mockService.updatedCalls) != 1 {
264 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls))
265 }
266
267 call := mockService.updatedCalls[0]
268 if call.Bio == nil || *call.Bio != "This is my bio" {
269 t.Errorf("Expected bio 'This is my bio', got %v", call.Bio)
270 }
271 })
272
273 t.Run("extracts avatar CID from blob ref structure", func(t *testing.T) {
274 mockService := newMockUserService()
275 mockService.users["did:plc:testuser"] = &users.User{
276 DID: "did:plc:testuser",
277 Handle: "testuser.bsky.social",
278 PDSURL: "https://bsky.social",
279 }
280 mockResolver := &mockIdentityResolverForUser{}
281 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
282 ctx := context.Background()
283
284 event := &JetstreamEvent{
285 Did: "did:plc:testuser",
286 TimeUS: time.Now().UnixMicro(),
287 Kind: "commit",
288 Commit: &CommitEvent{
289 Rev: "rev123",
290 Operation: "create",
291 Collection: CovesProfileCollection,
292 RKey: "self",
293 CID: "bafy123",
294 Record: map[string]interface{}{
295 "avatar": map[string]interface{}{
296 "$type": "blob",
297 "ref": map[string]interface{}{"$link": "bafkavatar123"},
298 "mimeType": "image/jpeg",
299 "size": float64(12345),
300 },
301 },
302 },
303 }
304
305 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
306 if err != nil {
307 t.Fatalf("Expected no error, got: %v", err)
308 }
309
310 if len(mockService.updatedCalls) != 1 {
311 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls))
312 }
313
314 call := mockService.updatedCalls[0]
315 if call.AvatarCID == nil || *call.AvatarCID != "bafkavatar123" {
316 t.Errorf("Expected avatar CID 'bafkavatar123', got %v", call.AvatarCID)
317 }
318 })
319
320 t.Run("extracts banner CID from blob ref structure", func(t *testing.T) {
321 mockService := newMockUserService()
322 mockService.users["did:plc:testuser"] = &users.User{
323 DID: "did:plc:testuser",
324 Handle: "testuser.bsky.social",
325 PDSURL: "https://bsky.social",
326 }
327 mockResolver := &mockIdentityResolverForUser{}
328 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
329 ctx := context.Background()
330
331 event := &JetstreamEvent{
332 Did: "did:plc:testuser",
333 TimeUS: time.Now().UnixMicro(),
334 Kind: "commit",
335 Commit: &CommitEvent{
336 Rev: "rev123",
337 Operation: "create",
338 Collection: CovesProfileCollection,
339 RKey: "self",
340 CID: "bafy123",
341 Record: map[string]interface{}{
342 "banner": map[string]interface{}{
343 "$type": "blob",
344 "ref": map[string]interface{}{"$link": "bafkbanner456"},
345 "mimeType": "image/png",
346 "size": float64(54321),
347 },
348 },
349 },
350 }
351
352 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
353 if err != nil {
354 t.Fatalf("Expected no error, got: %v", err)
355 }
356
357 if len(mockService.updatedCalls) != 1 {
358 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls))
359 }
360
361 call := mockService.updatedCalls[0]
362 if call.BannerCID == nil || *call.BannerCID != "bafkbanner456" {
363 t.Errorf("Expected banner CID 'bafkbanner456', got %v", call.BannerCID)
364 }
365 })
366
367 t.Run("extracts all profile fields together", func(t *testing.T) {
368 mockService := newMockUserService()
369 mockService.users["did:plc:testuser"] = &users.User{
370 DID: "did:plc:testuser",
371 Handle: "testuser.bsky.social",
372 PDSURL: "https://bsky.social",
373 }
374 mockResolver := &mockIdentityResolverForUser{}
375 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
376 ctx := context.Background()
377
378 event := &JetstreamEvent{
379 Did: "did:plc:testuser",
380 TimeUS: time.Now().UnixMicro(),
381 Kind: "commit",
382 Commit: &CommitEvent{
383 Rev: "rev123",
384 Operation: "create",
385 Collection: CovesProfileCollection,
386 RKey: "self",
387 CID: "bafy123",
388 Record: map[string]interface{}{
389 "displayName": "Full Profile User",
390 "description": "A complete bio",
391 "avatar": map[string]interface{}{
392 "$type": "blob",
393 "ref": map[string]interface{}{"$link": "bafkfullav123"},
394 "mimeType": "image/jpeg",
395 "size": float64(10000),
396 },
397 "banner": map[string]interface{}{
398 "$type": "blob",
399 "ref": map[string]interface{}{"$link": "bafkfullbn456"},
400 "mimeType": "image/png",
401 "size": float64(20000),
402 },
403 },
404 },
405 }
406
407 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
408 if err != nil {
409 t.Fatalf("Expected no error, got: %v", err)
410 }
411
412 if len(mockService.updatedCalls) != 1 {
413 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls))
414 }
415
416 call := mockService.updatedCalls[0]
417 if call.DisplayName == nil || *call.DisplayName != "Full Profile User" {
418 t.Errorf("Expected displayName 'Full Profile User', got %v", call.DisplayName)
419 }
420 if call.Bio == nil || *call.Bio != "A complete bio" {
421 t.Errorf("Expected bio 'A complete bio', got %v", call.Bio)
422 }
423 if call.AvatarCID == nil || *call.AvatarCID != "bafkfullav123" {
424 t.Errorf("Expected avatar CID 'bafkfullav123', got %v", call.AvatarCID)
425 }
426 if call.BannerCID == nil || *call.BannerCID != "bafkfullbn456" {
427 t.Errorf("Expected banner CID 'bafkfullbn456', got %v", call.BannerCID)
428 }
429 })
430
431 t.Run("handles delete operation by clearing profile fields", func(t *testing.T) {
432 mockService := newMockUserService()
433 mockService.users["did:plc:testuser"] = &users.User{
434 DID: "did:plc:testuser",
435 Handle: "testuser.bsky.social",
436 PDSURL: "https://bsky.social",
437 DisplayName: "Existing Name",
438 Bio: "Existing Bio",
439 AvatarCID: "existingavatar",
440 BannerCID: "existingbanner",
441 }
442 mockResolver := &mockIdentityResolverForUser{}
443 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
444 ctx := context.Background()
445
446 event := &JetstreamEvent{
447 Did: "did:plc:testuser",
448 TimeUS: time.Now().UnixMicro(),
449 Kind: "commit",
450 Commit: &CommitEvent{
451 Rev: "rev123",
452 Operation: "delete",
453 Collection: CovesProfileCollection,
454 RKey: "self",
455 },
456 }
457
458 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
459 if err != nil {
460 t.Fatalf("Expected no error, got: %v", err)
461 }
462
463 if len(mockService.updatedCalls) != 1 {
464 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls))
465 }
466
467 call := mockService.updatedCalls[0]
468 // Delete should pass empty strings to clear fields
469 if call.DisplayName == nil || *call.DisplayName != "" {
470 t.Errorf("Expected empty displayName for delete, got %v", call.DisplayName)
471 }
472 if call.Bio == nil || *call.Bio != "" {
473 t.Errorf("Expected empty bio for delete, got %v", call.Bio)
474 }
475 if call.AvatarCID == nil || *call.AvatarCID != "" {
476 t.Errorf("Expected empty avatar CID for delete, got %v", call.AvatarCID)
477 }
478 if call.BannerCID == nil || *call.BannerCID != "" {
479 t.Errorf("Expected empty banner CID for delete, got %v", call.BannerCID)
480 }
481 })
482
483 t.Run("handles update operation same as create", func(t *testing.T) {
484 mockService := newMockUserService()
485 mockService.users["did:plc:testuser"] = &users.User{
486 DID: "did:plc:testuser",
487 Handle: "testuser.bsky.social",
488 PDSURL: "https://bsky.social",
489 DisplayName: "Old Name",
490 }
491 mockResolver := &mockIdentityResolverForUser{}
492 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
493 ctx := context.Background()
494
495 event := &JetstreamEvent{
496 Did: "did:plc:testuser",
497 TimeUS: time.Now().UnixMicro(),
498 Kind: "commit",
499 Commit: &CommitEvent{
500 Rev: "rev124",
501 Operation: "update", // Update operation instead of create
502 Collection: CovesProfileCollection,
503 RKey: "self",
504 CID: "bafy456",
505 Record: map[string]interface{}{
506 "displayName": "Updated Name",
507 "description": "Updated bio",
508 },
509 },
510 }
511
512 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
513 if err != nil {
514 t.Fatalf("Expected no error, got: %v", err)
515 }
516
517 if len(mockService.updatedCalls) != 1 {
518 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls))
519 }
520
521 call := mockService.updatedCalls[0]
522 if call.DisplayName == nil || *call.DisplayName != "Updated Name" {
523 t.Errorf("Expected displayName 'Updated Name', got %v", call.DisplayName)
524 }
525 if call.Bio == nil || *call.Bio != "Updated bio" {
526 t.Errorf("Expected bio 'Updated bio', got %v", call.Bio)
527 }
528 })
529
530 t.Run("propagates database errors from GetUserByDID", func(t *testing.T) {
531 mockService := newMockUserService()
532 mockService.shouldFailGet = true
533 mockService.getError = errors.New("database connection error")
534 mockResolver := &mockIdentityResolverForUser{}
535 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
536 ctx := context.Background()
537
538 event := &JetstreamEvent{
539 Did: "did:plc:testuser",
540 TimeUS: time.Now().UnixMicro(),
541 Kind: "commit",
542 Commit: &CommitEvent{
543 Rev: "rev123",
544 Operation: "create",
545 Collection: CovesProfileCollection,
546 RKey: "self",
547 CID: "bafy123",
548 Record: map[string]interface{}{
549 "displayName": "Test User",
550 },
551 },
552 }
553
554 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
555 if err == nil {
556 t.Fatal("Expected error for database failure, got nil")
557 }
558 if !errors.Is(err, mockService.getError) && err.Error() != "failed to check if user exists: database connection error" {
559 t.Errorf("Expected wrapped database error, got: %v", err)
560 }
561 })
562
563 t.Run("handles nil commit gracefully", func(t *testing.T) {
564 mockService := newMockUserService()
565 mockResolver := &mockIdentityResolverForUser{}
566 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
567 ctx := context.Background()
568
569 event := &JetstreamEvent{
570 Did: "did:plc:testuser",
571 TimeUS: time.Now().UnixMicro(),
572 Kind: "commit",
573 Commit: nil, // No commit data
574 }
575
576 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
577 if err != nil {
578 t.Errorf("Expected no error for nil commit, got: %v", err)
579 }
580 })
581
582 t.Run("handles nil record in commit gracefully", func(t *testing.T) {
583 mockService := newMockUserService()
584 mockService.users["did:plc:testuser"] = &users.User{
585 DID: "did:plc:testuser",
586 Handle: "testuser.bsky.social",
587 PDSURL: "https://bsky.social",
588 }
589 mockResolver := &mockIdentityResolverForUser{}
590 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
591 ctx := context.Background()
592
593 event := &JetstreamEvent{
594 Did: "did:plc:testuser",
595 TimeUS: time.Now().UnixMicro(),
596 Kind: "commit",
597 Commit: &CommitEvent{
598 Rev: "rev123",
599 Operation: "create",
600 Collection: CovesProfileCollection,
601 RKey: "self",
602 CID: "bafy123",
603 Record: nil, // No record data
604 },
605 }
606
607 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
608 if err != nil {
609 t.Errorf("Expected no error for nil record, got: %v", err)
610 }
611 })
612
613 t.Run("handles invalid blob structure gracefully", func(t *testing.T) {
614 mockService := newMockUserService()
615 mockService.users["did:plc:testuser"] = &users.User{
616 DID: "did:plc:testuser",
617 Handle: "testuser.bsky.social",
618 PDSURL: "https://bsky.social",
619 }
620 mockResolver := &mockIdentityResolverForUser{}
621 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
622 ctx := context.Background()
623
624 event := &JetstreamEvent{
625 Did: "did:plc:testuser",
626 TimeUS: time.Now().UnixMicro(),
627 Kind: "commit",
628 Commit: &CommitEvent{
629 Rev: "rev123",
630 Operation: "create",
631 Collection: CovesProfileCollection,
632 RKey: "self",
633 CID: "bafy123",
634 Record: map[string]interface{}{
635 "displayName": "Test User",
636 "avatar": map[string]interface{}{
637 "$type": "not-a-blob", // Invalid type
638 },
639 "banner": "not-a-map", // Invalid structure
640 },
641 },
642 }
643
644 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
645 if err != nil {
646 t.Fatalf("Expected no error, got: %v", err)
647 }
648
649 if len(mockService.updatedCalls) != 1 {
650 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls))
651 }
652
653 call := mockService.updatedCalls[0]
654 // displayName should be extracted
655 if call.DisplayName == nil || *call.DisplayName != "Test User" {
656 t.Errorf("Expected displayName 'Test User', got %v", call.DisplayName)
657 }
658 // Avatar and banner should be nil (not extracted due to invalid structure)
659 if call.AvatarCID != nil {
660 t.Errorf("Expected nil avatar CID for invalid blob, got %v", call.AvatarCID)
661 }
662 if call.BannerCID != nil {
663 t.Errorf("Expected nil banner CID for invalid structure, got %v", call.BannerCID)
664 }
665 })
666}
667
668func TestUserConsumer_PropagatesUpdateProfileError(t *testing.T) {
669 t.Run("propagates_database_errors_from_UpdateProfile", func(t *testing.T) {
670 mockService := newMockUserService()
671 mockService.users["did:plc:testuser"] = &users.User{
672 DID: "did:plc:testuser",
673 Handle: "testuser.bsky.social",
674 PDSURL: "https://bsky.social",
675 }
676 mockService.updateError = errors.New("database write error")
677 mockResolver := &mockIdentityResolverForUser{}
678 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "")
679 ctx := context.Background()
680
681 event := &JetstreamEvent{
682 Did: "did:plc:testuser",
683 TimeUS: time.Now().UnixMicro(),
684 Kind: "commit",
685 Commit: &CommitEvent{
686 Rev: "rev123",
687 Operation: "create",
688 Collection: CovesProfileCollection,
689 RKey: "self",
690 CID: "bafy123",
691 Record: map[string]interface{}{
692 "displayName": "Test User",
693 },
694 },
695 }
696
697 err := consumer.handleEvent(ctx, mustMarshalEvent(event))
698 if err == nil {
699 t.Fatal("Expected error for UpdateProfile failure, got nil")
700 }
701 if !errors.Is(err, mockService.updateError) && err.Error() != "failed to update user profile: database write error" {
702 t.Errorf("Expected wrapped database error, got: %v", err)
703 }
704 })
705}
706
707func TestExtractBlobCID(t *testing.T) {
708 t.Run("extracts CID from valid blob structure", func(t *testing.T) {
709 blob := map[string]interface{}{
710 "$type": "blob",
711 "ref": map[string]interface{}{"$link": "bafktest123"},
712 "mimeType": "image/jpeg",
713 "size": float64(12345),
714 }
715
716 cid, ok := extractBlobCID(blob)
717 if !ok {
718 t.Fatal("Expected successful extraction")
719 }
720 if cid != "bafktest123" {
721 t.Errorf("Expected CID 'bafktest123', got '%s'", cid)
722 }
723 })
724
725 t.Run("returns false for nil blob", func(t *testing.T) {
726 cid, ok := extractBlobCID(nil)
727 if ok {
728 t.Error("Expected false for nil blob")
729 }
730 if cid != "" {
731 t.Errorf("Expected empty CID for nil blob, got '%s'", cid)
732 }
733 })
734
735 t.Run("returns false for wrong $type", func(t *testing.T) {
736 blob := map[string]interface{}{
737 "$type": "image",
738 "ref": map[string]interface{}{"$link": "bafktest123"},
739 }
740
741 cid, ok := extractBlobCID(blob)
742 if ok {
743 t.Error("Expected false for wrong $type")
744 }
745 if cid != "" {
746 t.Errorf("Expected empty CID for wrong type, got '%s'", cid)
747 }
748 })
749
750 t.Run("returns false for missing $type", func(t *testing.T) {
751 blob := map[string]interface{}{
752 "ref": map[string]interface{}{"$link": "bafktest123"},
753 }
754
755 cid, ok := extractBlobCID(blob)
756 if ok {
757 t.Error("Expected false for missing $type")
758 }
759 if cid != "" {
760 t.Errorf("Expected empty CID for missing type, got '%s'", cid)
761 }
762 })
763
764 t.Run("returns false for missing ref", func(t *testing.T) {
765 blob := map[string]interface{}{
766 "$type": "blob",
767 }
768
769 cid, ok := extractBlobCID(blob)
770 if ok {
771 t.Error("Expected false for missing ref")
772 }
773 if cid != "" {
774 t.Errorf("Expected empty CID for missing ref, got '%s'", cid)
775 }
776 })
777
778 t.Run("returns false for missing $link", func(t *testing.T) {
779 blob := map[string]interface{}{
780 "$type": "blob",
781 "ref": map[string]interface{}{},
782 }
783
784 cid, ok := extractBlobCID(blob)
785 if ok {
786 t.Error("Expected false for missing $link")
787 }
788 if cid != "" {
789 t.Errorf("Expected empty CID for missing link, got '%s'", cid)
790 }
791 })
792
793 t.Run("returns false for non-map ref", func(t *testing.T) {
794 blob := map[string]interface{}{
795 "$type": "blob",
796 "ref": "not-a-map",
797 }
798
799 cid, ok := extractBlobCID(blob)
800 if ok {
801 t.Error("Expected false for non-map ref")
802 }
803 if cid != "" {
804 t.Errorf("Expected empty CID for non-map ref, got '%s'", cid)
805 }
806 })
807}
808
809// mustMarshalEvent marshals an event to JSON bytes for testing
810func mustMarshalEvent(event *JetstreamEvent) []byte {
811 data, err := json.Marshal(event)
812 if err != nil {
813 panic(err)
814 }
815 return data
816}