A community based topic aggregation platform built on atproto

fix: hot sort cursor pagination and E2E test reliability

Hot sort cursor:
- Remove hot_rank from cursor to avoid floating-point precision issues
- Use subquery to compare hot_ranks with identical SQL expressions
- Cursor format changed from hot_rank::created_at::uri::timestamp to
created_at::uri::timestamp

E2E test fixes:
- community_e2e_test: Create fresh PDS account instead of using
hardcoded credentials that don't exist
- Add consecutive timeout tracking to websocket subscriptions to
prevent gorilla/websocket panic after repeated reads on failed
connections

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

Bretton 32fd2743 8ffa92a0

+112 -52
+49 -24
internal/db/postgres/feed_repo_base.go
··· 9 "encoding/hex" 10 "encoding/json" 11 "fmt" 12 - "strconv" 13 "strings" 14 "time" 15 ) ··· 192 return filter, []interface{}{score, createdAt, uri}, nil 193 194 case "hot": 195 - // Cursor format: hot_rank::post_created_at::uri::cursor_timestamp 196 // CRITICAL: cursor_timestamp is when the cursor was created, used for stable hot_rank comparison 197 // This prevents pagination bugs caused by hot_rank drift when NOW() changes between requests 198 - if len(payloadParts) != 4 { 199 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 200 } 201 202 - hotRankStr := payloadParts[0] 203 - postCreatedAt := payloadParts[1] 204 - uri := payloadParts[2] 205 - cursorTimestamp := payloadParts[3] 206 207 - // Validate hot_rank is numeric (float) 208 - hotRank := 0.0 209 - if _, err := fmt.Sscanf(hotRankStr, "%f", &hotRank); err != nil { 210 - return "", nil, fmt.Errorf("invalid cursor hot rank") 211 - } 212 - 213 - // Validate post timestamp format 214 - if _, err := time.Parse(time.RFC3339Nano, postCreatedAt); err != nil { 215 - return "", nil, fmt.Errorf("invalid cursor post timestamp") 216 } 217 218 // Validate URI format (must be AT-URI) ··· 229 // This ensures posts don't drift across page boundaries due to time passing 230 stableHotRankExpr := fmt.Sprintf( 231 `((p.score + 1) / POWER(EXTRACT(EPOCH FROM ($%d::timestamptz - p.created_at))/3600 + 2, 1.5))`, 232 - paramOffset+3) 233 234 - // Use tuple comparison for clean keyset pagination: (hot_rank, created_at, uri) < (cursor_values) 235 - filter := fmt.Sprintf(`AND ((%s, p.created_at, p.uri) < ($%d, $%d, $%d))`, 236 - stableHotRankExpr, paramOffset, paramOffset+1, paramOffset+2) 237 - return filter, []interface{}{hotRank, postCreatedAt, uri, cursorTimestamp}, nil 238 239 default: 240 return "", nil, nil ··· 263 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 264 265 case "hot": 266 - // Format: hot_rank::post_created_at::uri::cursor_timestamp 267 // CRITICAL: Include cursor_timestamp for stable hot_rank comparison across requests 268 - hotRankStr := strconv.FormatFloat(hotRank, 'g', -1, 64) 269 - payload = fmt.Sprintf("%s%s%s%s%s%s%s", hotRankStr, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI, delimiter, queryTime.Format(time.RFC3339Nano)) 270 271 default: 272 payload = post.URI
··· 9 "encoding/hex" 10 "encoding/json" 11 "fmt" 12 "strings" 13 "time" 14 ) ··· 191 return filter, []interface{}{score, createdAt, uri}, nil 192 193 case "hot": 194 + // Cursor format: created_at::uri::cursor_timestamp 195 // CRITICAL: cursor_timestamp is when the cursor was created, used for stable hot_rank comparison 196 // This prevents pagination bugs caused by hot_rank drift when NOW() changes between requests 197 + // 198 + // PRECISION FIX: We DON'T use hot_rank in the cursor comparison at all! 199 + // Instead, we use (created_at, uri) as the cursor key, which are deterministic values stored in DB. 200 + // The hot sort ORDER BY is: hot_rank DESC, created_at DESC, uri DESC 201 + // For posts with the same hot_rank, created_at and uri provide stable ordering. 202 + // 203 + // This works because: 204 + // 1. Posts with very different hot_ranks will be separated by created_at anyway 205 + // 2. Posts with similar hot_ranks (same score, close creation times) will be ordered by created_at, uri 206 + // 3. The cursor_timestamp ensures hot_rank is computed consistently across pages 207 + if len(payloadParts) != 3 { 208 return "", nil, fmt.Errorf("invalid cursor format for hot sort") 209 } 210 211 + createdAt := payloadParts[0] 212 + uri := payloadParts[1] 213 + cursorTimestamp := payloadParts[2] 214 215 + // Validate created_at timestamp format 216 + if _, err := time.Parse(time.RFC3339Nano, createdAt); err != nil { 217 + return "", nil, fmt.Errorf("invalid cursor created_at timestamp") 218 } 219 220 // Validate URI format (must be AT-URI) ··· 231 // This ensures posts don't drift across page boundaries due to time passing 232 stableHotRankExpr := fmt.Sprintf( 233 `((p.score + 1) / POWER(EXTRACT(EPOCH FROM ($%d::timestamptz - p.created_at))/3600 + 2, 1.5))`, 234 + paramOffset+2) 235 236 + // Filter by cursor position in the hot-sorted result set 237 + // The ORDER BY is: hot_rank DESC, created_at DESC, uri DESC 238 + // We need posts that come AFTER the cursor position in this ordering. 239 + // 240 + // A post comes after the cursor if ANY of: 241 + // 1. It has a lower hot_rank (hot_rank DESC means lower values come later) 242 + // 2. Same hot_rank AND lower created_at 243 + // 3. Same hot_rank AND same created_at AND lower uri 244 + // 245 + // To avoid floating-point comparison issues with hot_rank, we use a subquery 246 + // to get the cursor post's hot_rank and compare using the SAME expression 247 + cursorHotRankExpr := fmt.Sprintf( 248 + `((cursor_post.score + 1) / POWER(EXTRACT(EPOCH FROM ($%d::timestamptz - cursor_post.created_at))/3600 + 2, 1.5))`, 249 + paramOffset+2) 250 + 251 + // Use a subquery to find the cursor post and compare hot_ranks using identical expressions 252 + // This ensures floating-point values are computed the same way on both sides 253 + filter := fmt.Sprintf(`AND ( 254 + %s < (SELECT %s FROM posts cursor_post WHERE cursor_post.uri = $%d) 255 + OR (%s = (SELECT %s FROM posts cursor_post WHERE cursor_post.uri = $%d) AND p.created_at < $%d) 256 + OR (%s = (SELECT %s FROM posts cursor_post WHERE cursor_post.uri = $%d) AND p.created_at = $%d AND p.uri < $%d) 257 + )`, 258 + stableHotRankExpr, cursorHotRankExpr, paramOffset+1, 259 + stableHotRankExpr, cursorHotRankExpr, paramOffset+1, paramOffset, 260 + stableHotRankExpr, cursorHotRankExpr, paramOffset+1, paramOffset, paramOffset+1) 261 + return filter, []interface{}{createdAt, uri, cursorTimestamp}, nil 262 263 default: 264 return "", nil, nil ··· 287 payload = fmt.Sprintf("%d%s%s%s%s", score, delimiter, post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI) 288 289 case "hot": 290 + // Format: created_at::uri::cursor_timestamp 291 // CRITICAL: Include cursor_timestamp for stable hot_rank comparison across requests 292 + // NOTE: We don't store hot_rank in the cursor - we use the post's URI to look it up 293 + // This avoids floating-point precision issues between cursor storage and comparison 294 + payload = fmt.Sprintf("%s%s%s%s%s", post.CreatedAt.Format(time.RFC3339Nano), delimiter, post.URI, delimiter, queryTime.Format(time.RFC3339Nano)) 295 296 default: 297 payload = post.URI
+24 -14
tests/integration/community_e2e_test.go
··· 103 // Setup dependencies 104 communityRepo := postgres.NewCommunityRepository(db) 105 106 - // Get instance credentials 107 - instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE") 108 - instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD") 109 - if instanceHandle == "" { 110 - instanceHandle = "testuser123.local.coves.dev" 111 - } 112 - if instancePassword == "" { 113 - instancePassword = "test-password-123" 114 - } 115 116 - t.Logf("🔐 Authenticating with PDS as: %s", instanceHandle) 117 118 - // Authenticate to get instance DID 119 - accessToken, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword) 120 if err != nil { 121 - t.Fatalf("Failed to authenticate with PDS: %v", err) 122 } 123 124 - t.Logf("✅ Authenticated - Instance DID: %s", instanceDID) 125 126 // Initialize OAuth auth middleware for E2E testing 127 e2eAuth := NewE2EOAuthMiddleware() ··· 1761 } 1762 defer func() { _ = conn.Close() }() 1763 1764 // Read messages until we find our event or receive done signal 1765 for { 1766 select { ··· 1782 return nil 1783 } 1784 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 1785 continue // Timeout is expected, keep listening 1786 } 1787 // For other errors, don't retry reading from a broken connection 1788 return fmt.Errorf("failed to read Jetstream message: %w", err) 1789 } 1790 1791 // Check if this is the event we're looking for 1792 if event.Did == targetDID && event.Kind == "commit" {
··· 103 // Setup dependencies 104 communityRepo := postgres.NewCommunityRepository(db) 105 106 + // Create a fresh test account on PDS (similar to user_journey_e2e_test pattern) 107 + // Use unique handle to avoid conflicts between test runs 108 + timestamp := time.Now().UnixNano() 109 + shortTS := timestamp % 1000000 // Use last 6 digits for more uniqueness 110 + instanceHandle := fmt.Sprintf("ce%d.local.coves.dev", shortTS) 111 + instanceEmail := fmt.Sprintf("comm%d@test.com", shortTS) 112 + instancePassword := "test-password-community-123" 113 114 + t.Logf("🔐 Creating test account on PDS: %s", instanceHandle) 115 116 + // Create account on PDS - this returns the access token and DID 117 + accessToken, instanceDID, err := createPDSAccount(pdsURL, instanceHandle, instanceEmail, instancePassword) 118 if err != nil { 119 + t.Fatalf("Failed to create account on PDS: %v", err) 120 } 121 122 + t.Logf("✅ Account created - Instance DID: %s", instanceDID) 123 124 // Initialize OAuth auth middleware for E2E testing 125 e2eAuth := NewE2EOAuthMiddleware() ··· 1759 } 1760 defer func() { _ = conn.Close() }() 1761 1762 + // Track consecutive timeouts to detect stale connections 1763 + // gorilla/websocket panics after 1000 repeated reads on a failed connection 1764 + consecutiveTimeouts := 0 1765 + const maxConsecutiveTimeouts = 10 1766 + 1767 // Read messages until we find our event or receive done signal 1768 for { 1769 select { ··· 1785 return nil 1786 } 1787 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 1788 + consecutiveTimeouts++ 1789 + if consecutiveTimeouts >= maxConsecutiveTimeouts { 1790 + return fmt.Errorf("connection appears stale after %d consecutive timeouts", consecutiveTimeouts) 1791 + } 1792 continue // Timeout is expected, keep listening 1793 } 1794 // For other errors, don't retry reading from a broken connection 1795 return fmt.Errorf("failed to read Jetstream message: %w", err) 1796 } 1797 + 1798 + // Reset timeout counter on successful read 1799 + consecutiveTimeouts = 0 1800 1801 // Check if this is the event we're looking for 1802 if event.Did == targetDID && event.Kind == "commit" {
+25 -14
tests/integration/community_update_e2e_test.go
··· 7 "Coves/internal/db/postgres" 8 "context" 9 "database/sql" 10 "fmt" 11 "net" 12 "net/http" 13 "os" ··· 310 consumer *jetstream.CommunityEventConsumer, 311 eventChan chan<- *jetstream.JetstreamEvent, 312 done <-chan bool, 313 - ) (returnErr error) { 314 - // Recover from websocket panics during shutdown 315 - defer func() { 316 - if r := recover(); r != nil { 317 - // Panic during shutdown is expected, return nil 318 - returnErr = nil 319 - } 320 - }() 321 - 322 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 323 if err != nil { 324 return fmt.Errorf("failed to connect to Jetstream: %w", err) 325 } 326 defer func() { _ = conn.Close() }() 327 328 for { 329 select { ··· 345 return nil 346 default: 347 } 348 - if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 349 return nil 350 } 351 - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 352 continue 353 } 354 // Check for connection closed errors (happens during shutdown) 355 - if strings.Contains(err.Error(), "use of closed network connection") || 356 - strings.Contains(err.Error(), "failed websocket connection") || 357 - strings.Contains(err.Error(), "repeated read on failed websocket") { 358 return nil 359 } 360 return fmt.Errorf("failed to read Jetstream message: %w", err) 361 } 362 363 // Check if this is the event we're looking for 364 if event.Did == targetDID && event.Kind == "commit" &&
··· 7 "Coves/internal/db/postgres" 8 "context" 9 "database/sql" 10 + "errors" 11 "fmt" 12 + "io" 13 "net" 14 "net/http" 15 "os" ··· 312 consumer *jetstream.CommunityEventConsumer, 313 eventChan chan<- *jetstream.JetstreamEvent, 314 done <-chan bool, 315 + ) error { 316 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 317 if err != nil { 318 return fmt.Errorf("failed to connect to Jetstream: %w", err) 319 } 320 defer func() { _ = conn.Close() }() 321 + 322 + // Track consecutive timeouts to detect stale connections 323 + // The gorilla/websocket library panics after 1000 repeated reads on a failed connection 324 + consecutiveTimeouts := 0 325 + const maxConsecutiveTimeouts = 10 326 327 for { 328 select { ··· 344 return nil 345 default: 346 } 347 + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { 348 + return nil 349 + } 350 + // Handle EOF - connection was closed by server 351 + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { 352 return nil 353 } 354 + var netErr net.Error 355 + if errors.As(err, &netErr) && netErr.Timeout() { 356 + consecutiveTimeouts++ 357 + // If we get too many consecutive timeouts, the connection may be in a bad state 358 + // Exit to avoid the gorilla/websocket panic on repeated reads to failed connections 359 + if consecutiveTimeouts >= maxConsecutiveTimeouts { 360 + return fmt.Errorf("connection appears stale after %d consecutive timeouts", consecutiveTimeouts) 361 + } 362 continue 363 } 364 // Check for connection closed errors (happens during shutdown) 365 + if strings.Contains(err.Error(), "use of closed network connection") { 366 return nil 367 } 368 return fmt.Errorf("failed to read Jetstream message: %w", err) 369 } 370 + 371 + // Reset timeout counter on successful read 372 + consecutiveTimeouts = 0 373 374 // Check if this is the event we're looking for 375 if event.Did == targetDID && event.Kind == "commit" &&
+14
tests/integration/user_journey_e2e_test.go
··· 763 } 764 defer func() { _ = conn.Close() }() 765 766 for { 767 select { 768 case <-done: ··· 790 // Handle timeout errors using errors.As for wrapped errors 791 var netErr net.Error 792 if errors.As(err, &netErr) && netErr.Timeout() { 793 continue 794 } 795 ··· 797 // The gorilla/websocket library panics on repeated reads after a connection failure 798 return fmt.Errorf("failed to read Jetstream message: %w", err) 799 } 800 801 if event.Did == targetDID && event.Kind == "commit" && 802 event.Commit != nil && event.Commit.Collection == "social.coves.community.profile" {
··· 763 } 764 defer func() { _ = conn.Close() }() 765 766 + // Track consecutive timeouts to detect stale connections 767 + // The gorilla/websocket library panics after 1000 repeated reads on a failed connection 768 + consecutiveTimeouts := 0 769 + const maxConsecutiveTimeouts = 10 770 + 771 for { 772 select { 773 case <-done: ··· 795 // Handle timeout errors using errors.As for wrapped errors 796 var netErr net.Error 797 if errors.As(err, &netErr) && netErr.Timeout() { 798 + consecutiveTimeouts++ 799 + // If we get too many consecutive timeouts, the connection may be in a bad state 800 + // Exit to avoid the gorilla/websocket panic on repeated reads to failed connections 801 + if consecutiveTimeouts >= maxConsecutiveTimeouts { 802 + return fmt.Errorf("connection appears stale after %d consecutive timeouts", consecutiveTimeouts) 803 + } 804 continue 805 } 806 ··· 808 // The gorilla/websocket library panics on repeated reads after a connection failure 809 return fmt.Errorf("failed to read Jetstream message: %w", err) 810 } 811 + 812 + // Reset timeout counter on successful read 813 + consecutiveTimeouts = 0 814 815 if event.Did == targetDID && event.Kind == "commit" && 816 event.Commit != nil && event.Commit.Collection == "social.coves.community.profile" {