WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import { describe, it, expect, beforeEach, vi, afterEach } from "vitest";
2import { BackfillManager, BackfillStatus } from "../backfill-manager.js";
3import type { Database } from "@atbb/db";
4import type { AppConfig } from "../config.js";
5import { AtpAgent } from "@atproto/api";
6import type { Indexer } from "../indexer.js";
7import { createMockLogger } from "./mock-logger.js";
8
9vi.mock("@atproto/api", () => ({
10 AtpAgent: vi.fn().mockImplementation(() => ({
11 com: {
12 atproto: {
13 repo: {
14 listRecords: vi.fn(),
15 },
16 },
17 },
18 })),
19}));
20
21// Minimal mock config
22function mockConfig(overrides: Partial<AppConfig> = {}): AppConfig {
23 return {
24 port: 3000,
25 forumDid: "did:plc:testforum",
26 pdsUrl: "https://pds.example.com",
27 databaseUrl: "postgres://test",
28 jetstreamUrl: "wss://jetstream.example.com",
29 oauthPublicUrl: "https://example.com",
30 sessionSecret: "a".repeat(32),
31 sessionTtlDays: 7,
32 backfillRateLimit: 10,
33 backfillConcurrency: 10,
34 backfillCursorMaxAgeHours: 48,
35 ...overrides,
36 } as AppConfig;
37}
38
39describe("BackfillManager", () => {
40 let mockDb: Database;
41 let manager: BackfillManager;
42 let mockLogger: ReturnType<typeof createMockLogger>;
43
44 beforeEach(() => {
45 mockDb = {
46 select: vi.fn().mockReturnValue({
47 from: vi.fn().mockReturnValue({
48 where: vi.fn().mockReturnValue({
49 limit: vi.fn().mockResolvedValue([]),
50 }),
51 }),
52 }),
53 } as unknown as Database;
54
55 mockLogger = createMockLogger();
56 manager = new BackfillManager(mockDb, mockConfig(), mockLogger);
57 });
58
59 afterEach(() => {
60 vi.clearAllMocks();
61 });
62
63 describe("checkIfNeeded", () => {
64 it("returns FullSync when cursor is null (no cursor)", async () => {
65 const status = await manager.checkIfNeeded(null);
66 expect(status).toBe(BackfillStatus.FullSync);
67 });
68
69 it("returns FullSync when cursor exists but forums table is empty", async () => {
70 // Forums query returns empty
71 vi.spyOn(mockDb, "select").mockReturnValue({
72 from: vi.fn().mockReturnValue({
73 where: vi.fn().mockReturnValue({
74 limit: vi.fn().mockResolvedValue([]),
75 }),
76 }),
77 } as any);
78
79 // Cursor from 1 hour ago (fresh)
80 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000);
81 const status = await manager.checkIfNeeded(cursor);
82 expect(status).toBe(BackfillStatus.FullSync);
83 });
84
85 it("returns CatchUp when cursor age exceeds threshold", async () => {
86 // Forums query returns a forum (DB not empty)
87 vi.spyOn(mockDb, "select").mockReturnValue({
88 from: vi.fn().mockReturnValue({
89 where: vi.fn().mockReturnValue({
90 limit: vi.fn().mockResolvedValue([{ id: 1n, rkey: "self" }]),
91 }),
92 }),
93 } as any);
94
95 // Cursor from 72 hours ago (stale)
96 const cursor = BigInt((Date.now() - 72 * 60 * 60 * 1000) * 1000);
97 const status = await manager.checkIfNeeded(cursor);
98 expect(status).toBe(BackfillStatus.CatchUp);
99 });
100
101 it("returns NotNeeded when cursor is fresh and DB has data", async () => {
102 // Forums query returns a forum
103 vi.spyOn(mockDb, "select").mockReturnValue({
104 from: vi.fn().mockReturnValue({
105 where: vi.fn().mockReturnValue({
106 limit: vi.fn().mockResolvedValue([{ id: 1n, rkey: "self" }]),
107 }),
108 }),
109 } as any);
110
111 // Cursor from 1 hour ago (fresh)
112 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000);
113 const status = await manager.checkIfNeeded(cursor);
114 expect(status).toBe(BackfillStatus.NotNeeded);
115 });
116
117 it("returns FullSync when DB query fails (fail safe)", async () => {
118 vi.spyOn(mockDb, "select").mockReturnValue({
119 from: vi.fn().mockReturnValue({
120 where: vi.fn().mockReturnValue({
121 limit: vi.fn().mockRejectedValue(new Error("DB connection lost")),
122 }),
123 }),
124 } as any);
125
126 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
127 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000);
128 const status = await manager.checkIfNeeded(cursor);
129 expect(status).toBe(BackfillStatus.FullSync);
130 consoleSpy.mockRestore();
131 });
132 });
133
134 describe("syncRepoRecords", () => {
135 let mockIndexer: Indexer;
136
137 beforeEach(() => {
138 mockIndexer = {
139 handlePostCreate: vi.fn().mockResolvedValue(true),
140 handleForumCreate: vi.fn().mockResolvedValue(true),
141 } as unknown as Indexer;
142 });
143
144 it("fetches records and calls indexer for each one", async () => {
145 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
146 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({
147 data: {
148 records: [
149 {
150 uri: "at://did:plc:user1/space.atbb.post/abc123",
151 cid: "bafyabc",
152 value: { $type: "space.atbb.post", text: "Hello", createdAt: "2026-01-01T00:00:00Z" },
153 },
154 {
155 uri: "at://did:plc:user1/space.atbb.post/def456",
156 cid: "bafydef",
157 value: { $type: "space.atbb.post", text: "World", createdAt: "2026-01-01T01:00:00Z" },
158 },
159 ],
160 cursor: undefined,
161 },
162 });
163
164 manager.setIndexer(mockIndexer);
165 const stats = await manager.syncRepoRecords(
166 "did:plc:user1",
167 "space.atbb.post",
168 mockAgent
169 );
170
171 expect(stats.recordsFound).toBe(2);
172 expect(stats.recordsIndexed).toBe(2);
173 expect(stats.errors).toBe(0);
174 expect(mockIndexer.handlePostCreate).toHaveBeenCalledTimes(2);
175 expect(mockIndexer.handlePostCreate).toHaveBeenCalledWith(
176 expect.objectContaining({
177 did: "did:plc:user1",
178 commit: expect.objectContaining({
179 rkey: "abc123",
180 cid: "bafyabc",
181 record: expect.objectContaining({ text: "Hello" }),
182 }),
183 })
184 );
185 });
186
187 it("paginates through multiple pages", async () => {
188 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
189 (mockAgent.com.atproto.repo.listRecords as any)
190 .mockResolvedValueOnce({
191 data: {
192 records: [{
193 uri: "at://did:plc:user1/space.atbb.post/page1",
194 cid: "bafyp1",
195 value: { $type: "space.atbb.post", text: "Page 1", createdAt: "2026-01-01T00:00:00Z" },
196 }],
197 cursor: "next_page",
198 },
199 })
200 .mockResolvedValueOnce({
201 data: {
202 records: [{
203 uri: "at://did:plc:user1/space.atbb.post/page2",
204 cid: "bafyp2",
205 value: { $type: "space.atbb.post", text: "Page 2", createdAt: "2026-01-02T00:00:00Z" },
206 }],
207 cursor: undefined,
208 },
209 });
210
211 manager.setIndexer(mockIndexer);
212 const stats = await manager.syncRepoRecords(
213 "did:plc:user1",
214 "space.atbb.post",
215 mockAgent
216 );
217
218 expect(stats.recordsFound).toBe(2);
219 expect(stats.recordsIndexed).toBe(2);
220 expect(mockAgent.com.atproto.repo.listRecords).toHaveBeenCalledTimes(2);
221 });
222
223 it("continues on indexer errors and tracks error count", async () => {
224 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
225 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({
226 data: {
227 records: [
228 {
229 uri: "at://did:plc:user1/space.atbb.post/good",
230 cid: "bafygood",
231 value: { $type: "space.atbb.post", text: "Good", createdAt: "2026-01-01T00:00:00Z" },
232 },
233 {
234 uri: "at://did:plc:user1/space.atbb.post/bad",
235 cid: "bafybad",
236 value: { $type: "space.atbb.post", text: "Bad", createdAt: "2026-01-01T01:00:00Z" },
237 },
238 ],
239 cursor: undefined,
240 },
241 });
242
243 (mockIndexer.handlePostCreate as any)
244 .mockResolvedValueOnce(true)
245 .mockRejectedValueOnce(new Error("FK missing"));
246
247 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
248 manager.setIndexer(mockIndexer);
249 const stats = await manager.syncRepoRecords(
250 "did:plc:user1",
251 "space.atbb.post",
252 mockAgent
253 );
254
255 expect(stats.recordsFound).toBe(2);
256 expect(stats.recordsIndexed).toBe(1);
257 expect(stats.errors).toBe(1);
258 consoleSpy.mockRestore();
259 });
260
261 it("returns error stats when indexer is not set", async () => {
262 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
263 // No setIndexer call — indexer is null
264 const stats = await manager.syncRepoRecords("did:plc:user", "space.atbb.post", mockAgent);
265 expect(stats.errors).toBe(1);
266 expect(mockLogger.error).toHaveBeenCalledWith(
267 "backfill.sync_skipped",
268 expect.objectContaining({ reason: "indexer_not_set" })
269 );
270 });
271
272 it("handles PDS connection failure gracefully", async () => {
273 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
274 (mockAgent.com.atproto.repo.listRecords as any)
275 .mockRejectedValueOnce(new Error("fetch failed"));
276
277 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
278 manager.setIndexer(mockIndexer);
279 const stats = await manager.syncRepoRecords(
280 "did:plc:user1",
281 "space.atbb.post",
282 mockAgent
283 );
284
285 expect(stats.recordsFound).toBe(0);
286 expect(stats.recordsIndexed).toBe(0);
287 expect(stats.errors).toBe(1);
288 consoleSpy.mockRestore();
289 });
290 });
291
292 describe("performBackfill", () => {
293 let mockIndexer: Indexer;
294 let consoleSpy: any;
295
296 beforeEach(() => {
297 consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {});
298 vi.spyOn(console, "error").mockImplementation(() => {});
299 vi.spyOn(console, "warn").mockImplementation(() => {});
300
301 mockIndexer = {
302 handleForumCreate: vi.fn().mockResolvedValue(true),
303 handleCategoryCreate: vi.fn().mockResolvedValue(true),
304 handleBoardCreate: vi.fn().mockResolvedValue(true),
305 handleRoleCreate: vi.fn().mockResolvedValue(true),
306 handleMembershipCreate: vi.fn().mockResolvedValue(true),
307 handlePostCreate: vi.fn().mockResolvedValue(true),
308 handleModActionCreate: vi.fn().mockResolvedValue(true),
309 } as unknown as Indexer;
310 });
311
312 afterEach(() => {
313 consoleSpy.mockRestore();
314 });
315
316 it("creates a backfill_progress row on start", async () => {
317 const mockInsert = vi.fn().mockReturnValue({
318 values: vi.fn().mockReturnValue({
319 returning: vi.fn().mockResolvedValue([{ id: 1n }]),
320 }),
321 });
322
323 const mockSelectEmpty = vi.fn().mockReturnValue({
324 from: vi.fn().mockReturnValue({
325 where: vi.fn().mockReturnValue({
326 limit: vi.fn().mockResolvedValue([]),
327 orderBy: vi.fn().mockResolvedValue([]),
328 }),
329 orderBy: vi.fn().mockResolvedValue([]),
330 }),
331 });
332
333 mockDb = {
334 select: mockSelectEmpty,
335 insert: mockInsert,
336 update: vi.fn().mockReturnValue({
337 set: vi.fn().mockReturnValue({
338 where: vi.fn().mockResolvedValue(undefined),
339 }),
340 }),
341 } as unknown as Database;
342
343 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
344 manager.setIndexer(mockIndexer);
345
346 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
347 com: {
348 atproto: {
349 repo: {
350 listRecords: vi.fn().mockResolvedValue({
351 data: { records: [], cursor: undefined },
352 }),
353 },
354 },
355 },
356 });
357
358 await manager.performBackfill(BackfillStatus.FullSync);
359
360 expect(mockInsert).toHaveBeenCalled();
361 });
362
363 it("sets isRunning flag during backfill", async () => {
364 const mockInsert = vi.fn().mockReturnValue({
365 values: vi.fn().mockReturnValue({
366 returning: vi.fn().mockResolvedValue([{ id: 1n }]),
367 }),
368 });
369
370 mockDb = {
371 select: vi.fn().mockReturnValue({
372 from: vi.fn().mockReturnValue({
373 where: vi.fn().mockReturnValue({
374 limit: vi.fn().mockResolvedValue([]),
375 orderBy: vi.fn().mockResolvedValue([]),
376 }),
377 orderBy: vi.fn().mockResolvedValue([]),
378 }),
379 }),
380 insert: mockInsert,
381 update: vi.fn().mockReturnValue({
382 set: vi.fn().mockReturnValue({
383 where: vi.fn().mockResolvedValue(undefined),
384 }),
385 }),
386 } as unknown as Database;
387
388 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
389 manager.setIndexer(mockIndexer);
390
391 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
392 com: {
393 atproto: {
394 repo: {
395 listRecords: vi.fn().mockResolvedValue({
396 data: { records: [], cursor: undefined },
397 }),
398 },
399 },
400 },
401 });
402
403 expect(manager.getIsRunning()).toBe(false);
404 const promise = manager.performBackfill(BackfillStatus.FullSync);
405 expect(manager.getIsRunning()).toBe(true);
406 await promise;
407 expect(manager.getIsRunning()).toBe(false);
408 });
409
410 it("rejects concurrent backfill attempts", async () => {
411 const mockInsert = vi.fn().mockReturnValue({
412 values: vi.fn().mockReturnValue({
413 returning: vi.fn().mockResolvedValue([{ id: 1n }]),
414 }),
415 });
416
417 mockDb = {
418 select: vi.fn().mockReturnValue({
419 from: vi.fn().mockReturnValue({
420 where: vi.fn().mockReturnValue({
421 limit: vi.fn().mockResolvedValue([]),
422 orderBy: vi.fn().mockResolvedValue([]),
423 }),
424 orderBy: vi.fn().mockResolvedValue([]),
425 }),
426 }),
427 insert: mockInsert,
428 update: vi.fn().mockReturnValue({
429 set: vi.fn().mockReturnValue({
430 where: vi.fn().mockResolvedValue(undefined),
431 }),
432 }),
433 } as unknown as Database;
434
435 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
436 manager.setIndexer(mockIndexer);
437
438 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
439 com: {
440 atproto: {
441 repo: {
442 listRecords: vi.fn().mockImplementation(
443 () => new Promise((resolve) =>
444 setTimeout(() => resolve({ data: { records: [], cursor: undefined } }), 100)
445 )
446 ),
447 },
448 },
449 },
450 });
451
452 const first = manager.performBackfill(BackfillStatus.FullSync);
453
454 await expect(manager.performBackfill(BackfillStatus.FullSync))
455 .rejects.toThrow("Backfill is already in progress");
456
457 await first;
458 });
459
460 it("CatchUp: syncs user-owned collections and aggregates counts", async () => {
461 // Phase 1 (5 FORUM_OWNED_COLLECTIONS) must return empty so its records don't
462 // pollute the count. Phase 2: 2 users × 2 USER_OWNED_COLLECTIONS × 1 record = 4.
463 const emptyPage = { data: { records: [], cursor: undefined } };
464 const recordPage = {
465 data: {
466 records: [{
467 uri: "at://did:plc:u/space.atbb.post/r1",
468 cid: "bafyr1",
469 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" },
470 }],
471 cursor: undefined,
472 },
473 };
474
475 const mockListRecords = vi.fn()
476 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.forum (Phase 1 call 1)
477 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.category (Phase 1 call 2)
478 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.board (Phase 1 call 3)
479 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.role (Phase 1 call 4)
480 .mockResolvedValueOnce(emptyPage) // space.atbb.modAction (Phase 1 call 5)
481 .mockResolvedValue(recordPage); // all Phase 2 user collection calls
482
483 mockDb = {
484 select: vi.fn().mockReturnValue({
485 from: vi.fn().mockReturnValue({
486 orderBy: vi.fn().mockResolvedValue([
487 { did: "did:plc:user1" },
488 { did: "did:plc:user2" },
489 ]),
490 }),
491 }),
492 insert: vi.fn().mockReturnValue({
493 values: vi.fn().mockReturnValue({
494 returning: vi.fn().mockResolvedValue([{ id: 42n }]),
495 }),
496 }),
497 update: vi.fn().mockReturnValue({
498 set: vi.fn().mockReturnValue({
499 where: vi.fn().mockResolvedValue(undefined),
500 }),
501 }),
502 } as unknown as Database;
503
504 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 5 }), createMockLogger());
505 manager.setIndexer(mockIndexer);
506 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
507 com: { atproto: { repo: { listRecords: mockListRecords } } },
508 });
509
510 const result = await manager.performBackfill(BackfillStatus.CatchUp);
511
512 // Phase 1: 0 records (forum collections empty)
513 // Phase 2: 2 users × 2 collections × 1 record each = 4 records indexed
514 expect(result.recordsIndexed).toBe(4);
515 expect(result.errors).toBe(0);
516 expect(result.didsProcessed).toBe(2);
517 expect(result.backfillId).toBe(42n);
518 });
519
520 it("CatchUp: rejected user batch increments totalErrors and is not swallowed", async () => {
521 // syncRepoRecords never throws — it catches PDS errors internally and returns errors:1.
522 // For the batch callback to reject (tested by the allSettled handling), the
523 // backfillErrors DB insert must fail, which propagates the rejection out of the callback.
524 const emptyPage = { data: { records: [], cursor: undefined } };
525
526 const mockListRecords = vi.fn()
527 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.forum (Phase 1 call 1)
528 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.category (Phase 1 call 2)
529 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.board (Phase 1 call 3)
530 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.role (Phase 1 call 4)
531 .mockResolvedValueOnce(emptyPage) // space.atbb.modAction (Phase 1 call 5)
532 // user1: both collections succeed, 1 record each
533 .mockResolvedValueOnce({ data: { records: [{
534 uri: "at://did:plc:user1/space.atbb.membership/self",
535 cid: "bafymem",
536 value: { $type: "space.atbb.membership", createdAt: "2026-01-01T00:00:00Z" },
537 }], cursor: undefined } })
538 .mockResolvedValueOnce({ data: { records: [{
539 uri: "at://did:plc:user1/space.atbb.post/p1",
540 cid: "bafyp1",
541 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" },
542 }], cursor: undefined } })
543 // user2/membership: PDS error → syncRepoRecords catches → returns errors:1 →
544 // triggers backfillErrors insert (which rejects below) → callback rejects
545 .mockRejectedValueOnce(new Error("PDS unreachable"));
546
547 mockDb = {
548 select: vi.fn().mockReturnValue({
549 from: vi.fn().mockReturnValue({
550 orderBy: vi.fn().mockResolvedValue([
551 { did: "did:plc:user1" },
552 { did: "did:plc:user2" },
553 ]),
554 }),
555 }),
556 insert: vi.fn()
557 .mockReturnValueOnce({ // backfillProgress insert — must succeed
558 values: vi.fn().mockReturnValue({
559 returning: vi.fn().mockResolvedValue([{ id: 7n }]),
560 }),
561 })
562 .mockReturnValueOnce({ // backfillErrors insert for user2 — rejects to make callback throw
563 values: vi.fn().mockReturnValue({
564 returning: vi.fn().mockRejectedValue(new Error("backfillErrors insert failed")),
565 }),
566 }),
567 update: vi.fn().mockReturnValue({
568 set: vi.fn().mockReturnValue({
569 where: vi.fn().mockResolvedValue(undefined),
570 }),
571 }),
572 } as unknown as Database;
573
574 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 1 }), createMockLogger());
575 manager.setIndexer(mockIndexer);
576 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
577 com: { atproto: { repo: { listRecords: mockListRecords } } },
578 });
579
580 const result = await manager.performBackfill(BackfillStatus.CatchUp);
581
582 // user1 batch (concurrency=1): fulfilled, 2 records indexed (membership + post)
583 // user2 batch: callback rejects → allSettled rejected branch → totalErrors++ = 1
584 expect(result.recordsIndexed).toBe(2);
585 expect(result.errors).toBe(1);
586 });
587
588 it("clears isRunning flag even when backfill fails", async () => {
589 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
590
591 mockDb = {
592 insert: vi.fn().mockReturnValue({
593 values: vi.fn().mockReturnValue({
594 returning: vi.fn().mockRejectedValue(new Error("DB insert failed")),
595 }),
596 }),
597 update: vi.fn().mockReturnValue({
598 set: vi.fn().mockReturnValue({
599 where: vi.fn().mockResolvedValue(undefined),
600 }),
601 }),
602 } as unknown as Database;
603
604 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
605 manager.setIndexer(mockIndexer);
606
607 await expect(manager.performBackfill(BackfillStatus.FullSync))
608 .rejects.toThrow("DB insert failed");
609
610 expect(manager.getIsRunning()).toBe(false);
611 consoleSpy.mockRestore();
612 });
613 });
614
615 describe("checkForInterruptedBackfill", () => {
616 it("returns null when no interrupted backfill exists", async () => {
617 vi.spyOn(mockDb, "select").mockReturnValue({
618 from: vi.fn().mockReturnValue({
619 where: vi.fn().mockReturnValue({
620 limit: vi.fn().mockResolvedValue([]),
621 }),
622 }),
623 } as any);
624
625 const result = await manager.checkForInterruptedBackfill();
626 expect(result).toBeNull();
627 });
628
629 it("returns null and logs error when DB query fails", async () => {
630 vi.spyOn(mockDb, "select").mockReturnValue({
631 from: vi.fn().mockReturnValue({
632 where: vi.fn().mockReturnValue({
633 limit: vi.fn().mockRejectedValue(new Error("DB connection lost")),
634 }),
635 }),
636 } as any);
637
638 const result = await manager.checkForInterruptedBackfill();
639 expect(result).toBeNull();
640 expect(mockLogger.error).toHaveBeenCalled();
641 });
642
643 it("returns interrupted backfill row when one exists", async () => {
644 const interruptedRow = {
645 id: 5n,
646 status: "in_progress",
647 backfillType: "catch_up",
648 lastProcessedDid: "did:plc:halfway",
649 didsTotal: 100,
650 didsProcessed: 50,
651 recordsIndexed: 250,
652 startedAt: new Date(),
653 completedAt: null,
654 errorMessage: null,
655 };
656
657 vi.spyOn(mockDb, "select").mockReturnValue({
658 from: vi.fn().mockReturnValue({
659 where: vi.fn().mockReturnValue({
660 limit: vi.fn().mockResolvedValue([interruptedRow]),
661 }),
662 }),
663 } as any);
664
665 const result = await manager.checkForInterruptedBackfill();
666 expect(result).toEqual(interruptedRow);
667 });
668 });
669
670 describe("resumeBackfill", () => {
671 let mockIndexer: Indexer;
672
673 beforeEach(() => {
674 vi.spyOn(console, "log").mockImplementation(() => {});
675 vi.spyOn(console, "error").mockImplementation(() => {});
676
677 mockIndexer = {
678 handleForumCreate: vi.fn().mockResolvedValue(true),
679 handleCategoryCreate: vi.fn().mockResolvedValue(true),
680 handleBoardCreate: vi.fn().mockResolvedValue(true),
681 handleRoleCreate: vi.fn().mockResolvedValue(true),
682 handleMembershipCreate: vi.fn().mockResolvedValue(true),
683 handlePostCreate: vi.fn().mockResolvedValue(true),
684 handleModActionCreate: vi.fn().mockResolvedValue(true),
685 } as unknown as Indexer;
686 });
687
688 afterEach(() => {
689 vi.restoreAllMocks();
690 });
691
692 it("resumes from lastProcessedDid and processes remaining users", async () => {
693 // Interrupted at user1 (didsProcessed=1), user2 and user3 remain
694 const interrupted = {
695 id: 5n,
696 status: "in_progress" as const,
697 backfillType: "catch_up",
698 lastProcessedDid: "did:plc:user1",
699 didsTotal: 3,
700 didsProcessed: 1,
701 recordsIndexed: 2,
702 startedAt: new Date(),
703 completedAt: null,
704 errorMessage: null,
705 };
706
707 // user2 and user3: 1 record each per collection (2 collections = 4 total)
708 const recordPage = {
709 data: {
710 records: [{ uri: "at://did:plc:u/space.atbb.post/r1", cid: "bafyr1",
711 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" } }],
712 cursor: undefined,
713 },
714 };
715
716 const mockListRecords = vi.fn().mockResolvedValue(recordPage);
717
718 mockDb = {
719 select: vi.fn().mockReturnValue({
720 from: vi.fn().mockReturnValue({
721 where: vi.fn().mockReturnValue({
722 orderBy: vi.fn().mockResolvedValue([
723 { did: "did:plc:user2" },
724 { did: "did:plc:user3" },
725 ]),
726 }),
727 }),
728 }),
729 insert: vi.fn().mockReturnValue({
730 values: vi.fn().mockReturnValue({
731 returning: vi.fn().mockResolvedValue([]),
732 }),
733 }),
734 update: vi.fn().mockReturnValue({
735 set: vi.fn().mockReturnValue({
736 where: vi.fn().mockResolvedValue(undefined),
737 }),
738 }),
739 } as unknown as Database;
740
741 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 5 }), createMockLogger());
742 manager.setIndexer(mockIndexer);
743 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
744 com: { atproto: { repo: { listRecords: mockListRecords } } },
745 });
746
747 const result = await manager.resumeBackfill(interrupted);
748
749 // Starts from interrupted.recordsIndexed=2, adds 2 users × 2 collections × 1 record = 4
750 expect(result.recordsIndexed).toBe(6);
751 expect(result.errors).toBe(0);
752 expect(result.didsProcessed).toBe(3); // 1 (prior) + 2 (resumed)
753 expect(result.backfillId).toBe(5n);
754 });
755
756 it("marks completed even when no remaining users", async () => {
757 // Interrupted at the last user — no users with DID > lastProcessedDid
758 const interrupted = {
759 id: 3n,
760 status: "in_progress" as const,
761 backfillType: "catch_up",
762 lastProcessedDid: "did:plc:last",
763 didsTotal: 2,
764 didsProcessed: 2,
765 recordsIndexed: 10,
766 startedAt: new Date(),
767 completedAt: null,
768 errorMessage: null,
769 };
770
771 mockDb = {
772 select: vi.fn().mockReturnValue({
773 from: vi.fn().mockReturnValue({
774 where: vi.fn().mockReturnValue({
775 orderBy: vi.fn().mockResolvedValue([]), // no remaining users
776 }),
777 }),
778 }),
779 update: vi.fn().mockReturnValue({
780 set: vi.fn().mockReturnValue({
781 where: vi.fn().mockResolvedValue(undefined),
782 }),
783 }),
784 } as unknown as Database;
785
786 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
787 manager.setIndexer(mockIndexer);
788 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
789 com: { atproto: { repo: { listRecords: vi.fn() } } },
790 });
791
792 const result = await manager.resumeBackfill(interrupted);
793
794 // No new records — just marks completed with existing counts
795 expect(result.recordsIndexed).toBe(10);
796 expect(result.didsProcessed).toBe(2);
797 expect(result.backfillId).toBe(3n);
798
799 // DB row should be updated to completed
800 const updateMock = mockDb.update as any;
801 expect(updateMock).toHaveBeenCalled();
802 });
803
804 it("clears isRunning flag even when resume fails", async () => {
805 const interrupted = {
806 id: 9n,
807 status: "in_progress" as const,
808 backfillType: "catch_up",
809 lastProcessedDid: "did:plc:checkpoint",
810 didsTotal: 5,
811 didsProcessed: 3,
812 recordsIndexed: 15,
813 startedAt: new Date(),
814 completedAt: null,
815 errorMessage: null,
816 };
817
818 mockDb = {
819 select: vi.fn().mockReturnValue({
820 from: vi.fn().mockReturnValue({
821 where: vi.fn().mockReturnValue({
822 orderBy: vi.fn().mockRejectedValue(new Error("DB query failed")),
823 }),
824 }),
825 }),
826 update: vi.fn().mockReturnValue({
827 set: vi.fn().mockReturnValue({
828 where: vi.fn().mockResolvedValue(undefined),
829 }),
830 }),
831 } as unknown as Database;
832
833 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
834 manager.setIndexer(mockIndexer);
835 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
836 com: { atproto: { repo: { listRecords: vi.fn() } } },
837 });
838
839 await expect(manager.resumeBackfill(interrupted))
840 .rejects.toThrow("DB query failed");
841
842 expect(manager.getIsRunning()).toBe(false);
843 });
844
845 it("marks full_sync interrupted backfill as failed (cannot resume FullSync)", async () => {
846 const interrupted = {
847 id: 10n,
848 status: "in_progress" as const,
849 backfillType: "full_sync",
850 lastProcessedDid: null,
851 didsTotal: 0,
852 didsProcessed: 0,
853 recordsIndexed: 0,
854 startedAt: new Date(),
855 completedAt: null,
856 errorMessage: null,
857 };
858
859 const mockUpdate = vi.fn().mockReturnValue({
860 set: vi.fn().mockReturnValue({
861 where: vi.fn().mockResolvedValue(undefined),
862 }),
863 });
864 mockDb = {
865 update: mockUpdate,
866 } as unknown as Database;
867
868 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
869 manager.setIndexer(mockIndexer);
870
871 await expect(manager.resumeBackfill(interrupted))
872 .rejects.toThrow("Interrupted FullSync cannot be resumed");
873
874 // Verify the row was marked as failed
875 expect(mockUpdate).toHaveBeenCalled();
876 const setCall = mockUpdate.mock.results[0].value.set;
877 expect(setCall).toHaveBeenCalledWith(
878 expect.objectContaining({ status: "failed" })
879 );
880 });
881
882 it("rejects concurrent resume attempts", async () => {
883 const interrupted = {
884 id: 2n,
885 status: "in_progress" as const,
886 backfillType: "catch_up",
887 lastProcessedDid: "did:plc:check",
888 didsTotal: 2,
889 didsProcessed: 1,
890 recordsIndexed: 5,
891 startedAt: new Date(),
892 completedAt: null,
893 errorMessage: null,
894 };
895
896 mockDb = {
897 select: vi.fn().mockReturnValue({
898 from: vi.fn().mockReturnValue({
899 where: vi.fn().mockReturnValue({
900 orderBy: vi.fn().mockImplementation(
901 () => new Promise((resolve) => setTimeout(() => resolve([]), 200))
902 ),
903 }),
904 }),
905 }),
906 update: vi.fn().mockReturnValue({
907 set: vi.fn().mockReturnValue({
908 where: vi.fn().mockResolvedValue(undefined),
909 }),
910 }),
911 } as unknown as Database;
912
913 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
914 manager.setIndexer(mockIndexer);
915 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
916 com: { atproto: { repo: { listRecords: vi.fn() } } },
917 });
918
919 const first = manager.resumeBackfill(interrupted);
920
921 await expect(manager.resumeBackfill(interrupted))
922 .rejects.toThrow("Backfill is already in progress");
923
924 await first;
925 });
926 });
927});