import type { Database } from "@atbb/db"; import { forums, backfillProgress, backfillErrors, users } from "@atbb/db"; import { eq, asc, gt } from "drizzle-orm"; import { AtpAgent } from "@atproto/api"; import { CursorManager } from "./cursor-manager.js"; import type { AppConfig } from "./config.js"; import type { Indexer } from "./indexer.js"; import { isProgrammingError } from "./errors.js"; import type { Logger } from "@atbb/logger"; /** * Maps AT Proto collection NSIDs to Indexer handler method names. * Order matters: sync forum-owned records first (FK dependencies). */ // These collections define the sync order. Used by performBackfill() in Task 6. export const FORUM_OWNED_COLLECTIONS = [ "space.atbb.forum.forum", "space.atbb.forum.category", "space.atbb.forum.board", "space.atbb.forum.role", "space.atbb.modAction", ] as const; export const USER_OWNED_COLLECTIONS = [ "space.atbb.membership", "space.atbb.post", ] as const; const COLLECTION_HANDLER_MAP: Record = { "space.atbb.post": "handlePostCreate", "space.atbb.forum.forum": "handleForumCreate", "space.atbb.forum.category": "handleCategoryCreate", "space.atbb.forum.board": "handleBoardCreate", "space.atbb.forum.role": "handleRoleCreate", "space.atbb.membership": "handleMembershipCreate", "space.atbb.modAction": "handleModActionCreate", }; export enum BackfillStatus { NotNeeded = "not_needed", CatchUp = "catch_up", FullSync = "full_sync", } export interface BackfillResult { backfillId: bigint; type: BackfillStatus; didsProcessed: number; recordsIndexed: number; errors: number; durationMs: number; } export interface SyncStats { recordsFound: number; recordsIndexed: number; errors: number; } export class BackfillManager { private cursorManager: CursorManager; private isRunning = false; private indexer: Indexer | null = null; constructor( private db: Database, private config: AppConfig, private logger: Logger, ) { this.cursorManager = new CursorManager(db, logger); } /** * Inject the Indexer instance. Called during AppContext wiring. */ setIndexer(indexer: Indexer): void { this.indexer = indexer; } /** * Sync all records from a single (DID, collection) pair via listRecords. * Feeds each record through the matching Indexer handler. */ async syncRepoRecords( did: string, collection: string, agent: AtpAgent ): Promise { const stats: SyncStats = { recordsFound: 0, recordsIndexed: 0, errors: 0 }; const handlerName = COLLECTION_HANDLER_MAP[collection]; if (!handlerName || !this.indexer) { this.logger.error("backfill.sync_skipped", { event: "backfill.sync_skipped", did, collection, reason: !handlerName ? "unknown_collection" : "indexer_not_set", }); stats.errors = 1; return stats; } const handler = (this.indexer as any)[handlerName].bind(this.indexer); const delayMs = 1000 / this.config.backfillRateLimit; let cursor: string | undefined; try { do { const response = await agent.com.atproto.repo.listRecords({ repo: did, collection, limit: 100, cursor, }); const records = response.data.records; stats.recordsFound += records.length; for (const record of records) { try { const rkey = record.uri.split("/").pop()!; const event = { did, commit: { rkey, cid: record.cid, record: record.value }, }; await handler(event); stats.recordsIndexed++; } catch (error) { if (isProgrammingError(error)) throw error; stats.errors++; this.logger.error("backfill.record_error", { event: "backfill.record_error", did, collection, uri: record.uri, error: error instanceof Error ? error.message : String(error), }); } } cursor = response.data.cursor; // Rate limiting: delay between page fetches if (cursor) { await new Promise((resolve) => setTimeout(resolve, delayMs)); } } while (cursor); } catch (error) { stats.errors++; this.logger.error("backfill.pds_error", { event: "backfill.pds_error", did, collection, error: error instanceof Error ? error.message : String(error), }); } return stats; } /** * Determine if backfill is needed based on cursor state and DB contents. */ async checkIfNeeded(cursor: bigint | null): Promise { // No cursor at all → first startup or wiped cursor if (cursor === null) { this.logger.info("backfill.decision", { event: "backfill.decision", status: BackfillStatus.FullSync, reason: "no_cursor", }); return BackfillStatus.FullSync; } // Check if DB has forum data (consistency check) let forum: { rkey: string } | undefined; try { const results = await this.db .select() .from(forums) .where(eq(forums.rkey, "self")) .limit(1); forum = results[0]; } catch (error) { this.logger.error("backfill.decision", { event: "backfill.decision", status: BackfillStatus.FullSync, reason: "db_query_failed", error: error instanceof Error ? error.message : String(error), }); return BackfillStatus.FullSync; } if (!forum) { this.logger.info("backfill.decision", { event: "backfill.decision", status: BackfillStatus.FullSync, reason: "db_inconsistency", cursorTimestamp: cursor.toString(), }); return BackfillStatus.FullSync; } // Check cursor age const ageHours = this.cursorManager.getCursorAgeHours(cursor)!; if (ageHours > this.config.backfillCursorMaxAgeHours) { this.logger.info("backfill.decision", { event: "backfill.decision", status: BackfillStatus.CatchUp, reason: "cursor_too_old", cursorAgeHours: Math.round(ageHours), thresholdHours: this.config.backfillCursorMaxAgeHours, cursorTimestamp: cursor.toString(), }); return BackfillStatus.CatchUp; } this.logger.info("backfill.decision", { event: "backfill.decision", status: BackfillStatus.NotNeeded, reason: "cursor_fresh", cursorAgeHours: Math.round(ageHours), }); return BackfillStatus.NotNeeded; } /** * Check if a backfill is currently running. */ getIsRunning(): boolean { return this.isRunning; } /** * Create an AtpAgent pointed at the forum's PDS. * Extracted as a private method for test mocking. */ private createAgentForPds(): AtpAgent { return new AtpAgent({ service: this.config.pdsUrl }); } /** * Create a progress row and return its ID. * Use this before performBackfill when you need the ID immediately (e.g., for a 202 response). * Pass the returned ID as existingRowId to performBackfill to skip duplicate row creation. */ async prepareBackfillRow(type: BackfillStatus): Promise { const [row] = await this.db .insert(backfillProgress) .values({ status: "in_progress", backfillType: type, startedAt: new Date(), }) .returning({ id: backfillProgress.id }); return row.id; } /** * Query the backfill_progress table for any row with status = 'in_progress'. * Returns the first such row, or null if none exists. */ async checkForInterruptedBackfill() { try { const [row] = await this.db .select() .from(backfillProgress) .where(eq(backfillProgress.status, "in_progress")) .limit(1); return row ?? null; } catch (error) { if (isProgrammingError(error)) throw error; this.logger.error("backfill.check_interrupted.failed", { event: "backfill.check_interrupted.failed", error: error instanceof Error ? error.message : String(error), note: "Could not check for interrupted backfills — assuming none", }); return null; } } /** * Resume a CatchUp backfill from its last checkpoint (lastProcessedDid). * Only processes users with DID > lastProcessedDid. * Does NOT re-run Phase 1 (forum-owned collections). */ async resumeBackfill(interrupted: typeof backfillProgress.$inferSelect): Promise { if (this.isRunning) { throw new Error("Backfill is already in progress"); } this.isRunning = true; const startTime = Date.now(); let totalIndexed = interrupted.recordsIndexed; let totalErrors = 0; let didsProcessed = interrupted.didsProcessed; this.logger.info("backfill.resuming", { event: "backfill.resuming", backfillId: interrupted.id.toString(), lastProcessedDid: interrupted.lastProcessedDid, didsProcessed: interrupted.didsProcessed, didsTotal: interrupted.didsTotal, }); try { const agent = this.createAgentForPds(); if (interrupted.backfillType !== BackfillStatus.CatchUp) { // FullSync cannot be resumed from a checkpoint — it must re-run from scratch throw new Error( "Interrupted FullSync cannot be resumed. Re-trigger via /api/admin/backfill?force=full_sync." ); } if (interrupted.lastProcessedDid) { // Resume: fetch users after lastProcessedDid // TODO(ATB-13): Paginate for large forums const remainingUsers = await this.db .select({ did: users.did }) .from(users) .where(gt(users.did, interrupted.lastProcessedDid)) .orderBy(asc(users.did)); for (let i = 0; i < remainingUsers.length; i += this.config.backfillConcurrency) { const batch = remainingUsers.slice(i, i + this.config.backfillConcurrency); const backfillId = interrupted.id; const batchResults = await Promise.allSettled( batch.map(async (user) => { let userIndexed = 0; let userErrors = 0; for (const collection of USER_OWNED_COLLECTIONS) { const stats = await this.syncRepoRecords(user.did, collection, agent); userIndexed += stats.recordsIndexed; if (stats.errors > 0) { userErrors += stats.errors; await this.db.insert(backfillErrors).values({ backfillId, did: user.did, collection, errorMessage: `${stats.errors} record(s) failed`, createdAt: new Date(), }); } } return { indexed: userIndexed, errors: userErrors }; }) ); // Aggregate results after settlement, including DID for debuggability batchResults.forEach((result, i) => { if (result.status === "fulfilled") { totalIndexed += result.value.indexed; totalErrors += result.value.errors; } else { totalErrors++; this.logger.error("backfill.resume.batch_user_failed", { event: "backfill.resume.batch_user_failed", backfillId: backfillId.toString(), did: batch[i].did, error: result.reason instanceof Error ? result.reason.message : String(result.reason), }); } }); didsProcessed += batch.length; try { await this.db .update(backfillProgress) .set({ didsProcessed, recordsIndexed: totalIndexed, lastProcessedDid: batch[batch.length - 1].did, }) .where(eq(backfillProgress.id, backfillId)); } catch (checkpointError) { if (isProgrammingError(checkpointError)) throw checkpointError; this.logger.warn("backfill.resume.checkpoint_failed", { event: "backfill.resume.checkpoint_failed", backfillId: backfillId.toString(), didsProcessed, error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError), note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.", }); } } } // Mark completed await this.db .update(backfillProgress) .set({ status: "completed", didsProcessed, recordsIndexed: totalIndexed, completedAt: new Date(), }) .where(eq(backfillProgress.id, interrupted.id)); const result: BackfillResult = { backfillId: interrupted.id, type: interrupted.backfillType as BackfillStatus, didsProcessed, recordsIndexed: totalIndexed, errors: totalErrors, durationMs: Date.now() - startTime, }; const resumeEvent = totalErrors > 0 ? "backfill.resume.completed_with_errors" : "backfill.resume.completed"; this.logger.info(resumeEvent, { event: resumeEvent, ...result, backfillId: result.backfillId.toString(), }); return result; } catch (error) { // Best-effort: mark as failed try { await this.db .update(backfillProgress) .set({ status: "failed", errorMessage: error instanceof Error ? error.message : String(error), completedAt: new Date(), }) .where(eq(backfillProgress.id, interrupted.id)); } catch (updateError) { this.logger.error("backfill.resume.failed_status_update_error", { event: "backfill.resume.failed_status_update_error", backfillId: interrupted.id.toString(), error: updateError instanceof Error ? updateError.message : String(updateError), }); } this.logger.error("backfill.resume.failed", { event: "backfill.resume.failed", backfillId: interrupted.id.toString(), error: error instanceof Error ? error.message : String(error), }); throw error; } finally { this.isRunning = false; } } /** * Execute a backfill operation. * Phase 1: Syncs forum-owned collections from the Forum DID. * Phase 2 (CatchUp only): Syncs user-owned collections from all known users. * * @param existingRowId - If provided (from prepareBackfillRow), skips creating a new progress row. */ async performBackfill(type: BackfillStatus, existingRowId?: bigint): Promise { if (this.isRunning) { throw new Error("Backfill is already in progress"); } this.isRunning = true; const startTime = Date.now(); let backfillId: bigint | undefined = existingRowId; let totalIndexed = 0; let totalErrors = 0; let didsProcessed = 0; try { // Create progress row only if not pre-created by prepareBackfillRow if (backfillId === undefined) { const [row] = await this.db .insert(backfillProgress) .values({ status: "in_progress", backfillType: type, startedAt: new Date(), }) .returning({ id: backfillProgress.id }); backfillId = row.id; } // Capture in const so TypeScript can narrow through async closures const resolvedBackfillId: bigint = backfillId; const agent = this.createAgentForPds(); // Phase 1: Sync forum-owned collections from Forum DID for (const collection of FORUM_OWNED_COLLECTIONS) { const stats = await this.syncRepoRecords( this.config.forumDid, collection, agent ); totalIndexed += stats.recordsIndexed; totalErrors += stats.errors; if (stats.errors > 0) { await this.db.insert(backfillErrors).values({ backfillId: resolvedBackfillId, did: this.config.forumDid, collection, errorMessage: `${stats.errors} record(s) failed`, createdAt: new Date(), }); } } // Phase 2: For CatchUp, sync user-owned records from known DIDs if (type === BackfillStatus.CatchUp) { // TODO(ATB-13): Paginate for large forums — currently loads all DIDs into memory const knownUsers = await this.db .select({ did: users.did }) .from(users) .orderBy(asc(users.did)); const didsTotal = knownUsers.length; await this.db .update(backfillProgress) .set({ didsTotal }) .where(eq(backfillProgress.id, backfillId)); // Process in batches of backfillConcurrency for (let i = 0; i < knownUsers.length; i += this.config.backfillConcurrency) { const batch = knownUsers.slice(i, i + this.config.backfillConcurrency); const batchResults = await Promise.allSettled( batch.map(async (user) => { let userIndexed = 0; let userErrors = 0; for (const collection of USER_OWNED_COLLECTIONS) { const stats = await this.syncRepoRecords(user.did, collection, agent); userIndexed += stats.recordsIndexed; if (stats.errors > 0) { userErrors += stats.errors; await this.db.insert(backfillErrors).values({ backfillId: resolvedBackfillId, did: user.did, collection, errorMessage: `${stats.errors} record(s) failed`, createdAt: new Date(), }); } } return { indexed: userIndexed, errors: userErrors }; }) ); // Aggregate results after settlement, including DID for debuggability batchResults.forEach((result, i) => { if (result.status === "fulfilled") { totalIndexed += result.value.indexed; totalErrors += result.value.errors; } else { totalErrors++; this.logger.error("backfill.batch_user_failed", { event: "backfill.batch_user_failed", backfillId: resolvedBackfillId.toString(), did: batch[i].did, error: result.reason instanceof Error ? result.reason.message : String(result.reason), }); } }); didsProcessed += batch.length; try { await this.db .update(backfillProgress) .set({ didsProcessed, recordsIndexed: totalIndexed, lastProcessedDid: batch[batch.length - 1].did, }) .where(eq(backfillProgress.id, backfillId)); } catch (checkpointError) { if (isProgrammingError(checkpointError)) throw checkpointError; this.logger.warn("backfill.checkpoint_failed", { event: "backfill.checkpoint_failed", backfillId: resolvedBackfillId.toString(), didsProcessed, error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError), note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.", }); } this.logger.info("backfill.progress", { event: "backfill.progress", backfillId: backfillId.toString(), type, didsProcessed, didsTotal, recordsIndexed: totalIndexed, elapsedMs: Date.now() - startTime, }); } } // Mark completed await this.db .update(backfillProgress) .set({ status: "completed", didsProcessed, recordsIndexed: totalIndexed, completedAt: new Date(), }) .where(eq(backfillProgress.id, backfillId)); const result: BackfillResult = { backfillId: resolvedBackfillId, type, didsProcessed, recordsIndexed: totalIndexed, errors: totalErrors, durationMs: Date.now() - startTime, }; const completedEvent = totalErrors > 0 ? "backfill.completed_with_errors" : "backfill.completed"; this.logger.info(completedEvent, { event: completedEvent, ...result, backfillId: result.backfillId.toString(), }); return result; } catch (error) { // Best-effort: mark progress row as failed (if it was created) if (backfillId !== undefined) { try { await this.db .update(backfillProgress) .set({ status: "failed", errorMessage: error instanceof Error ? error.message : String(error), completedAt: new Date(), }) .where(eq(backfillProgress.id, backfillId)); } catch (updateError) { this.logger.error("backfill.failed_status_update_error", { event: "backfill.failed_status_update_error", backfillId: backfillId.toString(), error: updateError instanceof Error ? updateError.message : String(updateError), }); } } this.logger.error("backfill.failed", { event: "backfill.failed", backfillId: backfillId !== undefined ? backfillId.toString() : "not_created", error: error instanceof Error ? error.message : String(error), }); throw error; } finally { this.isRunning = false; } } }