import "server-only"; import type { NodeSavedSession, NodeSavedSessionStore, NodeSavedState, NodeSavedStateStore, RuntimeLock, } from "@atproto/oauth-client-node"; import pg from "pg"; const { Pool } = pg; let pool: pg.Pool | null = null; function getPool(): pg.Pool { if (!pool) { const connectionString = process.env.DATABASE_URL; if (!connectionString) { throw new Error("DATABASE_URL is required for auth storage"); } pool = new Pool({ connectionString }); } return pool; } /** * Hash a string to a 32-bit integer for use as PostgreSQL advisory lock key. */ function hashStringToInt(str: string): number { let hash = 0; for (let i = 0; i < str.length; i++) { const char = str.charCodeAt(i); hash = (hash << 5) - hash + char; hash = hash & hash; // Convert to 32-bit integer } return hash; } /** * PostgreSQL advisory lock implementation for OAuth token refresh synchronization. * Prevents concurrent token refreshes from causing race conditions. */ export const requestLock: RuntimeLock = async (key, fn) => { const db = getPool(); const lockId = hashStringToInt(key); const client = await db.connect(); try { await client.query("SELECT pg_advisory_lock($1)", [lockId]); try { return await fn(); } finally { await client.query("SELECT pg_advisory_unlock($1)", [lockId]); } } finally { client.release(); } }; export async function initAuthTables(): Promise { const db = getPool(); await db.query(` -- OAuth state storage (for in-flight auth requests) CREATE TABLE IF NOT EXISTS auth_state ( key TEXT PRIMARY KEY, state TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -- OAuth session storage (for authenticated users) CREATE TABLE IF NOT EXISTS auth_session ( key TEXT PRIMARY KEY, session TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_auth_state_created ON auth_state(created_at); `); } export class StateStore implements NodeSavedStateStore { async get(key: string): Promise { const db = getPool(); const result = await db.query<{ state: string }>( "SELECT state FROM auth_state WHERE key = $1", [key], ); if (result.rows.length === 0) return undefined; return JSON.parse(result.rows[0].state) as NodeSavedState; } async set(key: string, val: NodeSavedState): Promise { const db = getPool(); const state = JSON.stringify(val); await db.query( `INSERT INTO auth_state (key, state) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET state = EXCLUDED.state`, [key, state], ); await db.query("DELETE FROM auth_state WHERE created_at < NOW() - INTERVAL '15 minutes'"); } async del(key: string): Promise { const db = getPool(); await db.query("DELETE FROM auth_state WHERE key = $1", [key]); } } export class SessionStore implements NodeSavedSessionStore { async get(key: string): Promise { const db = getPool(); const result = await db.query<{ session: string }>( "SELECT session FROM auth_session WHERE key = $1", [key], ); if (result.rows.length === 0) { console.log(`[auth:session] not found: ${key}`); return undefined; } return JSON.parse(result.rows[0].session) as NodeSavedSession; } async set(key: string, val: NodeSavedSession): Promise { const db = getPool(); const session = JSON.stringify(val); const existing = await db.query("SELECT 1 FROM auth_session WHERE key = $1", [key]); const isNew = existing.rows.length === 0; await db.query( `INSERT INTO auth_session (key, session, updated_at) VALUES ($1, $2, NOW()) ON CONFLICT (key) DO UPDATE SET session = EXCLUDED.session, updated_at = NOW()`, [key, session], ); console.log(`[auth:session] SET ${key} -> ${isNew ? "created" : "updated"}`); } async del(key: string): Promise { const db = getPool(); const result = await db.query("DELETE FROM auth_session WHERE key = $1 RETURNING key", [key]); const deleted = result.rowCount && result.rowCount > 0; console.log(`[auth:session] DEL ${key} -> ${deleted ? "deleted" : "not found"}`); if (deleted) { console.log( `[auth:session] DEL stack:`, new Error().stack?.split("\n").slice(2, 6).join("\n"), ); } } }