a tool for shared writing and social publishing

update oauth cleanup workflow

+74 -41
+74 -41
app/api/inngest/functions/cleanup_expired_oauth_sessions.ts
··· 14 14 errors: [] as string[], 15 15 }; 16 16 17 - // Step 1: Get all identities with an atp_did (OAuth users) 17 + // Step 1: Get all identities with an atp_did (OAuth users) that have at least one auth token 18 18 const identities = await step.run("fetch-oauth-identities", async () => { 19 19 const { data, error } = await supabaseServerClient 20 20 .from("identities") 21 - .select("id, atp_did") 21 + .select("id, atp_did, email_auth_tokens(count)") 22 22 .not("atp_did", "is", null); 23 23 24 24 if (error) { 25 25 throw new Error(`Failed to fetch identities: ${error.message}`); 26 26 } 27 - return data || []; 27 + 28 + // Filter to only include identities with at least one auth token 29 + return (data || []) 30 + .filter((identity) => { 31 + const tokenCount = identity.email_auth_tokens?.[0]?.count ?? 0; 32 + return tokenCount > 0; 33 + }) 34 + .map((identity) => ({ 35 + id: identity.id, 36 + atp_did: identity.atp_did, 37 + tokenCount: identity.email_auth_tokens?.[0]?.count ?? 0, 38 + })); 28 39 }); 29 40 30 41 stats.totalIdentities = identities.length; 31 - console.log(`Found ${identities.length} OAuth identities to check`); 42 + console.log(`Found ${identities.length} OAuth identities with active sessions to check`); 32 43 33 - // Step 2: Check each identity's OAuth session and cleanup if expired 34 - for (const identity of identities) { 35 - if (!identity.atp_did) continue; 44 + // Step 2: Check identities' OAuth sessions in batched parallel and cleanup if expired 45 + const BATCH_SIZE = 150; 46 + const allResults: { 47 + identityId: string; 48 + valid: boolean; 49 + tokensDeleted: number; 50 + error?: string; 51 + }[] = []; 36 52 37 - const result = await step.run( 38 - `check-session-${identity.id}`, 39 - async () => { 40 - console.log(`Checking OAuth session for DID: ${identity.atp_did}`); 53 + for (let i = 0; i < identities.length; i += BATCH_SIZE) { 54 + const batch = identities.slice(i, i + BATCH_SIZE); 55 + const batchNum = Math.floor(i / BATCH_SIZE) + 1; 56 + const totalBatches = Math.ceil(identities.length / BATCH_SIZE); 57 + 58 + console.log( 59 + `Processing batch ${batchNum}/${totalBatches} (${batch.length} identities)`, 60 + ); 61 + 62 + const batchResults = await Promise.all( 63 + batch.map((identity) => 64 + step.run(`check-session-${identity.id}`, async () => { 65 + console.log( 66 + `Checking OAuth session for DID: ${identity.atp_did} (${identity.tokenCount} tokens)`, 67 + ); 41 68 42 - const sessionResult = await restoreOAuthSession(identity.atp_did!); 69 + const sessionResult = await restoreOAuthSession(identity.atp_did!); 43 70 44 - if (sessionResult.ok) { 45 - console.log(` Session valid for ${identity.atp_did}`); 46 - return { valid: true, tokensDeleted: 0 }; 47 - } 71 + if (sessionResult.ok) { 72 + console.log(` Session valid for ${identity.atp_did}`); 73 + return { identityId: identity.id, valid: true, tokensDeleted: 0 }; 74 + } 48 75 49 - // Session is expired/invalid - delete associated auth tokens 50 - console.log( 51 - ` Session expired for ${identity.atp_did}: ${sessionResult.error.message}`, 52 - ); 76 + // Session is expired/invalid - delete associated auth tokens 77 + console.log( 78 + ` Session expired for ${identity.atp_did}: ${sessionResult.error.message}`, 79 + ); 53 80 54 - const { data: deletedTokens, error: deleteError } = 55 - await supabaseServerClient 81 + const { error: deleteError } = await supabaseServerClient 56 82 .from("email_auth_tokens") 57 83 .delete() 58 - .eq("identity", identity.id) 59 - .select("id"); 84 + .eq("identity", identity.id); 85 + 86 + if (deleteError) { 87 + console.error( 88 + ` Error deleting tokens for identity ${identity.id}: ${deleteError.message}`, 89 + ); 90 + return { 91 + identityId: identity.id, 92 + valid: false, 93 + tokensDeleted: 0, 94 + error: deleteError.message, 95 + }; 96 + } 60 97 61 - if (deleteError) { 62 - console.error( 63 - ` Error deleting tokens for identity ${identity.id}: ${deleteError.message}`, 98 + console.log( 99 + ` Deleted ${identity.tokenCount} auth tokens for identity ${identity.id}`, 64 100 ); 101 + 65 102 return { 103 + identityId: identity.id, 66 104 valid: false, 67 - tokensDeleted: 0, 68 - error: deleteError.message, 105 + tokensDeleted: identity.tokenCount, 69 106 }; 70 - } 71 - 72 - const deletedCount = deletedTokens?.length || 0; 73 - console.log( 74 - ` Deleted ${deletedCount} auth tokens for identity ${identity.id}`, 75 - ); 76 - 77 - return { valid: false, tokensDeleted: deletedCount }; 78 - }, 107 + }), 108 + ), 79 109 ); 80 110 111 + allResults.push(...batchResults); 112 + } 113 + 114 + // Aggregate results 115 + for (const result of allResults) { 81 116 if (result.valid) { 82 117 stats.validSessions++; 83 118 } else { 84 119 stats.expiredSessions++; 85 120 stats.tokensDeleted += result.tokensDeleted; 86 121 if ("error" in result && result.error) { 87 - stats.errors.push( 88 - `Identity ${identity.id}: ${result.error}`, 89 - ); 122 + stats.errors.push(`Identity ${result.identityId}: ${result.error}`); 90 123 } 91 124 } 92 125 }