ATlast — you'll never need to find your favorites on another platform again. Find your favs in the ATmosphere.
atproto

feat(worker): implement bullmq cleanup worker

- clean up expired auth and notifications
- setup run daily at 2am via cleanup_transient_data()

byarielm.fyi edf4c3e4 30190c7a

verified
+276
+43
packages/worker/src/db/client.ts
··· 1 + /** 2 + * Worker Database Client 3 + * Kysely connection for PostgreSQL — uses a smaller pool than the API 4 + * since workers process jobs sequentially rather than handling concurrent requests. 5 + */ 6 + 7 + import { Kysely, PostgresDialect, sql } from "kysely"; 8 + import { Pool } from "pg"; 9 + import type { Database } from "@atlast/shared/types/database"; 10 + 11 + const pool = new Pool({ 12 + connectionString: process.env.DATABASE_URL, 13 + max: 5, // Smaller pool — workers run fewer concurrent queries than the API 14 + idleTimeoutMillis: 30000, 15 + connectionTimeoutMillis: 2000, 16 + }); 17 + 18 + export const db = new Kysely<Database>({ 19 + dialect: new PostgresDialect({ pool }), 20 + }); 21 + 22 + /** 23 + * Verify database connectivity on startup. 24 + * Throws if the connection cannot be established. 25 + */ 26 + export async function testConnection(): Promise<void> { 27 + try { 28 + await sql`SELECT 1`.execute(db); 29 + console.log("✅ [WORKER] Database connection successful"); 30 + } catch (error) { 31 + console.error("❌ [WORKER] Database connection failed:", error); 32 + throw error; 33 + } 34 + } 35 + 36 + /** 37 + * Destroy the connection pool. 38 + * Call during graceful shutdown to avoid hanging processes. 39 + */ 40 + export async function closeConnection(): Promise<void> { 41 + await db.destroy(); 42 + console.log("[WORKER] Database connection pool closed"); 43 + }
+107
packages/worker/src/index.ts
··· 1 + /** 2 + * BullMQ Worker — Main Entry Point 3 + * Connects to Redis and PostgreSQL, registers job handlers, 4 + * and schedules the daily cleanup cron job. 5 + */ 6 + 7 + import { Worker } from "bullmq"; 8 + import type { ConnectionOptions } from "bullmq"; 9 + import { config } from "dotenv"; 10 + import { cleanupQueue, testRedisConnection } from "./queues"; 11 + import { handleCleanupJob } from "./jobs/cleanupJob"; 12 + import { testConnection, closeConnection } from "./db/client"; 13 + 14 + // Load .env before anything reads process.env 15 + config(); 16 + 17 + /** 18 + * Build connection options for the Worker. 19 + * We use the same URL-to-options parser as queues.ts. 20 + * BullMQ requires separate connection objects for Queue vs Worker. 21 + */ 22 + function buildWorkerConnectionOptions(): ConnectionOptions { 23 + const url = process.env.REDIS_URL ?? "redis://localhost:6379"; 24 + const parsed = new URL(url); 25 + return { 26 + host: parsed.hostname, 27 + port: Number(parsed.port) || 6379, 28 + db: Number(parsed.pathname.slice(1)) || 0, 29 + maxRetriesPerRequest: null, 30 + enableReadyCheck: false, 31 + }; 32 + } 33 + 34 + /** 35 + * Cleanup Worker 36 + * concurrency: 1 — cleanup is idempotent but there's no benefit to parallel 37 + * runs; keep it simple for Phase 1. 38 + */ 39 + const cleanupWorker = new Worker("cleanup", handleCleanupJob, { 40 + connection: buildWorkerConnectionOptions(), 41 + concurrency: 1, 42 + lockDuration: 60000, // 60 s max lock — cleanup shouldn't take longer 43 + }); 44 + 45 + cleanupWorker.on("completed", (job) => { 46 + console.log(`[WORKER] Job ${job.id} completed`); 47 + }); 48 + 49 + cleanupWorker.on("failed", (job, error) => { 50 + console.error(`[WORKER] Job ${job?.id} failed: ${error.message}`); 51 + }); 52 + 53 + cleanupWorker.on("error", (error) => { 54 + console.error("[WORKER] Worker error:", error); 55 + }); 56 + 57 + async function start(): Promise<void> { 58 + console.log("🚀 [WORKER] Starting BullMQ worker..."); 59 + 60 + await testRedisConnection(); 61 + await testConnection(); 62 + 63 + // Add recurring cleanup job — fixed jobId prevents duplicates on restart 64 + await cleanupQueue.add( 65 + "daily-cleanup", 66 + {}, 67 + { 68 + repeat: { pattern: "0 2 * * *" }, // 2 AM daily 69 + jobId: "cleanup-daily", 70 + } 71 + ); 72 + 73 + console.log("✅ [WORKER] Worker started"); 74 + console.log("📅 [WORKER] Cleanup scheduled for 2 AM daily"); 75 + console.log("⏳ [WORKER] Waiting for jobs..."); 76 + } 77 + 78 + async function shutdown(): Promise<void> { 79 + console.log("\n⚠️ [WORKER] Shutting down..."); 80 + try { 81 + await cleanupWorker.close(); 82 + await closeConnection(); 83 + console.log("✅ [WORKER] Shutdown complete"); 84 + process.exit(0); 85 + } catch (error) { 86 + console.error("❌ [WORKER] Error during shutdown:", error); 87 + process.exit(1); 88 + } 89 + } 90 + 91 + process.on("SIGTERM", () => { void shutdown(); }); 92 + process.on("SIGINT", () => { void shutdown(); }); 93 + 94 + process.on("uncaughtException", (error) => { 95 + console.error("❌ [WORKER] Uncaught exception:", error); 96 + void shutdown(); 97 + }); 98 + 99 + process.on("unhandledRejection", (reason) => { 100 + console.error("❌ [WORKER] Unhandled rejection:", reason); 101 + void shutdown(); 102 + }); 103 + 104 + start().catch((error: unknown) => { 105 + console.error("❌ [WORKER] Failed to start:", error); 106 + process.exit(1); 107 + });
+48
packages/worker/src/jobs/cleanupJob.ts
··· 1 + /** 2 + * Cleanup Job Handler 3 + * Invokes the PostgreSQL cleanup_transient_data() function which atomically 4 + * removes expired OAuth states, sessions, and old notifications. 5 + */ 6 + 7 + import type { Job } from "bullmq"; 8 + import { sql } from "kysely"; 9 + import { db } from "../db/client"; 10 + 11 + export interface CleanupJobData { 12 + // Scheduled job — no input data required 13 + } 14 + 15 + export interface CleanupJobResult { 16 + cleaned: boolean; 17 + timestamp: string; 18 + } 19 + 20 + /** 21 + * Execute the cleanup job. 22 + * Delegates to the cleanup_transient_data() SQL function defined in init-db.sql. 23 + * Re-throws on failure so BullMQ can mark the job failed and schedule a retry. 24 + */ 25 + export async function handleCleanupJob( 26 + job: Job<CleanupJobData> 27 + ): Promise<CleanupJobResult> { 28 + const startTime = Date.now(); 29 + console.log(`[CLEANUP] Starting cleanup job ${job.id}...`); 30 + 31 + try { 32 + await sql`SELECT cleanup_transient_data()`.execute(db); 33 + 34 + const duration = Date.now() - startTime; 35 + console.log(`[CLEANUP] ✅ Completed in ${duration}ms`); 36 + 37 + return { cleaned: true, timestamp: new Date().toISOString() }; 38 + } catch (error) { 39 + console.error("[CLEANUP] ❌ Failed:", { 40 + message: error instanceof Error ? error.message : String(error), 41 + stack: error instanceof Error ? error.stack : undefined, 42 + jobId: job.id, 43 + attemptsMade: job.attemptsMade, 44 + }); 45 + 46 + throw error; // Let BullMQ handle retry logic 47 + } 48 + }
+78
packages/worker/src/queues.ts
··· 1 + /** 2 + * BullMQ Queue Configuration 3 + * Defines the cleanup queue used by both the API (to add jobs) 4 + * and the worker process (to consume them). 5 + * 6 + * We pass connection options objects (not Redis instances) to avoid a 7 + * TypeScript structural mismatch between the IORedis.Redis namespace type 8 + * used in BullMQ's ConnectionOptions and the default-imported Redis class. 9 + * BullMQ creates and manages its own Redis connections from these options. 10 + */ 11 + 12 + import { Queue } from "bullmq"; 13 + import Redis from "ioredis"; 14 + import type { ConnectionOptions } from "bullmq"; 15 + 16 + /** 17 + * Parse the REDIS_URL environment variable into a ConnectionOptions object. 18 + * Supports: redis://host:port/db (database index via path segment) 19 + */ 20 + function buildConnectionOptions(): ConnectionOptions { 21 + const url = process.env.REDIS_URL ?? "redis://localhost:6379"; 22 + const parsed = new URL(url); 23 + return { 24 + host: parsed.hostname, 25 + port: Number(parsed.port) || 6379, 26 + db: Number(parsed.pathname.slice(1)) || 0, 27 + maxRetriesPerRequest: null, // Required by BullMQ — disables per-command timeout 28 + enableReadyCheck: false, // Skip ready check RTT on startup 29 + }; 30 + } 31 + 32 + /** 33 + * Cleanup Queue 34 + * Handles periodic removal of expired transient data: 35 + * - oauth_states older than 1 hour 36 + * - user_sessions past their expires_at 37 + * - sent notification_queue rows older than 7 days 38 + * - failed notification_queue rows older than 30 days 39 + */ 40 + export const cleanupQueue = new Queue("cleanup", { 41 + connection: buildConnectionOptions(), 42 + defaultJobOptions: { 43 + attempts: 2, // Retry once on failure 44 + backoff: { 45 + type: "exponential", 46 + delay: 5000, // 5 s → 10 s between retries 47 + }, 48 + removeOnComplete: { 49 + age: 86400, // Keep completed jobs for 24 h (debugging) 50 + count: 10, 51 + }, 52 + removeOnFail: { 53 + age: 604800, // Keep failed jobs for 7 days 54 + count: 100, 55 + }, 56 + }, 57 + }); 58 + 59 + /** 60 + * Verify Redis connectivity using a short-lived client. 61 + * Throws if the connection cannot be established. 62 + */ 63 + export async function testRedisConnection(): Promise<void> { 64 + const url = process.env.REDIS_URL ?? "redis://localhost:6379"; 65 + const client = new Redis(url, { 66 + maxRetriesPerRequest: null, 67 + enableReadyCheck: false, 68 + }); 69 + try { 70 + await client.ping(); 71 + console.log("✅ [WORKER] Redis connection successful"); 72 + } catch (error) { 73 + console.error("❌ [WORKER] Redis connection failed:", error); 74 + throw error; 75 + } finally { 76 + client.disconnect(); 77 + } 78 + }