A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/core/communities"
5 "Coves/internal/core/posts"
6 "Coves/internal/core/users"
7 "context"
8 "database/sql"
9 "encoding/json"
10 "errors"
11 "fmt"
12 "log"
13 "time"
14)
15
16// PostEventConsumer consumes post-related events from Jetstream
17// Handles CREATE and DELETE operations for social.coves.community.post
18// UPDATE handler will be added when that feature is implemented
19type PostEventConsumer struct {
20 postRepo posts.Repository
21 communityRepo communities.Repository
22 userService users.UserService
23 db *sql.DB // Direct DB access for atomic count reconciliation
24}
25
26// NewPostEventConsumer creates a new Jetstream consumer for post events
27func NewPostEventConsumer(
28 postRepo posts.Repository,
29 communityRepo communities.Repository,
30 userService users.UserService,
31 db *sql.DB,
32) *PostEventConsumer {
33 return &PostEventConsumer{
34 postRepo: postRepo,
35 communityRepo: communityRepo,
36 userService: userService,
37 db: db,
38 }
39}
40
41// HandleEvent processes a Jetstream event for post records
42// Handles CREATE and DELETE operations - UPDATE deferred until that feature exists
43func (c *PostEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
44 // We only care about commit events for post records
45 if event.Kind != "commit" || event.Commit == nil {
46 return nil
47 }
48
49 commit := event.Commit
50
51 // Handle post record operations
52 if commit.Collection == "social.coves.community.post" {
53 switch commit.Operation {
54 case "create":
55 return c.createPost(ctx, event.Did, commit)
56 case "delete":
57 return c.deletePost(ctx, event.Did, commit)
58 }
59 }
60
61 // Silently ignore other operations (update) and other collections
62 return nil
63}
64
65// createPost indexes a new post from the firehose
66func (c *PostEventConsumer) createPost(ctx context.Context, repoDID string, commit *CommitEvent) error {
67 if commit.Record == nil {
68 return fmt.Errorf("post create event missing record data")
69 }
70
71 // Parse the post record
72 postRecord, err := parsePostRecord(commit.Record)
73 if err != nil {
74 return fmt.Errorf("failed to parse post record: %w", err)
75 }
76
77 // SECURITY: Validate this is a legitimate post event
78 if err := c.validatePostEvent(ctx, repoDID, postRecord); err != nil {
79 log.Printf("🚨 SECURITY: Rejecting post event: %v", err)
80 return err
81 }
82
83 // Build AT-URI for this post
84 // Format: at://community_did/social.coves.community.post/rkey
85 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", repoDID, commit.RKey)
86
87 // Parse timestamp from record
88 createdAt, err := time.Parse(time.RFC3339, postRecord.CreatedAt)
89 if err != nil {
90 // Fallback to current time if parsing fails
91 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
92 createdAt = time.Now()
93 }
94
95 // Build post entity
96 post := &posts.Post{
97 URI: uri,
98 CID: commit.CID,
99 RKey: commit.RKey,
100 AuthorDID: postRecord.Author,
101 CommunityDID: postRecord.Community,
102 Title: postRecord.Title,
103 Content: postRecord.Content,
104 CreatedAt: createdAt,
105 IndexedAt: time.Now(),
106 // Stats remain at 0 (no votes yet)
107 UpvoteCount: 0,
108 DownvoteCount: 0,
109 Score: 0,
110 CommentCount: 0,
111 }
112
113 // Serialize JSON fields (facets, embed, labels)
114 // Return error if any non-empty field fails to serialize (prevents silent data loss)
115 if postRecord.Facets != nil {
116 facetsJSON, marshalErr := json.Marshal(postRecord.Facets)
117 if marshalErr != nil {
118 return fmt.Errorf("failed to serialize facets: %w", marshalErr)
119 }
120 facetsStr := string(facetsJSON)
121 post.ContentFacets = &facetsStr
122 }
123
124 if postRecord.Embed != nil {
125 embedJSON, marshalErr := json.Marshal(postRecord.Embed)
126 if marshalErr != nil {
127 return fmt.Errorf("failed to serialize embed: %w", marshalErr)
128 }
129 embedStr := string(embedJSON)
130 post.Embed = &embedStr
131 }
132
133 if postRecord.Labels != nil {
134 labelsJSON, marshalErr := json.Marshal(postRecord.Labels)
135 if marshalErr != nil {
136 return fmt.Errorf("failed to serialize labels: %w", marshalErr)
137 }
138 labelsStr := string(labelsJSON)
139 post.ContentLabels = &labelsStr
140 }
141
142 // Atomically: Index post + Reconcile comment count for out-of-order arrivals
143 if err := c.indexPostAndReconcileCounts(ctx, post); err != nil {
144 return fmt.Errorf("failed to index post and reconcile counts: %w", err)
145 }
146
147 log.Printf("✓ Indexed post: %s (author: %s, community: %s, rkey: %s)",
148 uri, post.AuthorDID, post.CommunityDID, commit.RKey)
149 return nil
150}
151
152// deletePost handles post deletion events from Jetstream
153// Soft-deletes the post in AppView database by setting deleted_at timestamp
154func (c *PostEventConsumer) deletePost(ctx context.Context, repoDID string, commit *CommitEvent) error {
155 // Build AT-URI for this post
156 // Format: at://community_did/social.coves.community.post/rkey
157 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", repoDID, commit.RKey)
158
159 // Soft delete the post in AppView
160 if err := c.postRepo.SoftDelete(ctx, uri); err != nil {
161 return fmt.Errorf("failed to soft delete post: %w", err)
162 }
163
164 log.Printf("✓ Deleted post: %s (community: %s, rkey: %s)", uri, repoDID, commit.RKey)
165 return nil
166}
167
168// indexPostAndReconcileCounts atomically indexes a post and reconciles comment counts
169// This fixes the race condition where comments arrive before their parent post
170func (c *PostEventConsumer) indexPostAndReconcileCounts(ctx context.Context, post *posts.Post) error {
171 tx, err := c.db.BeginTx(ctx, nil)
172 if err != nil {
173 return fmt.Errorf("failed to begin transaction: %w", err)
174 }
175 defer func() {
176 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
177 log.Printf("Failed to rollback transaction: %v", rollbackErr)
178 }
179 }()
180
181 // 1. Insert the post (idempotent with RETURNING clause)
182 var facetsJSON, embedJSON, labelsJSON sql.NullString
183
184 if post.ContentFacets != nil {
185 facetsJSON.String = *post.ContentFacets
186 facetsJSON.Valid = true
187 }
188
189 if post.Embed != nil {
190 embedJSON.String = *post.Embed
191 embedJSON.Valid = true
192 }
193
194 if post.ContentLabels != nil {
195 labelsJSON.String = *post.ContentLabels
196 labelsJSON.Valid = true
197 }
198
199 insertQuery := `
200 INSERT INTO posts (
201 uri, cid, rkey, author_did, community_did,
202 title, content, content_facets, embed, content_labels,
203 created_at, indexed_at
204 ) VALUES (
205 $1, $2, $3, $4, $5,
206 $6, $7, $8, $9, $10,
207 $11, NOW()
208 )
209 ON CONFLICT (uri) DO NOTHING
210 RETURNING id
211 `
212
213 var postID int64
214 insertErr := tx.QueryRowContext(
215 ctx, insertQuery,
216 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID,
217 post.Title, post.Content, facetsJSON, embedJSON, labelsJSON,
218 post.CreatedAt,
219 ).Scan(&postID)
220
221 // If no rows returned, post already exists (idempotent - OK for Jetstream replays)
222 if insertErr == sql.ErrNoRows {
223 log.Printf("Post already indexed: %s (idempotent)", post.URI)
224 if commitErr := tx.Commit(); commitErr != nil {
225 return fmt.Errorf("failed to commit transaction: %w", commitErr)
226 }
227 return nil
228 }
229
230 if insertErr != nil {
231 return fmt.Errorf("failed to insert post: %w", insertErr)
232 }
233
234 // 2. Reconcile comment_count for this newly inserted post
235 // In case any comments arrived out-of-order before this post was indexed
236 // This is the CRITICAL FIX for the race condition identified in the PR review
237 // NOTE: Uses root_uri to count ALL comments in thread (including nested replies)
238 // NOTE: Counts include deleted comments since they're shown as "[deleted]" placeholders
239 //
240 // IMPORTANT: This reconciliation logic and the increment logic in CommentEventConsumer
241 // must stay in sync. Both use the same counting semantics:
242 // - Count ALL comments (including deleted) since deleted comments appear as "[deleted]" placeholders
243 // - This ensures comment_count matches the actual visible thread structure
244 // If you modify one, you must review and potentially modify the other.
245 // See: comment_consumer.go indexCommentAndUpdateCounts()
246 reconcileQuery := `
247 UPDATE posts
248 SET comment_count = (
249 SELECT COUNT(*)
250 FROM comments c
251 WHERE c.root_uri = $1
252 )
253 WHERE id = $2
254 `
255 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, post.URI, postID)
256 if reconcileErr != nil {
257 // Reconciliation failure is a critical error - it means comment_count will be incorrect
258 // This could cause data inconsistency where the displayed count doesn't match reality
259 // Roll back the transaction to maintain consistency
260 return fmt.Errorf("failed to reconcile comment_count for %s: %w", post.URI, reconcileErr)
261 }
262
263 // Commit transaction
264 if err := tx.Commit(); err != nil {
265 return fmt.Errorf("failed to commit transaction: %w", err)
266 }
267
268 return nil
269}
270
271// validatePostEvent performs security validation on post events
272// This prevents malicious actors from indexing fake posts
273func (c *PostEventConsumer) validatePostEvent(ctx context.Context, repoDID string, post *PostRecordFromJetstream) error {
274 // CRITICAL SECURITY CHECK:
275 // Posts MUST come from community repositories, not user repositories
276 // This prevents users from creating posts that appear to be from communities they don't control
277 //
278 // Example attack prevented:
279 // - User creates post in their own repo (at://user_did/social.coves.community.post/xyz)
280 // - Claims it's for community X (community field = community_did)
281 // - Without this check, fake post would be indexed
282 //
283 // With this check:
284 // - We verify event.Did (repo owner) == post.community (claimed community)
285 // - Reject if mismatch
286 if repoDID != post.Community {
287 return fmt.Errorf("repository DID (%s) doesn't match community DID (%s) - posts must come from community repos",
288 repoDID, post.Community)
289 }
290
291 // CRITICAL: Verify community exists in AppView
292 // Posts MUST reference valid communities (enforced by FK constraint)
293 // If community isn't indexed yet, we must reject the post
294 // Jetstream will replay events, so the post will be indexed once community is ready
295 _, err := c.communityRepo.GetByDID(ctx, post.Community)
296 if err != nil {
297 if communities.IsNotFound(err) {
298 // Reject - community must be indexed before posts
299 // This maintains referential integrity and prevents orphaned posts
300 return fmt.Errorf("community not found: %s - cannot index post before community", post.Community)
301 }
302 // Database error or other issue
303 return fmt.Errorf("failed to verify community exists: %w", err)
304 }
305
306 // CRITICAL: Verify author exists in AppView
307 // Every post MUST have a valid author (enforced by FK constraint)
308 // Even though posts live in community repos, they belong to specific authors
309 // If author isn't indexed yet, we must reject the post
310 _, err = c.userService.GetUserByDID(ctx, post.Author)
311 if err != nil {
312 // Use proper error type checking with errors.Is()
313 if errors.Is(err, users.ErrUserNotFound) {
314 // Reject - author must be indexed before posts
315 // This maintains referential integrity and prevents orphaned posts
316 return fmt.Errorf("author not found: %s - cannot index post before author", post.Author)
317 }
318 // Database error or other issue
319 return fmt.Errorf("failed to verify author exists: %w", err)
320 }
321
322 return nil
323}
324
325// PostRecordFromJetstream represents a post record as received from Jetstream
326// Matches the structure written to PDS via social.coves.community.post
327type PostRecordFromJetstream struct {
328 OriginalAuthor interface{} `json:"originalAuthor,omitempty"`
329 FederatedFrom interface{} `json:"federatedFrom,omitempty"`
330 Location interface{} `json:"location,omitempty"`
331 Title *string `json:"title,omitempty"`
332 Content *string `json:"content,omitempty"`
333 Embed map[string]interface{} `json:"embed,omitempty"`
334 Labels *posts.SelfLabels `json:"labels,omitempty"`
335 Type string `json:"$type"`
336 Community string `json:"community"`
337 Author string `json:"author"`
338 CreatedAt string `json:"createdAt"`
339 Facets []interface{} `json:"facets,omitempty"`
340}
341
342// parsePostRecord converts a raw Jetstream record map to a PostRecordFromJetstream
343func parsePostRecord(record map[string]interface{}) (*PostRecordFromJetstream, error) {
344 // Marshal to JSON and back to ensure proper type conversion
345 recordJSON, err := json.Marshal(record)
346 if err != nil {
347 return nil, fmt.Errorf("failed to marshal record: %w", err)
348 }
349
350 var post PostRecordFromJetstream
351 if err := json.Unmarshal(recordJSON, &post); err != nil {
352 return nil, fmt.Errorf("failed to unmarshal post record: %w", err)
353 }
354
355 // Validate required fields
356 if post.Community == "" {
357 return nil, fmt.Errorf("post record missing community field")
358 }
359 if post.Author == "" {
360 return nil, fmt.Errorf("post record missing author field")
361 }
362 if post.CreatedAt == "" {
363 return nil, fmt.Errorf("post record missing createdAt field")
364 }
365
366 return &post, nil
367}