A community based topic aggregation platform built on atproto
at main 367 lines 13 kB view raw
1package jetstream 2 3import ( 4 "Coves/internal/core/communities" 5 "Coves/internal/core/posts" 6 "Coves/internal/core/users" 7 "context" 8 "database/sql" 9 "encoding/json" 10 "errors" 11 "fmt" 12 "log" 13 "time" 14) 15 16// PostEventConsumer consumes post-related events from Jetstream 17// Handles CREATE and DELETE operations for social.coves.community.post 18// UPDATE handler will be added when that feature is implemented 19type PostEventConsumer struct { 20 postRepo posts.Repository 21 communityRepo communities.Repository 22 userService users.UserService 23 db *sql.DB // Direct DB access for atomic count reconciliation 24} 25 26// NewPostEventConsumer creates a new Jetstream consumer for post events 27func NewPostEventConsumer( 28 postRepo posts.Repository, 29 communityRepo communities.Repository, 30 userService users.UserService, 31 db *sql.DB, 32) *PostEventConsumer { 33 return &PostEventConsumer{ 34 postRepo: postRepo, 35 communityRepo: communityRepo, 36 userService: userService, 37 db: db, 38 } 39} 40 41// HandleEvent processes a Jetstream event for post records 42// Handles CREATE and DELETE operations - UPDATE deferred until that feature exists 43func (c *PostEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 44 // We only care about commit events for post records 45 if event.Kind != "commit" || event.Commit == nil { 46 return nil 47 } 48 49 commit := event.Commit 50 51 // Handle post record operations 52 if commit.Collection == "social.coves.community.post" { 53 switch commit.Operation { 54 case "create": 55 return c.createPost(ctx, event.Did, commit) 56 case "delete": 57 return c.deletePost(ctx, event.Did, commit) 58 } 59 } 60 61 // Silently ignore other operations (update) and other collections 62 return nil 63} 64 65// createPost indexes a new post from the firehose 66func (c *PostEventConsumer) createPost(ctx context.Context, repoDID string, commit *CommitEvent) error { 67 if commit.Record == nil { 68 return fmt.Errorf("post create event missing record data") 69 } 70 71 // Parse the post record 72 postRecord, err := parsePostRecord(commit.Record) 73 if err != nil { 74 return fmt.Errorf("failed to parse post record: %w", err) 75 } 76 77 // SECURITY: Validate this is a legitimate post event 78 if err := c.validatePostEvent(ctx, repoDID, postRecord); err != nil { 79 log.Printf("🚨 SECURITY: Rejecting post event: %v", err) 80 return err 81 } 82 83 // Build AT-URI for this post 84 // Format: at://community_did/social.coves.community.post/rkey 85 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", repoDID, commit.RKey) 86 87 // Parse timestamp from record 88 createdAt, err := time.Parse(time.RFC3339, postRecord.CreatedAt) 89 if err != nil { 90 // Fallback to current time if parsing fails 91 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 92 createdAt = time.Now() 93 } 94 95 // Build post entity 96 post := &posts.Post{ 97 URI: uri, 98 CID: commit.CID, 99 RKey: commit.RKey, 100 AuthorDID: postRecord.Author, 101 CommunityDID: postRecord.Community, 102 Title: postRecord.Title, 103 Content: postRecord.Content, 104 CreatedAt: createdAt, 105 IndexedAt: time.Now(), 106 // Stats remain at 0 (no votes yet) 107 UpvoteCount: 0, 108 DownvoteCount: 0, 109 Score: 0, 110 CommentCount: 0, 111 } 112 113 // Serialize JSON fields (facets, embed, labels) 114 // Return error if any non-empty field fails to serialize (prevents silent data loss) 115 if postRecord.Facets != nil { 116 facetsJSON, marshalErr := json.Marshal(postRecord.Facets) 117 if marshalErr != nil { 118 return fmt.Errorf("failed to serialize facets: %w", marshalErr) 119 } 120 facetsStr := string(facetsJSON) 121 post.ContentFacets = &facetsStr 122 } 123 124 if postRecord.Embed != nil { 125 embedJSON, marshalErr := json.Marshal(postRecord.Embed) 126 if marshalErr != nil { 127 return fmt.Errorf("failed to serialize embed: %w", marshalErr) 128 } 129 embedStr := string(embedJSON) 130 post.Embed = &embedStr 131 } 132 133 if postRecord.Labels != nil { 134 labelsJSON, marshalErr := json.Marshal(postRecord.Labels) 135 if marshalErr != nil { 136 return fmt.Errorf("failed to serialize labels: %w", marshalErr) 137 } 138 labelsStr := string(labelsJSON) 139 post.ContentLabels = &labelsStr 140 } 141 142 // Atomically: Index post + Reconcile comment count for out-of-order arrivals 143 if err := c.indexPostAndReconcileCounts(ctx, post); err != nil { 144 return fmt.Errorf("failed to index post and reconcile counts: %w", err) 145 } 146 147 log.Printf("✓ Indexed post: %s (author: %s, community: %s, rkey: %s)", 148 uri, post.AuthorDID, post.CommunityDID, commit.RKey) 149 return nil 150} 151 152// deletePost handles post deletion events from Jetstream 153// Soft-deletes the post in AppView database by setting deleted_at timestamp 154func (c *PostEventConsumer) deletePost(ctx context.Context, repoDID string, commit *CommitEvent) error { 155 // Build AT-URI for this post 156 // Format: at://community_did/social.coves.community.post/rkey 157 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", repoDID, commit.RKey) 158 159 // Soft delete the post in AppView 160 if err := c.postRepo.SoftDelete(ctx, uri); err != nil { 161 return fmt.Errorf("failed to soft delete post: %w", err) 162 } 163 164 log.Printf("✓ Deleted post: %s (community: %s, rkey: %s)", uri, repoDID, commit.RKey) 165 return nil 166} 167 168// indexPostAndReconcileCounts atomically indexes a post and reconciles comment counts 169// This fixes the race condition where comments arrive before their parent post 170func (c *PostEventConsumer) indexPostAndReconcileCounts(ctx context.Context, post *posts.Post) error { 171 tx, err := c.db.BeginTx(ctx, nil) 172 if err != nil { 173 return fmt.Errorf("failed to begin transaction: %w", err) 174 } 175 defer func() { 176 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 177 log.Printf("Failed to rollback transaction: %v", rollbackErr) 178 } 179 }() 180 181 // 1. Insert the post (idempotent with RETURNING clause) 182 var facetsJSON, embedJSON, labelsJSON sql.NullString 183 184 if post.ContentFacets != nil { 185 facetsJSON.String = *post.ContentFacets 186 facetsJSON.Valid = true 187 } 188 189 if post.Embed != nil { 190 embedJSON.String = *post.Embed 191 embedJSON.Valid = true 192 } 193 194 if post.ContentLabels != nil { 195 labelsJSON.String = *post.ContentLabels 196 labelsJSON.Valid = true 197 } 198 199 insertQuery := ` 200 INSERT INTO posts ( 201 uri, cid, rkey, author_did, community_did, 202 title, content, content_facets, embed, content_labels, 203 created_at, indexed_at 204 ) VALUES ( 205 $1, $2, $3, $4, $5, 206 $6, $7, $8, $9, $10, 207 $11, NOW() 208 ) 209 ON CONFLICT (uri) DO NOTHING 210 RETURNING id 211 ` 212 213 var postID int64 214 insertErr := tx.QueryRowContext( 215 ctx, insertQuery, 216 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID, 217 post.Title, post.Content, facetsJSON, embedJSON, labelsJSON, 218 post.CreatedAt, 219 ).Scan(&postID) 220 221 // If no rows returned, post already exists (idempotent - OK for Jetstream replays) 222 if insertErr == sql.ErrNoRows { 223 log.Printf("Post already indexed: %s (idempotent)", post.URI) 224 if commitErr := tx.Commit(); commitErr != nil { 225 return fmt.Errorf("failed to commit transaction: %w", commitErr) 226 } 227 return nil 228 } 229 230 if insertErr != nil { 231 return fmt.Errorf("failed to insert post: %w", insertErr) 232 } 233 234 // 2. Reconcile comment_count for this newly inserted post 235 // In case any comments arrived out-of-order before this post was indexed 236 // This is the CRITICAL FIX for the race condition identified in the PR review 237 // NOTE: Uses root_uri to count ALL comments in thread (including nested replies) 238 // NOTE: Counts include deleted comments since they're shown as "[deleted]" placeholders 239 // 240 // IMPORTANT: This reconciliation logic and the increment logic in CommentEventConsumer 241 // must stay in sync. Both use the same counting semantics: 242 // - Count ALL comments (including deleted) since deleted comments appear as "[deleted]" placeholders 243 // - This ensures comment_count matches the actual visible thread structure 244 // If you modify one, you must review and potentially modify the other. 245 // See: comment_consumer.go indexCommentAndUpdateCounts() 246 reconcileQuery := ` 247 UPDATE posts 248 SET comment_count = ( 249 SELECT COUNT(*) 250 FROM comments c 251 WHERE c.root_uri = $1 252 ) 253 WHERE id = $2 254 ` 255 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, post.URI, postID) 256 if reconcileErr != nil { 257 // Reconciliation failure is a critical error - it means comment_count will be incorrect 258 // This could cause data inconsistency where the displayed count doesn't match reality 259 // Roll back the transaction to maintain consistency 260 return fmt.Errorf("failed to reconcile comment_count for %s: %w", post.URI, reconcileErr) 261 } 262 263 // Commit transaction 264 if err := tx.Commit(); err != nil { 265 return fmt.Errorf("failed to commit transaction: %w", err) 266 } 267 268 return nil 269} 270 271// validatePostEvent performs security validation on post events 272// This prevents malicious actors from indexing fake posts 273func (c *PostEventConsumer) validatePostEvent(ctx context.Context, repoDID string, post *PostRecordFromJetstream) error { 274 // CRITICAL SECURITY CHECK: 275 // Posts MUST come from community repositories, not user repositories 276 // This prevents users from creating posts that appear to be from communities they don't control 277 // 278 // Example attack prevented: 279 // - User creates post in their own repo (at://user_did/social.coves.community.post/xyz) 280 // - Claims it's for community X (community field = community_did) 281 // - Without this check, fake post would be indexed 282 // 283 // With this check: 284 // - We verify event.Did (repo owner) == post.community (claimed community) 285 // - Reject if mismatch 286 if repoDID != post.Community { 287 return fmt.Errorf("repository DID (%s) doesn't match community DID (%s) - posts must come from community repos", 288 repoDID, post.Community) 289 } 290 291 // CRITICAL: Verify community exists in AppView 292 // Posts MUST reference valid communities (enforced by FK constraint) 293 // If community isn't indexed yet, we must reject the post 294 // Jetstream will replay events, so the post will be indexed once community is ready 295 _, err := c.communityRepo.GetByDID(ctx, post.Community) 296 if err != nil { 297 if communities.IsNotFound(err) { 298 // Reject - community must be indexed before posts 299 // This maintains referential integrity and prevents orphaned posts 300 return fmt.Errorf("community not found: %s - cannot index post before community", post.Community) 301 } 302 // Database error or other issue 303 return fmt.Errorf("failed to verify community exists: %w", err) 304 } 305 306 // CRITICAL: Verify author exists in AppView 307 // Every post MUST have a valid author (enforced by FK constraint) 308 // Even though posts live in community repos, they belong to specific authors 309 // If author isn't indexed yet, we must reject the post 310 _, err = c.userService.GetUserByDID(ctx, post.Author) 311 if err != nil { 312 // Use proper error type checking with errors.Is() 313 if errors.Is(err, users.ErrUserNotFound) { 314 // Reject - author must be indexed before posts 315 // This maintains referential integrity and prevents orphaned posts 316 return fmt.Errorf("author not found: %s - cannot index post before author", post.Author) 317 } 318 // Database error or other issue 319 return fmt.Errorf("failed to verify author exists: %w", err) 320 } 321 322 return nil 323} 324 325// PostRecordFromJetstream represents a post record as received from Jetstream 326// Matches the structure written to PDS via social.coves.community.post 327type PostRecordFromJetstream struct { 328 OriginalAuthor interface{} `json:"originalAuthor,omitempty"` 329 FederatedFrom interface{} `json:"federatedFrom,omitempty"` 330 Location interface{} `json:"location,omitempty"` 331 Title *string `json:"title,omitempty"` 332 Content *string `json:"content,omitempty"` 333 Embed map[string]interface{} `json:"embed,omitempty"` 334 Labels *posts.SelfLabels `json:"labels,omitempty"` 335 Type string `json:"$type"` 336 Community string `json:"community"` 337 Author string `json:"author"` 338 CreatedAt string `json:"createdAt"` 339 Facets []interface{} `json:"facets,omitempty"` 340} 341 342// parsePostRecord converts a raw Jetstream record map to a PostRecordFromJetstream 343func parsePostRecord(record map[string]interface{}) (*PostRecordFromJetstream, error) { 344 // Marshal to JSON and back to ensure proper type conversion 345 recordJSON, err := json.Marshal(record) 346 if err != nil { 347 return nil, fmt.Errorf("failed to marshal record: %w", err) 348 } 349 350 var post PostRecordFromJetstream 351 if err := json.Unmarshal(recordJSON, &post); err != nil { 352 return nil, fmt.Errorf("failed to unmarshal post record: %w", err) 353 } 354 355 // Validate required fields 356 if post.Community == "" { 357 return nil, fmt.Errorf("post record missing community field") 358 } 359 if post.Author == "" { 360 return nil, fmt.Errorf("post record missing author field") 361 } 362 if post.CreatedAt == "" { 363 return nil, fmt.Errorf("post record missing createdAt field") 364 } 365 366 return &post, nil 367}