WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import type { Database } from "@atbb/db";
2import { forums, backfillProgress, backfillErrors, users } from "@atbb/db";
3import { eq, asc, gt } from "drizzle-orm";
4import { AtpAgent } from "@atproto/api";
5import { CursorManager } from "./cursor-manager.js";
6import type { AppConfig } from "./config.js";
7import type { Indexer } from "./indexer.js";
8import { isProgrammingError } from "./errors.js";
9import type { Logger } from "@atbb/logger";
10
11/**
12 * Maps AT Proto collection NSIDs to Indexer handler method names.
13 * Order matters: sync forum-owned records first (FK dependencies).
14 */
15// These collections define the sync order. Used by performBackfill() in Task 6.
16export const FORUM_OWNED_COLLECTIONS = [
17 "space.atbb.forum.forum",
18 "space.atbb.forum.category",
19 "space.atbb.forum.board",
20 "space.atbb.forum.role",
21 "space.atbb.modAction",
22] as const;
23
24export const USER_OWNED_COLLECTIONS = [
25 "space.atbb.membership",
26 "space.atbb.post",
27] as const;
28
29const COLLECTION_HANDLER_MAP: Record<string, string> = {
30 "space.atbb.post": "handlePostCreate",
31 "space.atbb.forum.forum": "handleForumCreate",
32 "space.atbb.forum.category": "handleCategoryCreate",
33 "space.atbb.forum.board": "handleBoardCreate",
34 "space.atbb.forum.role": "handleRoleCreate",
35 "space.atbb.membership": "handleMembershipCreate",
36 "space.atbb.modAction": "handleModActionCreate",
37};
38
39export enum BackfillStatus {
40 NotNeeded = "not_needed",
41 CatchUp = "catch_up",
42 FullSync = "full_sync",
43}
44
45export interface BackfillResult {
46 backfillId: bigint;
47 type: BackfillStatus;
48 didsProcessed: number;
49 recordsIndexed: number;
50 errors: number;
51 durationMs: number;
52}
53
54export interface SyncStats {
55 recordsFound: number;
56 recordsIndexed: number;
57 errors: number;
58}
59
60export class BackfillManager {
61 private cursorManager: CursorManager;
62 private isRunning = false;
63 private indexer: Indexer | null = null;
64
65 constructor(
66 private db: Database,
67 private config: AppConfig,
68 private logger: Logger,
69 ) {
70 this.cursorManager = new CursorManager(db, logger);
71 }
72
73 /**
74 * Inject the Indexer instance. Called during AppContext wiring.
75 */
76 setIndexer(indexer: Indexer): void {
77 this.indexer = indexer;
78 }
79
80 /**
81 * Sync all records from a single (DID, collection) pair via listRecords.
82 * Feeds each record through the matching Indexer handler.
83 */
84 async syncRepoRecords(
85 did: string,
86 collection: string,
87 agent: AtpAgent
88 ): Promise<SyncStats> {
89 const stats: SyncStats = { recordsFound: 0, recordsIndexed: 0, errors: 0 };
90 const handlerName = COLLECTION_HANDLER_MAP[collection];
91
92 if (!handlerName || !this.indexer) {
93 this.logger.error("backfill.sync_skipped", {
94 event: "backfill.sync_skipped",
95 did,
96 collection,
97 reason: !handlerName ? "unknown_collection" : "indexer_not_set",
98 });
99 stats.errors = 1;
100 return stats;
101 }
102
103 const handler = (this.indexer as any)[handlerName].bind(this.indexer);
104 const delayMs = 1000 / this.config.backfillRateLimit;
105 let cursor: string | undefined;
106
107 try {
108 do {
109 const response = await agent.com.atproto.repo.listRecords({
110 repo: did,
111 collection,
112 limit: 100,
113 cursor,
114 });
115
116 const records = response.data.records;
117 stats.recordsFound += records.length;
118
119 for (const record of records) {
120 try {
121 const rkey = record.uri.split("/").pop()!;
122 const event = {
123 did,
124 commit: { rkey, cid: record.cid, record: record.value },
125 };
126 await handler(event);
127 stats.recordsIndexed++;
128 } catch (error) {
129 if (isProgrammingError(error)) throw error;
130 stats.errors++;
131 this.logger.error("backfill.record_error", {
132 event: "backfill.record_error",
133 did,
134 collection,
135 uri: record.uri,
136 error: error instanceof Error ? error.message : String(error),
137 });
138 }
139 }
140
141 cursor = response.data.cursor;
142
143 // Rate limiting: delay between page fetches
144 if (cursor) {
145 await new Promise((resolve) => setTimeout(resolve, delayMs));
146 }
147 } while (cursor);
148 } catch (error) {
149 stats.errors++;
150 this.logger.error("backfill.pds_error", {
151 event: "backfill.pds_error",
152 did,
153 collection,
154 error: error instanceof Error ? error.message : String(error),
155 });
156 }
157
158 return stats;
159 }
160
161 /**
162 * Determine if backfill is needed based on cursor state and DB contents.
163 */
164 async checkIfNeeded(cursor: bigint | null): Promise<BackfillStatus> {
165 // No cursor at all → first startup or wiped cursor
166 if (cursor === null) {
167 this.logger.info("backfill.decision", {
168 event: "backfill.decision",
169 status: BackfillStatus.FullSync,
170 reason: "no_cursor",
171 });
172 return BackfillStatus.FullSync;
173 }
174
175 // Check if DB has forum data (consistency check)
176 let forum: { rkey: string } | undefined;
177 try {
178 const results = await this.db
179 .select()
180 .from(forums)
181 .where(eq(forums.rkey, "self"))
182 .limit(1);
183 forum = results[0];
184 } catch (error) {
185 this.logger.error("backfill.decision", {
186 event: "backfill.decision",
187 status: BackfillStatus.FullSync,
188 reason: "db_query_failed",
189 error: error instanceof Error ? error.message : String(error),
190 });
191 return BackfillStatus.FullSync;
192 }
193
194 if (!forum) {
195 this.logger.info("backfill.decision", {
196 event: "backfill.decision",
197 status: BackfillStatus.FullSync,
198 reason: "db_inconsistency",
199 cursorTimestamp: cursor.toString(),
200 });
201 return BackfillStatus.FullSync;
202 }
203
204 // Check cursor age
205 const ageHours = this.cursorManager.getCursorAgeHours(cursor)!;
206 if (ageHours > this.config.backfillCursorMaxAgeHours) {
207 this.logger.info("backfill.decision", {
208 event: "backfill.decision",
209 status: BackfillStatus.CatchUp,
210 reason: "cursor_too_old",
211 cursorAgeHours: Math.round(ageHours),
212 thresholdHours: this.config.backfillCursorMaxAgeHours,
213 cursorTimestamp: cursor.toString(),
214 });
215 return BackfillStatus.CatchUp;
216 }
217
218 this.logger.info("backfill.decision", {
219 event: "backfill.decision",
220 status: BackfillStatus.NotNeeded,
221 reason: "cursor_fresh",
222 cursorAgeHours: Math.round(ageHours),
223 });
224 return BackfillStatus.NotNeeded;
225 }
226
227 /**
228 * Check if a backfill is currently running.
229 */
230 getIsRunning(): boolean {
231 return this.isRunning;
232 }
233
234 /**
235 * Create an AtpAgent pointed at the forum's PDS.
236 * Extracted as a private method for test mocking.
237 */
238 private createAgentForPds(): AtpAgent {
239 return new AtpAgent({ service: this.config.pdsUrl });
240 }
241
242 /**
243 * Create a progress row and return its ID.
244 * Use this before performBackfill when you need the ID immediately (e.g., for a 202 response).
245 * Pass the returned ID as existingRowId to performBackfill to skip duplicate row creation.
246 */
247 async prepareBackfillRow(type: BackfillStatus): Promise<bigint> {
248 const [row] = await this.db
249 .insert(backfillProgress)
250 .values({
251 status: "in_progress",
252 backfillType: type,
253 startedAt: new Date(),
254 })
255 .returning({ id: backfillProgress.id });
256 return row.id;
257 }
258
259 /**
260 * Query the backfill_progress table for any row with status = 'in_progress'.
261 * Returns the first such row, or null if none exists.
262 */
263 async checkForInterruptedBackfill() {
264 try {
265 const [row] = await this.db
266 .select()
267 .from(backfillProgress)
268 .where(eq(backfillProgress.status, "in_progress"))
269 .limit(1);
270
271 return row ?? null;
272 } catch (error) {
273 if (isProgrammingError(error)) throw error;
274 this.logger.error("backfill.check_interrupted.failed", {
275 event: "backfill.check_interrupted.failed",
276 error: error instanceof Error ? error.message : String(error),
277 note: "Could not check for interrupted backfills — assuming none",
278 });
279 return null;
280 }
281 }
282
283 /**
284 * Resume a CatchUp backfill from its last checkpoint (lastProcessedDid).
285 * Only processes users with DID > lastProcessedDid.
286 * Does NOT re-run Phase 1 (forum-owned collections).
287 */
288 async resumeBackfill(interrupted: typeof backfillProgress.$inferSelect): Promise<BackfillResult> {
289 if (this.isRunning) {
290 throw new Error("Backfill is already in progress");
291 }
292
293 this.isRunning = true;
294 const startTime = Date.now();
295 let totalIndexed = interrupted.recordsIndexed;
296 let totalErrors = 0;
297 let didsProcessed = interrupted.didsProcessed;
298
299 this.logger.info("backfill.resuming", {
300 event: "backfill.resuming",
301 backfillId: interrupted.id.toString(),
302 lastProcessedDid: interrupted.lastProcessedDid,
303 didsProcessed: interrupted.didsProcessed,
304 didsTotal: interrupted.didsTotal,
305 });
306
307 try {
308 const agent = this.createAgentForPds();
309
310 if (interrupted.backfillType !== BackfillStatus.CatchUp) {
311 // FullSync cannot be resumed from a checkpoint — it must re-run from scratch
312 throw new Error(
313 "Interrupted FullSync cannot be resumed. Re-trigger via /api/admin/backfill?force=full_sync."
314 );
315 }
316
317 if (interrupted.lastProcessedDid) {
318 // Resume: fetch users after lastProcessedDid
319 // TODO(ATB-13): Paginate for large forums
320 const remainingUsers = await this.db
321 .select({ did: users.did })
322 .from(users)
323 .where(gt(users.did, interrupted.lastProcessedDid))
324 .orderBy(asc(users.did));
325
326 for (let i = 0; i < remainingUsers.length; i += this.config.backfillConcurrency) {
327 const batch = remainingUsers.slice(i, i + this.config.backfillConcurrency);
328 const backfillId = interrupted.id;
329
330 const batchResults = await Promise.allSettled(
331 batch.map(async (user) => {
332 let userIndexed = 0;
333 let userErrors = 0;
334 for (const collection of USER_OWNED_COLLECTIONS) {
335 const stats = await this.syncRepoRecords(user.did, collection, agent);
336 userIndexed += stats.recordsIndexed;
337 if (stats.errors > 0) {
338 userErrors += stats.errors;
339 await this.db.insert(backfillErrors).values({
340 backfillId,
341 did: user.did,
342 collection,
343 errorMessage: `${stats.errors} record(s) failed`,
344 createdAt: new Date(),
345 });
346 }
347 }
348 return { indexed: userIndexed, errors: userErrors };
349 })
350 );
351
352 // Aggregate results after settlement, including DID for debuggability
353 batchResults.forEach((result, i) => {
354 if (result.status === "fulfilled") {
355 totalIndexed += result.value.indexed;
356 totalErrors += result.value.errors;
357 } else {
358 totalErrors++;
359 this.logger.error("backfill.resume.batch_user_failed", {
360 event: "backfill.resume.batch_user_failed",
361 backfillId: backfillId.toString(),
362 did: batch[i].did,
363 error: result.reason instanceof Error ? result.reason.message : String(result.reason),
364 });
365 }
366 });
367
368 didsProcessed += batch.length;
369
370 try {
371 await this.db
372 .update(backfillProgress)
373 .set({
374 didsProcessed,
375 recordsIndexed: totalIndexed,
376 lastProcessedDid: batch[batch.length - 1].did,
377 })
378 .where(eq(backfillProgress.id, backfillId));
379 } catch (checkpointError) {
380 if (isProgrammingError(checkpointError)) throw checkpointError;
381 this.logger.warn("backfill.resume.checkpoint_failed", {
382 event: "backfill.resume.checkpoint_failed",
383 backfillId: backfillId.toString(),
384 didsProcessed,
385 error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError),
386 note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.",
387 });
388 }
389 }
390 }
391
392 // Mark completed
393 await this.db
394 .update(backfillProgress)
395 .set({
396 status: "completed",
397 didsProcessed,
398 recordsIndexed: totalIndexed,
399 completedAt: new Date(),
400 })
401 .where(eq(backfillProgress.id, interrupted.id));
402
403 const result: BackfillResult = {
404 backfillId: interrupted.id,
405 type: interrupted.backfillType as BackfillStatus,
406 didsProcessed,
407 recordsIndexed: totalIndexed,
408 errors: totalErrors,
409 durationMs: Date.now() - startTime,
410 };
411
412 const resumeEvent = totalErrors > 0 ? "backfill.resume.completed_with_errors" : "backfill.resume.completed";
413 this.logger.info(resumeEvent, {
414 event: resumeEvent,
415 ...result,
416 backfillId: result.backfillId.toString(),
417 });
418
419 return result;
420 } catch (error) {
421 // Best-effort: mark as failed
422 try {
423 await this.db
424 .update(backfillProgress)
425 .set({
426 status: "failed",
427 errorMessage: error instanceof Error ? error.message : String(error),
428 completedAt: new Date(),
429 })
430 .where(eq(backfillProgress.id, interrupted.id));
431 } catch (updateError) {
432 this.logger.error("backfill.resume.failed_status_update_error", {
433 event: "backfill.resume.failed_status_update_error",
434 backfillId: interrupted.id.toString(),
435 error: updateError instanceof Error ? updateError.message : String(updateError),
436 });
437 }
438
439 this.logger.error("backfill.resume.failed", {
440 event: "backfill.resume.failed",
441 backfillId: interrupted.id.toString(),
442 error: error instanceof Error ? error.message : String(error),
443 });
444 throw error;
445 } finally {
446 this.isRunning = false;
447 }
448 }
449
450 /**
451 * Execute a backfill operation.
452 * Phase 1: Syncs forum-owned collections from the Forum DID.
453 * Phase 2 (CatchUp only): Syncs user-owned collections from all known users.
454 *
455 * @param existingRowId - If provided (from prepareBackfillRow), skips creating a new progress row.
456 */
457 async performBackfill(type: BackfillStatus, existingRowId?: bigint): Promise<BackfillResult> {
458 if (this.isRunning) {
459 throw new Error("Backfill is already in progress");
460 }
461
462 this.isRunning = true;
463 const startTime = Date.now();
464 let backfillId: bigint | undefined = existingRowId;
465 let totalIndexed = 0;
466 let totalErrors = 0;
467 let didsProcessed = 0;
468
469 try {
470 // Create progress row only if not pre-created by prepareBackfillRow
471 if (backfillId === undefined) {
472 const [row] = await this.db
473 .insert(backfillProgress)
474 .values({
475 status: "in_progress",
476 backfillType: type,
477 startedAt: new Date(),
478 })
479 .returning({ id: backfillProgress.id });
480 backfillId = row.id;
481 }
482 // Capture in const so TypeScript can narrow through async closures
483 const resolvedBackfillId: bigint = backfillId;
484
485 const agent = this.createAgentForPds();
486
487 // Phase 1: Sync forum-owned collections from Forum DID
488 for (const collection of FORUM_OWNED_COLLECTIONS) {
489 const stats = await this.syncRepoRecords(
490 this.config.forumDid,
491 collection,
492 agent
493 );
494 totalIndexed += stats.recordsIndexed;
495 totalErrors += stats.errors;
496 if (stats.errors > 0) {
497 await this.db.insert(backfillErrors).values({
498 backfillId: resolvedBackfillId,
499 did: this.config.forumDid,
500 collection,
501 errorMessage: `${stats.errors} record(s) failed`,
502 createdAt: new Date(),
503 });
504 }
505 }
506
507 // Phase 2: For CatchUp, sync user-owned records from known DIDs
508 if (type === BackfillStatus.CatchUp) {
509 // TODO(ATB-13): Paginate for large forums — currently loads all DIDs into memory
510 const knownUsers = await this.db
511 .select({ did: users.did })
512 .from(users)
513 .orderBy(asc(users.did));
514
515 const didsTotal = knownUsers.length;
516
517 await this.db
518 .update(backfillProgress)
519 .set({ didsTotal })
520 .where(eq(backfillProgress.id, backfillId));
521
522 // Process in batches of backfillConcurrency
523 for (let i = 0; i < knownUsers.length; i += this.config.backfillConcurrency) {
524 const batch = knownUsers.slice(i, i + this.config.backfillConcurrency);
525
526 const batchResults = await Promise.allSettled(
527 batch.map(async (user) => {
528 let userIndexed = 0;
529 let userErrors = 0;
530 for (const collection of USER_OWNED_COLLECTIONS) {
531 const stats = await this.syncRepoRecords(user.did, collection, agent);
532 userIndexed += stats.recordsIndexed;
533 if (stats.errors > 0) {
534 userErrors += stats.errors;
535 await this.db.insert(backfillErrors).values({
536 backfillId: resolvedBackfillId,
537 did: user.did,
538 collection,
539 errorMessage: `${stats.errors} record(s) failed`,
540 createdAt: new Date(),
541 });
542 }
543 }
544 return { indexed: userIndexed, errors: userErrors };
545 })
546 );
547
548 // Aggregate results after settlement, including DID for debuggability
549 batchResults.forEach((result, i) => {
550 if (result.status === "fulfilled") {
551 totalIndexed += result.value.indexed;
552 totalErrors += result.value.errors;
553 } else {
554 totalErrors++;
555 this.logger.error("backfill.batch_user_failed", {
556 event: "backfill.batch_user_failed",
557 backfillId: resolvedBackfillId.toString(),
558 did: batch[i].did,
559 error: result.reason instanceof Error ? result.reason.message : String(result.reason),
560 });
561 }
562 });
563
564 didsProcessed += batch.length;
565
566 try {
567 await this.db
568 .update(backfillProgress)
569 .set({
570 didsProcessed,
571 recordsIndexed: totalIndexed,
572 lastProcessedDid: batch[batch.length - 1].did,
573 })
574 .where(eq(backfillProgress.id, backfillId));
575 } catch (checkpointError) {
576 if (isProgrammingError(checkpointError)) throw checkpointError;
577 this.logger.warn("backfill.checkpoint_failed", {
578 event: "backfill.checkpoint_failed",
579 backfillId: resolvedBackfillId.toString(),
580 didsProcessed,
581 error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError),
582 note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.",
583 });
584 }
585
586 this.logger.info("backfill.progress", {
587 event: "backfill.progress",
588 backfillId: backfillId.toString(),
589 type,
590 didsProcessed,
591 didsTotal,
592 recordsIndexed: totalIndexed,
593 elapsedMs: Date.now() - startTime,
594 });
595 }
596 }
597
598 // Mark completed
599 await this.db
600 .update(backfillProgress)
601 .set({
602 status: "completed",
603 didsProcessed,
604 recordsIndexed: totalIndexed,
605 completedAt: new Date(),
606 })
607 .where(eq(backfillProgress.id, backfillId));
608
609 const result: BackfillResult = {
610 backfillId: resolvedBackfillId,
611 type,
612 didsProcessed,
613 recordsIndexed: totalIndexed,
614 errors: totalErrors,
615 durationMs: Date.now() - startTime,
616 };
617
618 const completedEvent = totalErrors > 0 ? "backfill.completed_with_errors" : "backfill.completed";
619 this.logger.info(completedEvent, {
620 event: completedEvent,
621 ...result,
622 backfillId: result.backfillId.toString(),
623 });
624
625 return result;
626 } catch (error) {
627 // Best-effort: mark progress row as failed (if it was created)
628 if (backfillId !== undefined) {
629 try {
630 await this.db
631 .update(backfillProgress)
632 .set({
633 status: "failed",
634 errorMessage: error instanceof Error ? error.message : String(error),
635 completedAt: new Date(),
636 })
637 .where(eq(backfillProgress.id, backfillId));
638 } catch (updateError) {
639 this.logger.error("backfill.failed_status_update_error", {
640 event: "backfill.failed_status_update_error",
641 backfillId: backfillId.toString(),
642 error: updateError instanceof Error ? updateError.message : String(updateError),
643 });
644 }
645 }
646
647 this.logger.error("backfill.failed", {
648 event: "backfill.failed",
649 backfillId: backfillId !== undefined ? backfillId.toString() : "not_created",
650 error: error instanceof Error ? error.message : String(error),
651 });
652 throw error;
653 } finally {
654 this.isRunning = false;
655 }
656 }
657}
658