WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at root/atb-56-theme-caching-layer 1228 lines 40 kB view raw
1import type { 2 CommitCreateEvent, 3 CommitDeleteEvent, 4 CommitUpdateEvent, 5} from "@skyware/jetstream"; 6import type { Database, DbOrTransaction } from "@atbb/db"; 7import type { Logger } from "@atbb/logger"; 8import { 9 posts, 10 forums, 11 categories, 12 boards, 13 users, 14 memberships, 15 modActions, 16 roles, 17 rolePermissions, 18 themes, 19 themePolicies, 20 themePolicyAvailableThemes, 21} from "@atbb/db"; 22import { eq, and } from "drizzle-orm"; 23import { parseAtUri } from "./at-uri.js"; 24import { BanEnforcer } from "./ban-enforcer.js"; 25import { 26 SpaceAtbbPost as Post, 27 SpaceAtbbForumForum as Forum, 28 SpaceAtbbForumCategory as Category, 29 SpaceAtbbForumBoard as Board, 30 SpaceAtbbMembership as Membership, 31 SpaceAtbbModAction as ModAction, 32 SpaceAtbbForumRole as Role, 33 SpaceAtbbForumTheme as Theme, 34 SpaceAtbbForumThemePolicy as ThemePolicy, 35} from "@atbb/lexicon"; 36 37// ── Collection Config Types ───────────────────────────── 38 39/** 40 * Configuration for a data-driven collection handler. 41 * Encodes the per-collection logic that differs across the 5 indexed types, 42 * while the generic handler methods supply the shared try/catch/log/throw scaffolding. 43 */ 44interface CollectionConfig<TRecord> { 45 /** Human-readable name for logging (e.g. "Post", "Forum") */ 46 name: string; 47 /** Drizzle table reference */ 48 table: any; 49 /** "hard" = DELETE FROM (all non-post collections) */ 50 deleteStrategy: "hard"; 51 /** Call ensureUser(event.did) before insert? (user-owned records) */ 52 ensureUserOnCreate?: boolean; 53 /** 54 * Transform event+record into DB insert values. 55 * Return null to skip the insert (e.g. when a required foreign key is missing). 56 */ 57 toInsertValues: ( 58 event: any, 59 record: TRecord, 60 tx: DbOrTransaction 61 ) => Promise<Record<string, any> | null>; 62 /** 63 * Transform event+record into DB update set values. 64 * Runs inside a transaction. Return null to skip the update. 65 */ 66 toUpdateValues: ( 67 event: any, 68 record: TRecord, 69 tx: DbOrTransaction 70 ) => Promise<Record<string, any> | null>; 71 /** 72 * Optional hook called after a row is inserted or updated, within the same 73 * transaction. Receives the row's numeric id (bigint) so callers can write 74 * to child tables (e.g. role_permissions). 75 */ 76 afterUpsert?: ( 77 event: any, 78 record: TRecord, 79 rowId: bigint, 80 tx: DbOrTransaction 81 ) => Promise<void>; 82} 83 84 85/** 86 * Indexer class for processing AT Proto firehose events 87 * Converts events into database records for the atBB AppView 88 */ 89export class Indexer { 90 private banEnforcer: BanEnforcer; 91 92 constructor(private db: Database, private logger: Logger) { 93 this.banEnforcer = new BanEnforcer(db, logger); 94 } 95 96 // ── Collection Configs ────────────────────────────────── 97 98 private postConfig: CollectionConfig<Post.Record> = { 99 name: "Post", 100 table: posts, 101 deleteStrategy: "hard", 102 ensureUserOnCreate: true, 103 toInsertValues: async (event, record, tx) => { 104 // Look up parent/root for replies 105 let rootId: bigint | null = null; 106 let parentId: bigint | null = null; 107 108 if (Post.isReplyRef(record.reply)) { 109 rootId = await this.getPostIdByUri(record.reply.root.uri, tx); 110 parentId = await this.getPostIdByUri(record.reply.parent.uri, tx); 111 } else if (record.reply) { 112 // reply ref present but $type omitted — rootPostId/parentPostId will be null, 113 // making this reply unreachable in thread navigation (data corruption). 114 this.logger.error("Post reply ref missing $type — rootPostId/parentPostId not resolved", { 115 operation: "Post CREATE", 116 postDid: event.did, 117 postRkey: event.commit.rkey, 118 errorId: "POST_REPLY_REF_MISSING_TYPE", 119 }); 120 } 121 122 // Look up board ID if board reference exists 123 let boardId: bigint | null = null; 124 if (record.board?.board.uri) { 125 boardId = await this.getBoardIdByUri(record.board.board.uri, tx); 126 if (!boardId) { 127 this.logger.error("Failed to index post: board not found", { 128 operation: "Post CREATE", 129 postDid: event.did, 130 postRkey: event.commit.rkey, 131 boardUri: record.board.board.uri, 132 errorId: "POST_BOARD_MISSING", 133 }); 134 throw new Error(`Board not found: ${record.board.board.uri}`); 135 } 136 } 137 138 return { 139 did: event.did, 140 rkey: event.commit.rkey, 141 cid: event.commit.cid, 142 title: record.reply ? null : (record.title ?? null), 143 text: record.text, 144 forumUri: record.forum?.forum.uri ?? null, 145 boardUri: record.board?.board.uri ?? null, 146 boardId, 147 rootPostId: rootId, 148 rootUri: record.reply?.root.uri ?? null, 149 parentPostId: parentId, 150 parentUri: record.reply?.parent.uri ?? null, 151 createdAt: new Date(record.createdAt), 152 indexedAt: new Date(), 153 }; 154 }, 155 toUpdateValues: async (event, record, tx) => { 156 // Look up board ID if board reference exists 157 let boardId: bigint | null = null; 158 if (record.board?.board.uri) { 159 boardId = await this.getBoardIdByUri(record.board.board.uri, tx); 160 if (!boardId) { 161 this.logger.error("Failed to index post: board not found", { 162 operation: "Post UPDATE", 163 postDid: event.did, 164 postRkey: event.commit.rkey, 165 boardUri: record.board.board.uri, 166 errorId: "POST_BOARD_MISSING", 167 }); 168 throw new Error(`Board not found: ${record.board.board.uri}`); 169 } 170 } 171 172 return { 173 cid: event.commit.cid, 174 title: record.reply ? null : (record.title ?? null), 175 text: record.text, 176 forumUri: record.forum?.forum.uri ?? null, 177 boardUri: record.board?.board.uri ?? null, 178 boardId, 179 indexedAt: new Date(), 180 }; 181 }, 182 }; 183 184 private forumConfig: CollectionConfig<Forum.Record> = { 185 name: "Forum", 186 table: forums, 187 deleteStrategy: "hard", 188 ensureUserOnCreate: true, 189 toInsertValues: async (event, record) => ({ 190 did: event.did, 191 rkey: event.commit.rkey, 192 cid: event.commit.cid, 193 name: record.name, 194 description: record.description ?? null, 195 indexedAt: new Date(), 196 }), 197 toUpdateValues: async (event, record) => ({ 198 cid: event.commit.cid, 199 name: record.name, 200 description: record.description ?? null, 201 indexedAt: new Date(), 202 }), 203 }; 204 205 private categoryConfig: CollectionConfig<Category.Record> = { 206 name: "Category", 207 table: categories, 208 deleteStrategy: "hard", 209 toInsertValues: async (event, record, tx) => { 210 // Categories are owned by the Forum DID, so event.did IS the forum DID 211 const forumId = await this.getForumIdByDid(event.did, tx); 212 213 if (!forumId) { 214 this.logger.warn("Category: Forum not found for DID", { 215 operation: "Category CREATE", 216 did: event.did, 217 }); 218 return null; 219 } 220 221 return { 222 did: event.did, 223 rkey: event.commit.rkey, 224 cid: event.commit.cid, 225 forumId, 226 name: record.name, 227 description: record.description ?? null, 228 slug: record.slug ?? null, 229 sortOrder: record.sortOrder ?? 0, 230 createdAt: new Date(record.createdAt), 231 indexedAt: new Date(), 232 }; 233 }, 234 toUpdateValues: async (event, record, tx) => { 235 // Categories are owned by the Forum DID, so event.did IS the forum DID 236 const forumId = await this.getForumIdByDid(event.did, tx); 237 238 if (!forumId) { 239 this.logger.warn("Category: Forum not found for DID", { 240 operation: "Category UPDATE", 241 did: event.did, 242 }); 243 return null; 244 } 245 246 return { 247 cid: event.commit.cid, 248 forumId, 249 name: record.name, 250 description: record.description ?? null, 251 slug: record.slug ?? null, 252 sortOrder: record.sortOrder ?? 0, 253 indexedAt: new Date(), 254 }; 255 }, 256 }; 257 258 private boardConfig: CollectionConfig<Board.Record> = { 259 name: "Board", 260 table: boards, 261 deleteStrategy: "hard", 262 toInsertValues: async (event, record, tx) => { 263 // Boards are owned by Forum DID 264 const categoryId = await this.getCategoryIdByUri( 265 record.category.category.uri, 266 tx 267 ); 268 269 if (!categoryId) { 270 this.logger.error("Failed to index board: category not found", { 271 operation: "Board CREATE", 272 boardDid: event.did, 273 boardRkey: event.commit.rkey, 274 categoryUri: record.category.category.uri, 275 errorId: "BOARD_CATEGORY_MISSING", 276 }); 277 throw new Error(`Category not found: ${record.category.category.uri}`); 278 } 279 280 return { 281 did: event.did, 282 rkey: event.commit.rkey, 283 cid: event.commit.cid, 284 name: record.name, 285 description: record.description ?? null, 286 slug: record.slug ?? null, 287 sortOrder: record.sortOrder ?? null, 288 categoryId, 289 categoryUri: record.category.category.uri, 290 createdAt: new Date(record.createdAt), 291 indexedAt: new Date(), 292 }; 293 }, 294 toUpdateValues: async (event, record, tx) => { 295 const categoryId = await this.getCategoryIdByUri( 296 record.category.category.uri, 297 tx 298 ); 299 300 if (!categoryId) { 301 this.logger.error("Failed to index board: category not found", { 302 operation: "Board UPDATE", 303 boardDid: event.did, 304 boardRkey: event.commit.rkey, 305 categoryUri: record.category.category.uri, 306 errorId: "BOARD_CATEGORY_MISSING", 307 }); 308 throw new Error(`Category not found: ${record.category.category.uri}`); 309 } 310 311 return { 312 cid: event.commit.cid, 313 name: record.name, 314 description: record.description ?? null, 315 slug: record.slug ?? null, 316 sortOrder: record.sortOrder ?? null, 317 categoryId, 318 categoryUri: record.category.category.uri, 319 indexedAt: new Date(), 320 }; 321 }, 322 }; 323 324 private roleConfig: CollectionConfig<Role.Record> = { 325 name: "Role", 326 table: roles, 327 deleteStrategy: "hard", 328 toInsertValues: async (event, record) => ({ 329 did: event.did, 330 rkey: event.commit.rkey, 331 cid: event.commit.cid, 332 name: record.name, 333 description: record.description ?? null, 334 priority: record.priority, 335 createdAt: new Date(record.createdAt), 336 indexedAt: new Date(), 337 }), 338 toUpdateValues: async (event, record) => ({ 339 cid: event.commit.cid, 340 name: record.name, 341 description: record.description ?? null, 342 priority: record.priority, 343 indexedAt: new Date(), 344 }), 345 afterUpsert: async (event, record, roleId, tx) => { 346 // Replace all permissions for this role atomically 347 await tx 348 .delete(rolePermissions) 349 .where(eq(rolePermissions.roleId, roleId)); 350 351 if (record.permissions && record.permissions.length > 0) { 352 await tx.insert(rolePermissions).values( 353 record.permissions.map((permission: string) => ({ 354 roleId, 355 permission, 356 })) 357 ); 358 } 359 }, 360 }; 361 362 private themeConfig: CollectionConfig<Theme.Record> = { 363 name: "Theme", 364 table: themes, 365 deleteStrategy: "hard", 366 toInsertValues: async (event, record) => ({ 367 did: event.did, 368 rkey: event.commit.rkey, 369 cid: event.commit.cid, 370 name: record.name, 371 colorScheme: record.colorScheme as string, 372 tokens: record.tokens, 373 cssOverrides: (record.cssOverrides as string | undefined) ?? null, 374 fontUrls: (record.fontUrls as string[] | undefined) ?? null, 375 createdAt: new Date(record.createdAt as string), 376 indexedAt: new Date(), 377 }), 378 toUpdateValues: async (event, record) => ({ 379 cid: event.commit.cid, 380 name: record.name, 381 colorScheme: record.colorScheme as string, 382 tokens: record.tokens, 383 cssOverrides: (record.cssOverrides as string | undefined) ?? null, 384 fontUrls: (record.fontUrls as string[] | undefined) ?? null, 385 indexedAt: new Date(), 386 }), 387 }; 388 389 private themePolicyConfig: CollectionConfig<ThemePolicy.Record> = { 390 name: "ThemePolicy", 391 table: themePolicies, 392 deleteStrategy: "hard", 393 toInsertValues: async (event, record) => ({ 394 did: event.did, 395 rkey: event.commit.rkey, 396 cid: event.commit.cid, 397 defaultLightThemeUri: record.defaultLightTheme.uri, 398 defaultDarkThemeUri: record.defaultDarkTheme.uri, 399 allowUserChoice: record.allowUserChoice, 400 indexedAt: new Date(), 401 }), 402 toUpdateValues: async (event, record) => ({ 403 cid: event.commit.cid, 404 defaultLightThemeUri: record.defaultLightTheme.uri, 405 defaultDarkThemeUri: record.defaultDarkTheme.uri, 406 allowUserChoice: record.allowUserChoice, 407 indexedAt: new Date(), 408 }), 409 afterUpsert: async (_event, record, policyId, tx) => { 410 // Atomically replace all available-theme rows for this policy 411 await tx 412 .delete(themePolicyAvailableThemes) 413 .where(eq(themePolicyAvailableThemes.policyId, policyId)); 414 415 const available = record.availableThemes ?? []; 416 if (available.length > 0) { 417 await tx.insert(themePolicyAvailableThemes).values( 418 available.map((themeRef) => ({ 419 policyId, 420 themeUri: themeRef.uri, 421 themeCid: themeRef.cid ?? null, 422 })) 423 ); 424 } 425 }, 426 }; 427 428 private membershipConfig: CollectionConfig<Membership.Record> = { 429 name: "Membership", 430 table: memberships, 431 deleteStrategy: "hard", 432 ensureUserOnCreate: true, 433 toInsertValues: async (event, record, tx) => { 434 // Look up forum by URI (inside transaction) 435 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx); 436 437 if (!forumId) { 438 this.logger.warn("Membership: Forum not found", { 439 operation: "Membership CREATE", 440 forumUri: record.forum.forum.uri, 441 }); 442 return null; 443 } 444 445 return { 446 did: event.did, 447 rkey: event.commit.rkey, 448 cid: event.commit.cid, 449 forumId, 450 forumUri: record.forum.forum.uri, 451 role: null, // TODO: Extract role name from roleUri or lexicon 452 roleUri: record.role?.role.uri ?? null, 453 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, 454 createdAt: new Date(record.createdAt), 455 indexedAt: new Date(), 456 }; 457 }, 458 toUpdateValues: async (event, record, tx) => { 459 // Look up forum by URI (may have changed) 460 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx); 461 462 if (!forumId) { 463 this.logger.warn("Membership: Forum not found", { 464 operation: "Membership UPDATE", 465 forumUri: record.forum.forum.uri, 466 }); 467 return null; 468 } 469 470 return { 471 cid: event.commit.cid, 472 forumId, 473 forumUri: record.forum.forum.uri, 474 role: null, // TODO: Extract role name from roleUri or lexicon 475 roleUri: record.role?.role.uri ?? null, 476 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, 477 indexedAt: new Date(), 478 }; 479 }, 480 }; 481 482 private modActionConfig: CollectionConfig<ModAction.Record> = { 483 name: "ModAction", 484 table: modActions, 485 deleteStrategy: "hard", 486 toInsertValues: async (event, record, tx) => { 487 // ModActions are owned by the Forum DID, so event.did IS the forum DID 488 const forumId = await this.getForumIdByDid(event.did, tx); 489 490 if (!forumId) { 491 this.logger.warn("ModAction: Forum not found for DID", { 492 operation: "ModAction CREATE", 493 did: event.did, 494 }); 495 return null; 496 } 497 498 // Ensure moderator exists 499 await this.ensureUser(record.createdBy, tx); 500 501 // Determine subject type (post or user) 502 let subjectPostUri: string | null = null; 503 let subjectDid: string | null = null; 504 505 if (record.subject.post) { 506 subjectPostUri = record.subject.post.uri; 507 } 508 if (record.subject.did) { 509 subjectDid = record.subject.did; 510 } 511 512 return { 513 did: event.did, 514 rkey: event.commit.rkey, 515 cid: event.commit.cid, 516 forumId, 517 action: record.action, 518 subjectPostUri, 519 subjectDid, 520 reason: record.reason ?? null, 521 createdBy: record.createdBy, 522 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 523 createdAt: new Date(record.createdAt), 524 indexedAt: new Date(), 525 }; 526 }, 527 toUpdateValues: async (event, record, tx) => { 528 // ModActions are owned by the Forum DID, so event.did IS the forum DID 529 const forumId = await this.getForumIdByDid(event.did, tx); 530 531 if (!forumId) { 532 this.logger.warn("ModAction: Forum not found for DID", { 533 operation: "ModAction UPDATE", 534 did: event.did, 535 }); 536 return null; 537 } 538 539 // Determine subject type (post or user) 540 let subjectPostUri: string | null = null; 541 let subjectDid: string | null = null; 542 543 if (record.subject.post) { 544 subjectPostUri = record.subject.post.uri; 545 } 546 if (record.subject.did) { 547 subjectDid = record.subject.did; 548 } 549 550 return { 551 cid: event.commit.cid, 552 forumId, 553 action: record.action, 554 subjectPostUri, 555 subjectDid, 556 reason: record.reason ?? null, 557 createdBy: record.createdBy, 558 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 559 indexedAt: new Date(), 560 }; 561 }, 562 }; 563 564 // ── Generic Handler Methods ───────────────────────────── 565 566 /** 567 * Generic create handler. Wraps the insert in a transaction, 568 * optionally ensures the user exists, and delegates to the 569 * config's toInsertValues callback for collection-specific logic. 570 */ 571 private async genericCreate<TRecord>( 572 config: CollectionConfig<TRecord>, 573 event: any 574 ): Promise<boolean> { 575 try { 576 const record = event.commit.record as unknown as TRecord; 577 let skipped = false; 578 579 await this.db.transaction(async (tx) => { 580 if (config.ensureUserOnCreate) { 581 await this.ensureUser(event.did, tx); 582 } 583 584 const values = await config.toInsertValues(event, record, tx); 585 if (!values) { 586 skipped = true; 587 return; // Skip insert (e.g. foreign key not found) 588 } 589 590 if (config.afterUpsert) { 591 const [inserted] = await tx 592 .insert(config.table) 593 .values(values) 594 .returning({ id: config.table.id }); 595 await config.afterUpsert(event, record, inserted.id, tx); 596 } else { 597 await tx.insert(config.table).values(values); 598 } 599 }); 600 601 // Only log success if insert actually happened 602 if (!skipped) { 603 this.logger.info(`${config.name} created`, { 604 did: event.did, 605 rkey: event.commit.rkey, 606 }); 607 } 608 return !skipped; 609 } catch (error) { 610 this.logger.error(`Failed to index ${config.name.toLowerCase()} create`, { 611 did: event.did, 612 rkey: event.commit.rkey, 613 error: error instanceof Error ? error.message : String(error), 614 }); 615 throw error; 616 } 617 } 618 619 /** 620 * Generic update handler. Wraps the update in a transaction 621 * and delegates to the config's toUpdateValues callback for 622 * collection-specific logic. 623 */ 624 private async genericUpdate<TRecord>( 625 config: CollectionConfig<TRecord>, 626 event: any 627 ) { 628 try { 629 const record = event.commit.record as unknown as TRecord; 630 let skipped = false; 631 632 await this.db.transaction(async (tx) => { 633 const values = await config.toUpdateValues(event, record, tx); 634 if (!values) { 635 skipped = true; 636 return; // Skip update (e.g. foreign key not found) 637 } 638 639 if (config.afterUpsert) { 640 const [updated] = await tx 641 .update(config.table) 642 .set(values) 643 .where( 644 and( 645 eq(config.table.did, event.did), 646 eq(config.table.rkey, event.commit.rkey) 647 ) 648 ) 649 .returning({ id: config.table.id }); 650 if (!updated) return; // Out-of-order UPDATE before CREATE: no row to update yet 651 await config.afterUpsert(event, record, updated.id, tx); 652 } else { 653 await tx 654 .update(config.table) 655 .set(values) 656 .where( 657 and( 658 eq(config.table.did, event.did), 659 eq(config.table.rkey, event.commit.rkey) 660 ) 661 ); 662 } 663 }); 664 665 // Only log success if update actually happened 666 if (!skipped) { 667 this.logger.info(`${config.name} updated`, { 668 did: event.did, 669 rkey: event.commit.rkey, 670 }); 671 } 672 } catch (error) { 673 this.logger.error(`Failed to update ${config.name.toLowerCase()}`, { 674 did: event.did, 675 rkey: event.commit.rkey, 676 error: error instanceof Error ? error.message : String(error), 677 }); 678 throw error; 679 } 680 } 681 682 /** 683 * Generic delete handler. Hard-deletes a record (DELETE FROM). 684 * Posts use handlePostDelete instead (always tombstone). 685 */ 686 private async genericDelete(config: CollectionConfig<any>, event: any) { 687 try { 688 await this.db 689 .delete(config.table) 690 .where( 691 and( 692 eq(config.table.did, event.did), 693 eq(config.table.rkey, event.commit.rkey) 694 ) 695 ); 696 697 this.logger.info(`${config.name} deleted`, { 698 did: event.did, 699 rkey: event.commit.rkey, 700 }); 701 } catch (error) { 702 this.logger.error(`Failed to delete ${config.name.toLowerCase()}`, { 703 did: event.did, 704 rkey: event.commit.rkey, 705 error: error instanceof Error ? error.message : String(error), 706 }); 707 throw error; 708 } 709 } 710 711 // ── Post Handlers ─────────────────────────────────────── 712 713 async handlePostCreate(event: CommitCreateEvent<"space.atbb.post">) { 714 const banned = await this.banEnforcer.isBanned(event.did); 715 if (banned) { 716 this.logger.info("Skipping post from banned user", { 717 did: event.did, 718 rkey: event.commit.rkey, 719 }); 720 return; 721 } 722 await this.genericCreate(this.postConfig, event); 723 } 724 725 async handlePostUpdate(event: CommitUpdateEvent<"space.atbb.post">) { 726 await this.genericUpdate(this.postConfig, event); 727 } 728 729 /** 730 * Handles a user-initiated post delete from the PDS. 731 * Always tombstones: replaces personal content with a placeholder and marks 732 * deletedByUser=true. The row is kept so threads referencing this post as 733 * their root or parent remain intact. Personal content is gone; structure is preserved. 734 */ 735 async handlePostDelete(event: CommitDeleteEvent<"space.atbb.post">) { 736 const { did, commit: { rkey } } = event; 737 try { 738 await this.db 739 .update(posts) 740 .set({ text: "[user deleted this post]", deletedByUser: true }) 741 .where(and(eq(posts.did, did), eq(posts.rkey, rkey))); 742 this.logger.info("Post tombstoned: content replaced, structure preserved", { did, rkey }); 743 } catch (error) { 744 this.logger.error("Failed to tombstone post", { 745 did, 746 rkey, 747 error: error instanceof Error ? error.message : String(error), 748 }); 749 throw error; 750 } 751 } 752 753 // ── Forum Handlers ────────────────────────────────────── 754 755 async handleForumCreate(event: CommitCreateEvent<"space.atbb.forum.forum">) { 756 await this.genericCreate(this.forumConfig, event); 757 } 758 759 async handleForumUpdate(event: CommitUpdateEvent<"space.atbb.forum.forum">) { 760 await this.genericUpdate(this.forumConfig, event); 761 } 762 763 async handleForumDelete(event: CommitDeleteEvent<"space.atbb.forum.forum">) { 764 await this.genericDelete(this.forumConfig, event); 765 } 766 767 // ── Category Handlers ─────────────────────────────────── 768 769 async handleCategoryCreate( 770 event: CommitCreateEvent<"space.atbb.forum.category"> 771 ) { 772 await this.genericCreate(this.categoryConfig, event); 773 } 774 775 async handleCategoryUpdate( 776 event: CommitUpdateEvent<"space.atbb.forum.category"> 777 ) { 778 await this.genericUpdate(this.categoryConfig, event); 779 } 780 781 async handleCategoryDelete( 782 event: CommitDeleteEvent<"space.atbb.forum.category"> 783 ) { 784 await this.genericDelete(this.categoryConfig, event); 785 } 786 787 // ── Board Handlers ────────────────────────────────────── 788 789 async handleBoardCreate(event: CommitCreateEvent<"space.atbb.forum.board">) { 790 await this.genericCreate(this.boardConfig, event); 791 } 792 793 async handleBoardUpdate(event: CommitUpdateEvent<"space.atbb.forum.board">) { 794 await this.genericUpdate(this.boardConfig, event); 795 } 796 797 async handleBoardDelete(event: CommitDeleteEvent<"space.atbb.forum.board">) { 798 await this.genericDelete(this.boardConfig, event); 799 } 800 801 // ── Role Handlers ─────────────────────────────────────── 802 803 async handleRoleCreate(event: CommitCreateEvent<"space.atbb.forum.role">) { 804 await this.genericCreate(this.roleConfig, event); 805 } 806 807 async handleRoleUpdate(event: CommitUpdateEvent<"space.atbb.forum.role">) { 808 await this.genericUpdate(this.roleConfig, event); 809 } 810 811 async handleRoleDelete(event: CommitDeleteEvent<"space.atbb.forum.role">) { 812 await this.genericDelete(this.roleConfig, event); 813 } 814 815 // ── Theme Handlers ────────────────────────────────────── 816 817 async handleThemeCreate(event: CommitCreateEvent<"space.atbb.forum.theme">) { 818 await this.genericCreate(this.themeConfig, event); 819 } 820 821 async handleThemeUpdate(event: CommitUpdateEvent<"space.atbb.forum.theme">) { 822 await this.genericUpdate(this.themeConfig, event); 823 } 824 825 async handleThemeDelete(event: CommitDeleteEvent<"space.atbb.forum.theme">) { 826 await this.genericDelete(this.themeConfig, event); 827 } 828 829 // ── ThemePolicy Handlers ───────────────────────────────── 830 831 async handleThemePolicyCreate(event: CommitCreateEvent<"space.atbb.forum.themePolicy">) { 832 await this.genericCreate(this.themePolicyConfig, event); 833 } 834 835 async handleThemePolicyUpdate(event: CommitUpdateEvent<"space.atbb.forum.themePolicy">) { 836 await this.genericUpdate(this.themePolicyConfig, event); 837 } 838 839 async handleThemePolicyDelete(event: CommitDeleteEvent<"space.atbb.forum.themePolicy">) { 840 await this.genericDelete(this.themePolicyConfig, event); 841 } 842 843 // ── Membership Handlers ───────────────────────────────── 844 845 async handleMembershipCreate( 846 event: CommitCreateEvent<"space.atbb.membership"> 847 ) { 848 await this.genericCreate(this.membershipConfig, event); 849 } 850 851 async handleMembershipUpdate( 852 event: CommitUpdateEvent<"space.atbb.membership"> 853 ) { 854 await this.genericUpdate(this.membershipConfig, event); 855 } 856 857 async handleMembershipDelete( 858 event: CommitDeleteEvent<"space.atbb.membership"> 859 ) { 860 await this.genericDelete(this.membershipConfig, event); 861 } 862 863 // ── ModAction Handlers ────────────────────────────────── 864 865 async handleModActionCreate( 866 event: CommitCreateEvent<"space.atbb.modAction"> 867 ) { 868 const record = event.commit.record as unknown as ModAction.Record; 869 const isBan = 870 record.action === "space.atbb.modAction.ban" && record.subject.did; 871 const isUnban = 872 record.action === "space.atbb.modAction.unban" && record.subject.did; 873 874 try { 875 if (isBan) { 876 // Custom atomic path: insert ban record + applyBan in one transaction 877 let skipped = false; 878 await this.db.transaction(async (tx) => { 879 const forumId = await this.getForumIdByDid(event.did, tx); 880 if (!forumId) { 881 this.logger.warn("ModAction (ban): Forum not found for DID", { 882 operation: "ModAction CREATE", 883 did: event.did, 884 }); 885 skipped = true; 886 return; 887 } 888 await this.ensureUser(record.createdBy, tx); 889 await tx.insert(modActions).values({ 890 did: event.did, 891 rkey: event.commit.rkey, 892 cid: event.commit.cid, 893 forumId, 894 action: record.action, 895 subjectPostUri: null, 896 subjectDid: record.subject.did ?? null, 897 reason: record.reason ?? null, 898 createdBy: record.createdBy, 899 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 900 createdAt: new Date(record.createdAt), 901 indexedAt: new Date(), 902 }); 903 await this.banEnforcer.applyBan(record.subject.did!, tx); 904 }); 905 if (!skipped) { 906 this.logger.info("ModAction (ban) created", { 907 did: event.did, 908 rkey: event.commit.rkey, 909 }); 910 } 911 } else if (isUnban) { 912 // Custom atomic path: insert unban record + liftBan in one transaction 913 let skipped = false; 914 await this.db.transaction(async (tx) => { 915 const forumId = await this.getForumIdByDid(event.did, tx); 916 if (!forumId) { 917 this.logger.warn("ModAction (unban): Forum not found for DID", { 918 operation: "ModAction CREATE", 919 did: event.did, 920 }); 921 skipped = true; 922 return; 923 } 924 await this.ensureUser(record.createdBy, tx); 925 await tx.insert(modActions).values({ 926 did: event.did, 927 rkey: event.commit.rkey, 928 cid: event.commit.cid, 929 forumId, 930 action: record.action, 931 subjectPostUri: null, 932 subjectDid: record.subject.did ?? null, 933 reason: record.reason ?? null, 934 createdBy: record.createdBy, 935 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 936 createdAt: new Date(record.createdAt), 937 indexedAt: new Date(), 938 }); 939 await this.banEnforcer.liftBan(record.subject.did!, tx); 940 }); 941 if (!skipped) { 942 this.logger.info("ModAction (unban) created", { 943 did: event.did, 944 rkey: event.commit.rkey, 945 }); 946 } 947 } else { 948 // Generic path for all other mod actions (mute, pin, lock, delete, etc.) 949 await this.genericCreate(this.modActionConfig, event); 950 951 // Ban/unban without subject.did — shouldn't happen but log if it does 952 if ( 953 record.action === "space.atbb.modAction.ban" || 954 record.action === "space.atbb.modAction.unban" 955 ) { 956 this.logger.warn("ModAction: ban/unban action missing subject.did, skipping enforcement", { 957 did: event.did, 958 rkey: event.commit.rkey, 959 action: record.action, 960 }); 961 } 962 } 963 } catch (error) { 964 this.logger.error("Failed to index ModAction create", { 965 did: event.did, 966 rkey: event.commit.rkey, 967 error: error instanceof Error ? error.message : String(error), 968 }); 969 throw error; 970 } 971 } 972 973 async handleModActionUpdate( 974 event: CommitUpdateEvent<"space.atbb.modAction"> 975 ) { 976 await this.genericUpdate(this.modActionConfig, event); 977 } 978 979 async handleModActionDelete( 980 event: CommitDeleteEvent<"space.atbb.modAction"> 981 ) { 982 try { 983 await this.db.transaction(async (tx) => { 984 // 1. Read before delete to capture action type and subject 985 const [existing] = await tx 986 .select({ 987 action: modActions.action, 988 subjectDid: modActions.subjectDid, 989 }) 990 .from(modActions) 991 .where( 992 and( 993 eq(modActions.did, event.did), 994 eq(modActions.rkey, event.commit.rkey) 995 ) 996 ) 997 .limit(1); 998 999 // 2. Hard delete the record 1000 await tx 1001 .delete(modActions) 1002 .where( 1003 and( 1004 eq(modActions.did, event.did), 1005 eq(modActions.rkey, event.commit.rkey) 1006 ) 1007 ); 1008 1009 // 3. Restore posts if the deleted record was a ban 1010 if ( 1011 existing?.action === "space.atbb.modAction.ban" && 1012 existing?.subjectDid 1013 ) { 1014 await this.banEnforcer.liftBan(existing.subjectDid, tx); 1015 } 1016 }); 1017 1018 this.logger.info("ModAction deleted", { 1019 did: event.did, 1020 rkey: event.commit.rkey, 1021 }); 1022 } catch (error) { 1023 this.logger.error("Failed to delete modAction", { 1024 did: event.did, 1025 rkey: event.commit.rkey, 1026 error: error instanceof Error ? error.message : String(error), 1027 }); 1028 throw error; 1029 } 1030 } 1031 1032 // ── Reaction Handlers (Stub) ──────────────────────────── 1033 1034 async handleReactionCreate( 1035 event: CommitCreateEvent<"space.atbb.reaction"> 1036 ) { 1037 this.logger.warn("Reaction created (not implemented)", { did: event.did, rkey: event.commit.rkey }); 1038 // TODO: Add reactions table to schema 1039 } 1040 1041 async handleReactionUpdate( 1042 event: CommitUpdateEvent<"space.atbb.reaction"> 1043 ) { 1044 this.logger.warn("Reaction updated (not implemented)", { did: event.did, rkey: event.commit.rkey }); 1045 // TODO: Add reactions table to schema 1046 } 1047 1048 async handleReactionDelete( 1049 event: CommitDeleteEvent<"space.atbb.reaction"> 1050 ) { 1051 this.logger.warn("Reaction deleted (not implemented)", { did: event.did, rkey: event.commit.rkey }); 1052 // TODO: Add reactions table to schema 1053 } 1054 1055 // ── Helper Methods ────────────────────────────────────── 1056 1057 /** 1058 * Ensure a user exists in the database. Creates if not exists. 1059 * @param dbOrTx - Database instance or transaction 1060 */ 1061 private async ensureUser(did: string, dbOrTx: DbOrTransaction = this.db) { 1062 try { 1063 const existing = await dbOrTx.select().from(users).where(eq(users.did, did)).limit(1); 1064 1065 if (existing.length === 0) { 1066 await dbOrTx.insert(users).values({ 1067 did, 1068 handle: null, // Will be updated by identity events 1069 indexedAt: new Date(), 1070 }); 1071 this.logger.info("Created user", { did }); 1072 } 1073 } catch (error) { 1074 this.logger.error("Failed to ensure user exists", { 1075 did, 1076 error: error instanceof Error ? error.message : String(error), 1077 }); 1078 throw error; 1079 } 1080 } 1081 1082 /** 1083 * Look up a forum ID by its AT URI 1084 * @param dbOrTx - Database instance or transaction 1085 */ 1086 private async getForumIdByUri( 1087 forumUri: string, 1088 dbOrTx: DbOrTransaction = this.db 1089 ): Promise<bigint | null> { 1090 const parsed = parseAtUri(forumUri); 1091 if (!parsed) return null; 1092 1093 try { 1094 const result = await dbOrTx 1095 .select({ id: forums.id }) 1096 .from(forums) 1097 .where(and(eq(forums.did, parsed.did), eq(forums.rkey, parsed.rkey))) 1098 .limit(1); 1099 1100 return result.length > 0 ? result[0].id : null; 1101 } catch (error) { 1102 this.logger.error("Database error in getForumIdByUri", { 1103 operation: "getForumIdByUri", 1104 forumUri, 1105 error: error instanceof Error ? error.message : String(error), 1106 }); 1107 throw error; 1108 } 1109 } 1110 1111 /** 1112 * Look up a forum ID by the forum's DID 1113 * Used for records owned by the forum (categories, modActions) 1114 * @param dbOrTx - Database instance or transaction 1115 */ 1116 private async getForumIdByDid( 1117 forumDid: string, 1118 dbOrTx: DbOrTransaction = this.db 1119 ): Promise<bigint | null> { 1120 try { 1121 const result = await dbOrTx 1122 .select({ id: forums.id }) 1123 .from(forums) 1124 .where(eq(forums.did, forumDid)) 1125 .limit(1); 1126 1127 return result.length > 0 ? result[0].id : null; 1128 } catch (error) { 1129 this.logger.error("Database error in getForumIdByDid", { 1130 operation: "getForumIdByDid", 1131 forumDid, 1132 error: error instanceof Error ? error.message : String(error), 1133 }); 1134 throw error; 1135 } 1136 } 1137 1138 /** 1139 * Look up a post ID by its AT URI 1140 * @param dbOrTx - Database instance or transaction 1141 */ 1142 private async getPostIdByUri( 1143 postUri: string, 1144 dbOrTx: DbOrTransaction = this.db 1145 ): Promise<bigint | null> { 1146 const parsed = parseAtUri(postUri); 1147 if (!parsed) return null; 1148 1149 try { 1150 const result = await dbOrTx 1151 .select({ id: posts.id }) 1152 .from(posts) 1153 .where(and(eq(posts.did, parsed.did), eq(posts.rkey, parsed.rkey))) 1154 .limit(1); 1155 1156 return result.length > 0 ? result[0].id : null; 1157 } catch (error) { 1158 this.logger.error("Database error in getPostIdByUri", { 1159 operation: "getPostIdByUri", 1160 postUri, 1161 error: error instanceof Error ? error.message : String(error), 1162 }); 1163 throw error; 1164 } 1165 } 1166 1167 /** 1168 * Look up board ID by AT URI (at://did/collection/rkey) 1169 * @param uri - AT URI of the board 1170 * @param dbOrTx - Database instance or transaction 1171 */ 1172 private async getBoardIdByUri( 1173 uri: string, 1174 dbOrTx: DbOrTransaction = this.db 1175 ): Promise<bigint | null> { 1176 const parsed = parseAtUri(uri); 1177 if (!parsed) return null; 1178 1179 try { 1180 const [result] = await dbOrTx 1181 .select({ id: boards.id }) 1182 .from(boards) 1183 .where(and(eq(boards.did, parsed.did), eq(boards.rkey, parsed.rkey))) 1184 .limit(1); 1185 return result?.id ?? null; 1186 } catch (error) { 1187 this.logger.error("Database error in getBoardIdByUri", { 1188 operation: "getBoardIdByUri", 1189 uri, 1190 did: parsed.did, 1191 rkey: parsed.rkey, 1192 error: error instanceof Error ? error.message : String(error), 1193 }); 1194 throw error; 1195 } 1196 } 1197 1198 /** 1199 * Look up category ID by AT URI (at://did/collection/rkey) 1200 * @param uri - AT URI of the category 1201 * @param dbOrTx - Database instance or transaction 1202 */ 1203 private async getCategoryIdByUri( 1204 uri: string, 1205 dbOrTx: DbOrTransaction = this.db 1206 ): Promise<bigint | null> { 1207 const parsed = parseAtUri(uri); 1208 if (!parsed) return null; 1209 1210 try { 1211 const [result] = await dbOrTx 1212 .select({ id: categories.id }) 1213 .from(categories) 1214 .where(and(eq(categories.did, parsed.did), eq(categories.rkey, parsed.rkey))) 1215 .limit(1); 1216 return result?.id ?? null; 1217 } catch (error) { 1218 this.logger.error("Database error in getCategoryIdByUri", { 1219 operation: "getCategoryIdByUri", 1220 uri, 1221 did: parsed.did, 1222 rkey: parsed.rkey, 1223 error: error instanceof Error ? error.message : String(error), 1224 }); 1225 throw error; 1226 } 1227 } 1228}