WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import type {
2 CommitCreateEvent,
3 CommitDeleteEvent,
4 CommitUpdateEvent,
5} from "@skyware/jetstream";
6import type { Database, DbOrTransaction } from "@atbb/db";
7import type { Logger } from "@atbb/logger";
8import {
9 posts,
10 forums,
11 categories,
12 boards,
13 users,
14 memberships,
15 modActions,
16 roles,
17 rolePermissions,
18} from "@atbb/db";
19import { eq, and } from "drizzle-orm";
20import { parseAtUri } from "./at-uri.js";
21import { BanEnforcer } from "./ban-enforcer.js";
22import {
23 SpaceAtbbPost as Post,
24 SpaceAtbbForumForum as Forum,
25 SpaceAtbbForumCategory as Category,
26 SpaceAtbbForumBoard as Board,
27 SpaceAtbbMembership as Membership,
28 SpaceAtbbModAction as ModAction,
29 SpaceAtbbForumRole as Role,
30} from "@atbb/lexicon";
31
32// ── Collection Config Types ─────────────────────────────
33
34/**
35 * Configuration for a data-driven collection handler.
36 * Encodes the per-collection logic that differs across the 5 indexed types,
37 * while the generic handler methods supply the shared try/catch/log/throw scaffolding.
38 */
39interface CollectionConfig<TRecord> {
40 /** Human-readable name for logging (e.g. "Post", "Forum") */
41 name: string;
42 /** Drizzle table reference */
43 table: any;
44 /** "hard" = DELETE FROM (all non-post collections) */
45 deleteStrategy: "hard";
46 /** Call ensureUser(event.did) before insert? (user-owned records) */
47 ensureUserOnCreate?: boolean;
48 /**
49 * Transform event+record into DB insert values.
50 * Return null to skip the insert (e.g. when a required foreign key is missing).
51 */
52 toInsertValues: (
53 event: any,
54 record: TRecord,
55 tx: DbOrTransaction
56 ) => Promise<Record<string, any> | null>;
57 /**
58 * Transform event+record into DB update set values.
59 * Runs inside a transaction. Return null to skip the update.
60 */
61 toUpdateValues: (
62 event: any,
63 record: TRecord,
64 tx: DbOrTransaction
65 ) => Promise<Record<string, any> | null>;
66 /**
67 * Optional hook called after a row is inserted or updated, within the same
68 * transaction. Receives the row's numeric id (bigint) so callers can write
69 * to child tables (e.g. role_permissions).
70 */
71 afterUpsert?: (
72 event: any,
73 record: TRecord,
74 rowId: bigint,
75 tx: DbOrTransaction
76 ) => Promise<void>;
77}
78
79
80/**
81 * Indexer class for processing AT Proto firehose events
82 * Converts events into database records for the atBB AppView
83 */
84export class Indexer {
85 private banEnforcer: BanEnforcer;
86
87 constructor(private db: Database, private logger: Logger) {
88 this.banEnforcer = new BanEnforcer(db, logger);
89 }
90
91 // ── Collection Configs ──────────────────────────────────
92
93 private postConfig: CollectionConfig<Post.Record> = {
94 name: "Post",
95 table: posts,
96 deleteStrategy: "hard",
97 ensureUserOnCreate: true,
98 toInsertValues: async (event, record, tx) => {
99 // Look up parent/root for replies
100 let rootId: bigint | null = null;
101 let parentId: bigint | null = null;
102
103 if (Post.isReplyRef(record.reply)) {
104 rootId = await this.getPostIdByUri(record.reply.root.uri, tx);
105 parentId = await this.getPostIdByUri(record.reply.parent.uri, tx);
106 } else if (record.reply) {
107 // reply ref present but $type omitted — rootPostId/parentPostId will be null,
108 // making this reply unreachable in thread navigation (data corruption).
109 this.logger.error("Post reply ref missing $type — rootPostId/parentPostId not resolved", {
110 operation: "Post CREATE",
111 postDid: event.did,
112 postRkey: event.commit.rkey,
113 errorId: "POST_REPLY_REF_MISSING_TYPE",
114 });
115 }
116
117 // Look up board ID if board reference exists
118 let boardId: bigint | null = null;
119 if (record.board?.board.uri) {
120 boardId = await this.getBoardIdByUri(record.board.board.uri, tx);
121 if (!boardId) {
122 this.logger.error("Failed to index post: board not found", {
123 operation: "Post CREATE",
124 postDid: event.did,
125 postRkey: event.commit.rkey,
126 boardUri: record.board.board.uri,
127 errorId: "POST_BOARD_MISSING",
128 });
129 throw new Error(`Board not found: ${record.board.board.uri}`);
130 }
131 }
132
133 return {
134 did: event.did,
135 rkey: event.commit.rkey,
136 cid: event.commit.cid,
137 title: record.reply ? null : (record.title ?? null),
138 text: record.text,
139 forumUri: record.forum?.forum.uri ?? null,
140 boardUri: record.board?.board.uri ?? null,
141 boardId,
142 rootPostId: rootId,
143 rootUri: record.reply?.root.uri ?? null,
144 parentPostId: parentId,
145 parentUri: record.reply?.parent.uri ?? null,
146 createdAt: new Date(record.createdAt),
147 indexedAt: new Date(),
148 };
149 },
150 toUpdateValues: async (event, record, tx) => {
151 // Look up board ID if board reference exists
152 let boardId: bigint | null = null;
153 if (record.board?.board.uri) {
154 boardId = await this.getBoardIdByUri(record.board.board.uri, tx);
155 if (!boardId) {
156 this.logger.error("Failed to index post: board not found", {
157 operation: "Post UPDATE",
158 postDid: event.did,
159 postRkey: event.commit.rkey,
160 boardUri: record.board.board.uri,
161 errorId: "POST_BOARD_MISSING",
162 });
163 throw new Error(`Board not found: ${record.board.board.uri}`);
164 }
165 }
166
167 return {
168 cid: event.commit.cid,
169 title: record.reply ? null : (record.title ?? null),
170 text: record.text,
171 forumUri: record.forum?.forum.uri ?? null,
172 boardUri: record.board?.board.uri ?? null,
173 boardId,
174 indexedAt: new Date(),
175 };
176 },
177 };
178
179 private forumConfig: CollectionConfig<Forum.Record> = {
180 name: "Forum",
181 table: forums,
182 deleteStrategy: "hard",
183 ensureUserOnCreate: true,
184 toInsertValues: async (event, record) => ({
185 did: event.did,
186 rkey: event.commit.rkey,
187 cid: event.commit.cid,
188 name: record.name,
189 description: record.description ?? null,
190 indexedAt: new Date(),
191 }),
192 toUpdateValues: async (event, record) => ({
193 cid: event.commit.cid,
194 name: record.name,
195 description: record.description ?? null,
196 indexedAt: new Date(),
197 }),
198 };
199
200 private categoryConfig: CollectionConfig<Category.Record> = {
201 name: "Category",
202 table: categories,
203 deleteStrategy: "hard",
204 toInsertValues: async (event, record, tx) => {
205 // Categories are owned by the Forum DID, so event.did IS the forum DID
206 const forumId = await this.getForumIdByDid(event.did, tx);
207
208 if (!forumId) {
209 this.logger.warn("Category: Forum not found for DID", {
210 operation: "Category CREATE",
211 did: event.did,
212 });
213 return null;
214 }
215
216 return {
217 did: event.did,
218 rkey: event.commit.rkey,
219 cid: event.commit.cid,
220 forumId,
221 name: record.name,
222 description: record.description ?? null,
223 slug: record.slug ?? null,
224 sortOrder: record.sortOrder ?? 0,
225 createdAt: new Date(record.createdAt),
226 indexedAt: new Date(),
227 };
228 },
229 toUpdateValues: async (event, record, tx) => {
230 // Categories are owned by the Forum DID, so event.did IS the forum DID
231 const forumId = await this.getForumIdByDid(event.did, tx);
232
233 if (!forumId) {
234 this.logger.warn("Category: Forum not found for DID", {
235 operation: "Category UPDATE",
236 did: event.did,
237 });
238 return null;
239 }
240
241 return {
242 cid: event.commit.cid,
243 forumId,
244 name: record.name,
245 description: record.description ?? null,
246 slug: record.slug ?? null,
247 sortOrder: record.sortOrder ?? 0,
248 indexedAt: new Date(),
249 };
250 },
251 };
252
253 private boardConfig: CollectionConfig<Board.Record> = {
254 name: "Board",
255 table: boards,
256 deleteStrategy: "hard",
257 toInsertValues: async (event, record, tx) => {
258 // Boards are owned by Forum DID
259 const categoryId = await this.getCategoryIdByUri(
260 record.category.category.uri,
261 tx
262 );
263
264 if (!categoryId) {
265 this.logger.error("Failed to index board: category not found", {
266 operation: "Board CREATE",
267 boardDid: event.did,
268 boardRkey: event.commit.rkey,
269 categoryUri: record.category.category.uri,
270 errorId: "BOARD_CATEGORY_MISSING",
271 });
272 throw new Error(`Category not found: ${record.category.category.uri}`);
273 }
274
275 return {
276 did: event.did,
277 rkey: event.commit.rkey,
278 cid: event.commit.cid,
279 name: record.name,
280 description: record.description ?? null,
281 slug: record.slug ?? null,
282 sortOrder: record.sortOrder ?? null,
283 categoryId,
284 categoryUri: record.category.category.uri,
285 createdAt: new Date(record.createdAt),
286 indexedAt: new Date(),
287 };
288 },
289 toUpdateValues: async (event, record, tx) => {
290 const categoryId = await this.getCategoryIdByUri(
291 record.category.category.uri,
292 tx
293 );
294
295 if (!categoryId) {
296 this.logger.error("Failed to index board: category not found", {
297 operation: "Board UPDATE",
298 boardDid: event.did,
299 boardRkey: event.commit.rkey,
300 categoryUri: record.category.category.uri,
301 errorId: "BOARD_CATEGORY_MISSING",
302 });
303 throw new Error(`Category not found: ${record.category.category.uri}`);
304 }
305
306 return {
307 cid: event.commit.cid,
308 name: record.name,
309 description: record.description ?? null,
310 slug: record.slug ?? null,
311 sortOrder: record.sortOrder ?? null,
312 categoryId,
313 categoryUri: record.category.category.uri,
314 indexedAt: new Date(),
315 };
316 },
317 };
318
319 private roleConfig: CollectionConfig<Role.Record> = {
320 name: "Role",
321 table: roles,
322 deleteStrategy: "hard",
323 toInsertValues: async (event, record) => ({
324 did: event.did,
325 rkey: event.commit.rkey,
326 cid: event.commit.cid,
327 name: record.name,
328 description: record.description ?? null,
329 priority: record.priority,
330 createdAt: new Date(record.createdAt),
331 indexedAt: new Date(),
332 }),
333 toUpdateValues: async (event, record) => ({
334 cid: event.commit.cid,
335 name: record.name,
336 description: record.description ?? null,
337 priority: record.priority,
338 indexedAt: new Date(),
339 }),
340 afterUpsert: async (event, record, roleId, tx) => {
341 // Replace all permissions for this role atomically
342 await tx
343 .delete(rolePermissions)
344 .where(eq(rolePermissions.roleId, roleId));
345
346 if (record.permissions && record.permissions.length > 0) {
347 await tx.insert(rolePermissions).values(
348 record.permissions.map((permission: string) => ({
349 roleId,
350 permission,
351 }))
352 );
353 }
354 },
355 };
356
357 private membershipConfig: CollectionConfig<Membership.Record> = {
358 name: "Membership",
359 table: memberships,
360 deleteStrategy: "hard",
361 ensureUserOnCreate: true,
362 toInsertValues: async (event, record, tx) => {
363 // Look up forum by URI (inside transaction)
364 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx);
365
366 if (!forumId) {
367 this.logger.warn("Membership: Forum not found", {
368 operation: "Membership CREATE",
369 forumUri: record.forum.forum.uri,
370 });
371 return null;
372 }
373
374 return {
375 did: event.did,
376 rkey: event.commit.rkey,
377 cid: event.commit.cid,
378 forumId,
379 forumUri: record.forum.forum.uri,
380 role: null, // TODO: Extract role name from roleUri or lexicon
381 roleUri: record.role?.role.uri ?? null,
382 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null,
383 createdAt: new Date(record.createdAt),
384 indexedAt: new Date(),
385 };
386 },
387 toUpdateValues: async (event, record, tx) => {
388 // Look up forum by URI (may have changed)
389 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx);
390
391 if (!forumId) {
392 this.logger.warn("Membership: Forum not found", {
393 operation: "Membership UPDATE",
394 forumUri: record.forum.forum.uri,
395 });
396 return null;
397 }
398
399 return {
400 cid: event.commit.cid,
401 forumId,
402 forumUri: record.forum.forum.uri,
403 role: null, // TODO: Extract role name from roleUri or lexicon
404 roleUri: record.role?.role.uri ?? null,
405 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null,
406 indexedAt: new Date(),
407 };
408 },
409 };
410
411 private modActionConfig: CollectionConfig<ModAction.Record> = {
412 name: "ModAction",
413 table: modActions,
414 deleteStrategy: "hard",
415 toInsertValues: async (event, record, tx) => {
416 // ModActions are owned by the Forum DID, so event.did IS the forum DID
417 const forumId = await this.getForumIdByDid(event.did, tx);
418
419 if (!forumId) {
420 this.logger.warn("ModAction: Forum not found for DID", {
421 operation: "ModAction CREATE",
422 did: event.did,
423 });
424 return null;
425 }
426
427 // Ensure moderator exists
428 await this.ensureUser(record.createdBy, tx);
429
430 // Determine subject type (post or user)
431 let subjectPostUri: string | null = null;
432 let subjectDid: string | null = null;
433
434 if (record.subject.post) {
435 subjectPostUri = record.subject.post.uri;
436 }
437 if (record.subject.did) {
438 subjectDid = record.subject.did;
439 }
440
441 return {
442 did: event.did,
443 rkey: event.commit.rkey,
444 cid: event.commit.cid,
445 forumId,
446 action: record.action,
447 subjectPostUri,
448 subjectDid,
449 reason: record.reason ?? null,
450 createdBy: record.createdBy,
451 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
452 createdAt: new Date(record.createdAt),
453 indexedAt: new Date(),
454 };
455 },
456 toUpdateValues: async (event, record, tx) => {
457 // ModActions are owned by the Forum DID, so event.did IS the forum DID
458 const forumId = await this.getForumIdByDid(event.did, tx);
459
460 if (!forumId) {
461 this.logger.warn("ModAction: Forum not found for DID", {
462 operation: "ModAction UPDATE",
463 did: event.did,
464 });
465 return null;
466 }
467
468 // Determine subject type (post or user)
469 let subjectPostUri: string | null = null;
470 let subjectDid: string | null = null;
471
472 if (record.subject.post) {
473 subjectPostUri = record.subject.post.uri;
474 }
475 if (record.subject.did) {
476 subjectDid = record.subject.did;
477 }
478
479 return {
480 cid: event.commit.cid,
481 forumId,
482 action: record.action,
483 subjectPostUri,
484 subjectDid,
485 reason: record.reason ?? null,
486 createdBy: record.createdBy,
487 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
488 indexedAt: new Date(),
489 };
490 },
491 };
492
493 // ── Generic Handler Methods ─────────────────────────────
494
495 /**
496 * Generic create handler. Wraps the insert in a transaction,
497 * optionally ensures the user exists, and delegates to the
498 * config's toInsertValues callback for collection-specific logic.
499 */
500 private async genericCreate<TRecord>(
501 config: CollectionConfig<TRecord>,
502 event: any
503 ): Promise<boolean> {
504 try {
505 const record = event.commit.record as unknown as TRecord;
506 let skipped = false;
507
508 await this.db.transaction(async (tx) => {
509 if (config.ensureUserOnCreate) {
510 await this.ensureUser(event.did, tx);
511 }
512
513 const values = await config.toInsertValues(event, record, tx);
514 if (!values) {
515 skipped = true;
516 return; // Skip insert (e.g. foreign key not found)
517 }
518
519 if (config.afterUpsert) {
520 const [inserted] = await tx
521 .insert(config.table)
522 .values(values)
523 .returning({ id: config.table.id });
524 await config.afterUpsert(event, record, inserted.id, tx);
525 } else {
526 await tx.insert(config.table).values(values);
527 }
528 });
529
530 // Only log success if insert actually happened
531 if (!skipped) {
532 this.logger.info(`${config.name} created`, {
533 did: event.did,
534 rkey: event.commit.rkey,
535 });
536 }
537 return !skipped;
538 } catch (error) {
539 this.logger.error(`Failed to index ${config.name.toLowerCase()} create`, {
540 did: event.did,
541 rkey: event.commit.rkey,
542 error: error instanceof Error ? error.message : String(error),
543 });
544 throw error;
545 }
546 }
547
548 /**
549 * Generic update handler. Wraps the update in a transaction
550 * and delegates to the config's toUpdateValues callback for
551 * collection-specific logic.
552 */
553 private async genericUpdate<TRecord>(
554 config: CollectionConfig<TRecord>,
555 event: any
556 ) {
557 try {
558 const record = event.commit.record as unknown as TRecord;
559 let skipped = false;
560
561 await this.db.transaction(async (tx) => {
562 const values = await config.toUpdateValues(event, record, tx);
563 if (!values) {
564 skipped = true;
565 return; // Skip update (e.g. foreign key not found)
566 }
567
568 if (config.afterUpsert) {
569 const [updated] = await tx
570 .update(config.table)
571 .set(values)
572 .where(
573 and(
574 eq(config.table.did, event.did),
575 eq(config.table.rkey, event.commit.rkey)
576 )
577 )
578 .returning({ id: config.table.id });
579 if (!updated) return; // Out-of-order UPDATE before CREATE: no row to update yet
580 await config.afterUpsert(event, record, updated.id, tx);
581 } else {
582 await tx
583 .update(config.table)
584 .set(values)
585 .where(
586 and(
587 eq(config.table.did, event.did),
588 eq(config.table.rkey, event.commit.rkey)
589 )
590 );
591 }
592 });
593
594 // Only log success if update actually happened
595 if (!skipped) {
596 this.logger.info(`${config.name} updated`, {
597 did: event.did,
598 rkey: event.commit.rkey,
599 });
600 }
601 } catch (error) {
602 this.logger.error(`Failed to update ${config.name.toLowerCase()}`, {
603 did: event.did,
604 rkey: event.commit.rkey,
605 error: error instanceof Error ? error.message : String(error),
606 });
607 throw error;
608 }
609 }
610
611 /**
612 * Generic delete handler. Hard-deletes a record (DELETE FROM).
613 * Posts use handlePostDelete instead (always tombstone).
614 */
615 private async genericDelete(config: CollectionConfig<any>, event: any) {
616 try {
617 await this.db
618 .delete(config.table)
619 .where(
620 and(
621 eq(config.table.did, event.did),
622 eq(config.table.rkey, event.commit.rkey)
623 )
624 );
625
626 this.logger.info(`${config.name} deleted`, {
627 did: event.did,
628 rkey: event.commit.rkey,
629 });
630 } catch (error) {
631 this.logger.error(`Failed to delete ${config.name.toLowerCase()}`, {
632 did: event.did,
633 rkey: event.commit.rkey,
634 error: error instanceof Error ? error.message : String(error),
635 });
636 throw error;
637 }
638 }
639
640 // ── Post Handlers ───────────────────────────────────────
641
642 async handlePostCreate(event: CommitCreateEvent<"space.atbb.post">) {
643 const banned = await this.banEnforcer.isBanned(event.did);
644 if (banned) {
645 this.logger.info("Skipping post from banned user", {
646 did: event.did,
647 rkey: event.commit.rkey,
648 });
649 return;
650 }
651 await this.genericCreate(this.postConfig, event);
652 }
653
654 async handlePostUpdate(event: CommitUpdateEvent<"space.atbb.post">) {
655 await this.genericUpdate(this.postConfig, event);
656 }
657
658 /**
659 * Handles a user-initiated post delete from the PDS.
660 * Always tombstones: replaces personal content with a placeholder and marks
661 * deletedByUser=true. The row is kept so threads referencing this post as
662 * their root or parent remain intact. Personal content is gone; structure is preserved.
663 */
664 async handlePostDelete(event: CommitDeleteEvent<"space.atbb.post">) {
665 const { did, commit: { rkey } } = event;
666 try {
667 await this.db
668 .update(posts)
669 .set({ text: "[user deleted this post]", deletedByUser: true })
670 .where(and(eq(posts.did, did), eq(posts.rkey, rkey)));
671 this.logger.info("Post tombstoned: content replaced, structure preserved", { did, rkey });
672 } catch (error) {
673 this.logger.error("Failed to tombstone post", {
674 did,
675 rkey,
676 error: error instanceof Error ? error.message : String(error),
677 });
678 throw error;
679 }
680 }
681
682 // ── Forum Handlers ──────────────────────────────────────
683
684 async handleForumCreate(event: CommitCreateEvent<"space.atbb.forum.forum">) {
685 await this.genericCreate(this.forumConfig, event);
686 }
687
688 async handleForumUpdate(event: CommitUpdateEvent<"space.atbb.forum.forum">) {
689 await this.genericUpdate(this.forumConfig, event);
690 }
691
692 async handleForumDelete(event: CommitDeleteEvent<"space.atbb.forum.forum">) {
693 await this.genericDelete(this.forumConfig, event);
694 }
695
696 // ── Category Handlers ───────────────────────────────────
697
698 async handleCategoryCreate(
699 event: CommitCreateEvent<"space.atbb.forum.category">
700 ) {
701 await this.genericCreate(this.categoryConfig, event);
702 }
703
704 async handleCategoryUpdate(
705 event: CommitUpdateEvent<"space.atbb.forum.category">
706 ) {
707 await this.genericUpdate(this.categoryConfig, event);
708 }
709
710 async handleCategoryDelete(
711 event: CommitDeleteEvent<"space.atbb.forum.category">
712 ) {
713 await this.genericDelete(this.categoryConfig, event);
714 }
715
716 // ── Board Handlers ──────────────────────────────────────
717
718 async handleBoardCreate(event: CommitCreateEvent<"space.atbb.forum.board">) {
719 await this.genericCreate(this.boardConfig, event);
720 }
721
722 async handleBoardUpdate(event: CommitUpdateEvent<"space.atbb.forum.board">) {
723 await this.genericUpdate(this.boardConfig, event);
724 }
725
726 async handleBoardDelete(event: CommitDeleteEvent<"space.atbb.forum.board">) {
727 await this.genericDelete(this.boardConfig, event);
728 }
729
730 // ── Role Handlers ───────────────────────────────────────
731
732 async handleRoleCreate(event: CommitCreateEvent<"space.atbb.forum.role">) {
733 await this.genericCreate(this.roleConfig, event);
734 }
735
736 async handleRoleUpdate(event: CommitUpdateEvent<"space.atbb.forum.role">) {
737 await this.genericUpdate(this.roleConfig, event);
738 }
739
740 async handleRoleDelete(event: CommitDeleteEvent<"space.atbb.forum.role">) {
741 await this.genericDelete(this.roleConfig, event);
742 }
743
744 // ── Membership Handlers ─────────────────────────────────
745
746 async handleMembershipCreate(
747 event: CommitCreateEvent<"space.atbb.membership">
748 ) {
749 await this.genericCreate(this.membershipConfig, event);
750 }
751
752 async handleMembershipUpdate(
753 event: CommitUpdateEvent<"space.atbb.membership">
754 ) {
755 await this.genericUpdate(this.membershipConfig, event);
756 }
757
758 async handleMembershipDelete(
759 event: CommitDeleteEvent<"space.atbb.membership">
760 ) {
761 await this.genericDelete(this.membershipConfig, event);
762 }
763
764 // ── ModAction Handlers ──────────────────────────────────
765
766 async handleModActionCreate(
767 event: CommitCreateEvent<"space.atbb.modAction">
768 ) {
769 const record = event.commit.record as unknown as ModAction.Record;
770 const isBan =
771 record.action === "space.atbb.modAction.ban" && record.subject.did;
772 const isUnban =
773 record.action === "space.atbb.modAction.unban" && record.subject.did;
774
775 try {
776 if (isBan) {
777 // Custom atomic path: insert ban record + applyBan in one transaction
778 let skipped = false;
779 await this.db.transaction(async (tx) => {
780 const forumId = await this.getForumIdByDid(event.did, tx);
781 if (!forumId) {
782 this.logger.warn("ModAction (ban): Forum not found for DID", {
783 operation: "ModAction CREATE",
784 did: event.did,
785 });
786 skipped = true;
787 return;
788 }
789 await this.ensureUser(record.createdBy, tx);
790 await tx.insert(modActions).values({
791 did: event.did,
792 rkey: event.commit.rkey,
793 cid: event.commit.cid,
794 forumId,
795 action: record.action,
796 subjectPostUri: null,
797 subjectDid: record.subject.did ?? null,
798 reason: record.reason ?? null,
799 createdBy: record.createdBy,
800 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
801 createdAt: new Date(record.createdAt),
802 indexedAt: new Date(),
803 });
804 await this.banEnforcer.applyBan(record.subject.did!, tx);
805 });
806 if (!skipped) {
807 this.logger.info("ModAction (ban) created", {
808 did: event.did,
809 rkey: event.commit.rkey,
810 });
811 }
812 } else if (isUnban) {
813 // Custom atomic path: insert unban record + liftBan in one transaction
814 let skipped = false;
815 await this.db.transaction(async (tx) => {
816 const forumId = await this.getForumIdByDid(event.did, tx);
817 if (!forumId) {
818 this.logger.warn("ModAction (unban): Forum not found for DID", {
819 operation: "ModAction CREATE",
820 did: event.did,
821 });
822 skipped = true;
823 return;
824 }
825 await this.ensureUser(record.createdBy, tx);
826 await tx.insert(modActions).values({
827 did: event.did,
828 rkey: event.commit.rkey,
829 cid: event.commit.cid,
830 forumId,
831 action: record.action,
832 subjectPostUri: null,
833 subjectDid: record.subject.did ?? null,
834 reason: record.reason ?? null,
835 createdBy: record.createdBy,
836 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
837 createdAt: new Date(record.createdAt),
838 indexedAt: new Date(),
839 });
840 await this.banEnforcer.liftBan(record.subject.did!, tx);
841 });
842 if (!skipped) {
843 this.logger.info("ModAction (unban) created", {
844 did: event.did,
845 rkey: event.commit.rkey,
846 });
847 }
848 } else {
849 // Generic path for all other mod actions (mute, pin, lock, delete, etc.)
850 await this.genericCreate(this.modActionConfig, event);
851
852 // Ban/unban without subject.did — shouldn't happen but log if it does
853 if (
854 record.action === "space.atbb.modAction.ban" ||
855 record.action === "space.atbb.modAction.unban"
856 ) {
857 this.logger.warn("ModAction: ban/unban action missing subject.did, skipping enforcement", {
858 did: event.did,
859 rkey: event.commit.rkey,
860 action: record.action,
861 });
862 }
863 }
864 } catch (error) {
865 this.logger.error("Failed to index ModAction create", {
866 did: event.did,
867 rkey: event.commit.rkey,
868 error: error instanceof Error ? error.message : String(error),
869 });
870 throw error;
871 }
872 }
873
874 async handleModActionUpdate(
875 event: CommitUpdateEvent<"space.atbb.modAction">
876 ) {
877 await this.genericUpdate(this.modActionConfig, event);
878 }
879
880 async handleModActionDelete(
881 event: CommitDeleteEvent<"space.atbb.modAction">
882 ) {
883 try {
884 await this.db.transaction(async (tx) => {
885 // 1. Read before delete to capture action type and subject
886 const [existing] = await tx
887 .select({
888 action: modActions.action,
889 subjectDid: modActions.subjectDid,
890 })
891 .from(modActions)
892 .where(
893 and(
894 eq(modActions.did, event.did),
895 eq(modActions.rkey, event.commit.rkey)
896 )
897 )
898 .limit(1);
899
900 // 2. Hard delete the record
901 await tx
902 .delete(modActions)
903 .where(
904 and(
905 eq(modActions.did, event.did),
906 eq(modActions.rkey, event.commit.rkey)
907 )
908 );
909
910 // 3. Restore posts if the deleted record was a ban
911 if (
912 existing?.action === "space.atbb.modAction.ban" &&
913 existing?.subjectDid
914 ) {
915 await this.banEnforcer.liftBan(existing.subjectDid, tx);
916 }
917 });
918
919 this.logger.info("ModAction deleted", {
920 did: event.did,
921 rkey: event.commit.rkey,
922 });
923 } catch (error) {
924 this.logger.error("Failed to delete modAction", {
925 did: event.did,
926 rkey: event.commit.rkey,
927 error: error instanceof Error ? error.message : String(error),
928 });
929 throw error;
930 }
931 }
932
933 // ── Reaction Handlers (Stub) ────────────────────────────
934
935 async handleReactionCreate(
936 event: CommitCreateEvent<"space.atbb.reaction">
937 ) {
938 this.logger.warn("Reaction created (not implemented)", { did: event.did, rkey: event.commit.rkey });
939 // TODO: Add reactions table to schema
940 }
941
942 async handleReactionUpdate(
943 event: CommitUpdateEvent<"space.atbb.reaction">
944 ) {
945 this.logger.warn("Reaction updated (not implemented)", { did: event.did, rkey: event.commit.rkey });
946 // TODO: Add reactions table to schema
947 }
948
949 async handleReactionDelete(
950 event: CommitDeleteEvent<"space.atbb.reaction">
951 ) {
952 this.logger.warn("Reaction deleted (not implemented)", { did: event.did, rkey: event.commit.rkey });
953 // TODO: Add reactions table to schema
954 }
955
956 // ── Helper Methods ──────────────────────────────────────
957
958 /**
959 * Ensure a user exists in the database. Creates if not exists.
960 * @param dbOrTx - Database instance or transaction
961 */
962 private async ensureUser(did: string, dbOrTx: DbOrTransaction = this.db) {
963 try {
964 const existing = await dbOrTx.select().from(users).where(eq(users.did, did)).limit(1);
965
966 if (existing.length === 0) {
967 await dbOrTx.insert(users).values({
968 did,
969 handle: null, // Will be updated by identity events
970 indexedAt: new Date(),
971 });
972 this.logger.info("Created user", { did });
973 }
974 } catch (error) {
975 this.logger.error("Failed to ensure user exists", {
976 did,
977 error: error instanceof Error ? error.message : String(error),
978 });
979 throw error;
980 }
981 }
982
983 /**
984 * Look up a forum ID by its AT URI
985 * @param dbOrTx - Database instance or transaction
986 */
987 private async getForumIdByUri(
988 forumUri: string,
989 dbOrTx: DbOrTransaction = this.db
990 ): Promise<bigint | null> {
991 const parsed = parseAtUri(forumUri);
992 if (!parsed) return null;
993
994 try {
995 const result = await dbOrTx
996 .select({ id: forums.id })
997 .from(forums)
998 .where(and(eq(forums.did, parsed.did), eq(forums.rkey, parsed.rkey)))
999 .limit(1);
1000
1001 return result.length > 0 ? result[0].id : null;
1002 } catch (error) {
1003 this.logger.error("Database error in getForumIdByUri", {
1004 operation: "getForumIdByUri",
1005 forumUri,
1006 error: error instanceof Error ? error.message : String(error),
1007 });
1008 throw error;
1009 }
1010 }
1011
1012 /**
1013 * Look up a forum ID by the forum's DID
1014 * Used for records owned by the forum (categories, modActions)
1015 * @param dbOrTx - Database instance or transaction
1016 */
1017 private async getForumIdByDid(
1018 forumDid: string,
1019 dbOrTx: DbOrTransaction = this.db
1020 ): Promise<bigint | null> {
1021 try {
1022 const result = await dbOrTx
1023 .select({ id: forums.id })
1024 .from(forums)
1025 .where(eq(forums.did, forumDid))
1026 .limit(1);
1027
1028 return result.length > 0 ? result[0].id : null;
1029 } catch (error) {
1030 this.logger.error("Database error in getForumIdByDid", {
1031 operation: "getForumIdByDid",
1032 forumDid,
1033 error: error instanceof Error ? error.message : String(error),
1034 });
1035 throw error;
1036 }
1037 }
1038
1039 /**
1040 * Look up a post ID by its AT URI
1041 * @param dbOrTx - Database instance or transaction
1042 */
1043 private async getPostIdByUri(
1044 postUri: string,
1045 dbOrTx: DbOrTransaction = this.db
1046 ): Promise<bigint | null> {
1047 const parsed = parseAtUri(postUri);
1048 if (!parsed) return null;
1049
1050 try {
1051 const result = await dbOrTx
1052 .select({ id: posts.id })
1053 .from(posts)
1054 .where(and(eq(posts.did, parsed.did), eq(posts.rkey, parsed.rkey)))
1055 .limit(1);
1056
1057 return result.length > 0 ? result[0].id : null;
1058 } catch (error) {
1059 this.logger.error("Database error in getPostIdByUri", {
1060 operation: "getPostIdByUri",
1061 postUri,
1062 error: error instanceof Error ? error.message : String(error),
1063 });
1064 throw error;
1065 }
1066 }
1067
1068 /**
1069 * Look up board ID by AT URI (at://did/collection/rkey)
1070 * @param uri - AT URI of the board
1071 * @param dbOrTx - Database instance or transaction
1072 */
1073 private async getBoardIdByUri(
1074 uri: string,
1075 dbOrTx: DbOrTransaction = this.db
1076 ): Promise<bigint | null> {
1077 const parsed = parseAtUri(uri);
1078 if (!parsed) return null;
1079
1080 try {
1081 const [result] = await dbOrTx
1082 .select({ id: boards.id })
1083 .from(boards)
1084 .where(and(eq(boards.did, parsed.did), eq(boards.rkey, parsed.rkey)))
1085 .limit(1);
1086 return result?.id ?? null;
1087 } catch (error) {
1088 this.logger.error("Database error in getBoardIdByUri", {
1089 operation: "getBoardIdByUri",
1090 uri,
1091 did: parsed.did,
1092 rkey: parsed.rkey,
1093 error: error instanceof Error ? error.message : String(error),
1094 });
1095 throw error;
1096 }
1097 }
1098
1099 /**
1100 * Look up category ID by AT URI (at://did/collection/rkey)
1101 * @param uri - AT URI of the category
1102 * @param dbOrTx - Database instance or transaction
1103 */
1104 private async getCategoryIdByUri(
1105 uri: string,
1106 dbOrTx: DbOrTransaction = this.db
1107 ): Promise<bigint | null> {
1108 const parsed = parseAtUri(uri);
1109 if (!parsed) return null;
1110
1111 try {
1112 const [result] = await dbOrTx
1113 .select({ id: categories.id })
1114 .from(categories)
1115 .where(and(eq(categories.did, parsed.did), eq(categories.rkey, parsed.rkey)))
1116 .limit(1);
1117 return result?.id ?? null;
1118 } catch (error) {
1119 this.logger.error("Database error in getCategoryIdByUri", {
1120 operation: "getCategoryIdByUri",
1121 uri,
1122 did: parsed.did,
1123 rkey: parsed.rkey,
1124 error: error instanceof Error ? error.message : String(error),
1125 });
1126 throw error;
1127 }
1128 }
1129}