A community based topic aggregation platform built on atproto

feat(users): index users on OAuth login, improve error handling

- Add UserIndexer interface and IndexUser method to index users after
successful OAuth authentication
- Change Jetstream consumer to only UPDATE existing users, not create new
ones (prevents indexing millions of unrelated Bluesky users)
- Add sentinel errors ErrUserNotFound and ErrHandleAlreadyTaken for
proper error handling instead of string matching
- Fix silent failure: Jetstream now propagates database errors instead
of silently dropping events during outages
- Remove redundant DID lookup in OAuth callback by reusing verifiedIdent
- Remove bsky.social fallback - not all users are on Bluesky
- Add compile-time interface satisfaction check for UserIndexer

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+135 -55
+9 -5
cmd/server/main.go
··· 48 48 postgresRepo "Coves/internal/db/postgres" 49 49 ) 50 50 51 + // Compile-time interface satisfaction checks 52 + var _ oauth.UserIndexer = (users.UserService)(nil) 53 + 51 54 func main() { 52 55 // Database configuration (AppView database) 53 56 dbURL := os.Getenv("DATABASE_URL") ··· 196 199 log.Fatalf("Failed to initialize OAuth client: %v", err) 197 200 } 198 201 202 + // Initialize user repository and service early (needed for OAuth user indexing) 203 + userRepo := postgresRepo.NewUserRepository(db) 204 + userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 205 + 199 206 // Create OAuth handler for HTTP endpoints 200 - oauthHandler := oauth.NewOAuthHandler(oauthClient, oauthStore) 207 + // WithUserIndexer ensures users are indexed into local database after OAuth login 208 + oauthHandler := oauth.NewOAuthHandler(oauthClient, oauthStore, oauth.WithUserIndexer(userService)) 201 209 202 210 // Create OAuth auth middleware 203 211 // Validates sealed session tokens and loads OAuth sessions from database ··· 213 221 PLCURL: plcURL, 214 222 HTTPClient: http.Client{Timeout: 10 * time.Second}, 215 223 } 216 - 217 - // Initialize repositories and services 218 - userRepo := postgresRepo.NewUserRepository(db) 219 - userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 220 224 221 225 communityRepo := postgresRepo.NewCommunityRepository(db) 222 226
+16 -43
internal/atproto/jetstream/user_consumer.go
··· 5 5 "Coves/internal/core/users" 6 6 "context" 7 7 "encoding/json" 8 + "errors" 8 9 "fmt" 9 10 "log" 10 11 "sync" ··· 213 214 } 214 215 215 216 // handleIdentityEvent processes identity events (handle changes) 217 + // NOTE: This only UPDATES existing users - it does NOT create new users. 218 + // Users are created during OAuth login or signup, not from Jetstream events. 219 + // This prevents indexing millions of Bluesky users who never interact with Coves. 216 220 func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error { 217 221 if event.Identity == nil { 218 222 return fmt.Errorf("identity event missing identity data") ··· 225 229 return fmt.Errorf("identity event missing did or handle") 226 230 } 227 231 228 - log.Printf("Identity event: %s → %s", did, handle) 229 - 230 - // Get existing user to check if handle changed 232 + // Only process users who exist in our database (i.e., have used Coves before) 231 233 existingUser, err := c.userService.GetUserByDID(ctx, did) 232 234 if err != nil { 233 - // User doesn't exist - create new user 234 - pdsURL := "https://bsky.social" // Default Bluesky PDS 235 - // TODO: Resolve PDS URL from DID document via PLC directory 236 - 237 - _, createErr := c.userService.CreateUser(ctx, users.CreateUserRequest{ 238 - DID: did, 239 - Handle: handle, 240 - PDSURL: pdsURL, 241 - }) 242 - 243 - if createErr != nil && !isDuplicateError(createErr) { 244 - return fmt.Errorf("failed to create user: %w", createErr) 235 + if errors.Is(err, users.ErrUserNotFound) { 236 + // User doesn't exist in our database - skip this event 237 + // They'll be indexed when they actually interact with Coves (OAuth login, signup, etc.) 238 + // This prevents us from indexing millions of Bluesky users we don't care about 239 + return nil 245 240 } 246 - 247 - log.Printf("Indexed new user: %s (%s)", handle, did) 248 - return nil 241 + // Database error - propagate so it can be retried 242 + return fmt.Errorf("failed to check if user exists: %w", err) 249 243 } 250 244 245 + log.Printf("Identity event for known user: %s (%s)", handle, did) 246 + 251 247 // User exists - check if handle changed 252 248 if existingUser.Handle != handle { 253 249 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did) ··· 298 294 return fmt.Errorf("account event missing did") 299 295 } 300 296 301 - // Account events don't include handle, so we can't index yet 302 - // We'll wait for the corresponding identity event 303 - log.Printf("Account event for %s (waiting for identity event)", did) 297 + // Account events don't include handle, so we skip them. 298 + // Users are indexed via OAuth login or signup, not from account events. 304 299 return nil 305 300 } 306 - 307 - // isDuplicateError checks if error is due to duplicate DID/handle 308 - func isDuplicateError(err error) bool { 309 - if err == nil { 310 - return false 311 - } 312 - errStr := err.Error() 313 - return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate") 314 - } 315 - 316 - func contains(s, substr string) bool { 317 - return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr)) 318 - } 319 - 320 - func anySubstring(s, substr string) bool { 321 - for i := 0; i <= len(s)-len(substr); i++ { 322 - if s[i:i+len(substr)] == substr { 323 - return true 324 - } 325 - } 326 - return false 327 - }
+50 -1
internal/atproto/oauth/handlers.go
··· 11 11 "strings" 12 12 13 13 "github.com/bluesky-social/indigo/atproto/auth/oauth" 14 + "github.com/bluesky-social/indigo/atproto/identity" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 15 16 ) 16 17 ··· 145 146 GetMobileOAuthData(ctx context.Context, state string) (*MobileOAuthData, error) 146 147 } 147 148 149 + // UserIndexer is the minimal interface for indexing users after OAuth login. 150 + // This decouples the OAuth handler from the full UserService. 151 + type UserIndexer interface { 152 + // IndexUser creates or updates a user in the local database. 153 + // This is idempotent - calling it multiple times with the same DID is safe. 154 + IndexUser(ctx context.Context, did, handle, pdsURL string) error 155 + } 156 + 148 157 // OAuthHandler handles OAuth-related HTTP endpoints 149 158 type OAuthHandler struct { 150 159 client *OAuthClient 151 160 store oauth.ClientAuthStore 152 161 mobileStore MobileOAuthStore // For server-side CSRF validation 162 + userIndexer UserIndexer // For indexing users after OAuth login 153 163 devResolver *DevHandleResolver // For dev mode: resolve handles via local PDS 154 164 devAuthResolver *DevAuthResolver // For dev mode: bypass HTTPS validation for localhost OAuth 155 165 } 156 166 167 + // OAuthHandlerOption is a functional option for configuring OAuthHandler 168 + type OAuthHandlerOption func(*OAuthHandler) 169 + 170 + // WithUserIndexer sets the user indexer for indexing users after OAuth login. 171 + // When set, users are automatically indexed into the local database after successful authentication. 172 + func WithUserIndexer(indexer UserIndexer) OAuthHandlerOption { 173 + return func(h *OAuthHandler) { 174 + h.userIndexer = indexer 175 + } 176 + } 177 + 157 178 // NewOAuthHandler creates a new OAuth handler 158 - func NewOAuthHandler(client *OAuthClient, store oauth.ClientAuthStore) *OAuthHandler { 179 + func NewOAuthHandler(client *OAuthClient, store oauth.ClientAuthStore, opts ...OAuthHandlerOption) *OAuthHandler { 159 180 handler := &OAuthHandler{ 160 181 client: client, 161 182 store: store, 183 + } 184 + 185 + // Apply functional options 186 + for _, opt := range opts { 187 + opt(handler) 162 188 } 163 189 164 190 // Check if the store implements MobileOAuthStore for server-side CSRF ··· 434 460 // This prevents impersonation via compromised PDS that issues tokens with invalid handle mappings 435 461 // Per AT Protocol spec: "Bidirectional verification required; confirm DID document claims handle" 436 462 // verifiedHandle stores the successfully verified handle for use in mobile callback 463 + // verifiedIdent stores the identity for reuse (PDS URL extraction, etc.) 437 464 verifiedHandle := "" 465 + var verifiedIdent *identity.Identity 438 466 if h.client.ClientApp.Dir != nil { 439 467 ident, err := h.client.ClientApp.Dir.LookupDID(ctx, sessData.AccountDID) 440 468 if err != nil { ··· 470 498 slog.Info("OAuth callback successful (dev mode: handle verified via PDS)", 471 499 "did", sessData.AccountDID, "handle", declaredHandle) 472 500 verifiedHandle = declaredHandle 501 + verifiedIdent = ident // Reuse the identity for PDS URL extraction 473 502 goto handleVerificationPassed 474 503 } 475 504 slog.Warn("dev mode: PDS handle verification failed", ··· 489 518 // Success: handle is valid and bidirectionally verified 490 519 slog.Info("OAuth callback successful", "did", sessData.AccountDID, "handle", ident.Handle) 491 520 verifiedHandle = ident.Handle.String() 521 + verifiedIdent = ident 492 522 } else { 493 523 // No directory client available - log warning but proceed 494 524 // This should only happen in testing scenarios ··· 497 527 slog.Info("OAuth callback successful (no handle verification)", "did", sessData.AccountDID) 498 528 } 499 529 handleVerificationPassed: 530 + 531 + // Index user in local database after successful OAuth login 532 + // This ensures users are available for profile lookups immediately after authentication 533 + if h.userIndexer != nil && verifiedHandle != "" && verifiedIdent != nil { 534 + pdsURL := verifiedIdent.PDSEndpoint() 535 + if pdsURL == "" { 536 + // No PDS URL available - skip indexing, user will be indexed on next login 537 + // We don't fallback to bsky.social since not all users are on Bluesky 538 + slog.Warn("skipping user indexing: no PDS URL in identity", 539 + "did", sessData.AccountDID, "handle", verifiedHandle) 540 + } else if indexErr := h.userIndexer.IndexUser(ctx, sessData.AccountDID.String(), verifiedHandle, pdsURL); indexErr != nil { 541 + // Log but don't fail - user can still proceed with their session 542 + // They'll be indexed on next login or via Jetstream identity event 543 + slog.Warn("failed to index user after OAuth login", 544 + "did", sessData.AccountDID, "handle", verifiedHandle, "error", indexErr) 545 + } else { 546 + slog.Info("indexed user after OAuth login", "did", sessData.AccountDID, "handle", verifiedHandle) 547 + } 548 + } 500 549 501 550 // Check if this is a mobile callback (check for mobile_redirect_uri cookie) 502 551 mobileRedirect, err := r.Cookie("mobile_redirect_uri")
+13 -1
internal/core/users/errors.go
··· 1 1 package users 2 2 3 - import "fmt" 3 + import ( 4 + "errors" 5 + "fmt" 6 + ) 7 + 8 + // Sentinel errors for common user operations 9 + var ( 10 + // ErrUserNotFound is returned when a user lookup finds no matching record 11 + ErrUserNotFound = errors.New("user not found") 12 + 13 + // ErrHandleAlreadyTaken is returned when attempting to use a handle that belongs to another user 14 + ErrHandleAlreadyTaken = errors.New("handle already taken") 15 + ) 4 16 5 17 // Domain errors for user service operations 6 18 // These map to lexicon error types defined in social.coves.actor.signup
+5
internal/core/users/interfaces.go
··· 39 39 UpdateHandle(ctx context.Context, did, newHandle string) (*User, error) 40 40 ResolveHandleToDID(ctx context.Context, handle string) (string, error) 41 41 RegisterAccount(ctx context.Context, req RegisterAccountRequest) (*RegisterAccountResponse, error) 42 + 43 + // IndexUser creates or updates a user in the local database. 44 + // This is idempotent - calling it multiple times with the same DID is safe. 45 + // Used after OAuth login to ensure users are immediately available for profile lookups. 46 + IndexUser(ctx context.Context, did, handle, pdsURL string) error 42 47 }
+37
internal/core/users/service.go
··· 5 5 "bytes" 6 6 "context" 7 7 "encoding/json" 8 + "errors" 8 9 "fmt" 9 10 "io" 10 11 "log" ··· 210 211 // Set the PDS URL in the response (PDS doesn't return this) 211 212 pdsResp.PDSURL = s.defaultPDS 212 213 214 + // Index the new user in local database so they're immediately available for profile lookups 215 + // This is idempotent - safe to call even if user somehow already exists 216 + if indexErr := s.IndexUser(ctx, pdsResp.DID, pdsResp.Handle, s.defaultPDS); indexErr != nil { 217 + // Log but don't fail - the account was created successfully on PDS 218 + // They'll be indexed on first OAuth login if this fails 219 + log.Printf("Warning: failed to index new user after signup (DID: %s): %v", pdsResp.DID, indexErr) 220 + } 221 + 213 222 return &pdsResp, nil 223 + } 224 + 225 + // IndexUser creates or updates a user in the local database. 226 + // This is idempotent and safe to call multiple times for the same user. 227 + // If the user exists, their handle is updated if it changed. 228 + func (s *userService) IndexUser(ctx context.Context, did, handle, pdsURL string) error { 229 + // Try to create the user (idempotent - CreateUser returns existing user if DID exists) 230 + _, err := s.CreateUser(ctx, CreateUserRequest{ 231 + DID: did, 232 + Handle: handle, 233 + PDSURL: pdsURL, 234 + }) 235 + 236 + if err != nil { 237 + // Check if it's a handle conflict (user exists with different handle) 238 + // In this case, update the handle instead 239 + if errors.Is(err, ErrHandleAlreadyTaken) { 240 + // User exists but handle changed - update it 241 + _, updateErr := s.UpdateHandle(ctx, did, handle) 242 + if updateErr != nil { 243 + return fmt.Errorf("failed to update handle for existing user: %w", updateErr) 244 + } 245 + return nil 246 + } 247 + return err 248 + } 249 + 250 + return nil 214 251 } 215 252 216 253 func (s *userService) validateCreateRequest(req CreateUserRequest) error {
+5 -5
internal/db/postgres/user_repo.go
··· 36 36 return nil, fmt.Errorf("user with DID already exists") 37 37 } 38 38 if strings.Contains(err.Error(), "users_handle_key") { 39 - return nil, fmt.Errorf("handle already taken") 39 + return nil, users.ErrHandleAlreadyTaken 40 40 } 41 41 } 42 42 return nil, fmt.Errorf("failed to create user: %w", err) ··· 54 54 Scan(&user.DID, &user.Handle, &user.PDSURL, &user.CreatedAt, &user.UpdatedAt) 55 55 56 56 if err == sql.ErrNoRows { 57 - return nil, fmt.Errorf("user not found") 57 + return nil, users.ErrUserNotFound 58 58 } 59 59 if err != nil { 60 60 return nil, fmt.Errorf("failed to get user by DID: %w", err) ··· 72 72 Scan(&user.DID, &user.Handle, &user.PDSURL, &user.CreatedAt, &user.UpdatedAt) 73 73 74 74 if err == sql.ErrNoRows { 75 - return nil, fmt.Errorf("user not found") 75 + return nil, users.ErrUserNotFound 76 76 } 77 77 if err != nil { 78 78 return nil, fmt.Errorf("failed to get user by handle: %w", err) ··· 94 94 Scan(&user.DID, &user.Handle, &user.PDSURL, &user.CreatedAt, &user.UpdatedAt) 95 95 96 96 if err == sql.ErrNoRows { 97 - return nil, fmt.Errorf("user not found") 97 + return nil, users.ErrUserNotFound 98 98 } 99 99 if err != nil { 100 100 // Check for unique constraint violation on handle 101 101 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "users_handle_key") { 102 - return nil, fmt.Errorf("handle already taken") 102 + return nil, users.ErrHandleAlreadyTaken 103 103 } 104 104 return nil, fmt.Errorf("failed to update handle: %w", err) 105 105 }