import { describe, it, expect, beforeEach, vi, afterEach } from "vitest"; import { BackfillManager, BackfillStatus } from "../backfill-manager.js"; import type { Database } from "@atbb/db"; import type { AppConfig } from "../config.js"; import { AtpAgent } from "@atproto/api"; import type { Indexer } from "../indexer.js"; import { createMockLogger } from "./mock-logger.js"; vi.mock("@atproto/api", () => ({ AtpAgent: vi.fn().mockImplementation(() => ({ com: { atproto: { repo: { listRecords: vi.fn(), }, }, }, })), })); // Minimal mock config function mockConfig(overrides: Partial = {}): AppConfig { return { port: 3000, forumDid: "did:plc:testforum", pdsUrl: "https://pds.example.com", databaseUrl: "postgres://test", jetstreamUrl: "wss://jetstream.example.com", oauthPublicUrl: "https://example.com", sessionSecret: "a".repeat(32), sessionTtlDays: 7, backfillRateLimit: 10, backfillConcurrency: 10, backfillCursorMaxAgeHours: 48, ...overrides, } as AppConfig; } describe("BackfillManager", () => { let mockDb: Database; let manager: BackfillManager; let mockLogger: ReturnType; beforeEach(() => { mockDb = { select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([]), }), }), }), } as unknown as Database; mockLogger = createMockLogger(); manager = new BackfillManager(mockDb, mockConfig(), mockLogger); }); afterEach(() => { vi.clearAllMocks(); }); describe("checkIfNeeded", () => { it("returns FullSync when cursor is null (no cursor)", async () => { const status = await manager.checkIfNeeded(null); expect(status).toBe(BackfillStatus.FullSync); }); it("returns FullSync when cursor exists but forums table is empty", async () => { // Forums query returns empty vi.spyOn(mockDb, "select").mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([]), }), }), } as any); // Cursor from 1 hour ago (fresh) const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000); const status = await manager.checkIfNeeded(cursor); expect(status).toBe(BackfillStatus.FullSync); }); it("returns CatchUp when cursor age exceeds threshold", async () => { // Forums query returns a forum (DB not empty) vi.spyOn(mockDb, "select").mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([{ id: 1n, rkey: "self" }]), }), }), } as any); // Cursor from 72 hours ago (stale) const cursor = BigInt((Date.now() - 72 * 60 * 60 * 1000) * 1000); const status = await manager.checkIfNeeded(cursor); expect(status).toBe(BackfillStatus.CatchUp); }); it("returns NotNeeded when cursor is fresh and DB has data", async () => { // Forums query returns a forum vi.spyOn(mockDb, "select").mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([{ id: 1n, rkey: "self" }]), }), }), } as any); // Cursor from 1 hour ago (fresh) const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000); const status = await manager.checkIfNeeded(cursor); expect(status).toBe(BackfillStatus.NotNeeded); }); it("returns FullSync when DB query fails (fail safe)", async () => { vi.spyOn(mockDb, "select").mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockRejectedValue(new Error("DB connection lost")), }), }), } as any); const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000); const status = await manager.checkIfNeeded(cursor); expect(status).toBe(BackfillStatus.FullSync); consoleSpy.mockRestore(); }); }); describe("syncRepoRecords", () => { let mockIndexer: Indexer; beforeEach(() => { mockIndexer = { handlePostCreate: vi.fn().mockResolvedValue(true), handleForumCreate: vi.fn().mockResolvedValue(true), } as unknown as Indexer; }); it("fetches records and calls indexer for each one", async () => { const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({ data: { records: [ { uri: "at://did:plc:user1/space.atbb.post/abc123", cid: "bafyabc", value: { $type: "space.atbb.post", text: "Hello", createdAt: "2026-01-01T00:00:00Z" }, }, { uri: "at://did:plc:user1/space.atbb.post/def456", cid: "bafydef", value: { $type: "space.atbb.post", text: "World", createdAt: "2026-01-01T01:00:00Z" }, }, ], cursor: undefined, }, }); manager.setIndexer(mockIndexer); const stats = await manager.syncRepoRecords( "did:plc:user1", "space.atbb.post", mockAgent ); expect(stats.recordsFound).toBe(2); expect(stats.recordsIndexed).toBe(2); expect(stats.errors).toBe(0); expect(mockIndexer.handlePostCreate).toHaveBeenCalledTimes(2); expect(mockIndexer.handlePostCreate).toHaveBeenCalledWith( expect.objectContaining({ did: "did:plc:user1", commit: expect.objectContaining({ rkey: "abc123", cid: "bafyabc", record: expect.objectContaining({ text: "Hello" }), }), }) ); }); it("paginates through multiple pages", async () => { const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); (mockAgent.com.atproto.repo.listRecords as any) .mockResolvedValueOnce({ data: { records: [{ uri: "at://did:plc:user1/space.atbb.post/page1", cid: "bafyp1", value: { $type: "space.atbb.post", text: "Page 1", createdAt: "2026-01-01T00:00:00Z" }, }], cursor: "next_page", }, }) .mockResolvedValueOnce({ data: { records: [{ uri: "at://did:plc:user1/space.atbb.post/page2", cid: "bafyp2", value: { $type: "space.atbb.post", text: "Page 2", createdAt: "2026-01-02T00:00:00Z" }, }], cursor: undefined, }, }); manager.setIndexer(mockIndexer); const stats = await manager.syncRepoRecords( "did:plc:user1", "space.atbb.post", mockAgent ); expect(stats.recordsFound).toBe(2); expect(stats.recordsIndexed).toBe(2); expect(mockAgent.com.atproto.repo.listRecords).toHaveBeenCalledTimes(2); }); it("continues on indexer errors and tracks error count", async () => { const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({ data: { records: [ { uri: "at://did:plc:user1/space.atbb.post/good", cid: "bafygood", value: { $type: "space.atbb.post", text: "Good", createdAt: "2026-01-01T00:00:00Z" }, }, { uri: "at://did:plc:user1/space.atbb.post/bad", cid: "bafybad", value: { $type: "space.atbb.post", text: "Bad", createdAt: "2026-01-01T01:00:00Z" }, }, ], cursor: undefined, }, }); (mockIndexer.handlePostCreate as any) .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error("FK missing")); const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); manager.setIndexer(mockIndexer); const stats = await manager.syncRepoRecords( "did:plc:user1", "space.atbb.post", mockAgent ); expect(stats.recordsFound).toBe(2); expect(stats.recordsIndexed).toBe(1); expect(stats.errors).toBe(1); consoleSpy.mockRestore(); }); it("returns error stats when indexer is not set", async () => { const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); // No setIndexer call — indexer is null const stats = await manager.syncRepoRecords("did:plc:user", "space.atbb.post", mockAgent); expect(stats.errors).toBe(1); expect(mockLogger.error).toHaveBeenCalledWith( "backfill.sync_skipped", expect.objectContaining({ reason: "indexer_not_set" }) ); }); it("handles PDS connection failure gracefully", async () => { const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); (mockAgent.com.atproto.repo.listRecords as any) .mockRejectedValueOnce(new Error("fetch failed")); const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); manager.setIndexer(mockIndexer); const stats = await manager.syncRepoRecords( "did:plc:user1", "space.atbb.post", mockAgent ); expect(stats.recordsFound).toBe(0); expect(stats.recordsIndexed).toBe(0); expect(stats.errors).toBe(1); consoleSpy.mockRestore(); }); }); describe("performBackfill", () => { let mockIndexer: Indexer; let consoleSpy: any; beforeEach(() => { consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {}); vi.spyOn(console, "error").mockImplementation(() => {}); vi.spyOn(console, "warn").mockImplementation(() => {}); mockIndexer = { handleForumCreate: vi.fn().mockResolvedValue(true), handleCategoryCreate: vi.fn().mockResolvedValue(true), handleBoardCreate: vi.fn().mockResolvedValue(true), handleRoleCreate: vi.fn().mockResolvedValue(true), handleMembershipCreate: vi.fn().mockResolvedValue(true), handlePostCreate: vi.fn().mockResolvedValue(true), handleModActionCreate: vi.fn().mockResolvedValue(true), } as unknown as Indexer; }); afterEach(() => { consoleSpy.mockRestore(); }); it("creates a backfill_progress row on start", async () => { const mockInsert = vi.fn().mockReturnValue({ values: vi.fn().mockReturnValue({ returning: vi.fn().mockResolvedValue([{ id: 1n }]), }), }); const mockSelectEmpty = vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([]), orderBy: vi.fn().mockResolvedValue([]), }), orderBy: vi.fn().mockResolvedValue([]), }), }); mockDb = { select: mockSelectEmpty, insert: mockInsert, update: vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }), } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); manager.setIndexer(mockIndexer); vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ com: { atproto: { repo: { listRecords: vi.fn().mockResolvedValue({ data: { records: [], cursor: undefined }, }), }, }, }, }); await manager.performBackfill(BackfillStatus.FullSync); expect(mockInsert).toHaveBeenCalled(); }); it("sets isRunning flag during backfill", async () => { const mockInsert = vi.fn().mockReturnValue({ values: vi.fn().mockReturnValue({ returning: vi.fn().mockResolvedValue([{ id: 1n }]), }), }); mockDb = { select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([]), orderBy: vi.fn().mockResolvedValue([]), }), orderBy: vi.fn().mockResolvedValue([]), }), }), insert: mockInsert, update: vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }), } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); manager.setIndexer(mockIndexer); vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ com: { atproto: { repo: { listRecords: vi.fn().mockResolvedValue({ data: { records: [], cursor: undefined }, }), }, }, }, }); expect(manager.getIsRunning()).toBe(false); const promise = manager.performBackfill(BackfillStatus.FullSync); expect(manager.getIsRunning()).toBe(true); await promise; expect(manager.getIsRunning()).toBe(false); }); it("rejects concurrent backfill attempts", async () => { const mockInsert = vi.fn().mockReturnValue({ values: vi.fn().mockReturnValue({ returning: vi.fn().mockResolvedValue([{ id: 1n }]), }), }); mockDb = { select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([]), orderBy: vi.fn().mockResolvedValue([]), }), orderBy: vi.fn().mockResolvedValue([]), }), }), insert: mockInsert, update: vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }), } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); manager.setIndexer(mockIndexer); vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ com: { atproto: { repo: { listRecords: vi.fn().mockImplementation( () => new Promise((resolve) => setTimeout(() => resolve({ data: { records: [], cursor: undefined } }), 100) ) ), }, }, }, }); const first = manager.performBackfill(BackfillStatus.FullSync); await expect(manager.performBackfill(BackfillStatus.FullSync)) .rejects.toThrow("Backfill is already in progress"); await first; }); it("CatchUp: syncs user-owned collections and aggregates counts", async () => { // Phase 1 (5 FORUM_OWNED_COLLECTIONS) must return empty so its records don't // pollute the count. Phase 2: 2 users × 2 USER_OWNED_COLLECTIONS × 1 record = 4. const emptyPage = { data: { records: [], cursor: undefined } }; const recordPage = { data: { records: [{ uri: "at://did:plc:u/space.atbb.post/r1", cid: "bafyr1", value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" }, }], cursor: undefined, }, }; const mockListRecords = vi.fn() .mockResolvedValueOnce(emptyPage) // space.atbb.forum.forum (Phase 1 call 1) .mockResolvedValueOnce(emptyPage) // space.atbb.forum.category (Phase 1 call 2) .mockResolvedValueOnce(emptyPage) // space.atbb.forum.board (Phase 1 call 3) .mockResolvedValueOnce(emptyPage) // space.atbb.forum.role (Phase 1 call 4) .mockResolvedValueOnce(emptyPage) // space.atbb.modAction (Phase 1 call 5) .mockResolvedValue(recordPage); // all Phase 2 user collection calls mockDb = { select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ orderBy: vi.fn().mockResolvedValue([ { did: "did:plc:user1" }, { did: "did:plc:user2" }, ]), }), }), insert: vi.fn().mockReturnValue({ values: vi.fn().mockReturnValue({ returning: vi.fn().mockResolvedValue([{ id: 42n }]), }), }), update: vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }), } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 5 }), createMockLogger()); manager.setIndexer(mockIndexer); vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ com: { atproto: { repo: { listRecords: mockListRecords } } }, }); const result = await manager.performBackfill(BackfillStatus.CatchUp); // Phase 1: 0 records (forum collections empty) // Phase 2: 2 users × 2 collections × 1 record each = 4 records indexed expect(result.recordsIndexed).toBe(4); expect(result.errors).toBe(0); expect(result.didsProcessed).toBe(2); expect(result.backfillId).toBe(42n); }); it("CatchUp: rejected user batch increments totalErrors and is not swallowed", async () => { // syncRepoRecords never throws — it catches PDS errors internally and returns errors:1. // For the batch callback to reject (tested by the allSettled handling), the // backfillErrors DB insert must fail, which propagates the rejection out of the callback. const emptyPage = { data: { records: [], cursor: undefined } }; const mockListRecords = vi.fn() .mockResolvedValueOnce(emptyPage) // space.atbb.forum.forum (Phase 1 call 1) .mockResolvedValueOnce(emptyPage) // space.atbb.forum.category (Phase 1 call 2) .mockResolvedValueOnce(emptyPage) // space.atbb.forum.board (Phase 1 call 3) .mockResolvedValueOnce(emptyPage) // space.atbb.forum.role (Phase 1 call 4) .mockResolvedValueOnce(emptyPage) // space.atbb.modAction (Phase 1 call 5) // user1: both collections succeed, 1 record each .mockResolvedValueOnce({ data: { records: [{ uri: "at://did:plc:user1/space.atbb.membership/self", cid: "bafymem", value: { $type: "space.atbb.membership", createdAt: "2026-01-01T00:00:00Z" }, }], cursor: undefined } }) .mockResolvedValueOnce({ data: { records: [{ uri: "at://did:plc:user1/space.atbb.post/p1", cid: "bafyp1", value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" }, }], cursor: undefined } }) // user2/membership: PDS error → syncRepoRecords catches → returns errors:1 → // triggers backfillErrors insert (which rejects below) → callback rejects .mockRejectedValueOnce(new Error("PDS unreachable")); mockDb = { select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ orderBy: vi.fn().mockResolvedValue([ { did: "did:plc:user1" }, { did: "did:plc:user2" }, ]), }), }), insert: vi.fn() .mockReturnValueOnce({ // backfillProgress insert — must succeed values: vi.fn().mockReturnValue({ returning: vi.fn().mockResolvedValue([{ id: 7n }]), }), }) .mockReturnValueOnce({ // backfillErrors insert for user2 — rejects to make callback throw values: vi.fn().mockReturnValue({ returning: vi.fn().mockRejectedValue(new Error("backfillErrors insert failed")), }), }), update: vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }), } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 1 }), createMockLogger()); manager.setIndexer(mockIndexer); vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ com: { atproto: { repo: { listRecords: mockListRecords } } }, }); const result = await manager.performBackfill(BackfillStatus.CatchUp); // user1 batch (concurrency=1): fulfilled, 2 records indexed (membership + post) // user2 batch: callback rejects → allSettled rejected branch → totalErrors++ = 1 expect(result.recordsIndexed).toBe(2); expect(result.errors).toBe(1); }); it("clears isRunning flag even when backfill fails", async () => { const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); mockDb = { insert: vi.fn().mockReturnValue({ values: vi.fn().mockReturnValue({ returning: vi.fn().mockRejectedValue(new Error("DB insert failed")), }), }), update: vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }), } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); manager.setIndexer(mockIndexer); await expect(manager.performBackfill(BackfillStatus.FullSync)) .rejects.toThrow("DB insert failed"); expect(manager.getIsRunning()).toBe(false); consoleSpy.mockRestore(); }); }); describe("checkForInterruptedBackfill", () => { it("returns null when no interrupted backfill exists", async () => { vi.spyOn(mockDb, "select").mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([]), }), }), } as any); const result = await manager.checkForInterruptedBackfill(); expect(result).toBeNull(); }); it("returns null and logs error when DB query fails", async () => { vi.spyOn(mockDb, "select").mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockRejectedValue(new Error("DB connection lost")), }), }), } as any); const result = await manager.checkForInterruptedBackfill(); expect(result).toBeNull(); expect(mockLogger.error).toHaveBeenCalled(); }); it("returns interrupted backfill row when one exists", async () => { const interruptedRow = { id: 5n, status: "in_progress", backfillType: "catch_up", lastProcessedDid: "did:plc:halfway", didsTotal: 100, didsProcessed: 50, recordsIndexed: 250, startedAt: new Date(), completedAt: null, errorMessage: null, }; vi.spyOn(mockDb, "select").mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ limit: vi.fn().mockResolvedValue([interruptedRow]), }), }), } as any); const result = await manager.checkForInterruptedBackfill(); expect(result).toEqual(interruptedRow); }); }); describe("resumeBackfill", () => { let mockIndexer: Indexer; beforeEach(() => { vi.spyOn(console, "log").mockImplementation(() => {}); vi.spyOn(console, "error").mockImplementation(() => {}); mockIndexer = { handleForumCreate: vi.fn().mockResolvedValue(true), handleCategoryCreate: vi.fn().mockResolvedValue(true), handleBoardCreate: vi.fn().mockResolvedValue(true), handleRoleCreate: vi.fn().mockResolvedValue(true), handleMembershipCreate: vi.fn().mockResolvedValue(true), handlePostCreate: vi.fn().mockResolvedValue(true), handleModActionCreate: vi.fn().mockResolvedValue(true), } as unknown as Indexer; }); afterEach(() => { vi.restoreAllMocks(); }); it("resumes from lastProcessedDid and processes remaining users", async () => { // Interrupted at user1 (didsProcessed=1), user2 and user3 remain const interrupted = { id: 5n, status: "in_progress" as const, backfillType: "catch_up", lastProcessedDid: "did:plc:user1", didsTotal: 3, didsProcessed: 1, recordsIndexed: 2, startedAt: new Date(), completedAt: null, errorMessage: null, }; // user2 and user3: 1 record each per collection (2 collections = 4 total) const recordPage = { data: { records: [{ uri: "at://did:plc:u/space.atbb.post/r1", cid: "bafyr1", value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" } }], cursor: undefined, }, }; const mockListRecords = vi.fn().mockResolvedValue(recordPage); mockDb = { select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ orderBy: vi.fn().mockResolvedValue([ { did: "did:plc:user2" }, { did: "did:plc:user3" }, ]), }), }), }), insert: vi.fn().mockReturnValue({ values: vi.fn().mockReturnValue({ returning: vi.fn().mockResolvedValue([]), }), }), update: vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }), } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 5 }), createMockLogger()); manager.setIndexer(mockIndexer); vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ com: { atproto: { repo: { listRecords: mockListRecords } } }, }); const result = await manager.resumeBackfill(interrupted); // Starts from interrupted.recordsIndexed=2, adds 2 users × 2 collections × 1 record = 4 expect(result.recordsIndexed).toBe(6); expect(result.errors).toBe(0); expect(result.didsProcessed).toBe(3); // 1 (prior) + 2 (resumed) expect(result.backfillId).toBe(5n); }); it("marks completed even when no remaining users", async () => { // Interrupted at the last user — no users with DID > lastProcessedDid const interrupted = { id: 3n, status: "in_progress" as const, backfillType: "catch_up", lastProcessedDid: "did:plc:last", didsTotal: 2, didsProcessed: 2, recordsIndexed: 10, startedAt: new Date(), completedAt: null, errorMessage: null, }; mockDb = { select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ orderBy: vi.fn().mockResolvedValue([]), // no remaining users }), }), }), update: vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }), } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); manager.setIndexer(mockIndexer); vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ com: { atproto: { repo: { listRecords: vi.fn() } } }, }); const result = await manager.resumeBackfill(interrupted); // No new records — just marks completed with existing counts expect(result.recordsIndexed).toBe(10); expect(result.didsProcessed).toBe(2); expect(result.backfillId).toBe(3n); // DB row should be updated to completed const updateMock = mockDb.update as any; expect(updateMock).toHaveBeenCalled(); }); it("clears isRunning flag even when resume fails", async () => { const interrupted = { id: 9n, status: "in_progress" as const, backfillType: "catch_up", lastProcessedDid: "did:plc:checkpoint", didsTotal: 5, didsProcessed: 3, recordsIndexed: 15, startedAt: new Date(), completedAt: null, errorMessage: null, }; mockDb = { select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ orderBy: vi.fn().mockRejectedValue(new Error("DB query failed")), }), }), }), update: vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }), } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); manager.setIndexer(mockIndexer); vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ com: { atproto: { repo: { listRecords: vi.fn() } } }, }); await expect(manager.resumeBackfill(interrupted)) .rejects.toThrow("DB query failed"); expect(manager.getIsRunning()).toBe(false); }); it("marks full_sync interrupted backfill as failed (cannot resume FullSync)", async () => { const interrupted = { id: 10n, status: "in_progress" as const, backfillType: "full_sync", lastProcessedDid: null, didsTotal: 0, didsProcessed: 0, recordsIndexed: 0, startedAt: new Date(), completedAt: null, errorMessage: null, }; const mockUpdate = vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }); mockDb = { update: mockUpdate, } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); manager.setIndexer(mockIndexer); await expect(manager.resumeBackfill(interrupted)) .rejects.toThrow("Interrupted FullSync cannot be resumed"); // Verify the row was marked as failed expect(mockUpdate).toHaveBeenCalled(); const setCall = mockUpdate.mock.results[0].value.set; expect(setCall).toHaveBeenCalledWith( expect.objectContaining({ status: "failed" }) ); }); it("rejects concurrent resume attempts", async () => { const interrupted = { id: 2n, status: "in_progress" as const, backfillType: "catch_up", lastProcessedDid: "did:plc:check", didsTotal: 2, didsProcessed: 1, recordsIndexed: 5, startedAt: new Date(), completedAt: null, errorMessage: null, }; mockDb = { select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockReturnValue({ orderBy: vi.fn().mockImplementation( () => new Promise((resolve) => setTimeout(() => resolve([]), 200)) ), }), }), }), update: vi.fn().mockReturnValue({ set: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue(undefined), }), }), } as unknown as Database; manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); manager.setIndexer(mockIndexer); vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ com: { atproto: { repo: { listRecords: vi.fn() } } }, }); const first = manager.resumeBackfill(interrupted); await expect(manager.resumeBackfill(interrupted)) .rejects.toThrow("Backfill is already in progress"); await first; }); }); });