A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/identity"
5 "Coves/internal/atproto/utils"
6 "Coves/internal/core/userblocks"
7 "Coves/internal/core/users"
8 "context"
9 "encoding/json"
10 "errors"
11 "fmt"
12 "log"
13 "log/slog"
14 "strings"
15 "sync"
16 "time"
17
18 "github.com/gorilla/websocket"
19)
20
21// CovesProfileCollection is the atProto collection for Coves user profiles.
22// NOTE: This constant is intentionally duplicated in internal/api/handlers/user/update_profile.go
23// to avoid circular dependencies between packages. Keep both definitions in sync.
24const CovesProfileCollection = "social.coves.actor.profile"
25
26// CovesActorBlockCollection is the atProto collection for user-to-user blocks.
27// Records live in the blocker's repository at at://blocker_did/social.coves.actor.block/{tid}
28const CovesActorBlockCollection = "social.coves.actor.block"
29
30// SessionHandleUpdater is an interface for updating OAuth session handles
31// when identity changes occur. This keeps active sessions in sync with
32// the user's current handle.
33type SessionHandleUpdater interface {
34 UpdateHandleByDID(ctx context.Context, did, newHandle string) (int64, error)
35}
36
37// JetstreamEvent represents an event from the Jetstream firehose
38// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream
39type JetstreamEvent struct {
40 Account *AccountEvent `json:"account,omitempty"`
41 Identity *IdentityEvent `json:"identity,omitempty"`
42 Commit *CommitEvent `json:"commit,omitempty"`
43 Did string `json:"did"`
44 Kind string `json:"kind"`
45 TimeUS int64 `json:"time_us"`
46}
47
48type AccountEvent struct {
49 Did string `json:"did"`
50 Time string `json:"time"`
51 Seq int64 `json:"seq"`
52 Active bool `json:"active"`
53}
54
55type IdentityEvent struct {
56 Did string `json:"did"`
57 Handle string `json:"handle"`
58 Time string `json:"time"`
59 Seq int64 `json:"seq"`
60}
61
62// CommitEvent represents a record commit from Jetstream
63type CommitEvent struct {
64 Rev string `json:"rev"`
65 Operation string `json:"operation"` // "create", "update", "delete"
66 Collection string `json:"collection"`
67 RKey string `json:"rkey"`
68 Record map[string]interface{} `json:"record,omitempty"`
69 CID string `json:"cid,omitempty"`
70}
71
72// UserEventConsumer consumes user-related events from Jetstream
73type UserEventConsumer struct {
74 userService users.UserService
75 identityResolver identity.Resolver
76 sessionHandleUpdater SessionHandleUpdater // Optional: updates OAuth sessions on handle change
77 userBlockRepo userblocks.Repository // Optional: indexes user-to-user blocks
78 wsURL string
79 pdsFilter string // Optional: only index users from specific PDS
80}
81
82// ConsumerOption is a functional option for configuring UserEventConsumer
83type ConsumerOption func(*UserEventConsumer)
84
85// WithSessionHandleUpdater sets the session handle updater for syncing OAuth sessions
86// when identity changes occur. If not set, OAuth sessions won't be updated on handle changes.
87func WithSessionHandleUpdater(updater SessionHandleUpdater) ConsumerOption {
88 return func(c *UserEventConsumer) {
89 c.sessionHandleUpdater = updater
90 }
91}
92
93// WithUserBlockRepo sets the user block repository for indexing user-to-user blocks
94// from the Jetstream firehose. If not set, block events will be ignored.
95func WithUserBlockRepo(repo userblocks.Repository) ConsumerOption {
96 return func(c *UserEventConsumer) {
97 c.userBlockRepo = repo
98 }
99}
100
101// NewUserEventConsumer creates a new Jetstream consumer for user events
102func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL, pdsFilter string, opts ...ConsumerOption) *UserEventConsumer {
103 c := &UserEventConsumer{
104 userService: userService,
105 identityResolver: identityResolver,
106 wsURL: wsURL,
107 pdsFilter: pdsFilter,
108 }
109 for _, opt := range opts {
110 opt(c)
111 }
112 return c
113}
114
115// Start begins consuming events from Jetstream
116// Runs indefinitely, reconnecting on errors
117func (c *UserEventConsumer) Start(ctx context.Context) error {
118 log.Printf("Starting Jetstream user consumer: %s", c.wsURL)
119
120 for {
121 select {
122 case <-ctx.Done():
123 log.Println("Jetstream consumer shutting down")
124 return ctx.Err()
125 default:
126 if err := c.connect(ctx); err != nil {
127 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err)
128 time.Sleep(5 * time.Second)
129 continue
130 }
131 }
132 }
133}
134
135// connect establishes WebSocket connection and processes events
136func (c *UserEventConsumer) connect(ctx context.Context) error {
137 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil)
138 if err != nil {
139 return fmt.Errorf("failed to connect to Jetstream: %w", err)
140 }
141 defer func() {
142 if err := conn.Close(); err != nil {
143 log.Printf("Failed to close WebSocket connection: %v", err)
144 }
145 }()
146
147 log.Println("Connected to Jetstream")
148
149 // Set read deadline to detect connection issues
150 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
151 log.Printf("Failed to set read deadline: %v", err)
152 }
153
154 // Set pong handler to keep connection alive
155 conn.SetPongHandler(func(string) error {
156 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
157 log.Printf("Failed to set read deadline in pong handler: %v", err)
158 }
159 return nil
160 })
161
162 // Start ping ticker
163 ticker := time.NewTicker(30 * time.Second)
164 defer ticker.Stop()
165
166 done := make(chan struct{})
167 var closeOnce sync.Once // Ensure done channel is only closed once
168
169 // Goroutine to send pings
170 go func() {
171 for {
172 select {
173 case <-ticker.C:
174 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
175 log.Printf("Ping error: %v", err)
176 closeOnce.Do(func() { close(done) })
177 return
178 }
179 case <-done:
180 return
181 case <-ctx.Done():
182 return
183 }
184 }
185 }()
186
187 // Read messages
188 for {
189 select {
190 case <-ctx.Done():
191 return ctx.Err()
192 case <-done:
193 return fmt.Errorf("connection closed")
194 default:
195 _, message, err := conn.ReadMessage()
196 if err != nil {
197 closeOnce.Do(func() { close(done) })
198 return fmt.Errorf("read error: %w", err)
199 }
200
201 // Reset read deadline on successful read
202 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
203 log.Printf("Failed to set read deadline: %v", err)
204 }
205
206 if err := c.handleEvent(ctx, message); err != nil {
207 log.Printf("Error handling event: %v", err)
208 // Continue processing other events
209 }
210 }
211 }
212}
213
214// handleEvent processes a single Jetstream event
215func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error {
216 var event JetstreamEvent
217 if err := json.Unmarshal(data, &event); err != nil {
218 return fmt.Errorf("failed to parse event: %w", err)
219 }
220
221 // We're interested in identity events (handle updates), account events (new users),
222 // and commit events (profile updates from social.coves.actor.profile)
223 switch event.Kind {
224 case "identity":
225 return c.handleIdentityEvent(ctx, &event)
226 case "account":
227 return c.handleAccountEvent(ctx, &event)
228 case "commit":
229 return c.handleCommitEvent(ctx, &event)
230 default:
231 // Ignore other event types
232 return nil
233 }
234}
235
236// HandleEvent processes a Jetstream event for user-related records.
237// This is the public entry point used by tests and external callers.
238func (c *UserEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
239 switch event.Kind {
240 case "identity":
241 return c.handleIdentityEvent(ctx, event)
242 case "account":
243 return c.handleAccountEvent(ctx, event)
244 case "commit":
245 return c.handleCommitEvent(ctx, event)
246 default:
247 return nil
248 }
249}
250
251// Deprecated: HandleIdentityEventPublic is superseded by HandleEvent which routes
252// all event kinds. Use HandleEvent for new code; this remains for existing tests.
253func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error {
254 return c.handleIdentityEvent(ctx, event)
255}
256
257// handleIdentityEvent processes identity events (handle changes)
258// NOTE: This only UPDATES existing users - it does NOT create new users.
259// Users are created during OAuth login or signup, not from Jetstream events.
260// This prevents indexing millions of Bluesky users who never interact with Coves.
261func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error {
262 if event.Identity == nil {
263 return fmt.Errorf("identity event missing identity data")
264 }
265
266 did := event.Identity.Did
267 handle := event.Identity.Handle
268
269 if did == "" || handle == "" {
270 return fmt.Errorf("identity event missing did or handle")
271 }
272
273 // Only process users who exist in our database (i.e., have used Coves before)
274 existingUser, err := c.userService.GetUserByDID(ctx, did)
275 if err != nil {
276 if errors.Is(err, users.ErrUserNotFound) {
277 // User doesn't exist in our database - skip this event
278 // They'll be indexed when they actually interact with Coves (OAuth login, signup, etc.)
279 // This prevents us from indexing millions of Bluesky users we don't care about
280 return nil
281 }
282 // Database error - propagate so it can be retried
283 return fmt.Errorf("failed to check if user exists: %w", err)
284 }
285
286 log.Printf("Identity event for known user: %s (%s)", handle, did)
287
288 // User exists - check if handle changed
289 if existingUser.Handle != handle {
290 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did)
291
292 // CRITICAL: Update database FIRST, then purge cache
293 // This prevents race condition where cache gets refilled with stale data
294 _, updateErr := c.userService.UpdateHandle(ctx, did, handle)
295 if updateErr != nil {
296 return fmt.Errorf("failed to update handle: %w", updateErr)
297 }
298
299 // CRITICAL: Purge BOTH old handle and DID from cache
300 // Old handle: alice.bsky.social → did:plc:abc123 (must be removed)
301 if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil {
302 slog.Error("CRITICAL: failed to purge old handle cache",
303 slog.String("handle", existingUser.Handle),
304 slog.String("error", purgeErr.Error()))
305 }
306
307 // DID: did:plc:abc123 → alice.bsky.social (must be removed)
308 if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil {
309 slog.Error("CRITICAL: failed to purge DID cache",
310 slog.String("did", did),
311 slog.String("error", purgeErr.Error()))
312 }
313
314 // Update OAuth session handles to keep mobile/web sessions in sync
315 // Failure here causes users to see stale handles in their active sessions
316 if c.sessionHandleUpdater != nil {
317 if sessionsUpdated, updateErr := c.sessionHandleUpdater.UpdateHandleByDID(ctx, did, handle); updateErr != nil {
318 slog.Error("failed to update OAuth session handles (users may see stale handle)",
319 slog.String("did", did),
320 slog.String("new_handle", handle),
321 slog.String("error", updateErr.Error()))
322 } else if sessionsUpdated > 0 {
323 log.Printf("Updated %d OAuth session(s) with new handle: %s", sessionsUpdated, handle)
324 }
325 }
326
327 log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle)
328 } else {
329 log.Printf("Handle unchanged for %s (%s)", handle, did)
330 }
331
332 return nil
333}
334
335// handleAccountEvent processes account events (account creation/updates)
336func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error {
337 if event.Account == nil {
338 return fmt.Errorf("account event missing account data")
339 }
340
341 did := event.Account.Did
342 if did == "" {
343 return fmt.Errorf("account event missing did")
344 }
345
346 // Account events don't include handle, so we skip them.
347 // Users are indexed via OAuth login or signup, not from account events.
348 return nil
349}
350
351// handleCommitEvent processes commit events for user-related collections.
352// Routes to appropriate handler based on collection:
353// - social.coves.actor.profile: Profile updates for users in our database
354// - social.coves.actor.block: User-to-user block create/delete events
355func (c *UserEventConsumer) handleCommitEvent(ctx context.Context, event *JetstreamEvent) error {
356 if event.Commit == nil {
357 slog.Warn("received nil commit in handleCommitEvent (malformed event)", slog.String("did", event.Did))
358 return nil
359 }
360
361 switch event.Commit.Collection {
362 case CovesProfileCollection:
363 return c.handleProfileCommit(ctx, event)
364 case CovesActorBlockCollection:
365 return c.handleUserBlock(ctx, event.Did, event.Commit)
366 default:
367 return nil
368 }
369}
370
371// handleProfileCommit processes profile commit events for users already in our database.
372// This syncs profile data (displayName, bio, avatar, banner) from Coves profiles.
373func (c *UserEventConsumer) handleProfileCommit(ctx context.Context, event *JetstreamEvent) error {
374 // Profile handling requires userService
375 if c.userService == nil {
376 return nil
377 }
378
379 // Only process users who exist in our database
380 _, err := c.userService.GetUserByDID(ctx, event.Did)
381 if err != nil {
382 if errors.Is(err, users.ErrUserNotFound) {
383 // User doesn't exist in our database - skip this event
384 // They'll be indexed when they actually interact with Coves
385 return nil
386 }
387 // Database error - propagate so it can be retried
388 return fmt.Errorf("failed to check if user exists: %w", err)
389 }
390
391 switch event.Commit.Operation {
392 case "create", "update":
393 return c.handleProfileUpdate(ctx, event.Did, event.Commit)
394 case "delete":
395 return c.handleProfileDelete(ctx, event.Did)
396 default:
397 return nil
398 }
399}
400
401// handleProfileUpdate processes profile create/update operations
402// Extracts displayName, description (bio), avatar, and banner from the record
403func (c *UserEventConsumer) handleProfileUpdate(ctx context.Context, did string, commit *CommitEvent) error {
404 if commit.Record == nil {
405 slog.Warn("received nil record in profile commit (profile update silently dropped)",
406 slog.String("did", did),
407 slog.String("operation", commit.Operation))
408 return nil
409 }
410
411 input := users.UpdateProfileInput{}
412
413 // Extract displayName
414 if dn, ok := commit.Record["displayName"].(string); ok {
415 input.DisplayName = &dn
416 }
417
418 // Extract description (bio)
419 if desc, ok := commit.Record["description"].(string); ok {
420 input.Bio = &desc
421 }
422
423 // Extract avatar CID from blob ref structure
424 if avatarMap, ok := commit.Record["avatar"].(map[string]interface{}); ok {
425 if cid, ok := extractBlobCID(avatarMap); ok {
426 input.AvatarCID = &cid
427 }
428 }
429
430 // Extract banner CID from blob ref structure
431 if bannerMap, ok := commit.Record["banner"].(map[string]interface{}); ok {
432 if cid, ok := extractBlobCID(bannerMap); ok {
433 input.BannerCID = &cid
434 }
435 }
436
437 _, err := c.userService.UpdateProfile(ctx, did, input)
438 if err != nil {
439 return fmt.Errorf("failed to update user profile: %w", err)
440 }
441
442 log.Printf("Updated profile for user %s", did)
443 return nil
444}
445
446// handleProfileDelete processes profile delete operations
447// Clears all profile fields by passing empty strings
448func (c *UserEventConsumer) handleProfileDelete(ctx context.Context, did string) error {
449 empty := ""
450 input := users.UpdateProfileInput{
451 DisplayName: &empty,
452 Bio: &empty,
453 AvatarCID: &empty,
454 BannerCID: &empty,
455 }
456 _, err := c.userService.UpdateProfile(ctx, did, input)
457 if err != nil {
458 return fmt.Errorf("failed to clear user profile: %w", err)
459 }
460 log.Printf("Cleared profile for user %s", did)
461 return nil
462}
463
464// handleUserBlock processes user-to-user block create/delete events.
465// CREATE operation = user blocked another user
466// DELETE operation = user unblocked another user
467func (c *UserEventConsumer) handleUserBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
468 if c.userBlockRepo == nil {
469 slog.Warn("user block event ignored: userBlockRepo not configured (WithUserBlockRepo not called)",
470 slog.String("user_did", userDID),
471 slog.String("operation", commit.Operation))
472 return nil
473 }
474
475 switch commit.Operation {
476 case "create":
477 return c.createUserBlock(ctx, userDID, commit)
478 case "delete":
479 return c.deleteUserBlock(ctx, userDID, commit)
480 default:
481 // Update operations shouldn't happen on blocks, but ignore gracefully
482 log.Printf("Ignoring unexpected operation on user block: %s (userDID=%s, rkey=%s)",
483 commit.Operation, userDID, commit.RKey)
484 return nil
485 }
486}
487
488// createUserBlock indexes a new user-to-user block from the firehose.
489func (c *UserEventConsumer) createUserBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
490 if commit.Record == nil {
491 return fmt.Errorf("user block create event missing record data")
492 }
493
494 // Validate userDID format (untrusted firehose data)
495 if !strings.HasPrefix(userDID, "did:") {
496 return fmt.Errorf("invalid blocker DID format from firehose: %s", userDID)
497 }
498
499 // Extract blocked user DID from record's subject field
500 blockedDID, ok := commit.Record["subject"].(string)
501 if !ok {
502 return fmt.Errorf("user block record missing subject field")
503 }
504
505 // Validate blockedDID format (untrusted firehose data)
506 if !strings.HasPrefix(blockedDID, "did:") {
507 return fmt.Errorf("invalid blocked DID format from firehose: %s", blockedDID)
508 }
509
510 // Validate rkey is non-empty before building AT-URI
511 if commit.RKey == "" {
512 return fmt.Errorf("user block create event missing rkey")
513 }
514
515 // Build AT-URI for the block record (lives in the blocker's repository)
516 uri := fmt.Sprintf("at://%s/social.coves.actor.block/%s", userDID, commit.RKey)
517
518 // Parse createdAt from record to preserve chronological ordering during replays
519 block := &userblocks.UserBlock{
520 BlockerDID: userDID,
521 BlockedDID: blockedDID,
522 BlockedAt: utils.ParseCreatedAt(commit.Record),
523 RecordURI: uri,
524 RecordCID: commit.CID,
525 }
526
527 // Index the block (idempotent via ON CONFLICT DO UPDATE)
528 _, err := c.userBlockRepo.BlockUser(ctx, block)
529 if err != nil {
530 if userblocks.IsConflict(err) {
531 log.Printf("User block already indexed: %s -> %s", userDID, blockedDID)
532 return nil
533 }
534 return fmt.Errorf("failed to index user block: %w", err)
535 }
536
537 log.Printf("Indexed user block: %s -> %s", userDID, blockedDID)
538 return nil
539}
540
541// deleteUserBlock removes a user-to-user block from the index.
542// DELETE operations don't include record data, so we look up the block by its URI.
543func (c *UserEventConsumer) deleteUserBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
544 // Validate rkey is non-empty before building AT-URI
545 if commit.RKey == "" {
546 return fmt.Errorf("user block delete event missing rkey")
547 }
548
549 // Build AT-URI from the rkey
550 uri := fmt.Sprintf("at://%s/social.coves.actor.block/%s", userDID, commit.RKey)
551
552 // Look up the block to get the blocked DID
553 block, err := c.userBlockRepo.GetBlockByURI(ctx, uri)
554 if err != nil {
555 if userblocks.IsNotFound(err) {
556 // Already deleted - this is fine (idempotency)
557 log.Printf("User block already deleted: %s", uri)
558 return nil
559 }
560 return fmt.Errorf("failed to find user block for deletion: %w", err)
561 }
562
563 // Remove the block from the index
564 err = c.userBlockRepo.UnblockUser(ctx, userDID, block.BlockedDID)
565 if err != nil {
566 if userblocks.IsNotFound(err) {
567 log.Printf("User block already removed: %s -> %s", userDID, block.BlockedDID)
568 return nil
569 }
570 return fmt.Errorf("failed to remove user block: %w", err)
571 }
572
573 log.Printf("Removed user block: %s -> %s", userDID, block.BlockedDID)
574 return nil
575}