WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import type {
2 CommitCreateEvent,
3 CommitDeleteEvent,
4 CommitUpdateEvent,
5} from "@skyware/jetstream";
6import type { Database, DbOrTransaction } from "@atbb/db";
7import type { Logger } from "@atbb/logger";
8import {
9 posts,
10 forums,
11 categories,
12 boards,
13 users,
14 memberships,
15 modActions,
16 roles,
17 rolePermissions,
18 themes,
19 themePolicies,
20 themePolicyAvailableThemes,
21} from "@atbb/db";
22import { eq, and } from "drizzle-orm";
23import { parseAtUri } from "./at-uri.js";
24import { BanEnforcer } from "./ban-enforcer.js";
25import {
26 SpaceAtbbPost as Post,
27 SpaceAtbbForumForum as Forum,
28 SpaceAtbbForumCategory as Category,
29 SpaceAtbbForumBoard as Board,
30 SpaceAtbbMembership as Membership,
31 SpaceAtbbModAction as ModAction,
32 SpaceAtbbForumRole as Role,
33 SpaceAtbbForumTheme as Theme,
34 SpaceAtbbForumThemePolicy as ThemePolicy,
35} from "@atbb/lexicon";
36
37// ── Collection Config Types ─────────────────────────────
38
39/**
40 * Configuration for a data-driven collection handler.
41 * Encodes the per-collection logic that differs across the 5 indexed types,
42 * while the generic handler methods supply the shared try/catch/log/throw scaffolding.
43 */
44interface CollectionConfig<TRecord> {
45 /** Human-readable name for logging (e.g. "Post", "Forum") */
46 name: string;
47 /** Drizzle table reference */
48 table: any;
49 /** "hard" = DELETE FROM (all non-post collections) */
50 deleteStrategy: "hard";
51 /** Call ensureUser(event.did) before insert? (user-owned records) */
52 ensureUserOnCreate?: boolean;
53 /**
54 * Transform event+record into DB insert values.
55 * Return null to skip the insert (e.g. when a required foreign key is missing).
56 */
57 toInsertValues: (
58 event: any,
59 record: TRecord,
60 tx: DbOrTransaction
61 ) => Promise<Record<string, any> | null>;
62 /**
63 * Transform event+record into DB update set values.
64 * Runs inside a transaction. Return null to skip the update.
65 */
66 toUpdateValues: (
67 event: any,
68 record: TRecord,
69 tx: DbOrTransaction
70 ) => Promise<Record<string, any> | null>;
71 /**
72 * Optional hook called after a row is inserted or updated, within the same
73 * transaction. Receives the row's numeric id (bigint) so callers can write
74 * to child tables (e.g. role_permissions).
75 */
76 afterUpsert?: (
77 event: any,
78 record: TRecord,
79 rowId: bigint,
80 tx: DbOrTransaction
81 ) => Promise<void>;
82}
83
84
85/**
86 * Indexer class for processing AT Proto firehose events
87 * Converts events into database records for the atBB AppView
88 */
89export class Indexer {
90 private banEnforcer: BanEnforcer;
91
92 constructor(private db: Database, private logger: Logger) {
93 this.banEnforcer = new BanEnforcer(db, logger);
94 }
95
96 // ── Collection Configs ──────────────────────────────────
97
98 private postConfig: CollectionConfig<Post.Record> = {
99 name: "Post",
100 table: posts,
101 deleteStrategy: "hard",
102 ensureUserOnCreate: true,
103 toInsertValues: async (event, record, tx) => {
104 // Look up parent/root for replies
105 let rootId: bigint | null = null;
106 let parentId: bigint | null = null;
107
108 if (Post.isReplyRef(record.reply)) {
109 rootId = await this.getPostIdByUri(record.reply.root.uri, tx);
110 parentId = await this.getPostIdByUri(record.reply.parent.uri, tx);
111 } else if (record.reply) {
112 // reply ref present but $type omitted — rootPostId/parentPostId will be null,
113 // making this reply unreachable in thread navigation (data corruption).
114 this.logger.error("Post reply ref missing $type — rootPostId/parentPostId not resolved", {
115 operation: "Post CREATE",
116 postDid: event.did,
117 postRkey: event.commit.rkey,
118 errorId: "POST_REPLY_REF_MISSING_TYPE",
119 });
120 }
121
122 // Look up board ID if board reference exists
123 let boardId: bigint | null = null;
124 if (record.board?.board.uri) {
125 boardId = await this.getBoardIdByUri(record.board.board.uri, tx);
126 if (!boardId) {
127 this.logger.error("Failed to index post: board not found", {
128 operation: "Post CREATE",
129 postDid: event.did,
130 postRkey: event.commit.rkey,
131 boardUri: record.board.board.uri,
132 errorId: "POST_BOARD_MISSING",
133 });
134 throw new Error(`Board not found: ${record.board.board.uri}`);
135 }
136 }
137
138 return {
139 did: event.did,
140 rkey: event.commit.rkey,
141 cid: event.commit.cid,
142 title: record.reply ? null : (record.title ?? null),
143 text: record.text,
144 forumUri: record.forum?.forum.uri ?? null,
145 boardUri: record.board?.board.uri ?? null,
146 boardId,
147 rootPostId: rootId,
148 rootUri: record.reply?.root.uri ?? null,
149 parentPostId: parentId,
150 parentUri: record.reply?.parent.uri ?? null,
151 createdAt: new Date(record.createdAt),
152 indexedAt: new Date(),
153 };
154 },
155 toUpdateValues: async (event, record, tx) => {
156 // Look up board ID if board reference exists
157 let boardId: bigint | null = null;
158 if (record.board?.board.uri) {
159 boardId = await this.getBoardIdByUri(record.board.board.uri, tx);
160 if (!boardId) {
161 this.logger.error("Failed to index post: board not found", {
162 operation: "Post UPDATE",
163 postDid: event.did,
164 postRkey: event.commit.rkey,
165 boardUri: record.board.board.uri,
166 errorId: "POST_BOARD_MISSING",
167 });
168 throw new Error(`Board not found: ${record.board.board.uri}`);
169 }
170 }
171
172 return {
173 cid: event.commit.cid,
174 title: record.reply ? null : (record.title ?? null),
175 text: record.text,
176 forumUri: record.forum?.forum.uri ?? null,
177 boardUri: record.board?.board.uri ?? null,
178 boardId,
179 indexedAt: new Date(),
180 };
181 },
182 };
183
184 private forumConfig: CollectionConfig<Forum.Record> = {
185 name: "Forum",
186 table: forums,
187 deleteStrategy: "hard",
188 ensureUserOnCreate: true,
189 toInsertValues: async (event, record) => ({
190 did: event.did,
191 rkey: event.commit.rkey,
192 cid: event.commit.cid,
193 name: record.name,
194 description: record.description ?? null,
195 indexedAt: new Date(),
196 }),
197 toUpdateValues: async (event, record) => ({
198 cid: event.commit.cid,
199 name: record.name,
200 description: record.description ?? null,
201 indexedAt: new Date(),
202 }),
203 };
204
205 private categoryConfig: CollectionConfig<Category.Record> = {
206 name: "Category",
207 table: categories,
208 deleteStrategy: "hard",
209 toInsertValues: async (event, record, tx) => {
210 // Categories are owned by the Forum DID, so event.did IS the forum DID
211 const forumId = await this.getForumIdByDid(event.did, tx);
212
213 if (!forumId) {
214 this.logger.warn("Category: Forum not found for DID", {
215 operation: "Category CREATE",
216 did: event.did,
217 });
218 return null;
219 }
220
221 return {
222 did: event.did,
223 rkey: event.commit.rkey,
224 cid: event.commit.cid,
225 forumId,
226 name: record.name,
227 description: record.description ?? null,
228 slug: record.slug ?? null,
229 sortOrder: record.sortOrder ?? 0,
230 createdAt: new Date(record.createdAt),
231 indexedAt: new Date(),
232 };
233 },
234 toUpdateValues: async (event, record, tx) => {
235 // Categories are owned by the Forum DID, so event.did IS the forum DID
236 const forumId = await this.getForumIdByDid(event.did, tx);
237
238 if (!forumId) {
239 this.logger.warn("Category: Forum not found for DID", {
240 operation: "Category UPDATE",
241 did: event.did,
242 });
243 return null;
244 }
245
246 return {
247 cid: event.commit.cid,
248 forumId,
249 name: record.name,
250 description: record.description ?? null,
251 slug: record.slug ?? null,
252 sortOrder: record.sortOrder ?? 0,
253 indexedAt: new Date(),
254 };
255 },
256 };
257
258 private boardConfig: CollectionConfig<Board.Record> = {
259 name: "Board",
260 table: boards,
261 deleteStrategy: "hard",
262 toInsertValues: async (event, record, tx) => {
263 // Boards are owned by Forum DID
264 const categoryId = await this.getCategoryIdByUri(
265 record.category.category.uri,
266 tx
267 );
268
269 if (!categoryId) {
270 this.logger.error("Failed to index board: category not found", {
271 operation: "Board CREATE",
272 boardDid: event.did,
273 boardRkey: event.commit.rkey,
274 categoryUri: record.category.category.uri,
275 errorId: "BOARD_CATEGORY_MISSING",
276 });
277 throw new Error(`Category not found: ${record.category.category.uri}`);
278 }
279
280 return {
281 did: event.did,
282 rkey: event.commit.rkey,
283 cid: event.commit.cid,
284 name: record.name,
285 description: record.description ?? null,
286 slug: record.slug ?? null,
287 sortOrder: record.sortOrder ?? null,
288 categoryId,
289 categoryUri: record.category.category.uri,
290 createdAt: new Date(record.createdAt),
291 indexedAt: new Date(),
292 };
293 },
294 toUpdateValues: async (event, record, tx) => {
295 const categoryId = await this.getCategoryIdByUri(
296 record.category.category.uri,
297 tx
298 );
299
300 if (!categoryId) {
301 this.logger.error("Failed to index board: category not found", {
302 operation: "Board UPDATE",
303 boardDid: event.did,
304 boardRkey: event.commit.rkey,
305 categoryUri: record.category.category.uri,
306 errorId: "BOARD_CATEGORY_MISSING",
307 });
308 throw new Error(`Category not found: ${record.category.category.uri}`);
309 }
310
311 return {
312 cid: event.commit.cid,
313 name: record.name,
314 description: record.description ?? null,
315 slug: record.slug ?? null,
316 sortOrder: record.sortOrder ?? null,
317 categoryId,
318 categoryUri: record.category.category.uri,
319 indexedAt: new Date(),
320 };
321 },
322 };
323
324 private roleConfig: CollectionConfig<Role.Record> = {
325 name: "Role",
326 table: roles,
327 deleteStrategy: "hard",
328 toInsertValues: async (event, record) => ({
329 did: event.did,
330 rkey: event.commit.rkey,
331 cid: event.commit.cid,
332 name: record.name,
333 description: record.description ?? null,
334 priority: record.priority,
335 createdAt: new Date(record.createdAt),
336 indexedAt: new Date(),
337 }),
338 toUpdateValues: async (event, record) => ({
339 cid: event.commit.cid,
340 name: record.name,
341 description: record.description ?? null,
342 priority: record.priority,
343 indexedAt: new Date(),
344 }),
345 afterUpsert: async (event, record, roleId, tx) => {
346 // Replace all permissions for this role atomically
347 await tx
348 .delete(rolePermissions)
349 .where(eq(rolePermissions.roleId, roleId));
350
351 if (record.permissions && record.permissions.length > 0) {
352 await tx.insert(rolePermissions).values(
353 record.permissions.map((permission: string) => ({
354 roleId,
355 permission,
356 }))
357 );
358 }
359 },
360 };
361
362 private themeConfig: CollectionConfig<Theme.Record> = {
363 name: "Theme",
364 table: themes,
365 deleteStrategy: "hard",
366 toInsertValues: async (event, record) => ({
367 did: event.did,
368 rkey: event.commit.rkey,
369 cid: event.commit.cid,
370 name: record.name,
371 colorScheme: record.colorScheme as string,
372 tokens: record.tokens,
373 cssOverrides: (record.cssOverrides as string | undefined) ?? null,
374 fontUrls: (record.fontUrls as string[] | undefined) ?? null,
375 createdAt: new Date(record.createdAt as string),
376 indexedAt: new Date(),
377 }),
378 toUpdateValues: async (event, record) => ({
379 cid: event.commit.cid,
380 name: record.name,
381 colorScheme: record.colorScheme as string,
382 tokens: record.tokens,
383 cssOverrides: (record.cssOverrides as string | undefined) ?? null,
384 fontUrls: (record.fontUrls as string[] | undefined) ?? null,
385 indexedAt: new Date(),
386 }),
387 };
388
389 private themePolicyConfig: CollectionConfig<ThemePolicy.Record> = {
390 name: "ThemePolicy",
391 table: themePolicies,
392 deleteStrategy: "hard",
393 toInsertValues: async (event, record) => ({
394 did: event.did,
395 rkey: event.commit.rkey,
396 cid: event.commit.cid,
397 defaultLightThemeUri: record.defaultLightTheme.uri,
398 defaultDarkThemeUri: record.defaultDarkTheme.uri,
399 allowUserChoice: record.allowUserChoice,
400 indexedAt: new Date(),
401 }),
402 toUpdateValues: async (event, record) => ({
403 cid: event.commit.cid,
404 defaultLightThemeUri: record.defaultLightTheme.uri,
405 defaultDarkThemeUri: record.defaultDarkTheme.uri,
406 allowUserChoice: record.allowUserChoice,
407 indexedAt: new Date(),
408 }),
409 afterUpsert: async (_event, record, policyId, tx) => {
410 // Atomically replace all available-theme rows for this policy
411 await tx
412 .delete(themePolicyAvailableThemes)
413 .where(eq(themePolicyAvailableThemes.policyId, policyId));
414
415 const available = record.availableThemes ?? [];
416 if (available.length > 0) {
417 await tx.insert(themePolicyAvailableThemes).values(
418 available.map((themeRef) => ({
419 policyId,
420 themeUri: themeRef.uri,
421 themeCid: themeRef.cid ?? null,
422 }))
423 );
424 }
425 },
426 };
427
428 private membershipConfig: CollectionConfig<Membership.Record> = {
429 name: "Membership",
430 table: memberships,
431 deleteStrategy: "hard",
432 ensureUserOnCreate: true,
433 toInsertValues: async (event, record, tx) => {
434 // Look up forum by URI (inside transaction)
435 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx);
436
437 if (!forumId) {
438 this.logger.warn("Membership: Forum not found", {
439 operation: "Membership CREATE",
440 forumUri: record.forum.forum.uri,
441 });
442 return null;
443 }
444
445 return {
446 did: event.did,
447 rkey: event.commit.rkey,
448 cid: event.commit.cid,
449 forumId,
450 forumUri: record.forum.forum.uri,
451 role: null, // TODO: Extract role name from roleUri or lexicon
452 roleUri: record.role?.role.uri ?? null,
453 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null,
454 createdAt: new Date(record.createdAt),
455 indexedAt: new Date(),
456 };
457 },
458 toUpdateValues: async (event, record, tx) => {
459 // Look up forum by URI (may have changed)
460 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx);
461
462 if (!forumId) {
463 this.logger.warn("Membership: Forum not found", {
464 operation: "Membership UPDATE",
465 forumUri: record.forum.forum.uri,
466 });
467 return null;
468 }
469
470 return {
471 cid: event.commit.cid,
472 forumId,
473 forumUri: record.forum.forum.uri,
474 role: null, // TODO: Extract role name from roleUri or lexicon
475 roleUri: record.role?.role.uri ?? null,
476 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null,
477 indexedAt: new Date(),
478 };
479 },
480 };
481
482 private modActionConfig: CollectionConfig<ModAction.Record> = {
483 name: "ModAction",
484 table: modActions,
485 deleteStrategy: "hard",
486 toInsertValues: async (event, record, tx) => {
487 // ModActions are owned by the Forum DID, so event.did IS the forum DID
488 const forumId = await this.getForumIdByDid(event.did, tx);
489
490 if (!forumId) {
491 this.logger.warn("ModAction: Forum not found for DID", {
492 operation: "ModAction CREATE",
493 did: event.did,
494 });
495 return null;
496 }
497
498 // Ensure moderator exists
499 await this.ensureUser(record.createdBy, tx);
500
501 // Determine subject type (post or user)
502 let subjectPostUri: string | null = null;
503 let subjectDid: string | null = null;
504
505 if (record.subject.post) {
506 subjectPostUri = record.subject.post.uri;
507 }
508 if (record.subject.did) {
509 subjectDid = record.subject.did;
510 }
511
512 return {
513 did: event.did,
514 rkey: event.commit.rkey,
515 cid: event.commit.cid,
516 forumId,
517 action: record.action,
518 subjectPostUri,
519 subjectDid,
520 reason: record.reason ?? null,
521 createdBy: record.createdBy,
522 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
523 createdAt: new Date(record.createdAt),
524 indexedAt: new Date(),
525 };
526 },
527 toUpdateValues: async (event, record, tx) => {
528 // ModActions are owned by the Forum DID, so event.did IS the forum DID
529 const forumId = await this.getForumIdByDid(event.did, tx);
530
531 if (!forumId) {
532 this.logger.warn("ModAction: Forum not found for DID", {
533 operation: "ModAction UPDATE",
534 did: event.did,
535 });
536 return null;
537 }
538
539 // Determine subject type (post or user)
540 let subjectPostUri: string | null = null;
541 let subjectDid: string | null = null;
542
543 if (record.subject.post) {
544 subjectPostUri = record.subject.post.uri;
545 }
546 if (record.subject.did) {
547 subjectDid = record.subject.did;
548 }
549
550 return {
551 cid: event.commit.cid,
552 forumId,
553 action: record.action,
554 subjectPostUri,
555 subjectDid,
556 reason: record.reason ?? null,
557 createdBy: record.createdBy,
558 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
559 indexedAt: new Date(),
560 };
561 },
562 };
563
564 // ── Generic Handler Methods ─────────────────────────────
565
566 /**
567 * Generic create handler. Wraps the insert in a transaction,
568 * optionally ensures the user exists, and delegates to the
569 * config's toInsertValues callback for collection-specific logic.
570 */
571 private async genericCreate<TRecord>(
572 config: CollectionConfig<TRecord>,
573 event: any
574 ): Promise<boolean> {
575 try {
576 const record = event.commit.record as unknown as TRecord;
577 let skipped = false;
578
579 await this.db.transaction(async (tx) => {
580 if (config.ensureUserOnCreate) {
581 await this.ensureUser(event.did, tx);
582 }
583
584 const values = await config.toInsertValues(event, record, tx);
585 if (!values) {
586 skipped = true;
587 return; // Skip insert (e.g. foreign key not found)
588 }
589
590 if (config.afterUpsert) {
591 const [inserted] = await tx
592 .insert(config.table)
593 .values(values)
594 .returning({ id: config.table.id });
595 await config.afterUpsert(event, record, inserted.id, tx);
596 } else {
597 await tx.insert(config.table).values(values);
598 }
599 });
600
601 // Only log success if insert actually happened
602 if (!skipped) {
603 this.logger.info(`${config.name} created`, {
604 did: event.did,
605 rkey: event.commit.rkey,
606 });
607 }
608 return !skipped;
609 } catch (error) {
610 this.logger.error(`Failed to index ${config.name.toLowerCase()} create`, {
611 did: event.did,
612 rkey: event.commit.rkey,
613 error: error instanceof Error ? error.message : String(error),
614 });
615 throw error;
616 }
617 }
618
619 /**
620 * Generic update handler. Wraps the update in a transaction
621 * and delegates to the config's toUpdateValues callback for
622 * collection-specific logic.
623 */
624 private async genericUpdate<TRecord>(
625 config: CollectionConfig<TRecord>,
626 event: any
627 ) {
628 try {
629 const record = event.commit.record as unknown as TRecord;
630 let skipped = false;
631
632 await this.db.transaction(async (tx) => {
633 const values = await config.toUpdateValues(event, record, tx);
634 if (!values) {
635 skipped = true;
636 return; // Skip update (e.g. foreign key not found)
637 }
638
639 if (config.afterUpsert) {
640 const [updated] = await tx
641 .update(config.table)
642 .set(values)
643 .where(
644 and(
645 eq(config.table.did, event.did),
646 eq(config.table.rkey, event.commit.rkey)
647 )
648 )
649 .returning({ id: config.table.id });
650 if (!updated) return; // Out-of-order UPDATE before CREATE: no row to update yet
651 await config.afterUpsert(event, record, updated.id, tx);
652 } else {
653 await tx
654 .update(config.table)
655 .set(values)
656 .where(
657 and(
658 eq(config.table.did, event.did),
659 eq(config.table.rkey, event.commit.rkey)
660 )
661 );
662 }
663 });
664
665 // Only log success if update actually happened
666 if (!skipped) {
667 this.logger.info(`${config.name} updated`, {
668 did: event.did,
669 rkey: event.commit.rkey,
670 });
671 }
672 } catch (error) {
673 this.logger.error(`Failed to update ${config.name.toLowerCase()}`, {
674 did: event.did,
675 rkey: event.commit.rkey,
676 error: error instanceof Error ? error.message : String(error),
677 });
678 throw error;
679 }
680 }
681
682 /**
683 * Generic delete handler. Hard-deletes a record (DELETE FROM).
684 * Posts use handlePostDelete instead (always tombstone).
685 */
686 private async genericDelete(config: CollectionConfig<any>, event: any) {
687 try {
688 await this.db
689 .delete(config.table)
690 .where(
691 and(
692 eq(config.table.did, event.did),
693 eq(config.table.rkey, event.commit.rkey)
694 )
695 );
696
697 this.logger.info(`${config.name} deleted`, {
698 did: event.did,
699 rkey: event.commit.rkey,
700 });
701 } catch (error) {
702 this.logger.error(`Failed to delete ${config.name.toLowerCase()}`, {
703 did: event.did,
704 rkey: event.commit.rkey,
705 error: error instanceof Error ? error.message : String(error),
706 });
707 throw error;
708 }
709 }
710
711 // ── Post Handlers ───────────────────────────────────────
712
713 async handlePostCreate(event: CommitCreateEvent<"space.atbb.post">) {
714 const banned = await this.banEnforcer.isBanned(event.did);
715 if (banned) {
716 this.logger.info("Skipping post from banned user", {
717 did: event.did,
718 rkey: event.commit.rkey,
719 });
720 return;
721 }
722 await this.genericCreate(this.postConfig, event);
723 }
724
725 async handlePostUpdate(event: CommitUpdateEvent<"space.atbb.post">) {
726 await this.genericUpdate(this.postConfig, event);
727 }
728
729 /**
730 * Handles a user-initiated post delete from the PDS.
731 * Always tombstones: replaces personal content with a placeholder and marks
732 * deletedByUser=true. The row is kept so threads referencing this post as
733 * their root or parent remain intact. Personal content is gone; structure is preserved.
734 */
735 async handlePostDelete(event: CommitDeleteEvent<"space.atbb.post">) {
736 const { did, commit: { rkey } } = event;
737 try {
738 await this.db
739 .update(posts)
740 .set({ text: "[user deleted this post]", deletedByUser: true })
741 .where(and(eq(posts.did, did), eq(posts.rkey, rkey)));
742 this.logger.info("Post tombstoned: content replaced, structure preserved", { did, rkey });
743 } catch (error) {
744 this.logger.error("Failed to tombstone post", {
745 did,
746 rkey,
747 error: error instanceof Error ? error.message : String(error),
748 });
749 throw error;
750 }
751 }
752
753 // ── Forum Handlers ──────────────────────────────────────
754
755 async handleForumCreate(event: CommitCreateEvent<"space.atbb.forum.forum">) {
756 await this.genericCreate(this.forumConfig, event);
757 }
758
759 async handleForumUpdate(event: CommitUpdateEvent<"space.atbb.forum.forum">) {
760 await this.genericUpdate(this.forumConfig, event);
761 }
762
763 async handleForumDelete(event: CommitDeleteEvent<"space.atbb.forum.forum">) {
764 await this.genericDelete(this.forumConfig, event);
765 }
766
767 // ── Category Handlers ───────────────────────────────────
768
769 async handleCategoryCreate(
770 event: CommitCreateEvent<"space.atbb.forum.category">
771 ) {
772 await this.genericCreate(this.categoryConfig, event);
773 }
774
775 async handleCategoryUpdate(
776 event: CommitUpdateEvent<"space.atbb.forum.category">
777 ) {
778 await this.genericUpdate(this.categoryConfig, event);
779 }
780
781 async handleCategoryDelete(
782 event: CommitDeleteEvent<"space.atbb.forum.category">
783 ) {
784 await this.genericDelete(this.categoryConfig, event);
785 }
786
787 // ── Board Handlers ──────────────────────────────────────
788
789 async handleBoardCreate(event: CommitCreateEvent<"space.atbb.forum.board">) {
790 await this.genericCreate(this.boardConfig, event);
791 }
792
793 async handleBoardUpdate(event: CommitUpdateEvent<"space.atbb.forum.board">) {
794 await this.genericUpdate(this.boardConfig, event);
795 }
796
797 async handleBoardDelete(event: CommitDeleteEvent<"space.atbb.forum.board">) {
798 await this.genericDelete(this.boardConfig, event);
799 }
800
801 // ── Role Handlers ───────────────────────────────────────
802
803 async handleRoleCreate(event: CommitCreateEvent<"space.atbb.forum.role">) {
804 await this.genericCreate(this.roleConfig, event);
805 }
806
807 async handleRoleUpdate(event: CommitUpdateEvent<"space.atbb.forum.role">) {
808 await this.genericUpdate(this.roleConfig, event);
809 }
810
811 async handleRoleDelete(event: CommitDeleteEvent<"space.atbb.forum.role">) {
812 await this.genericDelete(this.roleConfig, event);
813 }
814
815 // ── Theme Handlers ──────────────────────────────────────
816
817 async handleThemeCreate(event: CommitCreateEvent<"space.atbb.forum.theme">) {
818 await this.genericCreate(this.themeConfig, event);
819 }
820
821 async handleThemeUpdate(event: CommitUpdateEvent<"space.atbb.forum.theme">) {
822 await this.genericUpdate(this.themeConfig, event);
823 }
824
825 async handleThemeDelete(event: CommitDeleteEvent<"space.atbb.forum.theme">) {
826 await this.genericDelete(this.themeConfig, event);
827 }
828
829 // ── ThemePolicy Handlers ─────────────────────────────────
830
831 async handleThemePolicyCreate(event: CommitCreateEvent<"space.atbb.forum.themePolicy">) {
832 await this.genericCreate(this.themePolicyConfig, event);
833 }
834
835 async handleThemePolicyUpdate(event: CommitUpdateEvent<"space.atbb.forum.themePolicy">) {
836 await this.genericUpdate(this.themePolicyConfig, event);
837 }
838
839 async handleThemePolicyDelete(event: CommitDeleteEvent<"space.atbb.forum.themePolicy">) {
840 await this.genericDelete(this.themePolicyConfig, event);
841 }
842
843 // ── Membership Handlers ─────────────────────────────────
844
845 async handleMembershipCreate(
846 event: CommitCreateEvent<"space.atbb.membership">
847 ) {
848 await this.genericCreate(this.membershipConfig, event);
849 }
850
851 async handleMembershipUpdate(
852 event: CommitUpdateEvent<"space.atbb.membership">
853 ) {
854 await this.genericUpdate(this.membershipConfig, event);
855 }
856
857 async handleMembershipDelete(
858 event: CommitDeleteEvent<"space.atbb.membership">
859 ) {
860 await this.genericDelete(this.membershipConfig, event);
861 }
862
863 // ── ModAction Handlers ──────────────────────────────────
864
865 async handleModActionCreate(
866 event: CommitCreateEvent<"space.atbb.modAction">
867 ) {
868 const record = event.commit.record as unknown as ModAction.Record;
869 const isBan =
870 record.action === "space.atbb.modAction.ban" && record.subject.did;
871 const isUnban =
872 record.action === "space.atbb.modAction.unban" && record.subject.did;
873
874 try {
875 if (isBan) {
876 // Custom atomic path: insert ban record + applyBan in one transaction
877 let skipped = false;
878 await this.db.transaction(async (tx) => {
879 const forumId = await this.getForumIdByDid(event.did, tx);
880 if (!forumId) {
881 this.logger.warn("ModAction (ban): Forum not found for DID", {
882 operation: "ModAction CREATE",
883 did: event.did,
884 });
885 skipped = true;
886 return;
887 }
888 await this.ensureUser(record.createdBy, tx);
889 await tx.insert(modActions).values({
890 did: event.did,
891 rkey: event.commit.rkey,
892 cid: event.commit.cid,
893 forumId,
894 action: record.action,
895 subjectPostUri: null,
896 subjectDid: record.subject.did ?? null,
897 reason: record.reason ?? null,
898 createdBy: record.createdBy,
899 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
900 createdAt: new Date(record.createdAt),
901 indexedAt: new Date(),
902 });
903 await this.banEnforcer.applyBan(record.subject.did!, tx);
904 });
905 if (!skipped) {
906 this.logger.info("ModAction (ban) created", {
907 did: event.did,
908 rkey: event.commit.rkey,
909 });
910 }
911 } else if (isUnban) {
912 // Custom atomic path: insert unban record + liftBan in one transaction
913 let skipped = false;
914 await this.db.transaction(async (tx) => {
915 const forumId = await this.getForumIdByDid(event.did, tx);
916 if (!forumId) {
917 this.logger.warn("ModAction (unban): Forum not found for DID", {
918 operation: "ModAction CREATE",
919 did: event.did,
920 });
921 skipped = true;
922 return;
923 }
924 await this.ensureUser(record.createdBy, tx);
925 await tx.insert(modActions).values({
926 did: event.did,
927 rkey: event.commit.rkey,
928 cid: event.commit.cid,
929 forumId,
930 action: record.action,
931 subjectPostUri: null,
932 subjectDid: record.subject.did ?? null,
933 reason: record.reason ?? null,
934 createdBy: record.createdBy,
935 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
936 createdAt: new Date(record.createdAt),
937 indexedAt: new Date(),
938 });
939 await this.banEnforcer.liftBan(record.subject.did!, tx);
940 });
941 if (!skipped) {
942 this.logger.info("ModAction (unban) created", {
943 did: event.did,
944 rkey: event.commit.rkey,
945 });
946 }
947 } else {
948 // Generic path for all other mod actions (mute, pin, lock, delete, etc.)
949 await this.genericCreate(this.modActionConfig, event);
950
951 // Ban/unban without subject.did — shouldn't happen but log if it does
952 if (
953 record.action === "space.atbb.modAction.ban" ||
954 record.action === "space.atbb.modAction.unban"
955 ) {
956 this.logger.warn("ModAction: ban/unban action missing subject.did, skipping enforcement", {
957 did: event.did,
958 rkey: event.commit.rkey,
959 action: record.action,
960 });
961 }
962 }
963 } catch (error) {
964 this.logger.error("Failed to index ModAction create", {
965 did: event.did,
966 rkey: event.commit.rkey,
967 error: error instanceof Error ? error.message : String(error),
968 });
969 throw error;
970 }
971 }
972
973 async handleModActionUpdate(
974 event: CommitUpdateEvent<"space.atbb.modAction">
975 ) {
976 await this.genericUpdate(this.modActionConfig, event);
977 }
978
979 async handleModActionDelete(
980 event: CommitDeleteEvent<"space.atbb.modAction">
981 ) {
982 try {
983 await this.db.transaction(async (tx) => {
984 // 1. Read before delete to capture action type and subject
985 const [existing] = await tx
986 .select({
987 action: modActions.action,
988 subjectDid: modActions.subjectDid,
989 })
990 .from(modActions)
991 .where(
992 and(
993 eq(modActions.did, event.did),
994 eq(modActions.rkey, event.commit.rkey)
995 )
996 )
997 .limit(1);
998
999 // 2. Hard delete the record
1000 await tx
1001 .delete(modActions)
1002 .where(
1003 and(
1004 eq(modActions.did, event.did),
1005 eq(modActions.rkey, event.commit.rkey)
1006 )
1007 );
1008
1009 // 3. Restore posts if the deleted record was a ban
1010 if (
1011 existing?.action === "space.atbb.modAction.ban" &&
1012 existing?.subjectDid
1013 ) {
1014 await this.banEnforcer.liftBan(existing.subjectDid, tx);
1015 }
1016 });
1017
1018 this.logger.info("ModAction deleted", {
1019 did: event.did,
1020 rkey: event.commit.rkey,
1021 });
1022 } catch (error) {
1023 this.logger.error("Failed to delete modAction", {
1024 did: event.did,
1025 rkey: event.commit.rkey,
1026 error: error instanceof Error ? error.message : String(error),
1027 });
1028 throw error;
1029 }
1030 }
1031
1032 // ── Reaction Handlers (Stub) ────────────────────────────
1033
1034 async handleReactionCreate(
1035 event: CommitCreateEvent<"space.atbb.reaction">
1036 ) {
1037 this.logger.warn("Reaction created (not implemented)", { did: event.did, rkey: event.commit.rkey });
1038 // TODO: Add reactions table to schema
1039 }
1040
1041 async handleReactionUpdate(
1042 event: CommitUpdateEvent<"space.atbb.reaction">
1043 ) {
1044 this.logger.warn("Reaction updated (not implemented)", { did: event.did, rkey: event.commit.rkey });
1045 // TODO: Add reactions table to schema
1046 }
1047
1048 async handleReactionDelete(
1049 event: CommitDeleteEvent<"space.atbb.reaction">
1050 ) {
1051 this.logger.warn("Reaction deleted (not implemented)", { did: event.did, rkey: event.commit.rkey });
1052 // TODO: Add reactions table to schema
1053 }
1054
1055 // ── Helper Methods ──────────────────────────────────────
1056
1057 /**
1058 * Ensure a user exists in the database. Creates if not exists.
1059 * @param dbOrTx - Database instance or transaction
1060 */
1061 private async ensureUser(did: string, dbOrTx: DbOrTransaction = this.db) {
1062 try {
1063 const existing = await dbOrTx.select().from(users).where(eq(users.did, did)).limit(1);
1064
1065 if (existing.length === 0) {
1066 await dbOrTx.insert(users).values({
1067 did,
1068 handle: null, // Will be updated by identity events
1069 indexedAt: new Date(),
1070 });
1071 this.logger.info("Created user", { did });
1072 }
1073 } catch (error) {
1074 this.logger.error("Failed to ensure user exists", {
1075 did,
1076 error: error instanceof Error ? error.message : String(error),
1077 });
1078 throw error;
1079 }
1080 }
1081
1082 /**
1083 * Look up a forum ID by its AT URI
1084 * @param dbOrTx - Database instance or transaction
1085 */
1086 private async getForumIdByUri(
1087 forumUri: string,
1088 dbOrTx: DbOrTransaction = this.db
1089 ): Promise<bigint | null> {
1090 const parsed = parseAtUri(forumUri);
1091 if (!parsed) return null;
1092
1093 try {
1094 const result = await dbOrTx
1095 .select({ id: forums.id })
1096 .from(forums)
1097 .where(and(eq(forums.did, parsed.did), eq(forums.rkey, parsed.rkey)))
1098 .limit(1);
1099
1100 return result.length > 0 ? result[0].id : null;
1101 } catch (error) {
1102 this.logger.error("Database error in getForumIdByUri", {
1103 operation: "getForumIdByUri",
1104 forumUri,
1105 error: error instanceof Error ? error.message : String(error),
1106 });
1107 throw error;
1108 }
1109 }
1110
1111 /**
1112 * Look up a forum ID by the forum's DID
1113 * Used for records owned by the forum (categories, modActions)
1114 * @param dbOrTx - Database instance or transaction
1115 */
1116 private async getForumIdByDid(
1117 forumDid: string,
1118 dbOrTx: DbOrTransaction = this.db
1119 ): Promise<bigint | null> {
1120 try {
1121 const result = await dbOrTx
1122 .select({ id: forums.id })
1123 .from(forums)
1124 .where(eq(forums.did, forumDid))
1125 .limit(1);
1126
1127 return result.length > 0 ? result[0].id : null;
1128 } catch (error) {
1129 this.logger.error("Database error in getForumIdByDid", {
1130 operation: "getForumIdByDid",
1131 forumDid,
1132 error: error instanceof Error ? error.message : String(error),
1133 });
1134 throw error;
1135 }
1136 }
1137
1138 /**
1139 * Look up a post ID by its AT URI
1140 * @param dbOrTx - Database instance or transaction
1141 */
1142 private async getPostIdByUri(
1143 postUri: string,
1144 dbOrTx: DbOrTransaction = this.db
1145 ): Promise<bigint | null> {
1146 const parsed = parseAtUri(postUri);
1147 if (!parsed) return null;
1148
1149 try {
1150 const result = await dbOrTx
1151 .select({ id: posts.id })
1152 .from(posts)
1153 .where(and(eq(posts.did, parsed.did), eq(posts.rkey, parsed.rkey)))
1154 .limit(1);
1155
1156 return result.length > 0 ? result[0].id : null;
1157 } catch (error) {
1158 this.logger.error("Database error in getPostIdByUri", {
1159 operation: "getPostIdByUri",
1160 postUri,
1161 error: error instanceof Error ? error.message : String(error),
1162 });
1163 throw error;
1164 }
1165 }
1166
1167 /**
1168 * Look up board ID by AT URI (at://did/collection/rkey)
1169 * @param uri - AT URI of the board
1170 * @param dbOrTx - Database instance or transaction
1171 */
1172 private async getBoardIdByUri(
1173 uri: string,
1174 dbOrTx: DbOrTransaction = this.db
1175 ): Promise<bigint | null> {
1176 const parsed = parseAtUri(uri);
1177 if (!parsed) return null;
1178
1179 try {
1180 const [result] = await dbOrTx
1181 .select({ id: boards.id })
1182 .from(boards)
1183 .where(and(eq(boards.did, parsed.did), eq(boards.rkey, parsed.rkey)))
1184 .limit(1);
1185 return result?.id ?? null;
1186 } catch (error) {
1187 this.logger.error("Database error in getBoardIdByUri", {
1188 operation: "getBoardIdByUri",
1189 uri,
1190 did: parsed.did,
1191 rkey: parsed.rkey,
1192 error: error instanceof Error ? error.message : String(error),
1193 });
1194 throw error;
1195 }
1196 }
1197
1198 /**
1199 * Look up category ID by AT URI (at://did/collection/rkey)
1200 * @param uri - AT URI of the category
1201 * @param dbOrTx - Database instance or transaction
1202 */
1203 private async getCategoryIdByUri(
1204 uri: string,
1205 dbOrTx: DbOrTransaction = this.db
1206 ): Promise<bigint | null> {
1207 const parsed = parseAtUri(uri);
1208 if (!parsed) return null;
1209
1210 try {
1211 const [result] = await dbOrTx
1212 .select({ id: categories.id })
1213 .from(categories)
1214 .where(and(eq(categories.did, parsed.did), eq(categories.rkey, parsed.rkey)))
1215 .limit(1);
1216 return result?.id ?? null;
1217 } catch (error) {
1218 this.logger.error("Database error in getCategoryIdByUri", {
1219 operation: "getCategoryIdByUri",
1220 uri,
1221 did: parsed.did,
1222 rkey: parsed.rkey,
1223 error: error instanceof Error ? error.message : String(error),
1224 });
1225 throw error;
1226 }
1227 }
1228}