WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at atb-52-css-token-extraction 1129 lines 36 kB view raw
1import type { 2 CommitCreateEvent, 3 CommitDeleteEvent, 4 CommitUpdateEvent, 5} from "@skyware/jetstream"; 6import type { Database, DbOrTransaction } from "@atbb/db"; 7import type { Logger } from "@atbb/logger"; 8import { 9 posts, 10 forums, 11 categories, 12 boards, 13 users, 14 memberships, 15 modActions, 16 roles, 17 rolePermissions, 18} from "@atbb/db"; 19import { eq, and } from "drizzle-orm"; 20import { parseAtUri } from "./at-uri.js"; 21import { BanEnforcer } from "./ban-enforcer.js"; 22import { 23 SpaceAtbbPost as Post, 24 SpaceAtbbForumForum as Forum, 25 SpaceAtbbForumCategory as Category, 26 SpaceAtbbForumBoard as Board, 27 SpaceAtbbMembership as Membership, 28 SpaceAtbbModAction as ModAction, 29 SpaceAtbbForumRole as Role, 30} from "@atbb/lexicon"; 31 32// ── Collection Config Types ───────────────────────────── 33 34/** 35 * Configuration for a data-driven collection handler. 36 * Encodes the per-collection logic that differs across the 5 indexed types, 37 * while the generic handler methods supply the shared try/catch/log/throw scaffolding. 38 */ 39interface CollectionConfig<TRecord> { 40 /** Human-readable name for logging (e.g. "Post", "Forum") */ 41 name: string; 42 /** Drizzle table reference */ 43 table: any; 44 /** "hard" = DELETE FROM (all non-post collections) */ 45 deleteStrategy: "hard"; 46 /** Call ensureUser(event.did) before insert? (user-owned records) */ 47 ensureUserOnCreate?: boolean; 48 /** 49 * Transform event+record into DB insert values. 50 * Return null to skip the insert (e.g. when a required foreign key is missing). 51 */ 52 toInsertValues: ( 53 event: any, 54 record: TRecord, 55 tx: DbOrTransaction 56 ) => Promise<Record<string, any> | null>; 57 /** 58 * Transform event+record into DB update set values. 59 * Runs inside a transaction. Return null to skip the update. 60 */ 61 toUpdateValues: ( 62 event: any, 63 record: TRecord, 64 tx: DbOrTransaction 65 ) => Promise<Record<string, any> | null>; 66 /** 67 * Optional hook called after a row is inserted or updated, within the same 68 * transaction. Receives the row's numeric id (bigint) so callers can write 69 * to child tables (e.g. role_permissions). 70 */ 71 afterUpsert?: ( 72 event: any, 73 record: TRecord, 74 rowId: bigint, 75 tx: DbOrTransaction 76 ) => Promise<void>; 77} 78 79 80/** 81 * Indexer class for processing AT Proto firehose events 82 * Converts events into database records for the atBB AppView 83 */ 84export class Indexer { 85 private banEnforcer: BanEnforcer; 86 87 constructor(private db: Database, private logger: Logger) { 88 this.banEnforcer = new BanEnforcer(db, logger); 89 } 90 91 // ── Collection Configs ────────────────────────────────── 92 93 private postConfig: CollectionConfig<Post.Record> = { 94 name: "Post", 95 table: posts, 96 deleteStrategy: "hard", 97 ensureUserOnCreate: true, 98 toInsertValues: async (event, record, tx) => { 99 // Look up parent/root for replies 100 let rootId: bigint | null = null; 101 let parentId: bigint | null = null; 102 103 if (Post.isReplyRef(record.reply)) { 104 rootId = await this.getPostIdByUri(record.reply.root.uri, tx); 105 parentId = await this.getPostIdByUri(record.reply.parent.uri, tx); 106 } else if (record.reply) { 107 // reply ref present but $type omitted — rootPostId/parentPostId will be null, 108 // making this reply unreachable in thread navigation (data corruption). 109 this.logger.error("Post reply ref missing $type — rootPostId/parentPostId not resolved", { 110 operation: "Post CREATE", 111 postDid: event.did, 112 postRkey: event.commit.rkey, 113 errorId: "POST_REPLY_REF_MISSING_TYPE", 114 }); 115 } 116 117 // Look up board ID if board reference exists 118 let boardId: bigint | null = null; 119 if (record.board?.board.uri) { 120 boardId = await this.getBoardIdByUri(record.board.board.uri, tx); 121 if (!boardId) { 122 this.logger.error("Failed to index post: board not found", { 123 operation: "Post CREATE", 124 postDid: event.did, 125 postRkey: event.commit.rkey, 126 boardUri: record.board.board.uri, 127 errorId: "POST_BOARD_MISSING", 128 }); 129 throw new Error(`Board not found: ${record.board.board.uri}`); 130 } 131 } 132 133 return { 134 did: event.did, 135 rkey: event.commit.rkey, 136 cid: event.commit.cid, 137 title: record.reply ? null : (record.title ?? null), 138 text: record.text, 139 forumUri: record.forum?.forum.uri ?? null, 140 boardUri: record.board?.board.uri ?? null, 141 boardId, 142 rootPostId: rootId, 143 rootUri: record.reply?.root.uri ?? null, 144 parentPostId: parentId, 145 parentUri: record.reply?.parent.uri ?? null, 146 createdAt: new Date(record.createdAt), 147 indexedAt: new Date(), 148 }; 149 }, 150 toUpdateValues: async (event, record, tx) => { 151 // Look up board ID if board reference exists 152 let boardId: bigint | null = null; 153 if (record.board?.board.uri) { 154 boardId = await this.getBoardIdByUri(record.board.board.uri, tx); 155 if (!boardId) { 156 this.logger.error("Failed to index post: board not found", { 157 operation: "Post UPDATE", 158 postDid: event.did, 159 postRkey: event.commit.rkey, 160 boardUri: record.board.board.uri, 161 errorId: "POST_BOARD_MISSING", 162 }); 163 throw new Error(`Board not found: ${record.board.board.uri}`); 164 } 165 } 166 167 return { 168 cid: event.commit.cid, 169 title: record.reply ? null : (record.title ?? null), 170 text: record.text, 171 forumUri: record.forum?.forum.uri ?? null, 172 boardUri: record.board?.board.uri ?? null, 173 boardId, 174 indexedAt: new Date(), 175 }; 176 }, 177 }; 178 179 private forumConfig: CollectionConfig<Forum.Record> = { 180 name: "Forum", 181 table: forums, 182 deleteStrategy: "hard", 183 ensureUserOnCreate: true, 184 toInsertValues: async (event, record) => ({ 185 did: event.did, 186 rkey: event.commit.rkey, 187 cid: event.commit.cid, 188 name: record.name, 189 description: record.description ?? null, 190 indexedAt: new Date(), 191 }), 192 toUpdateValues: async (event, record) => ({ 193 cid: event.commit.cid, 194 name: record.name, 195 description: record.description ?? null, 196 indexedAt: new Date(), 197 }), 198 }; 199 200 private categoryConfig: CollectionConfig<Category.Record> = { 201 name: "Category", 202 table: categories, 203 deleteStrategy: "hard", 204 toInsertValues: async (event, record, tx) => { 205 // Categories are owned by the Forum DID, so event.did IS the forum DID 206 const forumId = await this.getForumIdByDid(event.did, tx); 207 208 if (!forumId) { 209 this.logger.warn("Category: Forum not found for DID", { 210 operation: "Category CREATE", 211 did: event.did, 212 }); 213 return null; 214 } 215 216 return { 217 did: event.did, 218 rkey: event.commit.rkey, 219 cid: event.commit.cid, 220 forumId, 221 name: record.name, 222 description: record.description ?? null, 223 slug: record.slug ?? null, 224 sortOrder: record.sortOrder ?? 0, 225 createdAt: new Date(record.createdAt), 226 indexedAt: new Date(), 227 }; 228 }, 229 toUpdateValues: async (event, record, tx) => { 230 // Categories are owned by the Forum DID, so event.did IS the forum DID 231 const forumId = await this.getForumIdByDid(event.did, tx); 232 233 if (!forumId) { 234 this.logger.warn("Category: Forum not found for DID", { 235 operation: "Category UPDATE", 236 did: event.did, 237 }); 238 return null; 239 } 240 241 return { 242 cid: event.commit.cid, 243 forumId, 244 name: record.name, 245 description: record.description ?? null, 246 slug: record.slug ?? null, 247 sortOrder: record.sortOrder ?? 0, 248 indexedAt: new Date(), 249 }; 250 }, 251 }; 252 253 private boardConfig: CollectionConfig<Board.Record> = { 254 name: "Board", 255 table: boards, 256 deleteStrategy: "hard", 257 toInsertValues: async (event, record, tx) => { 258 // Boards are owned by Forum DID 259 const categoryId = await this.getCategoryIdByUri( 260 record.category.category.uri, 261 tx 262 ); 263 264 if (!categoryId) { 265 this.logger.error("Failed to index board: category not found", { 266 operation: "Board CREATE", 267 boardDid: event.did, 268 boardRkey: event.commit.rkey, 269 categoryUri: record.category.category.uri, 270 errorId: "BOARD_CATEGORY_MISSING", 271 }); 272 throw new Error(`Category not found: ${record.category.category.uri}`); 273 } 274 275 return { 276 did: event.did, 277 rkey: event.commit.rkey, 278 cid: event.commit.cid, 279 name: record.name, 280 description: record.description ?? null, 281 slug: record.slug ?? null, 282 sortOrder: record.sortOrder ?? null, 283 categoryId, 284 categoryUri: record.category.category.uri, 285 createdAt: new Date(record.createdAt), 286 indexedAt: new Date(), 287 }; 288 }, 289 toUpdateValues: async (event, record, tx) => { 290 const categoryId = await this.getCategoryIdByUri( 291 record.category.category.uri, 292 tx 293 ); 294 295 if (!categoryId) { 296 this.logger.error("Failed to index board: category not found", { 297 operation: "Board UPDATE", 298 boardDid: event.did, 299 boardRkey: event.commit.rkey, 300 categoryUri: record.category.category.uri, 301 errorId: "BOARD_CATEGORY_MISSING", 302 }); 303 throw new Error(`Category not found: ${record.category.category.uri}`); 304 } 305 306 return { 307 cid: event.commit.cid, 308 name: record.name, 309 description: record.description ?? null, 310 slug: record.slug ?? null, 311 sortOrder: record.sortOrder ?? null, 312 categoryId, 313 categoryUri: record.category.category.uri, 314 indexedAt: new Date(), 315 }; 316 }, 317 }; 318 319 private roleConfig: CollectionConfig<Role.Record> = { 320 name: "Role", 321 table: roles, 322 deleteStrategy: "hard", 323 toInsertValues: async (event, record) => ({ 324 did: event.did, 325 rkey: event.commit.rkey, 326 cid: event.commit.cid, 327 name: record.name, 328 description: record.description ?? null, 329 priority: record.priority, 330 createdAt: new Date(record.createdAt), 331 indexedAt: new Date(), 332 }), 333 toUpdateValues: async (event, record) => ({ 334 cid: event.commit.cid, 335 name: record.name, 336 description: record.description ?? null, 337 priority: record.priority, 338 indexedAt: new Date(), 339 }), 340 afterUpsert: async (event, record, roleId, tx) => { 341 // Replace all permissions for this role atomically 342 await tx 343 .delete(rolePermissions) 344 .where(eq(rolePermissions.roleId, roleId)); 345 346 if (record.permissions && record.permissions.length > 0) { 347 await tx.insert(rolePermissions).values( 348 record.permissions.map((permission: string) => ({ 349 roleId, 350 permission, 351 })) 352 ); 353 } 354 }, 355 }; 356 357 private membershipConfig: CollectionConfig<Membership.Record> = { 358 name: "Membership", 359 table: memberships, 360 deleteStrategy: "hard", 361 ensureUserOnCreate: true, 362 toInsertValues: async (event, record, tx) => { 363 // Look up forum by URI (inside transaction) 364 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx); 365 366 if (!forumId) { 367 this.logger.warn("Membership: Forum not found", { 368 operation: "Membership CREATE", 369 forumUri: record.forum.forum.uri, 370 }); 371 return null; 372 } 373 374 return { 375 did: event.did, 376 rkey: event.commit.rkey, 377 cid: event.commit.cid, 378 forumId, 379 forumUri: record.forum.forum.uri, 380 role: null, // TODO: Extract role name from roleUri or lexicon 381 roleUri: record.role?.role.uri ?? null, 382 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, 383 createdAt: new Date(record.createdAt), 384 indexedAt: new Date(), 385 }; 386 }, 387 toUpdateValues: async (event, record, tx) => { 388 // Look up forum by URI (may have changed) 389 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx); 390 391 if (!forumId) { 392 this.logger.warn("Membership: Forum not found", { 393 operation: "Membership UPDATE", 394 forumUri: record.forum.forum.uri, 395 }); 396 return null; 397 } 398 399 return { 400 cid: event.commit.cid, 401 forumId, 402 forumUri: record.forum.forum.uri, 403 role: null, // TODO: Extract role name from roleUri or lexicon 404 roleUri: record.role?.role.uri ?? null, 405 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, 406 indexedAt: new Date(), 407 }; 408 }, 409 }; 410 411 private modActionConfig: CollectionConfig<ModAction.Record> = { 412 name: "ModAction", 413 table: modActions, 414 deleteStrategy: "hard", 415 toInsertValues: async (event, record, tx) => { 416 // ModActions are owned by the Forum DID, so event.did IS the forum DID 417 const forumId = await this.getForumIdByDid(event.did, tx); 418 419 if (!forumId) { 420 this.logger.warn("ModAction: Forum not found for DID", { 421 operation: "ModAction CREATE", 422 did: event.did, 423 }); 424 return null; 425 } 426 427 // Ensure moderator exists 428 await this.ensureUser(record.createdBy, tx); 429 430 // Determine subject type (post or user) 431 let subjectPostUri: string | null = null; 432 let subjectDid: string | null = null; 433 434 if (record.subject.post) { 435 subjectPostUri = record.subject.post.uri; 436 } 437 if (record.subject.did) { 438 subjectDid = record.subject.did; 439 } 440 441 return { 442 did: event.did, 443 rkey: event.commit.rkey, 444 cid: event.commit.cid, 445 forumId, 446 action: record.action, 447 subjectPostUri, 448 subjectDid, 449 reason: record.reason ?? null, 450 createdBy: record.createdBy, 451 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 452 createdAt: new Date(record.createdAt), 453 indexedAt: new Date(), 454 }; 455 }, 456 toUpdateValues: async (event, record, tx) => { 457 // ModActions are owned by the Forum DID, so event.did IS the forum DID 458 const forumId = await this.getForumIdByDid(event.did, tx); 459 460 if (!forumId) { 461 this.logger.warn("ModAction: Forum not found for DID", { 462 operation: "ModAction UPDATE", 463 did: event.did, 464 }); 465 return null; 466 } 467 468 // Determine subject type (post or user) 469 let subjectPostUri: string | null = null; 470 let subjectDid: string | null = null; 471 472 if (record.subject.post) { 473 subjectPostUri = record.subject.post.uri; 474 } 475 if (record.subject.did) { 476 subjectDid = record.subject.did; 477 } 478 479 return { 480 cid: event.commit.cid, 481 forumId, 482 action: record.action, 483 subjectPostUri, 484 subjectDid, 485 reason: record.reason ?? null, 486 createdBy: record.createdBy, 487 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 488 indexedAt: new Date(), 489 }; 490 }, 491 }; 492 493 // ── Generic Handler Methods ───────────────────────────── 494 495 /** 496 * Generic create handler. Wraps the insert in a transaction, 497 * optionally ensures the user exists, and delegates to the 498 * config's toInsertValues callback for collection-specific logic. 499 */ 500 private async genericCreate<TRecord>( 501 config: CollectionConfig<TRecord>, 502 event: any 503 ): Promise<boolean> { 504 try { 505 const record = event.commit.record as unknown as TRecord; 506 let skipped = false; 507 508 await this.db.transaction(async (tx) => { 509 if (config.ensureUserOnCreate) { 510 await this.ensureUser(event.did, tx); 511 } 512 513 const values = await config.toInsertValues(event, record, tx); 514 if (!values) { 515 skipped = true; 516 return; // Skip insert (e.g. foreign key not found) 517 } 518 519 if (config.afterUpsert) { 520 const [inserted] = await tx 521 .insert(config.table) 522 .values(values) 523 .returning({ id: config.table.id }); 524 await config.afterUpsert(event, record, inserted.id, tx); 525 } else { 526 await tx.insert(config.table).values(values); 527 } 528 }); 529 530 // Only log success if insert actually happened 531 if (!skipped) { 532 this.logger.info(`${config.name} created`, { 533 did: event.did, 534 rkey: event.commit.rkey, 535 }); 536 } 537 return !skipped; 538 } catch (error) { 539 this.logger.error(`Failed to index ${config.name.toLowerCase()} create`, { 540 did: event.did, 541 rkey: event.commit.rkey, 542 error: error instanceof Error ? error.message : String(error), 543 }); 544 throw error; 545 } 546 } 547 548 /** 549 * Generic update handler. Wraps the update in a transaction 550 * and delegates to the config's toUpdateValues callback for 551 * collection-specific logic. 552 */ 553 private async genericUpdate<TRecord>( 554 config: CollectionConfig<TRecord>, 555 event: any 556 ) { 557 try { 558 const record = event.commit.record as unknown as TRecord; 559 let skipped = false; 560 561 await this.db.transaction(async (tx) => { 562 const values = await config.toUpdateValues(event, record, tx); 563 if (!values) { 564 skipped = true; 565 return; // Skip update (e.g. foreign key not found) 566 } 567 568 if (config.afterUpsert) { 569 const [updated] = await tx 570 .update(config.table) 571 .set(values) 572 .where( 573 and( 574 eq(config.table.did, event.did), 575 eq(config.table.rkey, event.commit.rkey) 576 ) 577 ) 578 .returning({ id: config.table.id }); 579 if (!updated) return; // Out-of-order UPDATE before CREATE: no row to update yet 580 await config.afterUpsert(event, record, updated.id, tx); 581 } else { 582 await tx 583 .update(config.table) 584 .set(values) 585 .where( 586 and( 587 eq(config.table.did, event.did), 588 eq(config.table.rkey, event.commit.rkey) 589 ) 590 ); 591 } 592 }); 593 594 // Only log success if update actually happened 595 if (!skipped) { 596 this.logger.info(`${config.name} updated`, { 597 did: event.did, 598 rkey: event.commit.rkey, 599 }); 600 } 601 } catch (error) { 602 this.logger.error(`Failed to update ${config.name.toLowerCase()}`, { 603 did: event.did, 604 rkey: event.commit.rkey, 605 error: error instanceof Error ? error.message : String(error), 606 }); 607 throw error; 608 } 609 } 610 611 /** 612 * Generic delete handler. Hard-deletes a record (DELETE FROM). 613 * Posts use handlePostDelete instead (always tombstone). 614 */ 615 private async genericDelete(config: CollectionConfig<any>, event: any) { 616 try { 617 await this.db 618 .delete(config.table) 619 .where( 620 and( 621 eq(config.table.did, event.did), 622 eq(config.table.rkey, event.commit.rkey) 623 ) 624 ); 625 626 this.logger.info(`${config.name} deleted`, { 627 did: event.did, 628 rkey: event.commit.rkey, 629 }); 630 } catch (error) { 631 this.logger.error(`Failed to delete ${config.name.toLowerCase()}`, { 632 did: event.did, 633 rkey: event.commit.rkey, 634 error: error instanceof Error ? error.message : String(error), 635 }); 636 throw error; 637 } 638 } 639 640 // ── Post Handlers ─────────────────────────────────────── 641 642 async handlePostCreate(event: CommitCreateEvent<"space.atbb.post">) { 643 const banned = await this.banEnforcer.isBanned(event.did); 644 if (banned) { 645 this.logger.info("Skipping post from banned user", { 646 did: event.did, 647 rkey: event.commit.rkey, 648 }); 649 return; 650 } 651 await this.genericCreate(this.postConfig, event); 652 } 653 654 async handlePostUpdate(event: CommitUpdateEvent<"space.atbb.post">) { 655 await this.genericUpdate(this.postConfig, event); 656 } 657 658 /** 659 * Handles a user-initiated post delete from the PDS. 660 * Always tombstones: replaces personal content with a placeholder and marks 661 * deletedByUser=true. The row is kept so threads referencing this post as 662 * their root or parent remain intact. Personal content is gone; structure is preserved. 663 */ 664 async handlePostDelete(event: CommitDeleteEvent<"space.atbb.post">) { 665 const { did, commit: { rkey } } = event; 666 try { 667 await this.db 668 .update(posts) 669 .set({ text: "[user deleted this post]", deletedByUser: true }) 670 .where(and(eq(posts.did, did), eq(posts.rkey, rkey))); 671 this.logger.info("Post tombstoned: content replaced, structure preserved", { did, rkey }); 672 } catch (error) { 673 this.logger.error("Failed to tombstone post", { 674 did, 675 rkey, 676 error: error instanceof Error ? error.message : String(error), 677 }); 678 throw error; 679 } 680 } 681 682 // ── Forum Handlers ────────────────────────────────────── 683 684 async handleForumCreate(event: CommitCreateEvent<"space.atbb.forum.forum">) { 685 await this.genericCreate(this.forumConfig, event); 686 } 687 688 async handleForumUpdate(event: CommitUpdateEvent<"space.atbb.forum.forum">) { 689 await this.genericUpdate(this.forumConfig, event); 690 } 691 692 async handleForumDelete(event: CommitDeleteEvent<"space.atbb.forum.forum">) { 693 await this.genericDelete(this.forumConfig, event); 694 } 695 696 // ── Category Handlers ─────────────────────────────────── 697 698 async handleCategoryCreate( 699 event: CommitCreateEvent<"space.atbb.forum.category"> 700 ) { 701 await this.genericCreate(this.categoryConfig, event); 702 } 703 704 async handleCategoryUpdate( 705 event: CommitUpdateEvent<"space.atbb.forum.category"> 706 ) { 707 await this.genericUpdate(this.categoryConfig, event); 708 } 709 710 async handleCategoryDelete( 711 event: CommitDeleteEvent<"space.atbb.forum.category"> 712 ) { 713 await this.genericDelete(this.categoryConfig, event); 714 } 715 716 // ── Board Handlers ────────────────────────────────────── 717 718 async handleBoardCreate(event: CommitCreateEvent<"space.atbb.forum.board">) { 719 await this.genericCreate(this.boardConfig, event); 720 } 721 722 async handleBoardUpdate(event: CommitUpdateEvent<"space.atbb.forum.board">) { 723 await this.genericUpdate(this.boardConfig, event); 724 } 725 726 async handleBoardDelete(event: CommitDeleteEvent<"space.atbb.forum.board">) { 727 await this.genericDelete(this.boardConfig, event); 728 } 729 730 // ── Role Handlers ─────────────────────────────────────── 731 732 async handleRoleCreate(event: CommitCreateEvent<"space.atbb.forum.role">) { 733 await this.genericCreate(this.roleConfig, event); 734 } 735 736 async handleRoleUpdate(event: CommitUpdateEvent<"space.atbb.forum.role">) { 737 await this.genericUpdate(this.roleConfig, event); 738 } 739 740 async handleRoleDelete(event: CommitDeleteEvent<"space.atbb.forum.role">) { 741 await this.genericDelete(this.roleConfig, event); 742 } 743 744 // ── Membership Handlers ───────────────────────────────── 745 746 async handleMembershipCreate( 747 event: CommitCreateEvent<"space.atbb.membership"> 748 ) { 749 await this.genericCreate(this.membershipConfig, event); 750 } 751 752 async handleMembershipUpdate( 753 event: CommitUpdateEvent<"space.atbb.membership"> 754 ) { 755 await this.genericUpdate(this.membershipConfig, event); 756 } 757 758 async handleMembershipDelete( 759 event: CommitDeleteEvent<"space.atbb.membership"> 760 ) { 761 await this.genericDelete(this.membershipConfig, event); 762 } 763 764 // ── ModAction Handlers ────────────────────────────────── 765 766 async handleModActionCreate( 767 event: CommitCreateEvent<"space.atbb.modAction"> 768 ) { 769 const record = event.commit.record as unknown as ModAction.Record; 770 const isBan = 771 record.action === "space.atbb.modAction.ban" && record.subject.did; 772 const isUnban = 773 record.action === "space.atbb.modAction.unban" && record.subject.did; 774 775 try { 776 if (isBan) { 777 // Custom atomic path: insert ban record + applyBan in one transaction 778 let skipped = false; 779 await this.db.transaction(async (tx) => { 780 const forumId = await this.getForumIdByDid(event.did, tx); 781 if (!forumId) { 782 this.logger.warn("ModAction (ban): Forum not found for DID", { 783 operation: "ModAction CREATE", 784 did: event.did, 785 }); 786 skipped = true; 787 return; 788 } 789 await this.ensureUser(record.createdBy, tx); 790 await tx.insert(modActions).values({ 791 did: event.did, 792 rkey: event.commit.rkey, 793 cid: event.commit.cid, 794 forumId, 795 action: record.action, 796 subjectPostUri: null, 797 subjectDid: record.subject.did ?? null, 798 reason: record.reason ?? null, 799 createdBy: record.createdBy, 800 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 801 createdAt: new Date(record.createdAt), 802 indexedAt: new Date(), 803 }); 804 await this.banEnforcer.applyBan(record.subject.did!, tx); 805 }); 806 if (!skipped) { 807 this.logger.info("ModAction (ban) created", { 808 did: event.did, 809 rkey: event.commit.rkey, 810 }); 811 } 812 } else if (isUnban) { 813 // Custom atomic path: insert unban record + liftBan in one transaction 814 let skipped = false; 815 await this.db.transaction(async (tx) => { 816 const forumId = await this.getForumIdByDid(event.did, tx); 817 if (!forumId) { 818 this.logger.warn("ModAction (unban): Forum not found for DID", { 819 operation: "ModAction CREATE", 820 did: event.did, 821 }); 822 skipped = true; 823 return; 824 } 825 await this.ensureUser(record.createdBy, tx); 826 await tx.insert(modActions).values({ 827 did: event.did, 828 rkey: event.commit.rkey, 829 cid: event.commit.cid, 830 forumId, 831 action: record.action, 832 subjectPostUri: null, 833 subjectDid: record.subject.did ?? null, 834 reason: record.reason ?? null, 835 createdBy: record.createdBy, 836 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 837 createdAt: new Date(record.createdAt), 838 indexedAt: new Date(), 839 }); 840 await this.banEnforcer.liftBan(record.subject.did!, tx); 841 }); 842 if (!skipped) { 843 this.logger.info("ModAction (unban) created", { 844 did: event.did, 845 rkey: event.commit.rkey, 846 }); 847 } 848 } else { 849 // Generic path for all other mod actions (mute, pin, lock, delete, etc.) 850 await this.genericCreate(this.modActionConfig, event); 851 852 // Ban/unban without subject.did — shouldn't happen but log if it does 853 if ( 854 record.action === "space.atbb.modAction.ban" || 855 record.action === "space.atbb.modAction.unban" 856 ) { 857 this.logger.warn("ModAction: ban/unban action missing subject.did, skipping enforcement", { 858 did: event.did, 859 rkey: event.commit.rkey, 860 action: record.action, 861 }); 862 } 863 } 864 } catch (error) { 865 this.logger.error("Failed to index ModAction create", { 866 did: event.did, 867 rkey: event.commit.rkey, 868 error: error instanceof Error ? error.message : String(error), 869 }); 870 throw error; 871 } 872 } 873 874 async handleModActionUpdate( 875 event: CommitUpdateEvent<"space.atbb.modAction"> 876 ) { 877 await this.genericUpdate(this.modActionConfig, event); 878 } 879 880 async handleModActionDelete( 881 event: CommitDeleteEvent<"space.atbb.modAction"> 882 ) { 883 try { 884 await this.db.transaction(async (tx) => { 885 // 1. Read before delete to capture action type and subject 886 const [existing] = await tx 887 .select({ 888 action: modActions.action, 889 subjectDid: modActions.subjectDid, 890 }) 891 .from(modActions) 892 .where( 893 and( 894 eq(modActions.did, event.did), 895 eq(modActions.rkey, event.commit.rkey) 896 ) 897 ) 898 .limit(1); 899 900 // 2. Hard delete the record 901 await tx 902 .delete(modActions) 903 .where( 904 and( 905 eq(modActions.did, event.did), 906 eq(modActions.rkey, event.commit.rkey) 907 ) 908 ); 909 910 // 3. Restore posts if the deleted record was a ban 911 if ( 912 existing?.action === "space.atbb.modAction.ban" && 913 existing?.subjectDid 914 ) { 915 await this.banEnforcer.liftBan(existing.subjectDid, tx); 916 } 917 }); 918 919 this.logger.info("ModAction deleted", { 920 did: event.did, 921 rkey: event.commit.rkey, 922 }); 923 } catch (error) { 924 this.logger.error("Failed to delete modAction", { 925 did: event.did, 926 rkey: event.commit.rkey, 927 error: error instanceof Error ? error.message : String(error), 928 }); 929 throw error; 930 } 931 } 932 933 // ── Reaction Handlers (Stub) ──────────────────────────── 934 935 async handleReactionCreate( 936 event: CommitCreateEvent<"space.atbb.reaction"> 937 ) { 938 this.logger.warn("Reaction created (not implemented)", { did: event.did, rkey: event.commit.rkey }); 939 // TODO: Add reactions table to schema 940 } 941 942 async handleReactionUpdate( 943 event: CommitUpdateEvent<"space.atbb.reaction"> 944 ) { 945 this.logger.warn("Reaction updated (not implemented)", { did: event.did, rkey: event.commit.rkey }); 946 // TODO: Add reactions table to schema 947 } 948 949 async handleReactionDelete( 950 event: CommitDeleteEvent<"space.atbb.reaction"> 951 ) { 952 this.logger.warn("Reaction deleted (not implemented)", { did: event.did, rkey: event.commit.rkey }); 953 // TODO: Add reactions table to schema 954 } 955 956 // ── Helper Methods ────────────────────────────────────── 957 958 /** 959 * Ensure a user exists in the database. Creates if not exists. 960 * @param dbOrTx - Database instance or transaction 961 */ 962 private async ensureUser(did: string, dbOrTx: DbOrTransaction = this.db) { 963 try { 964 const existing = await dbOrTx.select().from(users).where(eq(users.did, did)).limit(1); 965 966 if (existing.length === 0) { 967 await dbOrTx.insert(users).values({ 968 did, 969 handle: null, // Will be updated by identity events 970 indexedAt: new Date(), 971 }); 972 this.logger.info("Created user", { did }); 973 } 974 } catch (error) { 975 this.logger.error("Failed to ensure user exists", { 976 did, 977 error: error instanceof Error ? error.message : String(error), 978 }); 979 throw error; 980 } 981 } 982 983 /** 984 * Look up a forum ID by its AT URI 985 * @param dbOrTx - Database instance or transaction 986 */ 987 private async getForumIdByUri( 988 forumUri: string, 989 dbOrTx: DbOrTransaction = this.db 990 ): Promise<bigint | null> { 991 const parsed = parseAtUri(forumUri); 992 if (!parsed) return null; 993 994 try { 995 const result = await dbOrTx 996 .select({ id: forums.id }) 997 .from(forums) 998 .where(and(eq(forums.did, parsed.did), eq(forums.rkey, parsed.rkey))) 999 .limit(1); 1000 1001 return result.length > 0 ? result[0].id : null; 1002 } catch (error) { 1003 this.logger.error("Database error in getForumIdByUri", { 1004 operation: "getForumIdByUri", 1005 forumUri, 1006 error: error instanceof Error ? error.message : String(error), 1007 }); 1008 throw error; 1009 } 1010 } 1011 1012 /** 1013 * Look up a forum ID by the forum's DID 1014 * Used for records owned by the forum (categories, modActions) 1015 * @param dbOrTx - Database instance or transaction 1016 */ 1017 private async getForumIdByDid( 1018 forumDid: string, 1019 dbOrTx: DbOrTransaction = this.db 1020 ): Promise<bigint | null> { 1021 try { 1022 const result = await dbOrTx 1023 .select({ id: forums.id }) 1024 .from(forums) 1025 .where(eq(forums.did, forumDid)) 1026 .limit(1); 1027 1028 return result.length > 0 ? result[0].id : null; 1029 } catch (error) { 1030 this.logger.error("Database error in getForumIdByDid", { 1031 operation: "getForumIdByDid", 1032 forumDid, 1033 error: error instanceof Error ? error.message : String(error), 1034 }); 1035 throw error; 1036 } 1037 } 1038 1039 /** 1040 * Look up a post ID by its AT URI 1041 * @param dbOrTx - Database instance or transaction 1042 */ 1043 private async getPostIdByUri( 1044 postUri: string, 1045 dbOrTx: DbOrTransaction = this.db 1046 ): Promise<bigint | null> { 1047 const parsed = parseAtUri(postUri); 1048 if (!parsed) return null; 1049 1050 try { 1051 const result = await dbOrTx 1052 .select({ id: posts.id }) 1053 .from(posts) 1054 .where(and(eq(posts.did, parsed.did), eq(posts.rkey, parsed.rkey))) 1055 .limit(1); 1056 1057 return result.length > 0 ? result[0].id : null; 1058 } catch (error) { 1059 this.logger.error("Database error in getPostIdByUri", { 1060 operation: "getPostIdByUri", 1061 postUri, 1062 error: error instanceof Error ? error.message : String(error), 1063 }); 1064 throw error; 1065 } 1066 } 1067 1068 /** 1069 * Look up board ID by AT URI (at://did/collection/rkey) 1070 * @param uri - AT URI of the board 1071 * @param dbOrTx - Database instance or transaction 1072 */ 1073 private async getBoardIdByUri( 1074 uri: string, 1075 dbOrTx: DbOrTransaction = this.db 1076 ): Promise<bigint | null> { 1077 const parsed = parseAtUri(uri); 1078 if (!parsed) return null; 1079 1080 try { 1081 const [result] = await dbOrTx 1082 .select({ id: boards.id }) 1083 .from(boards) 1084 .where(and(eq(boards.did, parsed.did), eq(boards.rkey, parsed.rkey))) 1085 .limit(1); 1086 return result?.id ?? null; 1087 } catch (error) { 1088 this.logger.error("Database error in getBoardIdByUri", { 1089 operation: "getBoardIdByUri", 1090 uri, 1091 did: parsed.did, 1092 rkey: parsed.rkey, 1093 error: error instanceof Error ? error.message : String(error), 1094 }); 1095 throw error; 1096 } 1097 } 1098 1099 /** 1100 * Look up category ID by AT URI (at://did/collection/rkey) 1101 * @param uri - AT URI of the category 1102 * @param dbOrTx - Database instance or transaction 1103 */ 1104 private async getCategoryIdByUri( 1105 uri: string, 1106 dbOrTx: DbOrTransaction = this.db 1107 ): Promise<bigint | null> { 1108 const parsed = parseAtUri(uri); 1109 if (!parsed) return null; 1110 1111 try { 1112 const [result] = await dbOrTx 1113 .select({ id: categories.id }) 1114 .from(categories) 1115 .where(and(eq(categories.did, parsed.did), eq(categories.rkey, parsed.rkey))) 1116 .limit(1); 1117 return result?.id ?? null; 1118 } catch (error) { 1119 this.logger.error("Database error in getCategoryIdByUri", { 1120 operation: "getCategoryIdByUri", 1121 uri, 1122 did: parsed.did, 1123 rkey: parsed.rkey, 1124 error: error instanceof Error ? error.message : String(error), 1125 }); 1126 throw error; 1127 } 1128 } 1129}