A community based topic aggregation platform built on atproto

test: add post comment_count reconciliation tests

Add comprehensive integration tests verifying that post comment_count
is correctly reconciled when comments arrive before their parent post
due to out-of-order Jetstream event delivery.

Test coverage:
- Single comment arrives before post - count reconciled to 1
- Multiple comments arrive before post - count reconciled correctly
- Mixed ordering (comments before and after post) - count accurate
- Idempotent post indexing preserves comment_count

Also update misleading FIXME comment in comment_consumer.go to accurate
NOTE explaining that reconciliation IS implemented in post_consumer.go
at lines 210-226.

Fixes issue: Post comment_count Never Reconciles
File: internal/atproto/jetstream/comment_consumer.go:362

+440 -10
+6 -10
internal/atproto/jetstream/comment_consumer.go
··· 359 359 // Parent could be a post (increment comment_count) or a comment (increment reply_count) 360 360 // Parse collection from parent URI to determine target table 361 361 // 362 - // FIXME(P1): Post comment_count reconciliation not implemented 363 - // When a comment arrives before its parent post (common with cross-repo Jetstream ordering), 364 - // the post update below returns 0 rows and we only log a warning. Later, when the post 365 - // is indexed by the post consumer, there's NO reconciliation logic to count pre-existing 366 - // comments. This causes posts to have permanently stale comment_count values. 367 - // 368 - // FIX REQUIRED: Post consumer MUST implement the same reconciliation pattern as comments 369 - // (see lines 292-305 above). When indexing a new post, count any comments where parent_uri 370 - // matches the post URI and set comment_count accordingly. 362 + // NOTE: Post comment_count reconciliation IS implemented in post_consumer.go:210-226 363 + // When a comment arrives before its parent post, the post update below returns 0 rows 364 + // and we log a warning. Later, when the post is indexed, the post consumer reconciles 365 + // comment_count by counting all pre-existing comments. This ensures accurate counts 366 + // despite out-of-order Jetstream event delivery. 371 367 // 372 - // Test demonstrating issue: TestCommentConsumer_PostCountReconciliation_Limitation 368 + // Test coverage: TestPostConsumer_CommentCountReconciliation in post_consumer_test.go 373 369 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 374 370 375 371 var updateQuery string
+434
tests/integration/post_consumer_test.go
··· 1 + package integration 2 + 3 + import ( 4 + "Coves/internal/atproto/jetstream" 5 + "Coves/internal/core/users" 6 + "Coves/internal/db/postgres" 7 + "context" 8 + "fmt" 9 + "testing" 10 + "time" 11 + ) 12 + 13 + // TestPostConsumer_CommentCountReconciliation tests that post comment_count 14 + // is correctly reconciled when comments arrive before the parent post. 15 + // 16 + // This addresses the issue identified in comment_consumer.go:362 where the FIXME 17 + // comment suggests reconciliation is not implemented. This test verifies that 18 + // the reconciliation logic in post_consumer.go:210-226 works correctly. 19 + func TestPostConsumer_CommentCountReconciliation(t *testing.T) { 20 + db := setupTestDB(t) 21 + defer func() { 22 + if err := db.Close(); err != nil { 23 + t.Logf("Failed to close database: %v", err) 24 + } 25 + }() 26 + 27 + ctx := context.Background() 28 + 29 + // Set up repositories and consumers 30 + postRepo := postgres.NewPostRepository(db) 31 + commentRepo := postgres.NewCommentRepository(db) 32 + communityRepo := postgres.NewCommunityRepository(db) 33 + userRepo := postgres.NewUserRepository(db) 34 + userService := users.NewUserService(userRepo, nil, getTestPDSURL()) 35 + 36 + commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 37 + postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 38 + 39 + // Setup test data 40 + testUser := createTestUser(t, db, "reconcile.test", "did:plc:reconcile123") 41 + testCommunity, err := createFeedTestCommunity(db, ctx, "reconcile-community", "owner.test") 42 + if err != nil { 43 + t.Fatalf("Failed to create test community: %v", err) 44 + } 45 + 46 + t.Run("Single comment arrives before post - count reconciled", func(t *testing.T) { 47 + // Scenario: User creates a post 48 + // Another user creates a comment on that post 49 + // Due to Jetstream ordering, comment event arrives BEFORE post event 50 + // When post is finally indexed, comment_count should be 1, not 0 51 + 52 + postRkey := generateTID() 53 + postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 54 + 55 + commentRkey := generateTID() 56 + commentURI := fmt.Sprintf("at://%s/social.coves.community.comment/%s", testUser.DID, commentRkey) 57 + 58 + // Step 1: Index comment FIRST (before parent post exists) 59 + commentEvent := &jetstream.JetstreamEvent{ 60 + Did: testUser.DID, 61 + Kind: "commit", 62 + Commit: &jetstream.CommitEvent{ 63 + Rev: "comment-rev", 64 + Operation: "create", 65 + Collection: "social.coves.community.comment", 66 + RKey: commentRkey, 67 + CID: "bafycomment", 68 + Record: map[string]interface{}{ 69 + "$type": "social.coves.community.comment", 70 + "content": "Comment arriving before parent post!", 71 + "reply": map[string]interface{}{ 72 + "root": map[string]interface{}{ 73 + "uri": postURI, // Points to post that doesn't exist yet 74 + "cid": "bafypost", 75 + }, 76 + "parent": map[string]interface{}{ 77 + "uri": postURI, 78 + "cid": "bafypost", 79 + }, 80 + }, 81 + "createdAt": time.Now().Format(time.RFC3339), 82 + }, 83 + }, 84 + } 85 + 86 + err := commentConsumer.HandleEvent(ctx, commentEvent) 87 + if err != nil { 88 + t.Fatalf("Failed to handle comment event: %v", err) 89 + } 90 + 91 + // Verify comment was indexed 92 + comment, err := commentRepo.GetByURI(ctx, commentURI) 93 + if err != nil { 94 + t.Fatalf("Comment not indexed: %v", err) 95 + } 96 + if comment.ParentURI != postURI { 97 + t.Errorf("Expected comment parent_uri %s, got %s", postURI, comment.ParentURI) 98 + } 99 + 100 + // Step 2: Now index post (arrives late due to Jetstream ordering) 101 + postEvent := &jetstream.JetstreamEvent{ 102 + Did: testCommunity, 103 + Kind: "commit", 104 + Commit: &jetstream.CommitEvent{ 105 + Rev: "post-rev", 106 + Operation: "create", 107 + Collection: "social.coves.community.post", 108 + RKey: postRkey, 109 + CID: "bafypost", 110 + Record: map[string]interface{}{ 111 + "$type": "social.coves.community.post", 112 + "community": testCommunity, 113 + "author": testUser.DID, 114 + "title": "Post arriving after comment", 115 + "content": "This post's comment arrived first!", 116 + "createdAt": time.Now().Format(time.RFC3339), 117 + }, 118 + }, 119 + } 120 + 121 + err = postConsumer.HandleEvent(ctx, postEvent) 122 + if err != nil { 123 + t.Fatalf("Failed to handle post event: %v", err) 124 + } 125 + 126 + // Step 3: Verify post was indexed with CORRECT comment_count 127 + post, err := postRepo.GetByURI(ctx, postURI) 128 + if err != nil { 129 + t.Fatalf("Post not indexed: %v", err) 130 + } 131 + 132 + // THIS IS THE KEY TEST: Post should have comment_count = 1 due to reconciliation 133 + if post.CommentCount != 1 { 134 + t.Errorf("Expected post comment_count to be 1 (reconciled), got %d", post.CommentCount) 135 + t.Logf("This indicates the reconciliation logic in post_consumer.go is not working!") 136 + t.Logf("The FIXME comment at comment_consumer.go:362 may still be valid.") 137 + } 138 + 139 + // Verify via direct query as well 140 + var dbCommentCount int 141 + err = db.QueryRowContext(ctx, "SELECT comment_count FROM posts WHERE uri = $1", postURI).Scan(&dbCommentCount) 142 + if err != nil { 143 + t.Fatalf("Failed to query post comment_count: %v", err) 144 + } 145 + if dbCommentCount != 1 { 146 + t.Errorf("Expected DB comment_count to be 1, got %d", dbCommentCount) 147 + } 148 + }) 149 + 150 + t.Run("Multiple comments arrive before post - count reconciled to correct total", func(t *testing.T) { 151 + postRkey := generateTID() 152 + postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 153 + 154 + // Step 1: Index 3 comments BEFORE the post exists 155 + for i := 1; i <= 3; i++ { 156 + commentRkey := generateTID() 157 + commentEvent := &jetstream.JetstreamEvent{ 158 + Did: testUser.DID, 159 + Kind: "commit", 160 + Commit: &jetstream.CommitEvent{ 161 + Rev: fmt.Sprintf("comment-%d-rev", i), 162 + Operation: "create", 163 + Collection: "social.coves.community.comment", 164 + RKey: commentRkey, 165 + CID: fmt.Sprintf("bafycomment%d", i), 166 + Record: map[string]interface{}{ 167 + "$type": "social.coves.community.comment", 168 + "content": fmt.Sprintf("Comment %d before post", i), 169 + "reply": map[string]interface{}{ 170 + "root": map[string]interface{}{ 171 + "uri": postURI, 172 + "cid": "bafypost2", 173 + }, 174 + "parent": map[string]interface{}{ 175 + "uri": postURI, 176 + "cid": "bafypost2", 177 + }, 178 + }, 179 + "createdAt": time.Now().Format(time.RFC3339), 180 + }, 181 + }, 182 + } 183 + 184 + err := commentConsumer.HandleEvent(ctx, commentEvent) 185 + if err != nil { 186 + t.Fatalf("Failed to handle comment %d event: %v", i, err) 187 + } 188 + } 189 + 190 + // Step 2: Now index the post 191 + postEvent := &jetstream.JetstreamEvent{ 192 + Did: testCommunity, 193 + Kind: "commit", 194 + Commit: &jetstream.CommitEvent{ 195 + Rev: "post2-rev", 196 + Operation: "create", 197 + Collection: "social.coves.community.post", 198 + RKey: postRkey, 199 + CID: "bafypost2", 200 + Record: map[string]interface{}{ 201 + "$type": "social.coves.community.post", 202 + "community": testCommunity, 203 + "author": testUser.DID, 204 + "title": "Post with 3 pre-existing comments", 205 + "content": "All 3 comments arrived before this post!", 206 + "createdAt": time.Now().Format(time.RFC3339), 207 + }, 208 + }, 209 + } 210 + 211 + err := postConsumer.HandleEvent(ctx, postEvent) 212 + if err != nil { 213 + t.Fatalf("Failed to handle post event: %v", err) 214 + } 215 + 216 + // Step 3: Verify post has comment_count = 3 217 + post, err := postRepo.GetByURI(ctx, postURI) 218 + if err != nil { 219 + t.Fatalf("Post not indexed: %v", err) 220 + } 221 + 222 + if post.CommentCount != 3 { 223 + t.Errorf("Expected post comment_count to be 3 (reconciled), got %d", post.CommentCount) 224 + } 225 + }) 226 + 227 + t.Run("Comments before and after post - count remains accurate", func(t *testing.T) { 228 + postRkey := generateTID() 229 + postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 230 + 231 + // Step 1: Index 2 comments BEFORE post 232 + for i := 1; i <= 2; i++ { 233 + commentRkey := generateTID() 234 + commentEvent := &jetstream.JetstreamEvent{ 235 + Did: testUser.DID, 236 + Kind: "commit", 237 + Commit: &jetstream.CommitEvent{ 238 + Rev: fmt.Sprintf("before-%d-rev", i), 239 + Operation: "create", 240 + Collection: "social.coves.community.comment", 241 + RKey: commentRkey, 242 + CID: fmt.Sprintf("bafybefore%d", i), 243 + Record: map[string]interface{}{ 244 + "$type": "social.coves.community.comment", 245 + "content": fmt.Sprintf("Before comment %d", i), 246 + "reply": map[string]interface{}{ 247 + "root": map[string]interface{}{ 248 + "uri": postURI, 249 + "cid": "bafypost3", 250 + }, 251 + "parent": map[string]interface{}{ 252 + "uri": postURI, 253 + "cid": "bafypost3", 254 + }, 255 + }, 256 + "createdAt": time.Now().Format(time.RFC3339), 257 + }, 258 + }, 259 + } 260 + 261 + err := commentConsumer.HandleEvent(ctx, commentEvent) 262 + if err != nil { 263 + t.Fatalf("Failed to handle before-comment %d: %v", i, err) 264 + } 265 + } 266 + 267 + // Step 2: Index the post (should reconcile to 2) 268 + postEvent := &jetstream.JetstreamEvent{ 269 + Did: testCommunity, 270 + Kind: "commit", 271 + Commit: &jetstream.CommitEvent{ 272 + Rev: "post3-rev", 273 + Operation: "create", 274 + Collection: "social.coves.community.post", 275 + RKey: postRkey, 276 + CID: "bafypost3", 277 + Record: map[string]interface{}{ 278 + "$type": "social.coves.community.post", 279 + "community": testCommunity, 280 + "author": testUser.DID, 281 + "title": "Post with before and after comments", 282 + "content": "Testing mixed ordering", 283 + "createdAt": time.Now().Format(time.RFC3339), 284 + }, 285 + }, 286 + } 287 + 288 + err := postConsumer.HandleEvent(ctx, postEvent) 289 + if err != nil { 290 + t.Fatalf("Failed to handle post event: %v", err) 291 + } 292 + 293 + // Verify count is 2 294 + post, err := postRepo.GetByURI(ctx, postURI) 295 + if err != nil { 296 + t.Fatalf("Post not indexed: %v", err) 297 + } 298 + if post.CommentCount != 2 { 299 + t.Errorf("Expected comment_count=2 after reconciliation, got %d", post.CommentCount) 300 + } 301 + 302 + // Step 3: Add 1 more comment AFTER post exists 303 + commentRkey := generateTID() 304 + afterCommentEvent := &jetstream.JetstreamEvent{ 305 + Did: testUser.DID, 306 + Kind: "commit", 307 + Commit: &jetstream.CommitEvent{ 308 + Rev: "after-rev", 309 + Operation: "create", 310 + Collection: "social.coves.community.comment", 311 + RKey: commentRkey, 312 + CID: "bafyafter", 313 + Record: map[string]interface{}{ 314 + "$type": "social.coves.community.comment", 315 + "content": "Comment after post exists", 316 + "reply": map[string]interface{}{ 317 + "root": map[string]interface{}{ 318 + "uri": postURI, 319 + "cid": "bafypost3", 320 + }, 321 + "parent": map[string]interface{}{ 322 + "uri": postURI, 323 + "cid": "bafypost3", 324 + }, 325 + }, 326 + "createdAt": time.Now().Format(time.RFC3339), 327 + }, 328 + }, 329 + } 330 + 331 + err = commentConsumer.HandleEvent(ctx, afterCommentEvent) 332 + if err != nil { 333 + t.Fatalf("Failed to handle after-comment: %v", err) 334 + } 335 + 336 + // Verify count incremented to 3 337 + post, err = postRepo.GetByURI(ctx, postURI) 338 + if err != nil { 339 + t.Fatalf("Failed to get post after increment: %v", err) 340 + } 341 + if post.CommentCount != 3 { 342 + t.Errorf("Expected comment_count=3 after increment, got %d", post.CommentCount) 343 + } 344 + }) 345 + 346 + t.Run("Idempotent post indexing preserves comment_count", func(t *testing.T) { 347 + postRkey := generateTID() 348 + postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", testCommunity, postRkey) 349 + 350 + // Create comment first 351 + commentRkey := generateTID() 352 + commentEvent := &jetstream.JetstreamEvent{ 353 + Did: testUser.DID, 354 + Kind: "commit", 355 + Commit: &jetstream.CommitEvent{ 356 + Rev: "idem-comment-rev", 357 + Operation: "create", 358 + Collection: "social.coves.community.comment", 359 + RKey: commentRkey, 360 + CID: "bafyidemcomment", 361 + Record: map[string]interface{}{ 362 + "$type": "social.coves.community.comment", 363 + "content": "Comment for idempotent test", 364 + "reply": map[string]interface{}{ 365 + "root": map[string]interface{}{ 366 + "uri": postURI, 367 + "cid": "bafyidempost", 368 + }, 369 + "parent": map[string]interface{}{ 370 + "uri": postURI, 371 + "cid": "bafyidempost", 372 + }, 373 + }, 374 + "createdAt": time.Now().Format(time.RFC3339), 375 + }, 376 + }, 377 + } 378 + 379 + err := commentConsumer.HandleEvent(ctx, commentEvent) 380 + if err != nil { 381 + t.Fatalf("Failed to create comment: %v", err) 382 + } 383 + 384 + // Index post (should reconcile to 1) 385 + postEvent := &jetstream.JetstreamEvent{ 386 + Did: testCommunity, 387 + Kind: "commit", 388 + Commit: &jetstream.CommitEvent{ 389 + Rev: "idem-post-rev", 390 + Operation: "create", 391 + Collection: "social.coves.community.post", 392 + RKey: postRkey, 393 + CID: "bafyidempost", 394 + Record: map[string]interface{}{ 395 + "$type": "social.coves.community.post", 396 + "community": testCommunity, 397 + "author": testUser.DID, 398 + "title": "Idempotent test post", 399 + "content": "Testing idempotent indexing", 400 + "createdAt": time.Now().Format(time.RFC3339), 401 + }, 402 + }, 403 + } 404 + 405 + err = postConsumer.HandleEvent(ctx, postEvent) 406 + if err != nil { 407 + t.Fatalf("Failed to index post first time: %v", err) 408 + } 409 + 410 + // Verify count is 1 411 + post, err := postRepo.GetByURI(ctx, postURI) 412 + if err != nil { 413 + t.Fatalf("Failed to get post: %v", err) 414 + } 415 + if post.CommentCount != 1 { 416 + t.Errorf("Expected comment_count=1 after first index, got %d", post.CommentCount) 417 + } 418 + 419 + // Replay same post event (idempotent - should skip) 420 + err = postConsumer.HandleEvent(ctx, postEvent) 421 + if err != nil { 422 + t.Fatalf("Idempotent post event should not error: %v", err) 423 + } 424 + 425 + // Verify count still 1 (not reset to 0) 426 + post, err = postRepo.GetByURI(ctx, postURI) 427 + if err != nil { 428 + t.Fatalf("Failed to get post after replay: %v", err) 429 + } 430 + if post.CommentCount != 1 { 431 + t.Errorf("Expected comment_count=1 after replay (idempotent), got %d", post.CommentCount) 432 + } 433 + }) 434 + }