A community based topic aggregation platform built on atproto
at main 816 lines 24 kB view raw
1package jetstream 2 3import ( 4 "Coves/internal/atproto/identity" 5 "Coves/internal/core/users" 6 "context" 7 "encoding/json" 8 "errors" 9 "testing" 10 "time" 11) 12 13// mockUserService is a test double for users.UserService 14type mockUserService struct { 15 users map[string]*users.User 16 updatedCalls []users.UpdateProfileInput 17 updatedDIDs []string 18 shouldFailGet bool 19 getError error 20 updateError error 21} 22 23func newMockUserService() *mockUserService { 24 return &mockUserService{ 25 users: make(map[string]*users.User), 26 updatedCalls: []users.UpdateProfileInput{}, 27 updatedDIDs: []string{}, 28 } 29} 30 31func (m *mockUserService) CreateUser(ctx context.Context, req users.CreateUserRequest) (*users.User, error) { 32 return nil, nil 33} 34 35func (m *mockUserService) GetUserByDID(ctx context.Context, did string) (*users.User, error) { 36 if m.shouldFailGet { 37 return nil, m.getError 38 } 39 user, exists := m.users[did] 40 if !exists { 41 return nil, users.ErrUserNotFound 42 } 43 return user, nil 44} 45 46func (m *mockUserService) GetUserByHandle(ctx context.Context, handle string) (*users.User, error) { 47 return nil, nil 48} 49 50func (m *mockUserService) UpdateHandle(ctx context.Context, did, newHandle string) (*users.User, error) { 51 return nil, nil 52} 53 54func (m *mockUserService) ResolveHandleToDID(ctx context.Context, handle string) (string, error) { 55 return "", nil 56} 57 58func (m *mockUserService) RegisterAccount(ctx context.Context, req users.RegisterAccountRequest) (*users.RegisterAccountResponse, error) { 59 return nil, nil 60} 61 62func (m *mockUserService) IndexUser(ctx context.Context, did, handle, pdsURL string) error { 63 return nil 64} 65 66func (m *mockUserService) GetProfile(ctx context.Context, did string) (*users.ProfileViewDetailed, error) { 67 return nil, nil 68} 69 70func (m *mockUserService) UpdateProfile(ctx context.Context, did string, input users.UpdateProfileInput) (*users.User, error) { 71 if m.updateError != nil { 72 return nil, m.updateError 73 } 74 m.updatedCalls = append(m.updatedCalls, input) 75 m.updatedDIDs = append(m.updatedDIDs, did) 76 user := m.users[did] 77 if user == nil { 78 return nil, users.ErrUserNotFound 79 } 80 // Apply updates to mock user 81 if input.DisplayName != nil { 82 user.DisplayName = *input.DisplayName 83 } 84 if input.Bio != nil { 85 user.Bio = *input.Bio 86 } 87 if input.AvatarCID != nil { 88 user.AvatarCID = *input.AvatarCID 89 } 90 if input.BannerCID != nil { 91 user.BannerCID = *input.BannerCID 92 } 93 return user, nil 94} 95 96func (m *mockUserService) DeleteAccount(ctx context.Context, did string) error { 97 return nil 98} 99 100// mockIdentityResolverForUser is a test double for identity.Resolver 101type mockIdentityResolverForUser struct{} 102 103func (m *mockIdentityResolverForUser) Resolve(ctx context.Context, identifier string) (*identity.Identity, error) { 104 return nil, nil 105} 106 107func (m *mockIdentityResolverForUser) ResolveHandle(ctx context.Context, handle string) (string, string, error) { 108 return "", "", nil 109} 110 111func (m *mockIdentityResolverForUser) ResolveDID(ctx context.Context, did string) (*identity.DIDDocument, error) { 112 return nil, nil 113} 114 115func (m *mockIdentityResolverForUser) Purge(ctx context.Context, identifier string) error { 116 return nil 117} 118 119func TestUserConsumer_HandleProfileCommit(t *testing.T) { 120 t.Run("ignores commits for unknown collections", func(t *testing.T) { 121 mockService := newMockUserService() 122 mockResolver := &mockIdentityResolverForUser{} 123 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 124 ctx := context.Background() 125 126 // Event with a non-profile collection (e.g., social.coves.post) 127 event := &JetstreamEvent{ 128 Did: "did:plc:testuser123", 129 TimeUS: time.Now().UnixMicro(), 130 Kind: "commit", 131 Commit: &CommitEvent{ 132 Rev: "rev123", 133 Operation: "create", 134 Collection: "social.coves.post", // Not CovesProfileCollection 135 RKey: "post123", 136 CID: "bafy123", 137 Record: map[string]interface{}{ 138 "text": "Hello world", 139 }, 140 }, 141 } 142 143 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 144 if err != nil { 145 t.Errorf("Expected no error for unknown collection, got: %v", err) 146 } 147 148 // Verify no UpdateProfile calls were made 149 if len(mockService.updatedCalls) != 0 { 150 t.Errorf("Expected 0 UpdateProfile calls, got %d", len(mockService.updatedCalls)) 151 } 152 }) 153 154 t.Run("ignores commits for users not in database", func(t *testing.T) { 155 mockService := newMockUserService() 156 // Don't add any users - the user lookup will fail 157 mockResolver := &mockIdentityResolverForUser{} 158 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 159 ctx := context.Background() 160 161 event := &JetstreamEvent{ 162 Did: "did:plc:unknownuser", 163 TimeUS: time.Now().UnixMicro(), 164 Kind: "commit", 165 Commit: &CommitEvent{ 166 Rev: "rev123", 167 Operation: "create", 168 Collection: CovesProfileCollection, 169 RKey: "self", 170 CID: "bafy123", 171 Record: map[string]interface{}{ 172 "displayName": "Unknown User", 173 }, 174 }, 175 } 176 177 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 178 // Should return nil (not an error) for users not in our database 179 if err != nil { 180 t.Errorf("Expected nil error for unknown user, got: %v", err) 181 } 182 183 // Verify no UpdateProfile calls were made 184 if len(mockService.updatedCalls) != 0 { 185 t.Errorf("Expected 0 UpdateProfile calls, got %d", len(mockService.updatedCalls)) 186 } 187 }) 188 189 t.Run("extracts displayName from record", func(t *testing.T) { 190 mockService := newMockUserService() 191 mockService.users["did:plc:testuser"] = &users.User{ 192 DID: "did:plc:testuser", 193 Handle: "testuser.bsky.social", 194 PDSURL: "https://bsky.social", 195 } 196 mockResolver := &mockIdentityResolverForUser{} 197 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 198 ctx := context.Background() 199 200 event := &JetstreamEvent{ 201 Did: "did:plc:testuser", 202 TimeUS: time.Now().UnixMicro(), 203 Kind: "commit", 204 Commit: &CommitEvent{ 205 Rev: "rev123", 206 Operation: "create", 207 Collection: CovesProfileCollection, 208 RKey: "self", 209 CID: "bafy123", 210 Record: map[string]interface{}{ 211 "displayName": "Test Display Name", 212 }, 213 }, 214 } 215 216 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 217 if err != nil { 218 t.Fatalf("Expected no error, got: %v", err) 219 } 220 221 if len(mockService.updatedCalls) != 1 { 222 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls)) 223 } 224 225 call := mockService.updatedCalls[0] 226 if call.DisplayName == nil || *call.DisplayName != "Test Display Name" { 227 t.Errorf("Expected displayName 'Test Display Name', got %v", call.DisplayName) 228 } 229 }) 230 231 t.Run("extracts description (bio) from record", func(t *testing.T) { 232 mockService := newMockUserService() 233 mockService.users["did:plc:testuser"] = &users.User{ 234 DID: "did:plc:testuser", 235 Handle: "testuser.bsky.social", 236 PDSURL: "https://bsky.social", 237 } 238 mockResolver := &mockIdentityResolverForUser{} 239 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 240 ctx := context.Background() 241 242 event := &JetstreamEvent{ 243 Did: "did:plc:testuser", 244 TimeUS: time.Now().UnixMicro(), 245 Kind: "commit", 246 Commit: &CommitEvent{ 247 Rev: "rev123", 248 Operation: "create", 249 Collection: CovesProfileCollection, 250 RKey: "self", 251 CID: "bafy123", 252 Record: map[string]interface{}{ 253 "description": "This is my bio", 254 }, 255 }, 256 } 257 258 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 259 if err != nil { 260 t.Fatalf("Expected no error, got: %v", err) 261 } 262 263 if len(mockService.updatedCalls) != 1 { 264 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls)) 265 } 266 267 call := mockService.updatedCalls[0] 268 if call.Bio == nil || *call.Bio != "This is my bio" { 269 t.Errorf("Expected bio 'This is my bio', got %v", call.Bio) 270 } 271 }) 272 273 t.Run("extracts avatar CID from blob ref structure", func(t *testing.T) { 274 mockService := newMockUserService() 275 mockService.users["did:plc:testuser"] = &users.User{ 276 DID: "did:plc:testuser", 277 Handle: "testuser.bsky.social", 278 PDSURL: "https://bsky.social", 279 } 280 mockResolver := &mockIdentityResolverForUser{} 281 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 282 ctx := context.Background() 283 284 event := &JetstreamEvent{ 285 Did: "did:plc:testuser", 286 TimeUS: time.Now().UnixMicro(), 287 Kind: "commit", 288 Commit: &CommitEvent{ 289 Rev: "rev123", 290 Operation: "create", 291 Collection: CovesProfileCollection, 292 RKey: "self", 293 CID: "bafy123", 294 Record: map[string]interface{}{ 295 "avatar": map[string]interface{}{ 296 "$type": "blob", 297 "ref": map[string]interface{}{"$link": "bafkavatar123"}, 298 "mimeType": "image/jpeg", 299 "size": float64(12345), 300 }, 301 }, 302 }, 303 } 304 305 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 306 if err != nil { 307 t.Fatalf("Expected no error, got: %v", err) 308 } 309 310 if len(mockService.updatedCalls) != 1 { 311 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls)) 312 } 313 314 call := mockService.updatedCalls[0] 315 if call.AvatarCID == nil || *call.AvatarCID != "bafkavatar123" { 316 t.Errorf("Expected avatar CID 'bafkavatar123', got %v", call.AvatarCID) 317 } 318 }) 319 320 t.Run("extracts banner CID from blob ref structure", func(t *testing.T) { 321 mockService := newMockUserService() 322 mockService.users["did:plc:testuser"] = &users.User{ 323 DID: "did:plc:testuser", 324 Handle: "testuser.bsky.social", 325 PDSURL: "https://bsky.social", 326 } 327 mockResolver := &mockIdentityResolverForUser{} 328 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 329 ctx := context.Background() 330 331 event := &JetstreamEvent{ 332 Did: "did:plc:testuser", 333 TimeUS: time.Now().UnixMicro(), 334 Kind: "commit", 335 Commit: &CommitEvent{ 336 Rev: "rev123", 337 Operation: "create", 338 Collection: CovesProfileCollection, 339 RKey: "self", 340 CID: "bafy123", 341 Record: map[string]interface{}{ 342 "banner": map[string]interface{}{ 343 "$type": "blob", 344 "ref": map[string]interface{}{"$link": "bafkbanner456"}, 345 "mimeType": "image/png", 346 "size": float64(54321), 347 }, 348 }, 349 }, 350 } 351 352 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 353 if err != nil { 354 t.Fatalf("Expected no error, got: %v", err) 355 } 356 357 if len(mockService.updatedCalls) != 1 { 358 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls)) 359 } 360 361 call := mockService.updatedCalls[0] 362 if call.BannerCID == nil || *call.BannerCID != "bafkbanner456" { 363 t.Errorf("Expected banner CID 'bafkbanner456', got %v", call.BannerCID) 364 } 365 }) 366 367 t.Run("extracts all profile fields together", func(t *testing.T) { 368 mockService := newMockUserService() 369 mockService.users["did:plc:testuser"] = &users.User{ 370 DID: "did:plc:testuser", 371 Handle: "testuser.bsky.social", 372 PDSURL: "https://bsky.social", 373 } 374 mockResolver := &mockIdentityResolverForUser{} 375 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 376 ctx := context.Background() 377 378 event := &JetstreamEvent{ 379 Did: "did:plc:testuser", 380 TimeUS: time.Now().UnixMicro(), 381 Kind: "commit", 382 Commit: &CommitEvent{ 383 Rev: "rev123", 384 Operation: "create", 385 Collection: CovesProfileCollection, 386 RKey: "self", 387 CID: "bafy123", 388 Record: map[string]interface{}{ 389 "displayName": "Full Profile User", 390 "description": "A complete bio", 391 "avatar": map[string]interface{}{ 392 "$type": "blob", 393 "ref": map[string]interface{}{"$link": "bafkfullav123"}, 394 "mimeType": "image/jpeg", 395 "size": float64(10000), 396 }, 397 "banner": map[string]interface{}{ 398 "$type": "blob", 399 "ref": map[string]interface{}{"$link": "bafkfullbn456"}, 400 "mimeType": "image/png", 401 "size": float64(20000), 402 }, 403 }, 404 }, 405 } 406 407 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 408 if err != nil { 409 t.Fatalf("Expected no error, got: %v", err) 410 } 411 412 if len(mockService.updatedCalls) != 1 { 413 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls)) 414 } 415 416 call := mockService.updatedCalls[0] 417 if call.DisplayName == nil || *call.DisplayName != "Full Profile User" { 418 t.Errorf("Expected displayName 'Full Profile User', got %v", call.DisplayName) 419 } 420 if call.Bio == nil || *call.Bio != "A complete bio" { 421 t.Errorf("Expected bio 'A complete bio', got %v", call.Bio) 422 } 423 if call.AvatarCID == nil || *call.AvatarCID != "bafkfullav123" { 424 t.Errorf("Expected avatar CID 'bafkfullav123', got %v", call.AvatarCID) 425 } 426 if call.BannerCID == nil || *call.BannerCID != "bafkfullbn456" { 427 t.Errorf("Expected banner CID 'bafkfullbn456', got %v", call.BannerCID) 428 } 429 }) 430 431 t.Run("handles delete operation by clearing profile fields", func(t *testing.T) { 432 mockService := newMockUserService() 433 mockService.users["did:plc:testuser"] = &users.User{ 434 DID: "did:plc:testuser", 435 Handle: "testuser.bsky.social", 436 PDSURL: "https://bsky.social", 437 DisplayName: "Existing Name", 438 Bio: "Existing Bio", 439 AvatarCID: "existingavatar", 440 BannerCID: "existingbanner", 441 } 442 mockResolver := &mockIdentityResolverForUser{} 443 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 444 ctx := context.Background() 445 446 event := &JetstreamEvent{ 447 Did: "did:plc:testuser", 448 TimeUS: time.Now().UnixMicro(), 449 Kind: "commit", 450 Commit: &CommitEvent{ 451 Rev: "rev123", 452 Operation: "delete", 453 Collection: CovesProfileCollection, 454 RKey: "self", 455 }, 456 } 457 458 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 459 if err != nil { 460 t.Fatalf("Expected no error, got: %v", err) 461 } 462 463 if len(mockService.updatedCalls) != 1 { 464 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls)) 465 } 466 467 call := mockService.updatedCalls[0] 468 // Delete should pass empty strings to clear fields 469 if call.DisplayName == nil || *call.DisplayName != "" { 470 t.Errorf("Expected empty displayName for delete, got %v", call.DisplayName) 471 } 472 if call.Bio == nil || *call.Bio != "" { 473 t.Errorf("Expected empty bio for delete, got %v", call.Bio) 474 } 475 if call.AvatarCID == nil || *call.AvatarCID != "" { 476 t.Errorf("Expected empty avatar CID for delete, got %v", call.AvatarCID) 477 } 478 if call.BannerCID == nil || *call.BannerCID != "" { 479 t.Errorf("Expected empty banner CID for delete, got %v", call.BannerCID) 480 } 481 }) 482 483 t.Run("handles update operation same as create", func(t *testing.T) { 484 mockService := newMockUserService() 485 mockService.users["did:plc:testuser"] = &users.User{ 486 DID: "did:plc:testuser", 487 Handle: "testuser.bsky.social", 488 PDSURL: "https://bsky.social", 489 DisplayName: "Old Name", 490 } 491 mockResolver := &mockIdentityResolverForUser{} 492 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 493 ctx := context.Background() 494 495 event := &JetstreamEvent{ 496 Did: "did:plc:testuser", 497 TimeUS: time.Now().UnixMicro(), 498 Kind: "commit", 499 Commit: &CommitEvent{ 500 Rev: "rev124", 501 Operation: "update", // Update operation instead of create 502 Collection: CovesProfileCollection, 503 RKey: "self", 504 CID: "bafy456", 505 Record: map[string]interface{}{ 506 "displayName": "Updated Name", 507 "description": "Updated bio", 508 }, 509 }, 510 } 511 512 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 513 if err != nil { 514 t.Fatalf("Expected no error, got: %v", err) 515 } 516 517 if len(mockService.updatedCalls) != 1 { 518 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls)) 519 } 520 521 call := mockService.updatedCalls[0] 522 if call.DisplayName == nil || *call.DisplayName != "Updated Name" { 523 t.Errorf("Expected displayName 'Updated Name', got %v", call.DisplayName) 524 } 525 if call.Bio == nil || *call.Bio != "Updated bio" { 526 t.Errorf("Expected bio 'Updated bio', got %v", call.Bio) 527 } 528 }) 529 530 t.Run("propagates database errors from GetUserByDID", func(t *testing.T) { 531 mockService := newMockUserService() 532 mockService.shouldFailGet = true 533 mockService.getError = errors.New("database connection error") 534 mockResolver := &mockIdentityResolverForUser{} 535 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 536 ctx := context.Background() 537 538 event := &JetstreamEvent{ 539 Did: "did:plc:testuser", 540 TimeUS: time.Now().UnixMicro(), 541 Kind: "commit", 542 Commit: &CommitEvent{ 543 Rev: "rev123", 544 Operation: "create", 545 Collection: CovesProfileCollection, 546 RKey: "self", 547 CID: "bafy123", 548 Record: map[string]interface{}{ 549 "displayName": "Test User", 550 }, 551 }, 552 } 553 554 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 555 if err == nil { 556 t.Fatal("Expected error for database failure, got nil") 557 } 558 if !errors.Is(err, mockService.getError) && err.Error() != "failed to check if user exists: database connection error" { 559 t.Errorf("Expected wrapped database error, got: %v", err) 560 } 561 }) 562 563 t.Run("handles nil commit gracefully", func(t *testing.T) { 564 mockService := newMockUserService() 565 mockResolver := &mockIdentityResolverForUser{} 566 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 567 ctx := context.Background() 568 569 event := &JetstreamEvent{ 570 Did: "did:plc:testuser", 571 TimeUS: time.Now().UnixMicro(), 572 Kind: "commit", 573 Commit: nil, // No commit data 574 } 575 576 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 577 if err != nil { 578 t.Errorf("Expected no error for nil commit, got: %v", err) 579 } 580 }) 581 582 t.Run("handles nil record in commit gracefully", func(t *testing.T) { 583 mockService := newMockUserService() 584 mockService.users["did:plc:testuser"] = &users.User{ 585 DID: "did:plc:testuser", 586 Handle: "testuser.bsky.social", 587 PDSURL: "https://bsky.social", 588 } 589 mockResolver := &mockIdentityResolverForUser{} 590 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 591 ctx := context.Background() 592 593 event := &JetstreamEvent{ 594 Did: "did:plc:testuser", 595 TimeUS: time.Now().UnixMicro(), 596 Kind: "commit", 597 Commit: &CommitEvent{ 598 Rev: "rev123", 599 Operation: "create", 600 Collection: CovesProfileCollection, 601 RKey: "self", 602 CID: "bafy123", 603 Record: nil, // No record data 604 }, 605 } 606 607 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 608 if err != nil { 609 t.Errorf("Expected no error for nil record, got: %v", err) 610 } 611 }) 612 613 t.Run("handles invalid blob structure gracefully", func(t *testing.T) { 614 mockService := newMockUserService() 615 mockService.users["did:plc:testuser"] = &users.User{ 616 DID: "did:plc:testuser", 617 Handle: "testuser.bsky.social", 618 PDSURL: "https://bsky.social", 619 } 620 mockResolver := &mockIdentityResolverForUser{} 621 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 622 ctx := context.Background() 623 624 event := &JetstreamEvent{ 625 Did: "did:plc:testuser", 626 TimeUS: time.Now().UnixMicro(), 627 Kind: "commit", 628 Commit: &CommitEvent{ 629 Rev: "rev123", 630 Operation: "create", 631 Collection: CovesProfileCollection, 632 RKey: "self", 633 CID: "bafy123", 634 Record: map[string]interface{}{ 635 "displayName": "Test User", 636 "avatar": map[string]interface{}{ 637 "$type": "not-a-blob", // Invalid type 638 }, 639 "banner": "not-a-map", // Invalid structure 640 }, 641 }, 642 } 643 644 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 645 if err != nil { 646 t.Fatalf("Expected no error, got: %v", err) 647 } 648 649 if len(mockService.updatedCalls) != 1 { 650 t.Fatalf("Expected 1 UpdateProfile call, got %d", len(mockService.updatedCalls)) 651 } 652 653 call := mockService.updatedCalls[0] 654 // displayName should be extracted 655 if call.DisplayName == nil || *call.DisplayName != "Test User" { 656 t.Errorf("Expected displayName 'Test User', got %v", call.DisplayName) 657 } 658 // Avatar and banner should be nil (not extracted due to invalid structure) 659 if call.AvatarCID != nil { 660 t.Errorf("Expected nil avatar CID for invalid blob, got %v", call.AvatarCID) 661 } 662 if call.BannerCID != nil { 663 t.Errorf("Expected nil banner CID for invalid structure, got %v", call.BannerCID) 664 } 665 }) 666} 667 668func TestUserConsumer_PropagatesUpdateProfileError(t *testing.T) { 669 t.Run("propagates_database_errors_from_UpdateProfile", func(t *testing.T) { 670 mockService := newMockUserService() 671 mockService.users["did:plc:testuser"] = &users.User{ 672 DID: "did:plc:testuser", 673 Handle: "testuser.bsky.social", 674 PDSURL: "https://bsky.social", 675 } 676 mockService.updateError = errors.New("database write error") 677 mockResolver := &mockIdentityResolverForUser{} 678 consumer := NewUserEventConsumer(mockService, mockResolver, "wss://jetstream.example.com", "") 679 ctx := context.Background() 680 681 event := &JetstreamEvent{ 682 Did: "did:plc:testuser", 683 TimeUS: time.Now().UnixMicro(), 684 Kind: "commit", 685 Commit: &CommitEvent{ 686 Rev: "rev123", 687 Operation: "create", 688 Collection: CovesProfileCollection, 689 RKey: "self", 690 CID: "bafy123", 691 Record: map[string]interface{}{ 692 "displayName": "Test User", 693 }, 694 }, 695 } 696 697 err := consumer.handleEvent(ctx, mustMarshalEvent(event)) 698 if err == nil { 699 t.Fatal("Expected error for UpdateProfile failure, got nil") 700 } 701 if !errors.Is(err, mockService.updateError) && err.Error() != "failed to update user profile: database write error" { 702 t.Errorf("Expected wrapped database error, got: %v", err) 703 } 704 }) 705} 706 707func TestExtractBlobCID(t *testing.T) { 708 t.Run("extracts CID from valid blob structure", func(t *testing.T) { 709 blob := map[string]interface{}{ 710 "$type": "blob", 711 "ref": map[string]interface{}{"$link": "bafktest123"}, 712 "mimeType": "image/jpeg", 713 "size": float64(12345), 714 } 715 716 cid, ok := extractBlobCID(blob) 717 if !ok { 718 t.Fatal("Expected successful extraction") 719 } 720 if cid != "bafktest123" { 721 t.Errorf("Expected CID 'bafktest123', got '%s'", cid) 722 } 723 }) 724 725 t.Run("returns false for nil blob", func(t *testing.T) { 726 cid, ok := extractBlobCID(nil) 727 if ok { 728 t.Error("Expected false for nil blob") 729 } 730 if cid != "" { 731 t.Errorf("Expected empty CID for nil blob, got '%s'", cid) 732 } 733 }) 734 735 t.Run("returns false for wrong $type", func(t *testing.T) { 736 blob := map[string]interface{}{ 737 "$type": "image", 738 "ref": map[string]interface{}{"$link": "bafktest123"}, 739 } 740 741 cid, ok := extractBlobCID(blob) 742 if ok { 743 t.Error("Expected false for wrong $type") 744 } 745 if cid != "" { 746 t.Errorf("Expected empty CID for wrong type, got '%s'", cid) 747 } 748 }) 749 750 t.Run("returns false for missing $type", func(t *testing.T) { 751 blob := map[string]interface{}{ 752 "ref": map[string]interface{}{"$link": "bafktest123"}, 753 } 754 755 cid, ok := extractBlobCID(blob) 756 if ok { 757 t.Error("Expected false for missing $type") 758 } 759 if cid != "" { 760 t.Errorf("Expected empty CID for missing type, got '%s'", cid) 761 } 762 }) 763 764 t.Run("returns false for missing ref", func(t *testing.T) { 765 blob := map[string]interface{}{ 766 "$type": "blob", 767 } 768 769 cid, ok := extractBlobCID(blob) 770 if ok { 771 t.Error("Expected false for missing ref") 772 } 773 if cid != "" { 774 t.Errorf("Expected empty CID for missing ref, got '%s'", cid) 775 } 776 }) 777 778 t.Run("returns false for missing $link", func(t *testing.T) { 779 blob := map[string]interface{}{ 780 "$type": "blob", 781 "ref": map[string]interface{}{}, 782 } 783 784 cid, ok := extractBlobCID(blob) 785 if ok { 786 t.Error("Expected false for missing $link") 787 } 788 if cid != "" { 789 t.Errorf("Expected empty CID for missing link, got '%s'", cid) 790 } 791 }) 792 793 t.Run("returns false for non-map ref", func(t *testing.T) { 794 blob := map[string]interface{}{ 795 "$type": "blob", 796 "ref": "not-a-map", 797 } 798 799 cid, ok := extractBlobCID(blob) 800 if ok { 801 t.Error("Expected false for non-map ref") 802 } 803 if cid != "" { 804 t.Errorf("Expected empty CID for non-map ref, got '%s'", cid) 805 } 806 }) 807} 808 809// mustMarshalEvent marshals an event to JSON bytes for testing 810func mustMarshalEvent(event *JetstreamEvent) []byte { 811 data, err := json.Marshal(event) 812 if err != nil { 813 panic(err) 814 } 815 return data 816}