import type { CommitCreateEvent, CommitDeleteEvent, CommitUpdateEvent, } from "@skyware/jetstream"; import type { Database, DbOrTransaction } from "@atbb/db"; import type { Logger } from "@atbb/logger"; import { posts, forums, categories, boards, users, memberships, modActions, roles, rolePermissions, themes, themePolicies, themePolicyAvailableThemes, } from "@atbb/db"; import { eq, and } from "drizzle-orm"; import { parseAtUri } from "./at-uri.js"; import { BanEnforcer } from "./ban-enforcer.js"; import { SpaceAtbbPost as Post, SpaceAtbbForumForum as Forum, SpaceAtbbForumCategory as Category, SpaceAtbbForumBoard as Board, SpaceAtbbMembership as Membership, SpaceAtbbModAction as ModAction, SpaceAtbbForumRole as Role, SpaceAtbbForumTheme as Theme, SpaceAtbbForumThemePolicy as ThemePolicy, } from "@atbb/lexicon"; // ── Collection Config Types ───────────────────────────── /** * Configuration for a data-driven collection handler. * Encodes the per-collection logic that differs across the 5 indexed types, * while the generic handler methods supply the shared try/catch/log/throw scaffolding. */ interface CollectionConfig { /** Human-readable name for logging (e.g. "Post", "Forum") */ name: string; /** Drizzle table reference */ table: any; /** "hard" = DELETE FROM (all non-post collections) */ deleteStrategy: "hard"; /** Call ensureUser(event.did) before insert? (user-owned records) */ ensureUserOnCreate?: boolean; /** * Transform event+record into DB insert values. * Return null to skip the insert (e.g. when a required foreign key is missing). */ toInsertValues: ( event: any, record: TRecord, tx: DbOrTransaction ) => Promise | null>; /** * Transform event+record into DB update set values. * Runs inside a transaction. Return null to skip the update. */ toUpdateValues: ( event: any, record: TRecord, tx: DbOrTransaction ) => Promise | null>; /** * Optional hook called after a row is inserted or updated, within the same * transaction. Receives the row's numeric id (bigint) so callers can write * to child tables (e.g. role_permissions). */ afterUpsert?: ( event: any, record: TRecord, rowId: bigint, tx: DbOrTransaction ) => Promise; } /** * Indexer class for processing AT Proto firehose events * Converts events into database records for the atBB AppView */ export class Indexer { private banEnforcer: BanEnforcer; constructor(private db: Database, private logger: Logger) { this.banEnforcer = new BanEnforcer(db, logger); } // ── Collection Configs ────────────────────────────────── private postConfig: CollectionConfig = { name: "Post", table: posts, deleteStrategy: "hard", ensureUserOnCreate: true, toInsertValues: async (event, record, tx) => { // Look up parent/root for replies let rootId: bigint | null = null; let parentId: bigint | null = null; if (Post.isReplyRef(record.reply)) { rootId = await this.getPostIdByUri(record.reply.root.uri, tx); parentId = await this.getPostIdByUri(record.reply.parent.uri, tx); } else if (record.reply) { // reply ref present but $type omitted — rootPostId/parentPostId will be null, // making this reply unreachable in thread navigation (data corruption). this.logger.error("Post reply ref missing $type — rootPostId/parentPostId not resolved", { operation: "Post CREATE", postDid: event.did, postRkey: event.commit.rkey, errorId: "POST_REPLY_REF_MISSING_TYPE", }); } // Look up board ID if board reference exists let boardId: bigint | null = null; if (record.board?.board.uri) { boardId = await this.getBoardIdByUri(record.board.board.uri, tx); if (!boardId) { this.logger.error("Failed to index post: board not found", { operation: "Post CREATE", postDid: event.did, postRkey: event.commit.rkey, boardUri: record.board.board.uri, errorId: "POST_BOARD_MISSING", }); throw new Error(`Board not found: ${record.board.board.uri}`); } } return { did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, title: record.reply ? null : (record.title ?? null), text: record.text, forumUri: record.forum?.forum.uri ?? null, boardUri: record.board?.board.uri ?? null, boardId, rootPostId: rootId, rootUri: record.reply?.root.uri ?? null, parentPostId: parentId, parentUri: record.reply?.parent.uri ?? null, createdAt: new Date(record.createdAt), indexedAt: new Date(), }; }, toUpdateValues: async (event, record, tx) => { // Look up board ID if board reference exists let boardId: bigint | null = null; if (record.board?.board.uri) { boardId = await this.getBoardIdByUri(record.board.board.uri, tx); if (!boardId) { this.logger.error("Failed to index post: board not found", { operation: "Post UPDATE", postDid: event.did, postRkey: event.commit.rkey, boardUri: record.board.board.uri, errorId: "POST_BOARD_MISSING", }); throw new Error(`Board not found: ${record.board.board.uri}`); } } return { cid: event.commit.cid, title: record.reply ? null : (record.title ?? null), text: record.text, forumUri: record.forum?.forum.uri ?? null, boardUri: record.board?.board.uri ?? null, boardId, indexedAt: new Date(), }; }, }; private forumConfig: CollectionConfig = { name: "Forum", table: forums, deleteStrategy: "hard", ensureUserOnCreate: true, toInsertValues: async (event, record) => ({ did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, name: record.name, description: record.description ?? null, indexedAt: new Date(), }), toUpdateValues: async (event, record) => ({ cid: event.commit.cid, name: record.name, description: record.description ?? null, indexedAt: new Date(), }), }; private categoryConfig: CollectionConfig = { name: "Category", table: categories, deleteStrategy: "hard", toInsertValues: async (event, record, tx) => { // Categories are owned by the Forum DID, so event.did IS the forum DID const forumId = await this.getForumIdByDid(event.did, tx); if (!forumId) { this.logger.warn("Category: Forum not found for DID", { operation: "Category CREATE", did: event.did, }); return null; } return { did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, forumId, name: record.name, description: record.description ?? null, slug: record.slug ?? null, sortOrder: record.sortOrder ?? 0, createdAt: new Date(record.createdAt), indexedAt: new Date(), }; }, toUpdateValues: async (event, record, tx) => { // Categories are owned by the Forum DID, so event.did IS the forum DID const forumId = await this.getForumIdByDid(event.did, tx); if (!forumId) { this.logger.warn("Category: Forum not found for DID", { operation: "Category UPDATE", did: event.did, }); return null; } return { cid: event.commit.cid, forumId, name: record.name, description: record.description ?? null, slug: record.slug ?? null, sortOrder: record.sortOrder ?? 0, indexedAt: new Date(), }; }, }; private boardConfig: CollectionConfig = { name: "Board", table: boards, deleteStrategy: "hard", toInsertValues: async (event, record, tx) => { // Boards are owned by Forum DID const categoryId = await this.getCategoryIdByUri( record.category.category.uri, tx ); if (!categoryId) { this.logger.error("Failed to index board: category not found", { operation: "Board CREATE", boardDid: event.did, boardRkey: event.commit.rkey, categoryUri: record.category.category.uri, errorId: "BOARD_CATEGORY_MISSING", }); throw new Error(`Category not found: ${record.category.category.uri}`); } return { did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, name: record.name, description: record.description ?? null, slug: record.slug ?? null, sortOrder: record.sortOrder ?? null, categoryId, categoryUri: record.category.category.uri, createdAt: new Date(record.createdAt), indexedAt: new Date(), }; }, toUpdateValues: async (event, record, tx) => { const categoryId = await this.getCategoryIdByUri( record.category.category.uri, tx ); if (!categoryId) { this.logger.error("Failed to index board: category not found", { operation: "Board UPDATE", boardDid: event.did, boardRkey: event.commit.rkey, categoryUri: record.category.category.uri, errorId: "BOARD_CATEGORY_MISSING", }); throw new Error(`Category not found: ${record.category.category.uri}`); } return { cid: event.commit.cid, name: record.name, description: record.description ?? null, slug: record.slug ?? null, sortOrder: record.sortOrder ?? null, categoryId, categoryUri: record.category.category.uri, indexedAt: new Date(), }; }, }; private roleConfig: CollectionConfig = { name: "Role", table: roles, deleteStrategy: "hard", toInsertValues: async (event, record) => ({ did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, name: record.name, description: record.description ?? null, priority: record.priority, createdAt: new Date(record.createdAt), indexedAt: new Date(), }), toUpdateValues: async (event, record) => ({ cid: event.commit.cid, name: record.name, description: record.description ?? null, priority: record.priority, indexedAt: new Date(), }), afterUpsert: async (event, record, roleId, tx) => { // Replace all permissions for this role atomically await tx .delete(rolePermissions) .where(eq(rolePermissions.roleId, roleId)); if (record.permissions && record.permissions.length > 0) { await tx.insert(rolePermissions).values( record.permissions.map((permission: string) => ({ roleId, permission, })) ); } }, }; private themeConfig: CollectionConfig = { name: "Theme", table: themes, deleteStrategy: "hard", toInsertValues: async (event, record) => ({ did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, name: record.name, colorScheme: record.colorScheme as string, tokens: record.tokens, cssOverrides: (record.cssOverrides as string | undefined) ?? null, fontUrls: (record.fontUrls as string[] | undefined) ?? null, createdAt: new Date(record.createdAt as string), indexedAt: new Date(), }), toUpdateValues: async (event, record) => ({ cid: event.commit.cid, name: record.name, colorScheme: record.colorScheme as string, tokens: record.tokens, cssOverrides: (record.cssOverrides as string | undefined) ?? null, fontUrls: (record.fontUrls as string[] | undefined) ?? null, indexedAt: new Date(), }), }; private themePolicyConfig: CollectionConfig = { name: "ThemePolicy", table: themePolicies, deleteStrategy: "hard", toInsertValues: async (event, record) => ({ did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, defaultLightThemeUri: record.defaultLightTheme.uri, defaultDarkThemeUri: record.defaultDarkTheme.uri, allowUserChoice: record.allowUserChoice, indexedAt: new Date(), }), toUpdateValues: async (event, record) => ({ cid: event.commit.cid, defaultLightThemeUri: record.defaultLightTheme.uri, defaultDarkThemeUri: record.defaultDarkTheme.uri, allowUserChoice: record.allowUserChoice, indexedAt: new Date(), }), afterUpsert: async (_event, record, policyId, tx) => { // Atomically replace all available-theme rows for this policy await tx .delete(themePolicyAvailableThemes) .where(eq(themePolicyAvailableThemes.policyId, policyId)); const available = record.availableThemes ?? []; if (available.length > 0) { await tx.insert(themePolicyAvailableThemes).values( available.map((themeRef) => ({ policyId, themeUri: themeRef.uri, themeCid: themeRef.cid ?? null, })) ); } }, }; private membershipConfig: CollectionConfig = { name: "Membership", table: memberships, deleteStrategy: "hard", ensureUserOnCreate: true, toInsertValues: async (event, record, tx) => { // Look up forum by URI (inside transaction) const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx); if (!forumId) { this.logger.warn("Membership: Forum not found", { operation: "Membership CREATE", forumUri: record.forum.forum.uri, }); return null; } return { did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, forumId, forumUri: record.forum.forum.uri, role: null, // TODO: Extract role name from roleUri or lexicon roleUri: record.role?.role.uri ?? null, joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, createdAt: new Date(record.createdAt), indexedAt: new Date(), }; }, toUpdateValues: async (event, record, tx) => { // Look up forum by URI (may have changed) const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx); if (!forumId) { this.logger.warn("Membership: Forum not found", { operation: "Membership UPDATE", forumUri: record.forum.forum.uri, }); return null; } return { cid: event.commit.cid, forumId, forumUri: record.forum.forum.uri, role: null, // TODO: Extract role name from roleUri or lexicon roleUri: record.role?.role.uri ?? null, joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, indexedAt: new Date(), }; }, }; private modActionConfig: CollectionConfig = { name: "ModAction", table: modActions, deleteStrategy: "hard", toInsertValues: async (event, record, tx) => { // ModActions are owned by the Forum DID, so event.did IS the forum DID const forumId = await this.getForumIdByDid(event.did, tx); if (!forumId) { this.logger.warn("ModAction: Forum not found for DID", { operation: "ModAction CREATE", did: event.did, }); return null; } // Ensure moderator exists await this.ensureUser(record.createdBy, tx); // Determine subject type (post or user) let subjectPostUri: string | null = null; let subjectDid: string | null = null; if (record.subject.post) { subjectPostUri = record.subject.post.uri; } if (record.subject.did) { subjectDid = record.subject.did; } return { did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, forumId, action: record.action, subjectPostUri, subjectDid, reason: record.reason ?? null, createdBy: record.createdBy, expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, createdAt: new Date(record.createdAt), indexedAt: new Date(), }; }, toUpdateValues: async (event, record, tx) => { // ModActions are owned by the Forum DID, so event.did IS the forum DID const forumId = await this.getForumIdByDid(event.did, tx); if (!forumId) { this.logger.warn("ModAction: Forum not found for DID", { operation: "ModAction UPDATE", did: event.did, }); return null; } // Determine subject type (post or user) let subjectPostUri: string | null = null; let subjectDid: string | null = null; if (record.subject.post) { subjectPostUri = record.subject.post.uri; } if (record.subject.did) { subjectDid = record.subject.did; } return { cid: event.commit.cid, forumId, action: record.action, subjectPostUri, subjectDid, reason: record.reason ?? null, createdBy: record.createdBy, expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, indexedAt: new Date(), }; }, }; // ── Generic Handler Methods ───────────────────────────── /** * Generic create handler. Wraps the insert in a transaction, * optionally ensures the user exists, and delegates to the * config's toInsertValues callback for collection-specific logic. */ private async genericCreate( config: CollectionConfig, event: any ): Promise { try { const record = event.commit.record as unknown as TRecord; let skipped = false; await this.db.transaction(async (tx) => { if (config.ensureUserOnCreate) { await this.ensureUser(event.did, tx); } const values = await config.toInsertValues(event, record, tx); if (!values) { skipped = true; return; // Skip insert (e.g. foreign key not found) } if (config.afterUpsert) { const [inserted] = await tx .insert(config.table) .values(values) .returning({ id: config.table.id }); await config.afterUpsert(event, record, inserted.id, tx); } else { await tx.insert(config.table).values(values); } }); // Only log success if insert actually happened if (!skipped) { this.logger.info(`${config.name} created`, { did: event.did, rkey: event.commit.rkey, }); } return !skipped; } catch (error) { this.logger.error(`Failed to index ${config.name.toLowerCase()} create`, { did: event.did, rkey: event.commit.rkey, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Generic update handler. Wraps the update in a transaction * and delegates to the config's toUpdateValues callback for * collection-specific logic. */ private async genericUpdate( config: CollectionConfig, event: any ) { try { const record = event.commit.record as unknown as TRecord; let skipped = false; await this.db.transaction(async (tx) => { const values = await config.toUpdateValues(event, record, tx); if (!values) { skipped = true; return; // Skip update (e.g. foreign key not found) } if (config.afterUpsert) { const [updated] = await tx .update(config.table) .set(values) .where( and( eq(config.table.did, event.did), eq(config.table.rkey, event.commit.rkey) ) ) .returning({ id: config.table.id }); if (!updated) return; // Out-of-order UPDATE before CREATE: no row to update yet await config.afterUpsert(event, record, updated.id, tx); } else { await tx .update(config.table) .set(values) .where( and( eq(config.table.did, event.did), eq(config.table.rkey, event.commit.rkey) ) ); } }); // Only log success if update actually happened if (!skipped) { this.logger.info(`${config.name} updated`, { did: event.did, rkey: event.commit.rkey, }); } } catch (error) { this.logger.error(`Failed to update ${config.name.toLowerCase()}`, { did: event.did, rkey: event.commit.rkey, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Generic delete handler. Hard-deletes a record (DELETE FROM). * Posts use handlePostDelete instead (always tombstone). */ private async genericDelete(config: CollectionConfig, event: any) { try { await this.db .delete(config.table) .where( and( eq(config.table.did, event.did), eq(config.table.rkey, event.commit.rkey) ) ); this.logger.info(`${config.name} deleted`, { did: event.did, rkey: event.commit.rkey, }); } catch (error) { this.logger.error(`Failed to delete ${config.name.toLowerCase()}`, { did: event.did, rkey: event.commit.rkey, error: error instanceof Error ? error.message : String(error), }); throw error; } } // ── Post Handlers ─────────────────────────────────────── async handlePostCreate(event: CommitCreateEvent<"space.atbb.post">) { const banned = await this.banEnforcer.isBanned(event.did); if (banned) { this.logger.info("Skipping post from banned user", { did: event.did, rkey: event.commit.rkey, }); return; } await this.genericCreate(this.postConfig, event); } async handlePostUpdate(event: CommitUpdateEvent<"space.atbb.post">) { await this.genericUpdate(this.postConfig, event); } /** * Handles a user-initiated post delete from the PDS. * Always tombstones: replaces personal content with a placeholder and marks * deletedByUser=true. The row is kept so threads referencing this post as * their root or parent remain intact. Personal content is gone; structure is preserved. */ async handlePostDelete(event: CommitDeleteEvent<"space.atbb.post">) { const { did, commit: { rkey } } = event; try { await this.db .update(posts) .set({ text: "[user deleted this post]", deletedByUser: true }) .where(and(eq(posts.did, did), eq(posts.rkey, rkey))); this.logger.info("Post tombstoned: content replaced, structure preserved", { did, rkey }); } catch (error) { this.logger.error("Failed to tombstone post", { did, rkey, error: error instanceof Error ? error.message : String(error), }); throw error; } } // ── Forum Handlers ────────────────────────────────────── async handleForumCreate(event: CommitCreateEvent<"space.atbb.forum.forum">) { await this.genericCreate(this.forumConfig, event); } async handleForumUpdate(event: CommitUpdateEvent<"space.atbb.forum.forum">) { await this.genericUpdate(this.forumConfig, event); } async handleForumDelete(event: CommitDeleteEvent<"space.atbb.forum.forum">) { await this.genericDelete(this.forumConfig, event); } // ── Category Handlers ─────────────────────────────────── async handleCategoryCreate( event: CommitCreateEvent<"space.atbb.forum.category"> ) { await this.genericCreate(this.categoryConfig, event); } async handleCategoryUpdate( event: CommitUpdateEvent<"space.atbb.forum.category"> ) { await this.genericUpdate(this.categoryConfig, event); } async handleCategoryDelete( event: CommitDeleteEvent<"space.atbb.forum.category"> ) { await this.genericDelete(this.categoryConfig, event); } // ── Board Handlers ────────────────────────────────────── async handleBoardCreate(event: CommitCreateEvent<"space.atbb.forum.board">) { await this.genericCreate(this.boardConfig, event); } async handleBoardUpdate(event: CommitUpdateEvent<"space.atbb.forum.board">) { await this.genericUpdate(this.boardConfig, event); } async handleBoardDelete(event: CommitDeleteEvent<"space.atbb.forum.board">) { await this.genericDelete(this.boardConfig, event); } // ── Role Handlers ─────────────────────────────────────── async handleRoleCreate(event: CommitCreateEvent<"space.atbb.forum.role">) { await this.genericCreate(this.roleConfig, event); } async handleRoleUpdate(event: CommitUpdateEvent<"space.atbb.forum.role">) { await this.genericUpdate(this.roleConfig, event); } async handleRoleDelete(event: CommitDeleteEvent<"space.atbb.forum.role">) { await this.genericDelete(this.roleConfig, event); } // ── Theme Handlers ────────────────────────────────────── async handleThemeCreate(event: CommitCreateEvent<"space.atbb.forum.theme">) { await this.genericCreate(this.themeConfig, event); } async handleThemeUpdate(event: CommitUpdateEvent<"space.atbb.forum.theme">) { await this.genericUpdate(this.themeConfig, event); } async handleThemeDelete(event: CommitDeleteEvent<"space.atbb.forum.theme">) { await this.genericDelete(this.themeConfig, event); } // ── ThemePolicy Handlers ───────────────────────────────── async handleThemePolicyCreate(event: CommitCreateEvent<"space.atbb.forum.themePolicy">) { await this.genericCreate(this.themePolicyConfig, event); } async handleThemePolicyUpdate(event: CommitUpdateEvent<"space.atbb.forum.themePolicy">) { await this.genericUpdate(this.themePolicyConfig, event); } async handleThemePolicyDelete(event: CommitDeleteEvent<"space.atbb.forum.themePolicy">) { await this.genericDelete(this.themePolicyConfig, event); } // ── Membership Handlers ───────────────────────────────── async handleMembershipCreate( event: CommitCreateEvent<"space.atbb.membership"> ) { await this.genericCreate(this.membershipConfig, event); } async handleMembershipUpdate( event: CommitUpdateEvent<"space.atbb.membership"> ) { await this.genericUpdate(this.membershipConfig, event); } async handleMembershipDelete( event: CommitDeleteEvent<"space.atbb.membership"> ) { await this.genericDelete(this.membershipConfig, event); } // ── ModAction Handlers ────────────────────────────────── async handleModActionCreate( event: CommitCreateEvent<"space.atbb.modAction"> ) { const record = event.commit.record as unknown as ModAction.Record; const isBan = record.action === "space.atbb.modAction.ban" && record.subject.did; const isUnban = record.action === "space.atbb.modAction.unban" && record.subject.did; try { if (isBan) { // Custom atomic path: insert ban record + applyBan in one transaction let skipped = false; await this.db.transaction(async (tx) => { const forumId = await this.getForumIdByDid(event.did, tx); if (!forumId) { this.logger.warn("ModAction (ban): Forum not found for DID", { operation: "ModAction CREATE", did: event.did, }); skipped = true; return; } await this.ensureUser(record.createdBy, tx); await tx.insert(modActions).values({ did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, forumId, action: record.action, subjectPostUri: null, subjectDid: record.subject.did ?? null, reason: record.reason ?? null, createdBy: record.createdBy, expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, createdAt: new Date(record.createdAt), indexedAt: new Date(), }); await this.banEnforcer.applyBan(record.subject.did!, tx); }); if (!skipped) { this.logger.info("ModAction (ban) created", { did: event.did, rkey: event.commit.rkey, }); } } else if (isUnban) { // Custom atomic path: insert unban record + liftBan in one transaction let skipped = false; await this.db.transaction(async (tx) => { const forumId = await this.getForumIdByDid(event.did, tx); if (!forumId) { this.logger.warn("ModAction (unban): Forum not found for DID", { operation: "ModAction CREATE", did: event.did, }); skipped = true; return; } await this.ensureUser(record.createdBy, tx); await tx.insert(modActions).values({ did: event.did, rkey: event.commit.rkey, cid: event.commit.cid, forumId, action: record.action, subjectPostUri: null, subjectDid: record.subject.did ?? null, reason: record.reason ?? null, createdBy: record.createdBy, expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, createdAt: new Date(record.createdAt), indexedAt: new Date(), }); await this.banEnforcer.liftBan(record.subject.did!, tx); }); if (!skipped) { this.logger.info("ModAction (unban) created", { did: event.did, rkey: event.commit.rkey, }); } } else { // Generic path for all other mod actions (mute, pin, lock, delete, etc.) await this.genericCreate(this.modActionConfig, event); // Ban/unban without subject.did — shouldn't happen but log if it does if ( record.action === "space.atbb.modAction.ban" || record.action === "space.atbb.modAction.unban" ) { this.logger.warn("ModAction: ban/unban action missing subject.did, skipping enforcement", { did: event.did, rkey: event.commit.rkey, action: record.action, }); } } } catch (error) { this.logger.error("Failed to index ModAction create", { did: event.did, rkey: event.commit.rkey, error: error instanceof Error ? error.message : String(error), }); throw error; } } async handleModActionUpdate( event: CommitUpdateEvent<"space.atbb.modAction"> ) { await this.genericUpdate(this.modActionConfig, event); } async handleModActionDelete( event: CommitDeleteEvent<"space.atbb.modAction"> ) { try { await this.db.transaction(async (tx) => { // 1. Read before delete to capture action type and subject const [existing] = await tx .select({ action: modActions.action, subjectDid: modActions.subjectDid, }) .from(modActions) .where( and( eq(modActions.did, event.did), eq(modActions.rkey, event.commit.rkey) ) ) .limit(1); // 2. Hard delete the record await tx .delete(modActions) .where( and( eq(modActions.did, event.did), eq(modActions.rkey, event.commit.rkey) ) ); // 3. Restore posts if the deleted record was a ban if ( existing?.action === "space.atbb.modAction.ban" && existing?.subjectDid ) { await this.banEnforcer.liftBan(existing.subjectDid, tx); } }); this.logger.info("ModAction deleted", { did: event.did, rkey: event.commit.rkey, }); } catch (error) { this.logger.error("Failed to delete modAction", { did: event.did, rkey: event.commit.rkey, error: error instanceof Error ? error.message : String(error), }); throw error; } } // ── Reaction Handlers (Stub) ──────────────────────────── async handleReactionCreate( event: CommitCreateEvent<"space.atbb.reaction"> ) { this.logger.warn("Reaction created (not implemented)", { did: event.did, rkey: event.commit.rkey }); // TODO: Add reactions table to schema } async handleReactionUpdate( event: CommitUpdateEvent<"space.atbb.reaction"> ) { this.logger.warn("Reaction updated (not implemented)", { did: event.did, rkey: event.commit.rkey }); // TODO: Add reactions table to schema } async handleReactionDelete( event: CommitDeleteEvent<"space.atbb.reaction"> ) { this.logger.warn("Reaction deleted (not implemented)", { did: event.did, rkey: event.commit.rkey }); // TODO: Add reactions table to schema } // ── Helper Methods ────────────────────────────────────── /** * Ensure a user exists in the database. Creates if not exists. * @param dbOrTx - Database instance or transaction */ private async ensureUser(did: string, dbOrTx: DbOrTransaction = this.db) { try { const existing = await dbOrTx.select().from(users).where(eq(users.did, did)).limit(1); if (existing.length === 0) { await dbOrTx.insert(users).values({ did, handle: null, // Will be updated by identity events indexedAt: new Date(), }); this.logger.info("Created user", { did }); } } catch (error) { this.logger.error("Failed to ensure user exists", { did, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Look up a forum ID by its AT URI * @param dbOrTx - Database instance or transaction */ private async getForumIdByUri( forumUri: string, dbOrTx: DbOrTransaction = this.db ): Promise { const parsed = parseAtUri(forumUri); if (!parsed) return null; try { const result = await dbOrTx .select({ id: forums.id }) .from(forums) .where(and(eq(forums.did, parsed.did), eq(forums.rkey, parsed.rkey))) .limit(1); return result.length > 0 ? result[0].id : null; } catch (error) { this.logger.error("Database error in getForumIdByUri", { operation: "getForumIdByUri", forumUri, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Look up a forum ID by the forum's DID * Used for records owned by the forum (categories, modActions) * @param dbOrTx - Database instance or transaction */ private async getForumIdByDid( forumDid: string, dbOrTx: DbOrTransaction = this.db ): Promise { try { const result = await dbOrTx .select({ id: forums.id }) .from(forums) .where(eq(forums.did, forumDid)) .limit(1); return result.length > 0 ? result[0].id : null; } catch (error) { this.logger.error("Database error in getForumIdByDid", { operation: "getForumIdByDid", forumDid, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Look up a post ID by its AT URI * @param dbOrTx - Database instance or transaction */ private async getPostIdByUri( postUri: string, dbOrTx: DbOrTransaction = this.db ): Promise { const parsed = parseAtUri(postUri); if (!parsed) return null; try { const result = await dbOrTx .select({ id: posts.id }) .from(posts) .where(and(eq(posts.did, parsed.did), eq(posts.rkey, parsed.rkey))) .limit(1); return result.length > 0 ? result[0].id : null; } catch (error) { this.logger.error("Database error in getPostIdByUri", { operation: "getPostIdByUri", postUri, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Look up board ID by AT URI (at://did/collection/rkey) * @param uri - AT URI of the board * @param dbOrTx - Database instance or transaction */ private async getBoardIdByUri( uri: string, dbOrTx: DbOrTransaction = this.db ): Promise { const parsed = parseAtUri(uri); if (!parsed) return null; try { const [result] = await dbOrTx .select({ id: boards.id }) .from(boards) .where(and(eq(boards.did, parsed.did), eq(boards.rkey, parsed.rkey))) .limit(1); return result?.id ?? null; } catch (error) { this.logger.error("Database error in getBoardIdByUri", { operation: "getBoardIdByUri", uri, did: parsed.did, rkey: parsed.rkey, error: error instanceof Error ? error.message : String(error), }); throw error; } } /** * Look up category ID by AT URI (at://did/collection/rkey) * @param uri - AT URI of the category * @param dbOrTx - Database instance or transaction */ private async getCategoryIdByUri( uri: string, dbOrTx: DbOrTransaction = this.db ): Promise { const parsed = parseAtUri(uri); if (!parsed) return null; try { const [result] = await dbOrTx .select({ id: categories.id }) .from(categories) .where(and(eq(categories.did, parsed.did), eq(categories.rkey, parsed.rkey))) .limit(1); return result?.id ?? null; } catch (error) { this.logger.error("Database error in getCategoryIdByUri", { operation: "getCategoryIdByUri", uri, did: parsed.did, rkey: parsed.rkey, error: error instanceof Error ? error.message : String(error), }); throw error; } } }