WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at root/atb-56-theme-caching-layer 927 lines 32 kB view raw
1import { describe, it, expect, beforeEach, vi, afterEach } from "vitest"; 2import { BackfillManager, BackfillStatus } from "../backfill-manager.js"; 3import type { Database } from "@atbb/db"; 4import type { AppConfig } from "../config.js"; 5import { AtpAgent } from "@atproto/api"; 6import type { Indexer } from "../indexer.js"; 7import { createMockLogger } from "./mock-logger.js"; 8 9vi.mock("@atproto/api", () => ({ 10 AtpAgent: vi.fn().mockImplementation(() => ({ 11 com: { 12 atproto: { 13 repo: { 14 listRecords: vi.fn(), 15 }, 16 }, 17 }, 18 })), 19})); 20 21// Minimal mock config 22function mockConfig(overrides: Partial<AppConfig> = {}): AppConfig { 23 return { 24 port: 3000, 25 forumDid: "did:plc:testforum", 26 pdsUrl: "https://pds.example.com", 27 databaseUrl: "postgres://test", 28 jetstreamUrl: "wss://jetstream.example.com", 29 oauthPublicUrl: "https://example.com", 30 sessionSecret: "a".repeat(32), 31 sessionTtlDays: 7, 32 backfillRateLimit: 10, 33 backfillConcurrency: 10, 34 backfillCursorMaxAgeHours: 48, 35 ...overrides, 36 } as AppConfig; 37} 38 39describe("BackfillManager", () => { 40 let mockDb: Database; 41 let manager: BackfillManager; 42 let mockLogger: ReturnType<typeof createMockLogger>; 43 44 beforeEach(() => { 45 mockDb = { 46 select: vi.fn().mockReturnValue({ 47 from: vi.fn().mockReturnValue({ 48 where: vi.fn().mockReturnValue({ 49 limit: vi.fn().mockResolvedValue([]), 50 }), 51 }), 52 }), 53 } as unknown as Database; 54 55 mockLogger = createMockLogger(); 56 manager = new BackfillManager(mockDb, mockConfig(), mockLogger); 57 }); 58 59 afterEach(() => { 60 vi.clearAllMocks(); 61 }); 62 63 describe("checkIfNeeded", () => { 64 it("returns FullSync when cursor is null (no cursor)", async () => { 65 const status = await manager.checkIfNeeded(null); 66 expect(status).toBe(BackfillStatus.FullSync); 67 }); 68 69 it("returns FullSync when cursor exists but forums table is empty", async () => { 70 // Forums query returns empty 71 vi.spyOn(mockDb, "select").mockReturnValue({ 72 from: vi.fn().mockReturnValue({ 73 where: vi.fn().mockReturnValue({ 74 limit: vi.fn().mockResolvedValue([]), 75 }), 76 }), 77 } as any); 78 79 // Cursor from 1 hour ago (fresh) 80 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000); 81 const status = await manager.checkIfNeeded(cursor); 82 expect(status).toBe(BackfillStatus.FullSync); 83 }); 84 85 it("returns CatchUp when cursor age exceeds threshold", async () => { 86 // Forums query returns a forum (DB not empty) 87 vi.spyOn(mockDb, "select").mockReturnValue({ 88 from: vi.fn().mockReturnValue({ 89 where: vi.fn().mockReturnValue({ 90 limit: vi.fn().mockResolvedValue([{ id: 1n, rkey: "self" }]), 91 }), 92 }), 93 } as any); 94 95 // Cursor from 72 hours ago (stale) 96 const cursor = BigInt((Date.now() - 72 * 60 * 60 * 1000) * 1000); 97 const status = await manager.checkIfNeeded(cursor); 98 expect(status).toBe(BackfillStatus.CatchUp); 99 }); 100 101 it("returns NotNeeded when cursor is fresh and DB has data", async () => { 102 // Forums query returns a forum 103 vi.spyOn(mockDb, "select").mockReturnValue({ 104 from: vi.fn().mockReturnValue({ 105 where: vi.fn().mockReturnValue({ 106 limit: vi.fn().mockResolvedValue([{ id: 1n, rkey: "self" }]), 107 }), 108 }), 109 } as any); 110 111 // Cursor from 1 hour ago (fresh) 112 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000); 113 const status = await manager.checkIfNeeded(cursor); 114 expect(status).toBe(BackfillStatus.NotNeeded); 115 }); 116 117 it("returns FullSync when DB query fails (fail safe)", async () => { 118 vi.spyOn(mockDb, "select").mockReturnValue({ 119 from: vi.fn().mockReturnValue({ 120 where: vi.fn().mockReturnValue({ 121 limit: vi.fn().mockRejectedValue(new Error("DB connection lost")), 122 }), 123 }), 124 } as any); 125 126 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 127 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000); 128 const status = await manager.checkIfNeeded(cursor); 129 expect(status).toBe(BackfillStatus.FullSync); 130 consoleSpy.mockRestore(); 131 }); 132 }); 133 134 describe("syncRepoRecords", () => { 135 let mockIndexer: Indexer; 136 137 beforeEach(() => { 138 mockIndexer = { 139 handlePostCreate: vi.fn().mockResolvedValue(true), 140 handleForumCreate: vi.fn().mockResolvedValue(true), 141 } as unknown as Indexer; 142 }); 143 144 it("fetches records and calls indexer for each one", async () => { 145 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 146 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({ 147 data: { 148 records: [ 149 { 150 uri: "at://did:plc:user1/space.atbb.post/abc123", 151 cid: "bafyabc", 152 value: { $type: "space.atbb.post", text: "Hello", createdAt: "2026-01-01T00:00:00Z" }, 153 }, 154 { 155 uri: "at://did:plc:user1/space.atbb.post/def456", 156 cid: "bafydef", 157 value: { $type: "space.atbb.post", text: "World", createdAt: "2026-01-01T01:00:00Z" }, 158 }, 159 ], 160 cursor: undefined, 161 }, 162 }); 163 164 manager.setIndexer(mockIndexer); 165 const stats = await manager.syncRepoRecords( 166 "did:plc:user1", 167 "space.atbb.post", 168 mockAgent 169 ); 170 171 expect(stats.recordsFound).toBe(2); 172 expect(stats.recordsIndexed).toBe(2); 173 expect(stats.errors).toBe(0); 174 expect(mockIndexer.handlePostCreate).toHaveBeenCalledTimes(2); 175 expect(mockIndexer.handlePostCreate).toHaveBeenCalledWith( 176 expect.objectContaining({ 177 did: "did:plc:user1", 178 commit: expect.objectContaining({ 179 rkey: "abc123", 180 cid: "bafyabc", 181 record: expect.objectContaining({ text: "Hello" }), 182 }), 183 }) 184 ); 185 }); 186 187 it("paginates through multiple pages", async () => { 188 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 189 (mockAgent.com.atproto.repo.listRecords as any) 190 .mockResolvedValueOnce({ 191 data: { 192 records: [{ 193 uri: "at://did:plc:user1/space.atbb.post/page1", 194 cid: "bafyp1", 195 value: { $type: "space.atbb.post", text: "Page 1", createdAt: "2026-01-01T00:00:00Z" }, 196 }], 197 cursor: "next_page", 198 }, 199 }) 200 .mockResolvedValueOnce({ 201 data: { 202 records: [{ 203 uri: "at://did:plc:user1/space.atbb.post/page2", 204 cid: "bafyp2", 205 value: { $type: "space.atbb.post", text: "Page 2", createdAt: "2026-01-02T00:00:00Z" }, 206 }], 207 cursor: undefined, 208 }, 209 }); 210 211 manager.setIndexer(mockIndexer); 212 const stats = await manager.syncRepoRecords( 213 "did:plc:user1", 214 "space.atbb.post", 215 mockAgent 216 ); 217 218 expect(stats.recordsFound).toBe(2); 219 expect(stats.recordsIndexed).toBe(2); 220 expect(mockAgent.com.atproto.repo.listRecords).toHaveBeenCalledTimes(2); 221 }); 222 223 it("continues on indexer errors and tracks error count", async () => { 224 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 225 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({ 226 data: { 227 records: [ 228 { 229 uri: "at://did:plc:user1/space.atbb.post/good", 230 cid: "bafygood", 231 value: { $type: "space.atbb.post", text: "Good", createdAt: "2026-01-01T00:00:00Z" }, 232 }, 233 { 234 uri: "at://did:plc:user1/space.atbb.post/bad", 235 cid: "bafybad", 236 value: { $type: "space.atbb.post", text: "Bad", createdAt: "2026-01-01T01:00:00Z" }, 237 }, 238 ], 239 cursor: undefined, 240 }, 241 }); 242 243 (mockIndexer.handlePostCreate as any) 244 .mockResolvedValueOnce(true) 245 .mockRejectedValueOnce(new Error("FK missing")); 246 247 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 248 manager.setIndexer(mockIndexer); 249 const stats = await manager.syncRepoRecords( 250 "did:plc:user1", 251 "space.atbb.post", 252 mockAgent 253 ); 254 255 expect(stats.recordsFound).toBe(2); 256 expect(stats.recordsIndexed).toBe(1); 257 expect(stats.errors).toBe(1); 258 consoleSpy.mockRestore(); 259 }); 260 261 it("returns error stats when indexer is not set", async () => { 262 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 263 // No setIndexer call — indexer is null 264 const stats = await manager.syncRepoRecords("did:plc:user", "space.atbb.post", mockAgent); 265 expect(stats.errors).toBe(1); 266 expect(mockLogger.error).toHaveBeenCalledWith( 267 "backfill.sync_skipped", 268 expect.objectContaining({ reason: "indexer_not_set" }) 269 ); 270 }); 271 272 it("handles PDS connection failure gracefully", async () => { 273 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 274 (mockAgent.com.atproto.repo.listRecords as any) 275 .mockRejectedValueOnce(new Error("fetch failed")); 276 277 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 278 manager.setIndexer(mockIndexer); 279 const stats = await manager.syncRepoRecords( 280 "did:plc:user1", 281 "space.atbb.post", 282 mockAgent 283 ); 284 285 expect(stats.recordsFound).toBe(0); 286 expect(stats.recordsIndexed).toBe(0); 287 expect(stats.errors).toBe(1); 288 consoleSpy.mockRestore(); 289 }); 290 }); 291 292 describe("performBackfill", () => { 293 let mockIndexer: Indexer; 294 let consoleSpy: any; 295 296 beforeEach(() => { 297 consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {}); 298 vi.spyOn(console, "error").mockImplementation(() => {}); 299 vi.spyOn(console, "warn").mockImplementation(() => {}); 300 301 mockIndexer = { 302 handleForumCreate: vi.fn().mockResolvedValue(true), 303 handleCategoryCreate: vi.fn().mockResolvedValue(true), 304 handleBoardCreate: vi.fn().mockResolvedValue(true), 305 handleRoleCreate: vi.fn().mockResolvedValue(true), 306 handleMembershipCreate: vi.fn().mockResolvedValue(true), 307 handlePostCreate: vi.fn().mockResolvedValue(true), 308 handleModActionCreate: vi.fn().mockResolvedValue(true), 309 } as unknown as Indexer; 310 }); 311 312 afterEach(() => { 313 consoleSpy.mockRestore(); 314 }); 315 316 it("creates a backfill_progress row on start", async () => { 317 const mockInsert = vi.fn().mockReturnValue({ 318 values: vi.fn().mockReturnValue({ 319 returning: vi.fn().mockResolvedValue([{ id: 1n }]), 320 }), 321 }); 322 323 const mockSelectEmpty = vi.fn().mockReturnValue({ 324 from: vi.fn().mockReturnValue({ 325 where: vi.fn().mockReturnValue({ 326 limit: vi.fn().mockResolvedValue([]), 327 orderBy: vi.fn().mockResolvedValue([]), 328 }), 329 orderBy: vi.fn().mockResolvedValue([]), 330 }), 331 }); 332 333 mockDb = { 334 select: mockSelectEmpty, 335 insert: mockInsert, 336 update: vi.fn().mockReturnValue({ 337 set: vi.fn().mockReturnValue({ 338 where: vi.fn().mockResolvedValue(undefined), 339 }), 340 }), 341 } as unknown as Database; 342 343 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 344 manager.setIndexer(mockIndexer); 345 346 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 347 com: { 348 atproto: { 349 repo: { 350 listRecords: vi.fn().mockResolvedValue({ 351 data: { records: [], cursor: undefined }, 352 }), 353 }, 354 }, 355 }, 356 }); 357 358 await manager.performBackfill(BackfillStatus.FullSync); 359 360 expect(mockInsert).toHaveBeenCalled(); 361 }); 362 363 it("sets isRunning flag during backfill", async () => { 364 const mockInsert = vi.fn().mockReturnValue({ 365 values: vi.fn().mockReturnValue({ 366 returning: vi.fn().mockResolvedValue([{ id: 1n }]), 367 }), 368 }); 369 370 mockDb = { 371 select: vi.fn().mockReturnValue({ 372 from: vi.fn().mockReturnValue({ 373 where: vi.fn().mockReturnValue({ 374 limit: vi.fn().mockResolvedValue([]), 375 orderBy: vi.fn().mockResolvedValue([]), 376 }), 377 orderBy: vi.fn().mockResolvedValue([]), 378 }), 379 }), 380 insert: mockInsert, 381 update: vi.fn().mockReturnValue({ 382 set: vi.fn().mockReturnValue({ 383 where: vi.fn().mockResolvedValue(undefined), 384 }), 385 }), 386 } as unknown as Database; 387 388 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 389 manager.setIndexer(mockIndexer); 390 391 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 392 com: { 393 atproto: { 394 repo: { 395 listRecords: vi.fn().mockResolvedValue({ 396 data: { records: [], cursor: undefined }, 397 }), 398 }, 399 }, 400 }, 401 }); 402 403 expect(manager.getIsRunning()).toBe(false); 404 const promise = manager.performBackfill(BackfillStatus.FullSync); 405 expect(manager.getIsRunning()).toBe(true); 406 await promise; 407 expect(manager.getIsRunning()).toBe(false); 408 }); 409 410 it("rejects concurrent backfill attempts", async () => { 411 const mockInsert = vi.fn().mockReturnValue({ 412 values: vi.fn().mockReturnValue({ 413 returning: vi.fn().mockResolvedValue([{ id: 1n }]), 414 }), 415 }); 416 417 mockDb = { 418 select: vi.fn().mockReturnValue({ 419 from: vi.fn().mockReturnValue({ 420 where: vi.fn().mockReturnValue({ 421 limit: vi.fn().mockResolvedValue([]), 422 orderBy: vi.fn().mockResolvedValue([]), 423 }), 424 orderBy: vi.fn().mockResolvedValue([]), 425 }), 426 }), 427 insert: mockInsert, 428 update: vi.fn().mockReturnValue({ 429 set: vi.fn().mockReturnValue({ 430 where: vi.fn().mockResolvedValue(undefined), 431 }), 432 }), 433 } as unknown as Database; 434 435 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 436 manager.setIndexer(mockIndexer); 437 438 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 439 com: { 440 atproto: { 441 repo: { 442 listRecords: vi.fn().mockImplementation( 443 () => new Promise((resolve) => 444 setTimeout(() => resolve({ data: { records: [], cursor: undefined } }), 100) 445 ) 446 ), 447 }, 448 }, 449 }, 450 }); 451 452 const first = manager.performBackfill(BackfillStatus.FullSync); 453 454 await expect(manager.performBackfill(BackfillStatus.FullSync)) 455 .rejects.toThrow("Backfill is already in progress"); 456 457 await first; 458 }); 459 460 it("CatchUp: syncs user-owned collections and aggregates counts", async () => { 461 // Phase 1 (5 FORUM_OWNED_COLLECTIONS) must return empty so its records don't 462 // pollute the count. Phase 2: 2 users × 2 USER_OWNED_COLLECTIONS × 1 record = 4. 463 const emptyPage = { data: { records: [], cursor: undefined } }; 464 const recordPage = { 465 data: { 466 records: [{ 467 uri: "at://did:plc:u/space.atbb.post/r1", 468 cid: "bafyr1", 469 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" }, 470 }], 471 cursor: undefined, 472 }, 473 }; 474 475 const mockListRecords = vi.fn() 476 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.forum (Phase 1 call 1) 477 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.category (Phase 1 call 2) 478 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.board (Phase 1 call 3) 479 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.role (Phase 1 call 4) 480 .mockResolvedValueOnce(emptyPage) // space.atbb.modAction (Phase 1 call 5) 481 .mockResolvedValue(recordPage); // all Phase 2 user collection calls 482 483 mockDb = { 484 select: vi.fn().mockReturnValue({ 485 from: vi.fn().mockReturnValue({ 486 orderBy: vi.fn().mockResolvedValue([ 487 { did: "did:plc:user1" }, 488 { did: "did:plc:user2" }, 489 ]), 490 }), 491 }), 492 insert: vi.fn().mockReturnValue({ 493 values: vi.fn().mockReturnValue({ 494 returning: vi.fn().mockResolvedValue([{ id: 42n }]), 495 }), 496 }), 497 update: vi.fn().mockReturnValue({ 498 set: vi.fn().mockReturnValue({ 499 where: vi.fn().mockResolvedValue(undefined), 500 }), 501 }), 502 } as unknown as Database; 503 504 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 5 }), createMockLogger()); 505 manager.setIndexer(mockIndexer); 506 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 507 com: { atproto: { repo: { listRecords: mockListRecords } } }, 508 }); 509 510 const result = await manager.performBackfill(BackfillStatus.CatchUp); 511 512 // Phase 1: 0 records (forum collections empty) 513 // Phase 2: 2 users × 2 collections × 1 record each = 4 records indexed 514 expect(result.recordsIndexed).toBe(4); 515 expect(result.errors).toBe(0); 516 expect(result.didsProcessed).toBe(2); 517 expect(result.backfillId).toBe(42n); 518 }); 519 520 it("CatchUp: rejected user batch increments totalErrors and is not swallowed", async () => { 521 // syncRepoRecords never throws — it catches PDS errors internally and returns errors:1. 522 // For the batch callback to reject (tested by the allSettled handling), the 523 // backfillErrors DB insert must fail, which propagates the rejection out of the callback. 524 const emptyPage = { data: { records: [], cursor: undefined } }; 525 526 const mockListRecords = vi.fn() 527 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.forum (Phase 1 call 1) 528 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.category (Phase 1 call 2) 529 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.board (Phase 1 call 3) 530 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.role (Phase 1 call 4) 531 .mockResolvedValueOnce(emptyPage) // space.atbb.modAction (Phase 1 call 5) 532 // user1: both collections succeed, 1 record each 533 .mockResolvedValueOnce({ data: { records: [{ 534 uri: "at://did:plc:user1/space.atbb.membership/self", 535 cid: "bafymem", 536 value: { $type: "space.atbb.membership", createdAt: "2026-01-01T00:00:00Z" }, 537 }], cursor: undefined } }) 538 .mockResolvedValueOnce({ data: { records: [{ 539 uri: "at://did:plc:user1/space.atbb.post/p1", 540 cid: "bafyp1", 541 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" }, 542 }], cursor: undefined } }) 543 // user2/membership: PDS error → syncRepoRecords catches → returns errors:1 → 544 // triggers backfillErrors insert (which rejects below) → callback rejects 545 .mockRejectedValueOnce(new Error("PDS unreachable")); 546 547 mockDb = { 548 select: vi.fn().mockReturnValue({ 549 from: vi.fn().mockReturnValue({ 550 orderBy: vi.fn().mockResolvedValue([ 551 { did: "did:plc:user1" }, 552 { did: "did:plc:user2" }, 553 ]), 554 }), 555 }), 556 insert: vi.fn() 557 .mockReturnValueOnce({ // backfillProgress insert — must succeed 558 values: vi.fn().mockReturnValue({ 559 returning: vi.fn().mockResolvedValue([{ id: 7n }]), 560 }), 561 }) 562 .mockReturnValueOnce({ // backfillErrors insert for user2 — rejects to make callback throw 563 values: vi.fn().mockReturnValue({ 564 returning: vi.fn().mockRejectedValue(new Error("backfillErrors insert failed")), 565 }), 566 }), 567 update: vi.fn().mockReturnValue({ 568 set: vi.fn().mockReturnValue({ 569 where: vi.fn().mockResolvedValue(undefined), 570 }), 571 }), 572 } as unknown as Database; 573 574 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 1 }), createMockLogger()); 575 manager.setIndexer(mockIndexer); 576 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 577 com: { atproto: { repo: { listRecords: mockListRecords } } }, 578 }); 579 580 const result = await manager.performBackfill(BackfillStatus.CatchUp); 581 582 // user1 batch (concurrency=1): fulfilled, 2 records indexed (membership + post) 583 // user2 batch: callback rejects → allSettled rejected branch → totalErrors++ = 1 584 expect(result.recordsIndexed).toBe(2); 585 expect(result.errors).toBe(1); 586 }); 587 588 it("clears isRunning flag even when backfill fails", async () => { 589 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 590 591 mockDb = { 592 insert: vi.fn().mockReturnValue({ 593 values: vi.fn().mockReturnValue({ 594 returning: vi.fn().mockRejectedValue(new Error("DB insert failed")), 595 }), 596 }), 597 update: vi.fn().mockReturnValue({ 598 set: vi.fn().mockReturnValue({ 599 where: vi.fn().mockResolvedValue(undefined), 600 }), 601 }), 602 } as unknown as Database; 603 604 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 605 manager.setIndexer(mockIndexer); 606 607 await expect(manager.performBackfill(BackfillStatus.FullSync)) 608 .rejects.toThrow("DB insert failed"); 609 610 expect(manager.getIsRunning()).toBe(false); 611 consoleSpy.mockRestore(); 612 }); 613 }); 614 615 describe("checkForInterruptedBackfill", () => { 616 it("returns null when no interrupted backfill exists", async () => { 617 vi.spyOn(mockDb, "select").mockReturnValue({ 618 from: vi.fn().mockReturnValue({ 619 where: vi.fn().mockReturnValue({ 620 limit: vi.fn().mockResolvedValue([]), 621 }), 622 }), 623 } as any); 624 625 const result = await manager.checkForInterruptedBackfill(); 626 expect(result).toBeNull(); 627 }); 628 629 it("returns null and logs error when DB query fails", async () => { 630 vi.spyOn(mockDb, "select").mockReturnValue({ 631 from: vi.fn().mockReturnValue({ 632 where: vi.fn().mockReturnValue({ 633 limit: vi.fn().mockRejectedValue(new Error("DB connection lost")), 634 }), 635 }), 636 } as any); 637 638 const result = await manager.checkForInterruptedBackfill(); 639 expect(result).toBeNull(); 640 expect(mockLogger.error).toHaveBeenCalled(); 641 }); 642 643 it("returns interrupted backfill row when one exists", async () => { 644 const interruptedRow = { 645 id: 5n, 646 status: "in_progress", 647 backfillType: "catch_up", 648 lastProcessedDid: "did:plc:halfway", 649 didsTotal: 100, 650 didsProcessed: 50, 651 recordsIndexed: 250, 652 startedAt: new Date(), 653 completedAt: null, 654 errorMessage: null, 655 }; 656 657 vi.spyOn(mockDb, "select").mockReturnValue({ 658 from: vi.fn().mockReturnValue({ 659 where: vi.fn().mockReturnValue({ 660 limit: vi.fn().mockResolvedValue([interruptedRow]), 661 }), 662 }), 663 } as any); 664 665 const result = await manager.checkForInterruptedBackfill(); 666 expect(result).toEqual(interruptedRow); 667 }); 668 }); 669 670 describe("resumeBackfill", () => { 671 let mockIndexer: Indexer; 672 673 beforeEach(() => { 674 vi.spyOn(console, "log").mockImplementation(() => {}); 675 vi.spyOn(console, "error").mockImplementation(() => {}); 676 677 mockIndexer = { 678 handleForumCreate: vi.fn().mockResolvedValue(true), 679 handleCategoryCreate: vi.fn().mockResolvedValue(true), 680 handleBoardCreate: vi.fn().mockResolvedValue(true), 681 handleRoleCreate: vi.fn().mockResolvedValue(true), 682 handleMembershipCreate: vi.fn().mockResolvedValue(true), 683 handlePostCreate: vi.fn().mockResolvedValue(true), 684 handleModActionCreate: vi.fn().mockResolvedValue(true), 685 } as unknown as Indexer; 686 }); 687 688 afterEach(() => { 689 vi.restoreAllMocks(); 690 }); 691 692 it("resumes from lastProcessedDid and processes remaining users", async () => { 693 // Interrupted at user1 (didsProcessed=1), user2 and user3 remain 694 const interrupted = { 695 id: 5n, 696 status: "in_progress" as const, 697 backfillType: "catch_up", 698 lastProcessedDid: "did:plc:user1", 699 didsTotal: 3, 700 didsProcessed: 1, 701 recordsIndexed: 2, 702 startedAt: new Date(), 703 completedAt: null, 704 errorMessage: null, 705 }; 706 707 // user2 and user3: 1 record each per collection (2 collections = 4 total) 708 const recordPage = { 709 data: { 710 records: [{ uri: "at://did:plc:u/space.atbb.post/r1", cid: "bafyr1", 711 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" } }], 712 cursor: undefined, 713 }, 714 }; 715 716 const mockListRecords = vi.fn().mockResolvedValue(recordPage); 717 718 mockDb = { 719 select: vi.fn().mockReturnValue({ 720 from: vi.fn().mockReturnValue({ 721 where: vi.fn().mockReturnValue({ 722 orderBy: vi.fn().mockResolvedValue([ 723 { did: "did:plc:user2" }, 724 { did: "did:plc:user3" }, 725 ]), 726 }), 727 }), 728 }), 729 insert: vi.fn().mockReturnValue({ 730 values: vi.fn().mockReturnValue({ 731 returning: vi.fn().mockResolvedValue([]), 732 }), 733 }), 734 update: vi.fn().mockReturnValue({ 735 set: vi.fn().mockReturnValue({ 736 where: vi.fn().mockResolvedValue(undefined), 737 }), 738 }), 739 } as unknown as Database; 740 741 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 5 }), createMockLogger()); 742 manager.setIndexer(mockIndexer); 743 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 744 com: { atproto: { repo: { listRecords: mockListRecords } } }, 745 }); 746 747 const result = await manager.resumeBackfill(interrupted); 748 749 // Starts from interrupted.recordsIndexed=2, adds 2 users × 2 collections × 1 record = 4 750 expect(result.recordsIndexed).toBe(6); 751 expect(result.errors).toBe(0); 752 expect(result.didsProcessed).toBe(3); // 1 (prior) + 2 (resumed) 753 expect(result.backfillId).toBe(5n); 754 }); 755 756 it("marks completed even when no remaining users", async () => { 757 // Interrupted at the last user — no users with DID > lastProcessedDid 758 const interrupted = { 759 id: 3n, 760 status: "in_progress" as const, 761 backfillType: "catch_up", 762 lastProcessedDid: "did:plc:last", 763 didsTotal: 2, 764 didsProcessed: 2, 765 recordsIndexed: 10, 766 startedAt: new Date(), 767 completedAt: null, 768 errorMessage: null, 769 }; 770 771 mockDb = { 772 select: vi.fn().mockReturnValue({ 773 from: vi.fn().mockReturnValue({ 774 where: vi.fn().mockReturnValue({ 775 orderBy: vi.fn().mockResolvedValue([]), // no remaining users 776 }), 777 }), 778 }), 779 update: vi.fn().mockReturnValue({ 780 set: vi.fn().mockReturnValue({ 781 where: vi.fn().mockResolvedValue(undefined), 782 }), 783 }), 784 } as unknown as Database; 785 786 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 787 manager.setIndexer(mockIndexer); 788 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 789 com: { atproto: { repo: { listRecords: vi.fn() } } }, 790 }); 791 792 const result = await manager.resumeBackfill(interrupted); 793 794 // No new records — just marks completed with existing counts 795 expect(result.recordsIndexed).toBe(10); 796 expect(result.didsProcessed).toBe(2); 797 expect(result.backfillId).toBe(3n); 798 799 // DB row should be updated to completed 800 const updateMock = mockDb.update as any; 801 expect(updateMock).toHaveBeenCalled(); 802 }); 803 804 it("clears isRunning flag even when resume fails", async () => { 805 const interrupted = { 806 id: 9n, 807 status: "in_progress" as const, 808 backfillType: "catch_up", 809 lastProcessedDid: "did:plc:checkpoint", 810 didsTotal: 5, 811 didsProcessed: 3, 812 recordsIndexed: 15, 813 startedAt: new Date(), 814 completedAt: null, 815 errorMessage: null, 816 }; 817 818 mockDb = { 819 select: vi.fn().mockReturnValue({ 820 from: vi.fn().mockReturnValue({ 821 where: vi.fn().mockReturnValue({ 822 orderBy: vi.fn().mockRejectedValue(new Error("DB query failed")), 823 }), 824 }), 825 }), 826 update: vi.fn().mockReturnValue({ 827 set: vi.fn().mockReturnValue({ 828 where: vi.fn().mockResolvedValue(undefined), 829 }), 830 }), 831 } as unknown as Database; 832 833 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 834 manager.setIndexer(mockIndexer); 835 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 836 com: { atproto: { repo: { listRecords: vi.fn() } } }, 837 }); 838 839 await expect(manager.resumeBackfill(interrupted)) 840 .rejects.toThrow("DB query failed"); 841 842 expect(manager.getIsRunning()).toBe(false); 843 }); 844 845 it("marks full_sync interrupted backfill as failed (cannot resume FullSync)", async () => { 846 const interrupted = { 847 id: 10n, 848 status: "in_progress" as const, 849 backfillType: "full_sync", 850 lastProcessedDid: null, 851 didsTotal: 0, 852 didsProcessed: 0, 853 recordsIndexed: 0, 854 startedAt: new Date(), 855 completedAt: null, 856 errorMessage: null, 857 }; 858 859 const mockUpdate = vi.fn().mockReturnValue({ 860 set: vi.fn().mockReturnValue({ 861 where: vi.fn().mockResolvedValue(undefined), 862 }), 863 }); 864 mockDb = { 865 update: mockUpdate, 866 } as unknown as Database; 867 868 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 869 manager.setIndexer(mockIndexer); 870 871 await expect(manager.resumeBackfill(interrupted)) 872 .rejects.toThrow("Interrupted FullSync cannot be resumed"); 873 874 // Verify the row was marked as failed 875 expect(mockUpdate).toHaveBeenCalled(); 876 const setCall = mockUpdate.mock.results[0].value.set; 877 expect(setCall).toHaveBeenCalledWith( 878 expect.objectContaining({ status: "failed" }) 879 ); 880 }); 881 882 it("rejects concurrent resume attempts", async () => { 883 const interrupted = { 884 id: 2n, 885 status: "in_progress" as const, 886 backfillType: "catch_up", 887 lastProcessedDid: "did:plc:check", 888 didsTotal: 2, 889 didsProcessed: 1, 890 recordsIndexed: 5, 891 startedAt: new Date(), 892 completedAt: null, 893 errorMessage: null, 894 }; 895 896 mockDb = { 897 select: vi.fn().mockReturnValue({ 898 from: vi.fn().mockReturnValue({ 899 where: vi.fn().mockReturnValue({ 900 orderBy: vi.fn().mockImplementation( 901 () => new Promise((resolve) => setTimeout(() => resolve([]), 200)) 902 ), 903 }), 904 }), 905 }), 906 update: vi.fn().mockReturnValue({ 907 set: vi.fn().mockReturnValue({ 908 where: vi.fn().mockResolvedValue(undefined), 909 }), 910 }), 911 } as unknown as Database; 912 913 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 914 manager.setIndexer(mockIndexer); 915 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 916 com: { atproto: { repo: { listRecords: vi.fn() } } }, 917 }); 918 919 const first = manager.resumeBackfill(interrupted); 920 921 await expect(manager.resumeBackfill(interrupted)) 922 .rejects.toThrow("Backfill is already in progress"); 923 924 await first; 925 }); 926 }); 927});