WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at atb-52-css-token-extraction 658 lines 22 kB view raw
1import type { Database } from "@atbb/db"; 2import { forums, backfillProgress, backfillErrors, users } from "@atbb/db"; 3import { eq, asc, gt } from "drizzle-orm"; 4import { AtpAgent } from "@atproto/api"; 5import { CursorManager } from "./cursor-manager.js"; 6import type { AppConfig } from "./config.js"; 7import type { Indexer } from "./indexer.js"; 8import { isProgrammingError } from "./errors.js"; 9import type { Logger } from "@atbb/logger"; 10 11/** 12 * Maps AT Proto collection NSIDs to Indexer handler method names. 13 * Order matters: sync forum-owned records first (FK dependencies). 14 */ 15// These collections define the sync order. Used by performBackfill() in Task 6. 16export const FORUM_OWNED_COLLECTIONS = [ 17 "space.atbb.forum.forum", 18 "space.atbb.forum.category", 19 "space.atbb.forum.board", 20 "space.atbb.forum.role", 21 "space.atbb.modAction", 22] as const; 23 24export const USER_OWNED_COLLECTIONS = [ 25 "space.atbb.membership", 26 "space.atbb.post", 27] as const; 28 29const COLLECTION_HANDLER_MAP: Record<string, string> = { 30 "space.atbb.post": "handlePostCreate", 31 "space.atbb.forum.forum": "handleForumCreate", 32 "space.atbb.forum.category": "handleCategoryCreate", 33 "space.atbb.forum.board": "handleBoardCreate", 34 "space.atbb.forum.role": "handleRoleCreate", 35 "space.atbb.membership": "handleMembershipCreate", 36 "space.atbb.modAction": "handleModActionCreate", 37}; 38 39export enum BackfillStatus { 40 NotNeeded = "not_needed", 41 CatchUp = "catch_up", 42 FullSync = "full_sync", 43} 44 45export interface BackfillResult { 46 backfillId: bigint; 47 type: BackfillStatus; 48 didsProcessed: number; 49 recordsIndexed: number; 50 errors: number; 51 durationMs: number; 52} 53 54export interface SyncStats { 55 recordsFound: number; 56 recordsIndexed: number; 57 errors: number; 58} 59 60export class BackfillManager { 61 private cursorManager: CursorManager; 62 private isRunning = false; 63 private indexer: Indexer | null = null; 64 65 constructor( 66 private db: Database, 67 private config: AppConfig, 68 private logger: Logger, 69 ) { 70 this.cursorManager = new CursorManager(db, logger); 71 } 72 73 /** 74 * Inject the Indexer instance. Called during AppContext wiring. 75 */ 76 setIndexer(indexer: Indexer): void { 77 this.indexer = indexer; 78 } 79 80 /** 81 * Sync all records from a single (DID, collection) pair via listRecords. 82 * Feeds each record through the matching Indexer handler. 83 */ 84 async syncRepoRecords( 85 did: string, 86 collection: string, 87 agent: AtpAgent 88 ): Promise<SyncStats> { 89 const stats: SyncStats = { recordsFound: 0, recordsIndexed: 0, errors: 0 }; 90 const handlerName = COLLECTION_HANDLER_MAP[collection]; 91 92 if (!handlerName || !this.indexer) { 93 this.logger.error("backfill.sync_skipped", { 94 event: "backfill.sync_skipped", 95 did, 96 collection, 97 reason: !handlerName ? "unknown_collection" : "indexer_not_set", 98 }); 99 stats.errors = 1; 100 return stats; 101 } 102 103 const handler = (this.indexer as any)[handlerName].bind(this.indexer); 104 const delayMs = 1000 / this.config.backfillRateLimit; 105 let cursor: string | undefined; 106 107 try { 108 do { 109 const response = await agent.com.atproto.repo.listRecords({ 110 repo: did, 111 collection, 112 limit: 100, 113 cursor, 114 }); 115 116 const records = response.data.records; 117 stats.recordsFound += records.length; 118 119 for (const record of records) { 120 try { 121 const rkey = record.uri.split("/").pop()!; 122 const event = { 123 did, 124 commit: { rkey, cid: record.cid, record: record.value }, 125 }; 126 await handler(event); 127 stats.recordsIndexed++; 128 } catch (error) { 129 if (isProgrammingError(error)) throw error; 130 stats.errors++; 131 this.logger.error("backfill.record_error", { 132 event: "backfill.record_error", 133 did, 134 collection, 135 uri: record.uri, 136 error: error instanceof Error ? error.message : String(error), 137 }); 138 } 139 } 140 141 cursor = response.data.cursor; 142 143 // Rate limiting: delay between page fetches 144 if (cursor) { 145 await new Promise((resolve) => setTimeout(resolve, delayMs)); 146 } 147 } while (cursor); 148 } catch (error) { 149 stats.errors++; 150 this.logger.error("backfill.pds_error", { 151 event: "backfill.pds_error", 152 did, 153 collection, 154 error: error instanceof Error ? error.message : String(error), 155 }); 156 } 157 158 return stats; 159 } 160 161 /** 162 * Determine if backfill is needed based on cursor state and DB contents. 163 */ 164 async checkIfNeeded(cursor: bigint | null): Promise<BackfillStatus> { 165 // No cursor at all → first startup or wiped cursor 166 if (cursor === null) { 167 this.logger.info("backfill.decision", { 168 event: "backfill.decision", 169 status: BackfillStatus.FullSync, 170 reason: "no_cursor", 171 }); 172 return BackfillStatus.FullSync; 173 } 174 175 // Check if DB has forum data (consistency check) 176 let forum: { rkey: string } | undefined; 177 try { 178 const results = await this.db 179 .select() 180 .from(forums) 181 .where(eq(forums.rkey, "self")) 182 .limit(1); 183 forum = results[0]; 184 } catch (error) { 185 this.logger.error("backfill.decision", { 186 event: "backfill.decision", 187 status: BackfillStatus.FullSync, 188 reason: "db_query_failed", 189 error: error instanceof Error ? error.message : String(error), 190 }); 191 return BackfillStatus.FullSync; 192 } 193 194 if (!forum) { 195 this.logger.info("backfill.decision", { 196 event: "backfill.decision", 197 status: BackfillStatus.FullSync, 198 reason: "db_inconsistency", 199 cursorTimestamp: cursor.toString(), 200 }); 201 return BackfillStatus.FullSync; 202 } 203 204 // Check cursor age 205 const ageHours = this.cursorManager.getCursorAgeHours(cursor)!; 206 if (ageHours > this.config.backfillCursorMaxAgeHours) { 207 this.logger.info("backfill.decision", { 208 event: "backfill.decision", 209 status: BackfillStatus.CatchUp, 210 reason: "cursor_too_old", 211 cursorAgeHours: Math.round(ageHours), 212 thresholdHours: this.config.backfillCursorMaxAgeHours, 213 cursorTimestamp: cursor.toString(), 214 }); 215 return BackfillStatus.CatchUp; 216 } 217 218 this.logger.info("backfill.decision", { 219 event: "backfill.decision", 220 status: BackfillStatus.NotNeeded, 221 reason: "cursor_fresh", 222 cursorAgeHours: Math.round(ageHours), 223 }); 224 return BackfillStatus.NotNeeded; 225 } 226 227 /** 228 * Check if a backfill is currently running. 229 */ 230 getIsRunning(): boolean { 231 return this.isRunning; 232 } 233 234 /** 235 * Create an AtpAgent pointed at the forum's PDS. 236 * Extracted as a private method for test mocking. 237 */ 238 private createAgentForPds(): AtpAgent { 239 return new AtpAgent({ service: this.config.pdsUrl }); 240 } 241 242 /** 243 * Create a progress row and return its ID. 244 * Use this before performBackfill when you need the ID immediately (e.g., for a 202 response). 245 * Pass the returned ID as existingRowId to performBackfill to skip duplicate row creation. 246 */ 247 async prepareBackfillRow(type: BackfillStatus): Promise<bigint> { 248 const [row] = await this.db 249 .insert(backfillProgress) 250 .values({ 251 status: "in_progress", 252 backfillType: type, 253 startedAt: new Date(), 254 }) 255 .returning({ id: backfillProgress.id }); 256 return row.id; 257 } 258 259 /** 260 * Query the backfill_progress table for any row with status = 'in_progress'. 261 * Returns the first such row, or null if none exists. 262 */ 263 async checkForInterruptedBackfill() { 264 try { 265 const [row] = await this.db 266 .select() 267 .from(backfillProgress) 268 .where(eq(backfillProgress.status, "in_progress")) 269 .limit(1); 270 271 return row ?? null; 272 } catch (error) { 273 if (isProgrammingError(error)) throw error; 274 this.logger.error("backfill.check_interrupted.failed", { 275 event: "backfill.check_interrupted.failed", 276 error: error instanceof Error ? error.message : String(error), 277 note: "Could not check for interrupted backfills — assuming none", 278 }); 279 return null; 280 } 281 } 282 283 /** 284 * Resume a CatchUp backfill from its last checkpoint (lastProcessedDid). 285 * Only processes users with DID > lastProcessedDid. 286 * Does NOT re-run Phase 1 (forum-owned collections). 287 */ 288 async resumeBackfill(interrupted: typeof backfillProgress.$inferSelect): Promise<BackfillResult> { 289 if (this.isRunning) { 290 throw new Error("Backfill is already in progress"); 291 } 292 293 this.isRunning = true; 294 const startTime = Date.now(); 295 let totalIndexed = interrupted.recordsIndexed; 296 let totalErrors = 0; 297 let didsProcessed = interrupted.didsProcessed; 298 299 this.logger.info("backfill.resuming", { 300 event: "backfill.resuming", 301 backfillId: interrupted.id.toString(), 302 lastProcessedDid: interrupted.lastProcessedDid, 303 didsProcessed: interrupted.didsProcessed, 304 didsTotal: interrupted.didsTotal, 305 }); 306 307 try { 308 const agent = this.createAgentForPds(); 309 310 if (interrupted.backfillType !== BackfillStatus.CatchUp) { 311 // FullSync cannot be resumed from a checkpoint — it must re-run from scratch 312 throw new Error( 313 "Interrupted FullSync cannot be resumed. Re-trigger via /api/admin/backfill?force=full_sync." 314 ); 315 } 316 317 if (interrupted.lastProcessedDid) { 318 // Resume: fetch users after lastProcessedDid 319 // TODO(ATB-13): Paginate for large forums 320 const remainingUsers = await this.db 321 .select({ did: users.did }) 322 .from(users) 323 .where(gt(users.did, interrupted.lastProcessedDid)) 324 .orderBy(asc(users.did)); 325 326 for (let i = 0; i < remainingUsers.length; i += this.config.backfillConcurrency) { 327 const batch = remainingUsers.slice(i, i + this.config.backfillConcurrency); 328 const backfillId = interrupted.id; 329 330 const batchResults = await Promise.allSettled( 331 batch.map(async (user) => { 332 let userIndexed = 0; 333 let userErrors = 0; 334 for (const collection of USER_OWNED_COLLECTIONS) { 335 const stats = await this.syncRepoRecords(user.did, collection, agent); 336 userIndexed += stats.recordsIndexed; 337 if (stats.errors > 0) { 338 userErrors += stats.errors; 339 await this.db.insert(backfillErrors).values({ 340 backfillId, 341 did: user.did, 342 collection, 343 errorMessage: `${stats.errors} record(s) failed`, 344 createdAt: new Date(), 345 }); 346 } 347 } 348 return { indexed: userIndexed, errors: userErrors }; 349 }) 350 ); 351 352 // Aggregate results after settlement, including DID for debuggability 353 batchResults.forEach((result, i) => { 354 if (result.status === "fulfilled") { 355 totalIndexed += result.value.indexed; 356 totalErrors += result.value.errors; 357 } else { 358 totalErrors++; 359 this.logger.error("backfill.resume.batch_user_failed", { 360 event: "backfill.resume.batch_user_failed", 361 backfillId: backfillId.toString(), 362 did: batch[i].did, 363 error: result.reason instanceof Error ? result.reason.message : String(result.reason), 364 }); 365 } 366 }); 367 368 didsProcessed += batch.length; 369 370 try { 371 await this.db 372 .update(backfillProgress) 373 .set({ 374 didsProcessed, 375 recordsIndexed: totalIndexed, 376 lastProcessedDid: batch[batch.length - 1].did, 377 }) 378 .where(eq(backfillProgress.id, backfillId)); 379 } catch (checkpointError) { 380 if (isProgrammingError(checkpointError)) throw checkpointError; 381 this.logger.warn("backfill.resume.checkpoint_failed", { 382 event: "backfill.resume.checkpoint_failed", 383 backfillId: backfillId.toString(), 384 didsProcessed, 385 error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError), 386 note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.", 387 }); 388 } 389 } 390 } 391 392 // Mark completed 393 await this.db 394 .update(backfillProgress) 395 .set({ 396 status: "completed", 397 didsProcessed, 398 recordsIndexed: totalIndexed, 399 completedAt: new Date(), 400 }) 401 .where(eq(backfillProgress.id, interrupted.id)); 402 403 const result: BackfillResult = { 404 backfillId: interrupted.id, 405 type: interrupted.backfillType as BackfillStatus, 406 didsProcessed, 407 recordsIndexed: totalIndexed, 408 errors: totalErrors, 409 durationMs: Date.now() - startTime, 410 }; 411 412 const resumeEvent = totalErrors > 0 ? "backfill.resume.completed_with_errors" : "backfill.resume.completed"; 413 this.logger.info(resumeEvent, { 414 event: resumeEvent, 415 ...result, 416 backfillId: result.backfillId.toString(), 417 }); 418 419 return result; 420 } catch (error) { 421 // Best-effort: mark as failed 422 try { 423 await this.db 424 .update(backfillProgress) 425 .set({ 426 status: "failed", 427 errorMessage: error instanceof Error ? error.message : String(error), 428 completedAt: new Date(), 429 }) 430 .where(eq(backfillProgress.id, interrupted.id)); 431 } catch (updateError) { 432 this.logger.error("backfill.resume.failed_status_update_error", { 433 event: "backfill.resume.failed_status_update_error", 434 backfillId: interrupted.id.toString(), 435 error: updateError instanceof Error ? updateError.message : String(updateError), 436 }); 437 } 438 439 this.logger.error("backfill.resume.failed", { 440 event: "backfill.resume.failed", 441 backfillId: interrupted.id.toString(), 442 error: error instanceof Error ? error.message : String(error), 443 }); 444 throw error; 445 } finally { 446 this.isRunning = false; 447 } 448 } 449 450 /** 451 * Execute a backfill operation. 452 * Phase 1: Syncs forum-owned collections from the Forum DID. 453 * Phase 2 (CatchUp only): Syncs user-owned collections from all known users. 454 * 455 * @param existingRowId - If provided (from prepareBackfillRow), skips creating a new progress row. 456 */ 457 async performBackfill(type: BackfillStatus, existingRowId?: bigint): Promise<BackfillResult> { 458 if (this.isRunning) { 459 throw new Error("Backfill is already in progress"); 460 } 461 462 this.isRunning = true; 463 const startTime = Date.now(); 464 let backfillId: bigint | undefined = existingRowId; 465 let totalIndexed = 0; 466 let totalErrors = 0; 467 let didsProcessed = 0; 468 469 try { 470 // Create progress row only if not pre-created by prepareBackfillRow 471 if (backfillId === undefined) { 472 const [row] = await this.db 473 .insert(backfillProgress) 474 .values({ 475 status: "in_progress", 476 backfillType: type, 477 startedAt: new Date(), 478 }) 479 .returning({ id: backfillProgress.id }); 480 backfillId = row.id; 481 } 482 // Capture in const so TypeScript can narrow through async closures 483 const resolvedBackfillId: bigint = backfillId; 484 485 const agent = this.createAgentForPds(); 486 487 // Phase 1: Sync forum-owned collections from Forum DID 488 for (const collection of FORUM_OWNED_COLLECTIONS) { 489 const stats = await this.syncRepoRecords( 490 this.config.forumDid, 491 collection, 492 agent 493 ); 494 totalIndexed += stats.recordsIndexed; 495 totalErrors += stats.errors; 496 if (stats.errors > 0) { 497 await this.db.insert(backfillErrors).values({ 498 backfillId: resolvedBackfillId, 499 did: this.config.forumDid, 500 collection, 501 errorMessage: `${stats.errors} record(s) failed`, 502 createdAt: new Date(), 503 }); 504 } 505 } 506 507 // Phase 2: For CatchUp, sync user-owned records from known DIDs 508 if (type === BackfillStatus.CatchUp) { 509 // TODO(ATB-13): Paginate for large forums — currently loads all DIDs into memory 510 const knownUsers = await this.db 511 .select({ did: users.did }) 512 .from(users) 513 .orderBy(asc(users.did)); 514 515 const didsTotal = knownUsers.length; 516 517 await this.db 518 .update(backfillProgress) 519 .set({ didsTotal }) 520 .where(eq(backfillProgress.id, backfillId)); 521 522 // Process in batches of backfillConcurrency 523 for (let i = 0; i < knownUsers.length; i += this.config.backfillConcurrency) { 524 const batch = knownUsers.slice(i, i + this.config.backfillConcurrency); 525 526 const batchResults = await Promise.allSettled( 527 batch.map(async (user) => { 528 let userIndexed = 0; 529 let userErrors = 0; 530 for (const collection of USER_OWNED_COLLECTIONS) { 531 const stats = await this.syncRepoRecords(user.did, collection, agent); 532 userIndexed += stats.recordsIndexed; 533 if (stats.errors > 0) { 534 userErrors += stats.errors; 535 await this.db.insert(backfillErrors).values({ 536 backfillId: resolvedBackfillId, 537 did: user.did, 538 collection, 539 errorMessage: `${stats.errors} record(s) failed`, 540 createdAt: new Date(), 541 }); 542 } 543 } 544 return { indexed: userIndexed, errors: userErrors }; 545 }) 546 ); 547 548 // Aggregate results after settlement, including DID for debuggability 549 batchResults.forEach((result, i) => { 550 if (result.status === "fulfilled") { 551 totalIndexed += result.value.indexed; 552 totalErrors += result.value.errors; 553 } else { 554 totalErrors++; 555 this.logger.error("backfill.batch_user_failed", { 556 event: "backfill.batch_user_failed", 557 backfillId: resolvedBackfillId.toString(), 558 did: batch[i].did, 559 error: result.reason instanceof Error ? result.reason.message : String(result.reason), 560 }); 561 } 562 }); 563 564 didsProcessed += batch.length; 565 566 try { 567 await this.db 568 .update(backfillProgress) 569 .set({ 570 didsProcessed, 571 recordsIndexed: totalIndexed, 572 lastProcessedDid: batch[batch.length - 1].did, 573 }) 574 .where(eq(backfillProgress.id, backfillId)); 575 } catch (checkpointError) { 576 if (isProgrammingError(checkpointError)) throw checkpointError; 577 this.logger.warn("backfill.checkpoint_failed", { 578 event: "backfill.checkpoint_failed", 579 backfillId: resolvedBackfillId.toString(), 580 didsProcessed, 581 error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError), 582 note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.", 583 }); 584 } 585 586 this.logger.info("backfill.progress", { 587 event: "backfill.progress", 588 backfillId: backfillId.toString(), 589 type, 590 didsProcessed, 591 didsTotal, 592 recordsIndexed: totalIndexed, 593 elapsedMs: Date.now() - startTime, 594 }); 595 } 596 } 597 598 // Mark completed 599 await this.db 600 .update(backfillProgress) 601 .set({ 602 status: "completed", 603 didsProcessed, 604 recordsIndexed: totalIndexed, 605 completedAt: new Date(), 606 }) 607 .where(eq(backfillProgress.id, backfillId)); 608 609 const result: BackfillResult = { 610 backfillId: resolvedBackfillId, 611 type, 612 didsProcessed, 613 recordsIndexed: totalIndexed, 614 errors: totalErrors, 615 durationMs: Date.now() - startTime, 616 }; 617 618 const completedEvent = totalErrors > 0 ? "backfill.completed_with_errors" : "backfill.completed"; 619 this.logger.info(completedEvent, { 620 event: completedEvent, 621 ...result, 622 backfillId: result.backfillId.toString(), 623 }); 624 625 return result; 626 } catch (error) { 627 // Best-effort: mark progress row as failed (if it was created) 628 if (backfillId !== undefined) { 629 try { 630 await this.db 631 .update(backfillProgress) 632 .set({ 633 status: "failed", 634 errorMessage: error instanceof Error ? error.message : String(error), 635 completedAt: new Date(), 636 }) 637 .where(eq(backfillProgress.id, backfillId)); 638 } catch (updateError) { 639 this.logger.error("backfill.failed_status_update_error", { 640 event: "backfill.failed_status_update_error", 641 backfillId: backfillId.toString(), 642 error: updateError instanceof Error ? updateError.message : String(updateError), 643 }); 644 } 645 } 646 647 this.logger.error("backfill.failed", { 648 event: "backfill.failed", 649 backfillId: backfillId !== undefined ? backfillId.toString() : "not_created", 650 error: error instanceof Error ? error.message : String(error), 651 }); 652 throw error; 653 } finally { 654 this.isRunning = false; 655 } 656 } 657} 658