A community based topic aggregation platform built on atproto
at main 575 lines 19 kB view raw
1package jetstream 2 3import ( 4 "Coves/internal/atproto/identity" 5 "Coves/internal/atproto/utils" 6 "Coves/internal/core/userblocks" 7 "Coves/internal/core/users" 8 "context" 9 "encoding/json" 10 "errors" 11 "fmt" 12 "log" 13 "log/slog" 14 "strings" 15 "sync" 16 "time" 17 18 "github.com/gorilla/websocket" 19) 20 21// CovesProfileCollection is the atProto collection for Coves user profiles. 22// NOTE: This constant is intentionally duplicated in internal/api/handlers/user/update_profile.go 23// to avoid circular dependencies between packages. Keep both definitions in sync. 24const CovesProfileCollection = "social.coves.actor.profile" 25 26// CovesActorBlockCollection is the atProto collection for user-to-user blocks. 27// Records live in the blocker's repository at at://blocker_did/social.coves.actor.block/{tid} 28const CovesActorBlockCollection = "social.coves.actor.block" 29 30// SessionHandleUpdater is an interface for updating OAuth session handles 31// when identity changes occur. This keeps active sessions in sync with 32// the user's current handle. 33type SessionHandleUpdater interface { 34 UpdateHandleByDID(ctx context.Context, did, newHandle string) (int64, error) 35} 36 37// JetstreamEvent represents an event from the Jetstream firehose 38// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream 39type JetstreamEvent struct { 40 Account *AccountEvent `json:"account,omitempty"` 41 Identity *IdentityEvent `json:"identity,omitempty"` 42 Commit *CommitEvent `json:"commit,omitempty"` 43 Did string `json:"did"` 44 Kind string `json:"kind"` 45 TimeUS int64 `json:"time_us"` 46} 47 48type AccountEvent struct { 49 Did string `json:"did"` 50 Time string `json:"time"` 51 Seq int64 `json:"seq"` 52 Active bool `json:"active"` 53} 54 55type IdentityEvent struct { 56 Did string `json:"did"` 57 Handle string `json:"handle"` 58 Time string `json:"time"` 59 Seq int64 `json:"seq"` 60} 61 62// CommitEvent represents a record commit from Jetstream 63type CommitEvent struct { 64 Rev string `json:"rev"` 65 Operation string `json:"operation"` // "create", "update", "delete" 66 Collection string `json:"collection"` 67 RKey string `json:"rkey"` 68 Record map[string]interface{} `json:"record,omitempty"` 69 CID string `json:"cid,omitempty"` 70} 71 72// UserEventConsumer consumes user-related events from Jetstream 73type UserEventConsumer struct { 74 userService users.UserService 75 identityResolver identity.Resolver 76 sessionHandleUpdater SessionHandleUpdater // Optional: updates OAuth sessions on handle change 77 userBlockRepo userblocks.Repository // Optional: indexes user-to-user blocks 78 wsURL string 79 pdsFilter string // Optional: only index users from specific PDS 80} 81 82// ConsumerOption is a functional option for configuring UserEventConsumer 83type ConsumerOption func(*UserEventConsumer) 84 85// WithSessionHandleUpdater sets the session handle updater for syncing OAuth sessions 86// when identity changes occur. If not set, OAuth sessions won't be updated on handle changes. 87func WithSessionHandleUpdater(updater SessionHandleUpdater) ConsumerOption { 88 return func(c *UserEventConsumer) { 89 c.sessionHandleUpdater = updater 90 } 91} 92 93// WithUserBlockRepo sets the user block repository for indexing user-to-user blocks 94// from the Jetstream firehose. If not set, block events will be ignored. 95func WithUserBlockRepo(repo userblocks.Repository) ConsumerOption { 96 return func(c *UserEventConsumer) { 97 c.userBlockRepo = repo 98 } 99} 100 101// NewUserEventConsumer creates a new Jetstream consumer for user events 102func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL, pdsFilter string, opts ...ConsumerOption) *UserEventConsumer { 103 c := &UserEventConsumer{ 104 userService: userService, 105 identityResolver: identityResolver, 106 wsURL: wsURL, 107 pdsFilter: pdsFilter, 108 } 109 for _, opt := range opts { 110 opt(c) 111 } 112 return c 113} 114 115// Start begins consuming events from Jetstream 116// Runs indefinitely, reconnecting on errors 117func (c *UserEventConsumer) Start(ctx context.Context) error { 118 log.Printf("Starting Jetstream user consumer: %s", c.wsURL) 119 120 for { 121 select { 122 case <-ctx.Done(): 123 log.Println("Jetstream consumer shutting down") 124 return ctx.Err() 125 default: 126 if err := c.connect(ctx); err != nil { 127 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err) 128 time.Sleep(5 * time.Second) 129 continue 130 } 131 } 132 } 133} 134 135// connect establishes WebSocket connection and processes events 136func (c *UserEventConsumer) connect(ctx context.Context) error { 137 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil) 138 if err != nil { 139 return fmt.Errorf("failed to connect to Jetstream: %w", err) 140 } 141 defer func() { 142 if err := conn.Close(); err != nil { 143 log.Printf("Failed to close WebSocket connection: %v", err) 144 } 145 }() 146 147 log.Println("Connected to Jetstream") 148 149 // Set read deadline to detect connection issues 150 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 151 log.Printf("Failed to set read deadline: %v", err) 152 } 153 154 // Set pong handler to keep connection alive 155 conn.SetPongHandler(func(string) error { 156 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 157 log.Printf("Failed to set read deadline in pong handler: %v", err) 158 } 159 return nil 160 }) 161 162 // Start ping ticker 163 ticker := time.NewTicker(30 * time.Second) 164 defer ticker.Stop() 165 166 done := make(chan struct{}) 167 var closeOnce sync.Once // Ensure done channel is only closed once 168 169 // Goroutine to send pings 170 go func() { 171 for { 172 select { 173 case <-ticker.C: 174 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 175 log.Printf("Ping error: %v", err) 176 closeOnce.Do(func() { close(done) }) 177 return 178 } 179 case <-done: 180 return 181 case <-ctx.Done(): 182 return 183 } 184 } 185 }() 186 187 // Read messages 188 for { 189 select { 190 case <-ctx.Done(): 191 return ctx.Err() 192 case <-done: 193 return fmt.Errorf("connection closed") 194 default: 195 _, message, err := conn.ReadMessage() 196 if err != nil { 197 closeOnce.Do(func() { close(done) }) 198 return fmt.Errorf("read error: %w", err) 199 } 200 201 // Reset read deadline on successful read 202 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 203 log.Printf("Failed to set read deadline: %v", err) 204 } 205 206 if err := c.handleEvent(ctx, message); err != nil { 207 log.Printf("Error handling event: %v", err) 208 // Continue processing other events 209 } 210 } 211 } 212} 213 214// handleEvent processes a single Jetstream event 215func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error { 216 var event JetstreamEvent 217 if err := json.Unmarshal(data, &event); err != nil { 218 return fmt.Errorf("failed to parse event: %w", err) 219 } 220 221 // We're interested in identity events (handle updates), account events (new users), 222 // and commit events (profile updates from social.coves.actor.profile) 223 switch event.Kind { 224 case "identity": 225 return c.handleIdentityEvent(ctx, &event) 226 case "account": 227 return c.handleAccountEvent(ctx, &event) 228 case "commit": 229 return c.handleCommitEvent(ctx, &event) 230 default: 231 // Ignore other event types 232 return nil 233 } 234} 235 236// HandleEvent processes a Jetstream event for user-related records. 237// This is the public entry point used by tests and external callers. 238func (c *UserEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 239 switch event.Kind { 240 case "identity": 241 return c.handleIdentityEvent(ctx, event) 242 case "account": 243 return c.handleAccountEvent(ctx, event) 244 case "commit": 245 return c.handleCommitEvent(ctx, event) 246 default: 247 return nil 248 } 249} 250 251// Deprecated: HandleIdentityEventPublic is superseded by HandleEvent which routes 252// all event kinds. Use HandleEvent for new code; this remains for existing tests. 253func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error { 254 return c.handleIdentityEvent(ctx, event) 255} 256 257// handleIdentityEvent processes identity events (handle changes) 258// NOTE: This only UPDATES existing users - it does NOT create new users. 259// Users are created during OAuth login or signup, not from Jetstream events. 260// This prevents indexing millions of Bluesky users who never interact with Coves. 261func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error { 262 if event.Identity == nil { 263 return fmt.Errorf("identity event missing identity data") 264 } 265 266 did := event.Identity.Did 267 handle := event.Identity.Handle 268 269 if did == "" || handle == "" { 270 return fmt.Errorf("identity event missing did or handle") 271 } 272 273 // Only process users who exist in our database (i.e., have used Coves before) 274 existingUser, err := c.userService.GetUserByDID(ctx, did) 275 if err != nil { 276 if errors.Is(err, users.ErrUserNotFound) { 277 // User doesn't exist in our database - skip this event 278 // They'll be indexed when they actually interact with Coves (OAuth login, signup, etc.) 279 // This prevents us from indexing millions of Bluesky users we don't care about 280 return nil 281 } 282 // Database error - propagate so it can be retried 283 return fmt.Errorf("failed to check if user exists: %w", err) 284 } 285 286 log.Printf("Identity event for known user: %s (%s)", handle, did) 287 288 // User exists - check if handle changed 289 if existingUser.Handle != handle { 290 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did) 291 292 // CRITICAL: Update database FIRST, then purge cache 293 // This prevents race condition where cache gets refilled with stale data 294 _, updateErr := c.userService.UpdateHandle(ctx, did, handle) 295 if updateErr != nil { 296 return fmt.Errorf("failed to update handle: %w", updateErr) 297 } 298 299 // CRITICAL: Purge BOTH old handle and DID from cache 300 // Old handle: alice.bsky.social → did:plc:abc123 (must be removed) 301 if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil { 302 slog.Error("CRITICAL: failed to purge old handle cache", 303 slog.String("handle", existingUser.Handle), 304 slog.String("error", purgeErr.Error())) 305 } 306 307 // DID: did:plc:abc123 → alice.bsky.social (must be removed) 308 if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil { 309 slog.Error("CRITICAL: failed to purge DID cache", 310 slog.String("did", did), 311 slog.String("error", purgeErr.Error())) 312 } 313 314 // Update OAuth session handles to keep mobile/web sessions in sync 315 // Failure here causes users to see stale handles in their active sessions 316 if c.sessionHandleUpdater != nil { 317 if sessionsUpdated, updateErr := c.sessionHandleUpdater.UpdateHandleByDID(ctx, did, handle); updateErr != nil { 318 slog.Error("failed to update OAuth session handles (users may see stale handle)", 319 slog.String("did", did), 320 slog.String("new_handle", handle), 321 slog.String("error", updateErr.Error())) 322 } else if sessionsUpdated > 0 { 323 log.Printf("Updated %d OAuth session(s) with new handle: %s", sessionsUpdated, handle) 324 } 325 } 326 327 log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle) 328 } else { 329 log.Printf("Handle unchanged for %s (%s)", handle, did) 330 } 331 332 return nil 333} 334 335// handleAccountEvent processes account events (account creation/updates) 336func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error { 337 if event.Account == nil { 338 return fmt.Errorf("account event missing account data") 339 } 340 341 did := event.Account.Did 342 if did == "" { 343 return fmt.Errorf("account event missing did") 344 } 345 346 // Account events don't include handle, so we skip them. 347 // Users are indexed via OAuth login or signup, not from account events. 348 return nil 349} 350 351// handleCommitEvent processes commit events for user-related collections. 352// Routes to appropriate handler based on collection: 353// - social.coves.actor.profile: Profile updates for users in our database 354// - social.coves.actor.block: User-to-user block create/delete events 355func (c *UserEventConsumer) handleCommitEvent(ctx context.Context, event *JetstreamEvent) error { 356 if event.Commit == nil { 357 slog.Warn("received nil commit in handleCommitEvent (malformed event)", slog.String("did", event.Did)) 358 return nil 359 } 360 361 switch event.Commit.Collection { 362 case CovesProfileCollection: 363 return c.handleProfileCommit(ctx, event) 364 case CovesActorBlockCollection: 365 return c.handleUserBlock(ctx, event.Did, event.Commit) 366 default: 367 return nil 368 } 369} 370 371// handleProfileCommit processes profile commit events for users already in our database. 372// This syncs profile data (displayName, bio, avatar, banner) from Coves profiles. 373func (c *UserEventConsumer) handleProfileCommit(ctx context.Context, event *JetstreamEvent) error { 374 // Profile handling requires userService 375 if c.userService == nil { 376 return nil 377 } 378 379 // Only process users who exist in our database 380 _, err := c.userService.GetUserByDID(ctx, event.Did) 381 if err != nil { 382 if errors.Is(err, users.ErrUserNotFound) { 383 // User doesn't exist in our database - skip this event 384 // They'll be indexed when they actually interact with Coves 385 return nil 386 } 387 // Database error - propagate so it can be retried 388 return fmt.Errorf("failed to check if user exists: %w", err) 389 } 390 391 switch event.Commit.Operation { 392 case "create", "update": 393 return c.handleProfileUpdate(ctx, event.Did, event.Commit) 394 case "delete": 395 return c.handleProfileDelete(ctx, event.Did) 396 default: 397 return nil 398 } 399} 400 401// handleProfileUpdate processes profile create/update operations 402// Extracts displayName, description (bio), avatar, and banner from the record 403func (c *UserEventConsumer) handleProfileUpdate(ctx context.Context, did string, commit *CommitEvent) error { 404 if commit.Record == nil { 405 slog.Warn("received nil record in profile commit (profile update silently dropped)", 406 slog.String("did", did), 407 slog.String("operation", commit.Operation)) 408 return nil 409 } 410 411 input := users.UpdateProfileInput{} 412 413 // Extract displayName 414 if dn, ok := commit.Record["displayName"].(string); ok { 415 input.DisplayName = &dn 416 } 417 418 // Extract description (bio) 419 if desc, ok := commit.Record["description"].(string); ok { 420 input.Bio = &desc 421 } 422 423 // Extract avatar CID from blob ref structure 424 if avatarMap, ok := commit.Record["avatar"].(map[string]interface{}); ok { 425 if cid, ok := extractBlobCID(avatarMap); ok { 426 input.AvatarCID = &cid 427 } 428 } 429 430 // Extract banner CID from blob ref structure 431 if bannerMap, ok := commit.Record["banner"].(map[string]interface{}); ok { 432 if cid, ok := extractBlobCID(bannerMap); ok { 433 input.BannerCID = &cid 434 } 435 } 436 437 _, err := c.userService.UpdateProfile(ctx, did, input) 438 if err != nil { 439 return fmt.Errorf("failed to update user profile: %w", err) 440 } 441 442 log.Printf("Updated profile for user %s", did) 443 return nil 444} 445 446// handleProfileDelete processes profile delete operations 447// Clears all profile fields by passing empty strings 448func (c *UserEventConsumer) handleProfileDelete(ctx context.Context, did string) error { 449 empty := "" 450 input := users.UpdateProfileInput{ 451 DisplayName: &empty, 452 Bio: &empty, 453 AvatarCID: &empty, 454 BannerCID: &empty, 455 } 456 _, err := c.userService.UpdateProfile(ctx, did, input) 457 if err != nil { 458 return fmt.Errorf("failed to clear user profile: %w", err) 459 } 460 log.Printf("Cleared profile for user %s", did) 461 return nil 462} 463 464// handleUserBlock processes user-to-user block create/delete events. 465// CREATE operation = user blocked another user 466// DELETE operation = user unblocked another user 467func (c *UserEventConsumer) handleUserBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 468 if c.userBlockRepo == nil { 469 slog.Warn("user block event ignored: userBlockRepo not configured (WithUserBlockRepo not called)", 470 slog.String("user_did", userDID), 471 slog.String("operation", commit.Operation)) 472 return nil 473 } 474 475 switch commit.Operation { 476 case "create": 477 return c.createUserBlock(ctx, userDID, commit) 478 case "delete": 479 return c.deleteUserBlock(ctx, userDID, commit) 480 default: 481 // Update operations shouldn't happen on blocks, but ignore gracefully 482 log.Printf("Ignoring unexpected operation on user block: %s (userDID=%s, rkey=%s)", 483 commit.Operation, userDID, commit.RKey) 484 return nil 485 } 486} 487 488// createUserBlock indexes a new user-to-user block from the firehose. 489func (c *UserEventConsumer) createUserBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 490 if commit.Record == nil { 491 return fmt.Errorf("user block create event missing record data") 492 } 493 494 // Validate userDID format (untrusted firehose data) 495 if !strings.HasPrefix(userDID, "did:") { 496 return fmt.Errorf("invalid blocker DID format from firehose: %s", userDID) 497 } 498 499 // Extract blocked user DID from record's subject field 500 blockedDID, ok := commit.Record["subject"].(string) 501 if !ok { 502 return fmt.Errorf("user block record missing subject field") 503 } 504 505 // Validate blockedDID format (untrusted firehose data) 506 if !strings.HasPrefix(blockedDID, "did:") { 507 return fmt.Errorf("invalid blocked DID format from firehose: %s", blockedDID) 508 } 509 510 // Validate rkey is non-empty before building AT-URI 511 if commit.RKey == "" { 512 return fmt.Errorf("user block create event missing rkey") 513 } 514 515 // Build AT-URI for the block record (lives in the blocker's repository) 516 uri := fmt.Sprintf("at://%s/social.coves.actor.block/%s", userDID, commit.RKey) 517 518 // Parse createdAt from record to preserve chronological ordering during replays 519 block := &userblocks.UserBlock{ 520 BlockerDID: userDID, 521 BlockedDID: blockedDID, 522 BlockedAt: utils.ParseCreatedAt(commit.Record), 523 RecordURI: uri, 524 RecordCID: commit.CID, 525 } 526 527 // Index the block (idempotent via ON CONFLICT DO UPDATE) 528 _, err := c.userBlockRepo.BlockUser(ctx, block) 529 if err != nil { 530 if userblocks.IsConflict(err) { 531 log.Printf("User block already indexed: %s -> %s", userDID, blockedDID) 532 return nil 533 } 534 return fmt.Errorf("failed to index user block: %w", err) 535 } 536 537 log.Printf("Indexed user block: %s -> %s", userDID, blockedDID) 538 return nil 539} 540 541// deleteUserBlock removes a user-to-user block from the index. 542// DELETE operations don't include record data, so we look up the block by its URI. 543func (c *UserEventConsumer) deleteUserBlock(ctx context.Context, userDID string, commit *CommitEvent) error { 544 // Validate rkey is non-empty before building AT-URI 545 if commit.RKey == "" { 546 return fmt.Errorf("user block delete event missing rkey") 547 } 548 549 // Build AT-URI from the rkey 550 uri := fmt.Sprintf("at://%s/social.coves.actor.block/%s", userDID, commit.RKey) 551 552 // Look up the block to get the blocked DID 553 block, err := c.userBlockRepo.GetBlockByURI(ctx, uri) 554 if err != nil { 555 if userblocks.IsNotFound(err) { 556 // Already deleted - this is fine (idempotency) 557 log.Printf("User block already deleted: %s", uri) 558 return nil 559 } 560 return fmt.Errorf("failed to find user block for deletion: %w", err) 561 } 562 563 // Remove the block from the index 564 err = c.userBlockRepo.UnblockUser(ctx, userDID, block.BlockedDID) 565 if err != nil { 566 if userblocks.IsNotFound(err) { 567 log.Printf("User block already removed: %s -> %s", userDID, block.BlockedDID) 568 return nil 569 } 570 return fmt.Errorf("failed to remove user block: %w", err) 571 } 572 573 log.Printf("Removed user block: %s -> %s", userDID, block.BlockedDID) 574 return nil 575}