A community based topic aggregation platform built on atproto

fix(feeds): resolve hot sort pagination duplicates caused by time drift

The hot_rank cursor was using NOW() for comparison, but time passes between
page requests. This caused posts to drift across cursor boundaries, resulting
in duplicates appearing on subsequent pages.

Fix: Store cursor creation timestamp in the cursor and use it for hot_rank
computation in subsequent queries, ensuring stable comparisons.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+153 -20
+5 -1
internal/db/postgres/discover_repo.go
··· 5 5 "context" 6 6 "database/sql" 7 7 "fmt" 8 + "time" 8 9 ) 9 10 10 11 type postgresDiscoverRepo struct { ··· 33 34 34 35 // GetDiscover retrieves posts from ALL communities (public feed) 35 36 func (r *postgresDiscoverRepo) GetDiscover(ctx context.Context, req discover.GetDiscoverRequest) ([]*discover.FeedViewPost, *string, error) { 37 + // Capture query time for stable cursor generation (used for hot sort pagination) 38 + queryTime := time.Now() 39 + 36 40 // Build ORDER BY clause based on sort type 37 41 orderBy, timeFilter := r.buildSortClause(req.Sort, req.Timeframe) 38 42 ··· 119 123 hotRanks = hotRanks[:req.Limit] 120 124 lastPost := feedPosts[len(feedPosts)-1].Post 121 125 lastHotRank := hotRanks[len(hotRanks)-1] 122 - cursorStr := r.feedRepoBase.buildCursor(lastPost, req.Sort, lastHotRank) 126 + cursorStr := r.feedRepoBase.buildCursor(lastPost, req.Sort, lastHotRank, queryTime) 123 127 cursor = &cursorStr 124 128 } 125 129
+5 -1
internal/db/postgres/feed_repo.go
··· 5 5 "context" 6 6 "database/sql" 7 7 "fmt" 8 + "time" 8 9 ) 9 10 10 11 type postgresFeedRepo struct { ··· 37 38 // GetCommunityFeed retrieves posts from a community with sorting and pagination 38 39 // Single query with JOINs for optimal performance 39 40 func (r *postgresFeedRepo) GetCommunityFeed(ctx context.Context, req communityFeeds.GetCommunityFeedRequest) ([]*communityFeeds.FeedViewPost, *string, error) { 41 + // Capture query time for stable cursor generation (used for hot sort pagination) 42 + queryTime := time.Now() 43 + 40 44 // Build ORDER BY clause based on sort type 41 45 orderBy, timeFilter := r.feedRepoBase.buildSortClause(req.Sort, req.Timeframe) 42 46 ··· 125 129 hotRanks = hotRanks[:req.Limit] 126 130 lastPost := feedPosts[len(feedPosts)-1].Post 127 131 lastHotRank := hotRanks[len(hotRanks)-1] 128 - cursorStr := r.feedRepoBase.buildCursor(lastPost, req.Sort, lastHotRank) 132 + cursorStr := r.feedRepoBase.buildCursor(lastPost, req.Sort, lastHotRank, queryTime) 129 133 cursor = &cursorStr 130 134 } 131 135
+28 -17
internal/db/postgres/feed_repo_base.go
··· 192 192 return filter, []interface{}{score, createdAt, uri}, nil 193 193 194 194 case "hot": 195 - // Cursor format: hot_rank::timestamp::uri 196 - // CRITICAL: Must use computed hot_rank, not raw score, to prevent pagination bugs 197 - if len(payloadParts) != 3 { 195 + // Cursor format: hot_rank::post_created_at::uri::cursor_timestamp 196 + // CRITICAL: cursor_timestamp is when the cursor was created, used for stable hot_rank comparison 197 + // This prevents pagination bugs caused by hot_rank drift when NOW() changes between requests 198 + if len(payloadParts) != 4 { 198 199 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 199 200 } 200 201 201 202 hotRankStr := payloadParts[0] 202 - createdAt := payloadParts[1] 203 + postCreatedAt := payloadParts[1] 203 204 uri := payloadParts[2] 205 + cursorTimestamp := payloadParts[3] 204 206 205 207 // Validate hot_rank is numeric (float) 206 208 hotRank := 0.0 ··· 208 210 return "", nil, fmt.Errorf("invalid cursor hot rank") 209 211 } 210 212 211 - // Validate timestamp format 212 - if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 213 - return "", nil, fmt.Errorf("invalid cursor timestamp") 213 + // Validate post timestamp format 214 + if _, err := time.Parse(time.RFC3339Nano, postCreatedAt); err != nil { 215 + return "", nil, fmt.Errorf("invalid cursor post timestamp") 214 216 } 215 217 216 218 // Validate URI format (must be AT-URI) ··· 218 220 return "", nil, fmt.Errorf("invalid cursor URI") 219 221 } 220 222 221 - // CRITICAL: Compare against the computed hot_rank expression, not p.score 222 - filter := fmt.Sprintf(`AND ((%s < $%d OR (%s = $%d AND p.created_at < $%d) OR (%s = $%d AND p.created_at = $%d AND p.uri < $%d)) AND p.uri != $%d)`, 223 - r.hotRankExpression, paramOffset, 224 - r.hotRankExpression, paramOffset, paramOffset+1, 225 - r.hotRankExpression, paramOffset, paramOffset+1, paramOffset+2, 223 + // Validate cursor timestamp format 224 + if _, err := time.Parse(time.RFC3339Nano, cursorTimestamp); err != nil { 225 + return "", nil, fmt.Errorf("invalid cursor timestamp") 226 + } 227 + 228 + // CRITICAL: Use cursor_timestamp instead of NOW() for stable hot_rank comparison 229 + // This ensures posts don't drift across page boundaries due to time passing 230 + stableHotRankExpr := fmt.Sprintf( 231 + `((p.score + 1) / POWER(EXTRACT(EPOCH FROM ($%d::timestamptz - p.created_at))/3600 + 2, 1.5))`, 226 232 paramOffset+3) 227 - return filter, []interface{}{hotRank, createdAt, uri, uri}, nil 233 + 234 + // Use tuple comparison for clean keyset pagination: (hot_rank, created_at, uri) < (cursor_values) 235 + filter := fmt.Sprintf(`AND ((%s, p.created_at, p.uri) < ($%d, $%d, $%d))`, 236 + stableHotRankExpr, paramOffset, paramOffset+1, paramOffset+2) 237 + return filter, []interface{}{hotRank, postCreatedAt, uri, cursorTimestamp}, nil 228 238 229 239 default: 230 240 return "", nil, nil ··· 233 243 234 244 // buildCursor creates HMAC-signed pagination cursor from last post 235 245 // SECURITY: Cursor is signed with HMAC-SHA256 to prevent manipulation 236 - func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64) string { 246 + // queryTime is the timestamp when the query was executed, used for stable hot_rank comparison 247 + func (r *feedRepoBase) buildCursor(post *posts.PostView, sort string, hotRank float64, queryTime time.Time) string { 237 248 var payload string 238 249 // Use :: as delimiter following Bluesky convention 239 250 const delimiter = "::" ··· 252 263 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 253 264 254 265 case "hot": 255 - // Format: hot_rank::timestamp::uri 256 - // CRITICAL: Use computed hot_rank with full precision 266 + // Format: hot_rank::post_created_at::uri::cursor_timestamp 267 + // CRITICAL: Include cursor_timestamp for stable hot_rank comparison across requests 257 268 hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64) 258 - payload = fmt.Sprintf("%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 269 + payload = fmt.Sprintf("%s%s%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI, delimiter, queryTime.Format(time.RFC3339Nano)) 259 270 260 271 default: 261 272 payload = post.URI
+5 -1
internal/db/postgres/timeline_repo.go
··· 5 5 "context" 6 6 "database/sql" 7 7 "fmt" 8 + "time" 8 9 ) 9 10 10 11 type postgresTimelineRepo struct { ··· 35 36 // GetTimeline retrieves posts from all communities the user subscribes to 36 37 // Single query with JOINs for optimal performance 37 38 func (r *postgresTimelineRepo) GetTimeline(ctx context.Context, req timeline.GetTimelineRequest) ([]*timeline.FeedViewPost, *string, error) { 39 + // Capture query time for stable cursor generation (used for hot sort pagination) 40 + queryTime := time.Now() 41 + 38 42 // Build ORDER BY clause based on sort type 39 43 orderBy, timeFilter := r.buildSortClause(req.Sort, req.Timeframe) 40 44 ··· 125 129 hotRanks = hotRanks[:req.Limit] 126 130 lastPost := feedPosts[len(feedPosts)-1].Post 127 131 lastHotRank := hotRanks[len(hotRanks)-1] 128 - cursorStr := r.feedRepoBase.buildCursor(lastPost, req.Sort, lastHotRank) 132 + cursorStr := r.feedRepoBase.buildCursor(lastPost, req.Sort, lastHotRank, queryTime) 129 133 cursor = &cursorStr 130 134 } 131 135
+110
tests/integration/feed_test.go
··· 700 700 t.Logf("SUCCESS: All posts with similar hot ranks preserved (precision bug fixed)") 701 701 } 702 702 703 + // TestGetCommunityFeed_HotCursorTimeDrift tests that hot sort pagination is stable across time drift. 704 + // Regression test for a bug where posts would appear multiple times or be skipped when: 705 + // 1. Time passes between page 1 and page 2 requests 706 + // 2. Many posts have similar hot ranks 707 + // 708 + // Root cause: The cursor stored a hot_rank computed with NOW(), but the next query 709 + // also used NOW() (which had advanced). This caused posts to drift across the cursor boundary. 710 + // 711 + // Fix: Store the cursor creation timestamp in the cursor and use it for subsequent comparisons, 712 + // ensuring stable hot_rank computation across pagination requests. 713 + func TestGetCommunityFeed_HotCursorTimeDrift(t *testing.T) { 714 + if testing.Short() { 715 + t.Skip("Skipping integration test in short mode") 716 + } 717 + 718 + db := setupTestDB(t) 719 + t.Cleanup(func() { _ = db.Close() }) 720 + 721 + // Setup services 722 + feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 723 + communityRepo := postgres.NewCommunityRepository(db) 724 + communityService := communities.NewCommunityService( 725 + communityRepo, 726 + "http://localhost:3001", 727 + "did:web:test.coves.social", 728 + "test.coves.social", 729 + nil, 730 + ) 731 + feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 732 + handler := communityFeed.NewGetCommunityHandler(feedService, nil, nil) 733 + 734 + // Setup test data 735 + ctx := context.Background() 736 + testID := time.Now().UnixNano() 737 + communityDID, err := createFeedTestCommunity(db, ctx, fmt.Sprintf("timedrift-%d", testID), fmt.Sprintf("timedrift-%d.test", testID)) 738 + require.NoError(t, err) 739 + 740 + // Create 15 posts all with the SAME score and created at the SAME time 741 + // This maximizes the chance of time drift causing duplicates: 742 + // - All posts have nearly identical hot ranks 743 + // - Any small change in NOW() could cause posts to swap order 744 + baseTime := time.Now().Add(-1 * time.Hour) 745 + var allPostURIs []string 746 + for i := 0; i < 15; i++ { 747 + // Add tiny offsets (1ms) to created_at for deterministic ordering 748 + postURI := createTestPost(t, db, communityDID, fmt.Sprintf("did:plc:user%d", i), 749 + fmt.Sprintf("Post %d", i), 10, baseTime.Add(time.Duration(i)*time.Millisecond)) 750 + allPostURIs = append(allPostURIs, postURI) 751 + } 752 + 753 + // Paginate through all posts with limit=5 754 + seenURIs := make(map[string]int) 755 + var cursor *string 756 + pageNum := 0 757 + 758 + for { 759 + pageNum++ 760 + url := fmt.Sprintf("/xrpc/social.coves.communityFeed.getCommunity?community=%s&sort=hot&limit=5", communityDID) 761 + if cursor != nil { 762 + url += "&cursor=" + *cursor 763 + } 764 + 765 + req := httptest.NewRequest(http.MethodGet, url, nil) 766 + rec := httptest.NewRecorder() 767 + handler.HandleGetCommunity(rec, req) 768 + 769 + require.Equal(t, http.StatusOK, rec.Code, "Page %d failed: %s", pageNum, rec.Body.String()) 770 + 771 + var page communityFeeds.FeedResponse 772 + err = json.Unmarshal(rec.Body.Bytes(), &page) 773 + require.NoError(t, err) 774 + 775 + if len(page.Feed) == 0 { 776 + break 777 + } 778 + 779 + for _, p := range page.Feed { 780 + seenURIs[p.Post.URI]++ 781 + if seenURIs[p.Post.URI] > 1 { 782 + t.Errorf("DUPLICATE on page %d: %s (seen %d times)", pageNum, p.Post.URI, seenURIs[p.Post.URI]) 783 + } 784 + } 785 + 786 + cursor = page.Cursor 787 + if cursor == nil { 788 + break 789 + } 790 + 791 + // Prevent infinite loops 792 + if pageNum > 10 { 793 + t.Fatal("Too many pages - possible infinite loop") 794 + } 795 + } 796 + 797 + // Verify we saw all posts exactly once 798 + assert.Equal(t, 15, len(seenURIs), "Should see all 15 posts") 799 + for uri, count := range seenURIs { 800 + if count != 1 { 801 + t.Errorf("Post %s seen %d times (expected 1)", uri, count) 802 + } 803 + } 804 + 805 + // Verify we saw all the posts we created 806 + for _, uri := range allPostURIs { 807 + assert.Contains(t, seenURIs, uri, "Missing post: %s", uri) 808 + } 809 + 810 + t.Logf("SUCCESS: All 15 posts seen exactly once across %d pages (time drift bug fixed)", pageNum) 811 + } 812 + 703 813 // TestGetCommunityFeed_BlobURLTransformation tests that blob refs are transformed to URLs 704 814 func TestGetCommunityFeed_BlobURLTransformation(t *testing.T) { 705 815 if testing.Short() {