WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto

fix: address PR #7 critical review issues

Address all 7 blocking issues identified in comprehensive PR review:

1. parseAtUri: Replace URL constructor with regex for at:// scheme support
2. Collection names: Use full lexicon IDs (space.atbb.forum.forum, space.atbb.forum.category)
3. Forum resolution: Add getForumIdByDid() for category/modAction records owned by Forum DID
4. ModAction subject: Access record.subject.post.uri and record.subject.did correctly
5. Circuit breaker: Track consecutive failures (max 100), stop firehose on threshold
6. Transactions: Wrap ensureUser + insert operations in db.transaction()
7. Reconnection state: Set isRunning=false on exhaustion, add health check methods

Additional improvements:
- Propagate errors from all handlers to circuit breaker
- Update test collection names and add type assertions
- Enhance error logging with event context

+289 -163
+22 -22
apps/appview/src/lib/__tests__/indexer.test.ts
··· 61 $type: "space.atbb.post", 62 text: "Hello world", 63 createdAt: "2024-01-01T00:00:00Z", 64 - }, 65 }, 66 }; 67 ··· 93 }, 94 }, 95 createdAt: "2024-01-01T00:00:00Z", 96 - }, 97 }, 98 }; 99 ··· 129 }, 130 }, 131 createdAt: "2024-01-01T01:00:00Z", 132 - }, 133 }, 134 }; 135 ··· 155 $type: "space.atbb.post", 156 text: "Updated text", 157 createdAt: "2024-01-01T00:00:00Z", 158 - }, 159 }, 160 }; 161 ··· 189 it("should handle forum creation", async () => { 190 const { handleForumCreate } = await import("../indexer.js"); 191 192 - const event: CommitCreateEvent<"space.atbb.forum"> = { 193 did: "did:plc:forum", 194 time_us: 1234567890, 195 kind: "commit", 196 commit: { 197 rev: "abc", 198 operation: "create", 199 - collection: "space.atbb.forum", 200 rkey: "self", 201 cid: "cidForum", 202 record: { 203 - $type: "space.atbb.forum", 204 name: "Test Forum", 205 description: "A test forum", 206 - }, 207 }, 208 }; 209 ··· 215 it("should handle forum update", async () => { 216 const { handleForumUpdate } = await import("../indexer.js"); 217 218 - const event: CommitUpdateEvent<"space.atbb.forum"> = { 219 did: "did:plc:forum", 220 time_us: 1234567890, 221 kind: "commit", 222 commit: { 223 rev: "abc", 224 operation: "update", 225 - collection: "space.atbb.forum", 226 rkey: "self", 227 cid: "cidForumNew", 228 record: { 229 - $type: "space.atbb.forum", 230 name: "Updated Forum Name", 231 description: "Updated description", 232 - }, 233 }, 234 }; 235 ··· 241 it("should handle forum deletion", async () => { 242 const { handleForumDelete } = await import("../indexer.js"); 243 244 - const event: CommitDeleteEvent<"space.atbb.forum"> = { 245 did: "did:plc:forum", 246 time_us: 1234567890, 247 kind: "commit", 248 commit: { 249 rev: "abc", 250 operation: "delete", 251 - collection: "space.atbb.forum", 252 rkey: "self", 253 }, 254 }; ··· 263 it("should handle category creation without errors", async () => { 264 const { handleCategoryCreate } = await import("../indexer.js"); 265 266 - const event: CommitCreateEvent<"space.atbb.category"> = { 267 did: "did:plc:forum", 268 time_us: 1234567890, 269 kind: "commit", 270 commit: { 271 rev: "abc", 272 operation: "create", 273 - collection: "space.atbb.category", 274 rkey: "cat1", 275 cid: "cidCat", 276 record: { 277 - $type: "space.atbb.category", 278 name: "General Discussion", 279 forum: { 280 forum: { ··· 285 slug: "general-discussion", 286 sortOrder: 0, 287 createdAt: "2024-01-01T00:00:00Z", 288 - }, 289 }, 290 }; 291 ··· 306 }), 307 } as any); 308 309 - const event: CommitCreateEvent<"space.atbb.category"> = { 310 did: "did:plc:forum", 311 time_us: 1234567890, 312 kind: "commit", 313 commit: { 314 rev: "abc", 315 operation: "create", 316 - collection: "space.atbb.category", 317 rkey: "cat1", 318 cid: "cidCat", 319 record: { 320 - $type: "space.atbb.category", 321 name: "General Discussion", 322 forum: { 323 forum: { ··· 326 }, 327 }, 328 createdAt: "2024-01-01T00:00:00Z", 329 - }, 330 }, 331 }; 332
··· 61 $type: "space.atbb.post", 62 text: "Hello world", 63 createdAt: "2024-01-01T00:00:00Z", 64 + } as any, 65 }, 66 }; 67 ··· 93 }, 94 }, 95 createdAt: "2024-01-01T00:00:00Z", 96 + } as any, 97 }, 98 }; 99 ··· 129 }, 130 }, 131 createdAt: "2024-01-01T01:00:00Z", 132 + } as any, 133 }, 134 }; 135 ··· 155 $type: "space.atbb.post", 156 text: "Updated text", 157 createdAt: "2024-01-01T00:00:00Z", 158 + } as any, 159 }, 160 }; 161 ··· 189 it("should handle forum creation", async () => { 190 const { handleForumCreate } = await import("../indexer.js"); 191 192 + const event: CommitCreateEvent<"space.atbb.forum.forum"> = { 193 did: "did:plc:forum", 194 time_us: 1234567890, 195 kind: "commit", 196 commit: { 197 rev: "abc", 198 operation: "create", 199 + collection: "space.atbb.forum.forum", 200 rkey: "self", 201 cid: "cidForum", 202 record: { 203 + $type: "space.atbb.forum.forum", 204 name: "Test Forum", 205 description: "A test forum", 206 + } as any, 207 }, 208 }; 209 ··· 215 it("should handle forum update", async () => { 216 const { handleForumUpdate } = await import("../indexer.js"); 217 218 + const event: CommitUpdateEvent<"space.atbb.forum.forum"> = { 219 did: "did:plc:forum", 220 time_us: 1234567890, 221 kind: "commit", 222 commit: { 223 rev: "abc", 224 operation: "update", 225 + collection: "space.atbb.forum.forum", 226 rkey: "self", 227 cid: "cidForumNew", 228 record: { 229 + $type: "space.atbb.forum.forum", 230 name: "Updated Forum Name", 231 description: "Updated description", 232 + } as any, 233 }, 234 }; 235 ··· 241 it("should handle forum deletion", async () => { 242 const { handleForumDelete } = await import("../indexer.js"); 243 244 + const event: CommitDeleteEvent<"space.atbb.forum.forum"> = { 245 did: "did:plc:forum", 246 time_us: 1234567890, 247 kind: "commit", 248 commit: { 249 rev: "abc", 250 operation: "delete", 251 + collection: "space.atbb.forum.forum", 252 rkey: "self", 253 }, 254 }; ··· 263 it("should handle category creation without errors", async () => { 264 const { handleCategoryCreate } = await import("../indexer.js"); 265 266 + const event: CommitCreateEvent<"space.atbb.forum.category"> = { 267 did: "did:plc:forum", 268 time_us: 1234567890, 269 kind: "commit", 270 commit: { 271 rev: "abc", 272 operation: "create", 273 + collection: "space.atbb.forum.category", 274 rkey: "cat1", 275 cid: "cidCat", 276 record: { 277 + $type: "space.atbb.forum.category", 278 name: "General Discussion", 279 forum: { 280 forum: { ··· 285 slug: "general-discussion", 286 sortOrder: 0, 287 createdAt: "2024-01-01T00:00:00Z", 288 + } as any, 289 }, 290 }; 291 ··· 306 }), 307 } as any); 308 309 + const event: CommitCreateEvent<"space.atbb.forum.category"> = { 310 did: "did:plc:forum", 311 time_us: 1234567890, 312 kind: "commit", 313 commit: { 314 rev: "abc", 315 operation: "create", 316 + collection: "space.atbb.forum.category", 317 rkey: "cat1", 318 cid: "cidCat", 319 record: { 320 + $type: "space.atbb.forum.category", 321 name: "General Discussion", 322 forum: { 323 forum: { ··· 326 }, 327 }, 328 createdAt: "2024-01-01T00:00:00Z", 329 + } as any, 330 }, 331 }; 332
+111 -28
apps/appview/src/lib/firehose.ts
··· 14 private readonly maxReconnectAttempts = 10; 15 private readonly reconnectDelayMs = 5000; 16 17 - // Collections we're interested in 18 private readonly wantedCollections = [ 19 "space.atbb.post", 20 - "space.atbb.forum", 21 - "space.atbb.category", 22 "space.atbb.membership", 23 "space.atbb.modAction", 24 "space.atbb.reaction", ··· 56 this.handlePostCreate(event); 57 }); 58 59 - this.jetstream.onCreate("space.atbb.forum", (event) => { 60 this.handleForumCreate(event); 61 }); 62 63 - this.jetstream.onCreate("space.atbb.category", (event) => { 64 this.handleCategoryCreate(event); 65 }); 66 ··· 81 this.handlePostUpdate(event); 82 }); 83 84 - this.jetstream.onUpdate("space.atbb.forum", (event) => { 85 this.handleForumUpdate(event); 86 }); 87 88 - this.jetstream.onUpdate("space.atbb.category", (event) => { 89 this.handleCategoryUpdate(event); 90 }); 91 ··· 106 this.handlePostDelete(event); 107 }); 108 109 - this.jetstream.onDelete("space.atbb.forum", (event) => { 110 this.handleForumDelete(event); 111 }); 112 113 - this.jetstream.onDelete("space.atbb.category", (event) => { 114 this.handleCategoryDelete(event); 115 }); 116 ··· 186 } 187 188 /** 189 * Handle reconnection with exponential backoff 190 */ 191 private async handleReconnect() { 192 if (this.reconnectAttempts >= this.maxReconnectAttempts) { 193 console.error( 194 - `Max reconnect attempts (${this.maxReconnectAttempts}) reached. Giving up.` 195 ); 196 return; 197 } 198 ··· 251 } 252 } 253 254 // ── Event Handlers ────────────────────────────────────── 255 256 - private handlePostCreate = indexer.handlePostCreate; 257 - private handlePostUpdate = indexer.handlePostUpdate; 258 - private handlePostDelete = indexer.handlePostDelete; 259 260 - private handleForumCreate = indexer.handleForumCreate; 261 - private handleForumUpdate = indexer.handleForumUpdate; 262 - private handleForumDelete = indexer.handleForumDelete; 263 264 - private handleCategoryCreate = indexer.handleCategoryCreate; 265 - private handleCategoryUpdate = indexer.handleCategoryUpdate; 266 - private handleCategoryDelete = indexer.handleCategoryDelete; 267 268 - private handleMembershipCreate = indexer.handleMembershipCreate; 269 - private handleMembershipUpdate = indexer.handleMembershipUpdate; 270 - private handleMembershipDelete = indexer.handleMembershipDelete; 271 272 - private handleModActionCreate = indexer.handleModActionCreate; 273 - private handleModActionUpdate = indexer.handleModActionUpdate; 274 - private handleModActionDelete = indexer.handleModActionDelete; 275 276 - private handleReactionCreate = indexer.handleReactionCreate; 277 - private handleReactionUpdate = indexer.handleReactionUpdate; 278 - private handleReactionDelete = indexer.handleReactionDelete; 279 }
··· 14 private readonly maxReconnectAttempts = 10; 15 private readonly reconnectDelayMs = 5000; 16 17 + // Circuit breaker for handler failures 18 + private consecutiveFailures = 0; 19 + private readonly maxConsecutiveFailures = 100; 20 + 21 + // Collections we're interested in (full lexicon IDs) 22 private readonly wantedCollections = [ 23 "space.atbb.post", 24 + "space.atbb.forum.forum", 25 + "space.atbb.forum.category", 26 "space.atbb.membership", 27 "space.atbb.modAction", 28 "space.atbb.reaction", ··· 60 this.handlePostCreate(event); 61 }); 62 63 + this.jetstream.onCreate("space.atbb.forum.forum", (event) => { 64 this.handleForumCreate(event); 65 }); 66 67 + this.jetstream.onCreate("space.atbb.forum.category", (event) => { 68 this.handleCategoryCreate(event); 69 }); 70 ··· 85 this.handlePostUpdate(event); 86 }); 87 88 + this.jetstream.onUpdate("space.atbb.forum.forum", (event) => { 89 this.handleForumUpdate(event); 90 }); 91 92 + this.jetstream.onUpdate("space.atbb.forum.category", (event) => { 93 this.handleCategoryUpdate(event); 94 }); 95 ··· 110 this.handlePostDelete(event); 111 }); 112 113 + this.jetstream.onDelete("space.atbb.forum.forum", (event) => { 114 this.handleForumDelete(event); 115 }); 116 117 + this.jetstream.onDelete("space.atbb.forum.category", (event) => { 118 this.handleCategoryDelete(event); 119 }); 120 ··· 190 } 191 192 /** 193 + * Check if the firehose is healthy and actively indexing 194 + */ 195 + isHealthy(): boolean { 196 + return this.isRunning; 197 + } 198 + 199 + /** 200 + * Get detailed health status for monitoring 201 + */ 202 + getHealthStatus(): { 203 + isRunning: boolean; 204 + reconnectAttempts: number; 205 + consecutiveFailures: number; 206 + maxReconnectAttempts: number; 207 + maxConsecutiveFailures: number; 208 + } { 209 + return { 210 + isRunning: this.isRunning, 211 + reconnectAttempts: this.reconnectAttempts, 212 + consecutiveFailures: this.consecutiveFailures, 213 + maxReconnectAttempts: this.maxReconnectAttempts, 214 + maxConsecutiveFailures: this.maxConsecutiveFailures, 215 + }; 216 + } 217 + 218 + /** 219 * Handle reconnection with exponential backoff 220 */ 221 private async handleReconnect() { 222 if (this.reconnectAttempts >= this.maxReconnectAttempts) { 223 console.error( 224 + `[FATAL] Max reconnect attempts (${this.maxReconnectAttempts}) reached. Firehose indexing has stopped.` 225 + ); 226 + console.error( 227 + `[FATAL] The appview will continue serving stale data. Manual intervention required.` 228 ); 229 + this.isRunning = false; 230 return; 231 } 232 ··· 285 } 286 } 287 288 + // ── Circuit Breaker ───────────────────────────────────── 289 + 290 + /** 291 + * Wrap handler to track failures and stop firehose on excessive errors 292 + */ 293 + private async wrapHandler<T>( 294 + handler: (event: T) => Promise<void>, 295 + event: T, 296 + handlerName: string 297 + ): Promise<void> { 298 + try { 299 + await handler(event); 300 + // Success - reset failure counter 301 + this.consecutiveFailures = 0; 302 + } catch (error) { 303 + this.consecutiveFailures++; 304 + console.error( 305 + `[HANDLER ERROR] ${handlerName} failed (${this.consecutiveFailures}/${this.maxConsecutiveFailures}):`, 306 + error 307 + ); 308 + 309 + // Check circuit breaker threshold 310 + if (this.consecutiveFailures >= this.maxConsecutiveFailures) { 311 + console.error( 312 + `[CIRCUIT BREAKER] Max consecutive failures (${this.maxConsecutiveFailures}) reached. Stopping firehose to prevent data loss.` 313 + ); 314 + await this.stop(); 315 + } 316 + } 317 + } 318 + 319 // ── Event Handlers ────────────────────────────────────── 320 321 + private handlePostCreate = async (event: Parameters<typeof indexer.handlePostCreate>[0]) => 322 + this.wrapHandler(indexer.handlePostCreate, event, "handlePostCreate"); 323 + private handlePostUpdate = async (event: Parameters<typeof indexer.handlePostUpdate>[0]) => 324 + this.wrapHandler(indexer.handlePostUpdate, event, "handlePostUpdate"); 325 + private handlePostDelete = async (event: Parameters<typeof indexer.handlePostDelete>[0]) => 326 + this.wrapHandler(indexer.handlePostDelete, event, "handlePostDelete"); 327 328 + private handleForumCreate = async (event: Parameters<typeof indexer.handleForumCreate>[0]) => 329 + this.wrapHandler(indexer.handleForumCreate, event, "handleForumCreate"); 330 + private handleForumUpdate = async (event: Parameters<typeof indexer.handleForumUpdate>[0]) => 331 + this.wrapHandler(indexer.handleForumUpdate, event, "handleForumUpdate"); 332 + private handleForumDelete = async (event: Parameters<typeof indexer.handleForumDelete>[0]) => 333 + this.wrapHandler(indexer.handleForumDelete, event, "handleForumDelete"); 334 335 + private handleCategoryCreate = async (event: Parameters<typeof indexer.handleCategoryCreate>[0]) => 336 + this.wrapHandler(indexer.handleCategoryCreate, event, "handleCategoryCreate"); 337 + private handleCategoryUpdate = async (event: Parameters<typeof indexer.handleCategoryUpdate>[0]) => 338 + this.wrapHandler(indexer.handleCategoryUpdate, event, "handleCategoryUpdate"); 339 + private handleCategoryDelete = async (event: Parameters<typeof indexer.handleCategoryDelete>[0]) => 340 + this.wrapHandler(indexer.handleCategoryDelete, event, "handleCategoryDelete"); 341 342 + private handleMembershipCreate = async (event: Parameters<typeof indexer.handleMembershipCreate>[0]) => 343 + this.wrapHandler(indexer.handleMembershipCreate, event, "handleMembershipCreate"); 344 + private handleMembershipUpdate = async (event: Parameters<typeof indexer.handleMembershipUpdate>[0]) => 345 + this.wrapHandler(indexer.handleMembershipUpdate, event, "handleMembershipUpdate"); 346 + private handleMembershipDelete = async (event: Parameters<typeof indexer.handleMembershipDelete>[0]) => 347 + this.wrapHandler(indexer.handleMembershipDelete, event, "handleMembershipDelete"); 348 349 + private handleModActionCreate = async (event: Parameters<typeof indexer.handleModActionCreate>[0]) => 350 + this.wrapHandler(indexer.handleModActionCreate, event, "handleModActionCreate"); 351 + private handleModActionUpdate = async (event: Parameters<typeof indexer.handleModActionUpdate>[0]) => 352 + this.wrapHandler(indexer.handleModActionUpdate, event, "handleModActionUpdate"); 353 + private handleModActionDelete = async (event: Parameters<typeof indexer.handleModActionDelete>[0]) => 354 + this.wrapHandler(indexer.handleModActionDelete, event, "handleModActionDelete"); 355 356 + private handleReactionCreate = async (event: Parameters<typeof indexer.handleReactionCreate>[0]) => 357 + this.wrapHandler(indexer.handleReactionCreate, event, "handleReactionCreate"); 358 + private handleReactionUpdate = async (event: Parameters<typeof indexer.handleReactionUpdate>[0]) => 359 + this.wrapHandler(indexer.handleReactionUpdate, event, "handleReactionUpdate"); 360 + private handleReactionDelete = async (event: Parameters<typeof indexer.handleReactionDelete>[0]) => 361 + this.wrapHandler(indexer.handleReactionDelete, event, "handleReactionDelete"); 362 }
+156 -113
apps/appview/src/lib/indexer.ts
··· 31 32 /** 33 * Parse an AT Proto URI to extract DID, collection, and rkey 34 - * Format: at://did:plc:xxx/collection/rkey 35 */ 36 function parseAtUri(uri: string): { 37 did: string; ··· 39 rkey: string; 40 } | null { 41 try { 42 - const url = new URL(uri); 43 - if (url.protocol !== "at:") return null; 44 - 45 - const did = url.hostname; 46 - const parts = url.pathname.split("/").filter((p) => p); 47 - if (parts.length < 2) return null; 48 - 49 - const rkey = parts[parts.length - 1]; 50 - const collection = parts.slice(0, -1).join("."); 51 52 return { did, collection, rkey }; 53 - } catch { 54 return null; 55 } 56 } 57 58 /** 59 * Ensure a user exists in the database. Creates if not exists. 60 */ 61 - async function ensureUser(did: string) { 62 try { 63 - const existing = await db.select().from(users).where(eq(users.did, did)).limit(1); 64 65 if (existing.length === 0) { 66 - await db.insert(users).values({ 67 did, 68 handle: null, // Will be updated by identity events 69 indexedAt: new Date(), ··· 93 } 94 95 /** 96 * Look up a post ID by its AT URI 97 */ 98 async function getPostIdByUri(postUri: string): Promise<bigint | null> { ··· 116 try { 117 const record = event.commit.record as unknown as Post.Record; 118 119 - // Ensure author exists 120 - await ensureUser(event.did); 121 122 - // Look up parent/root for replies 123 - let rootId: bigint | null = null; 124 - let parentId: bigint | null = null; 125 126 - if (Post.isReplyRef(record.reply)) { 127 - rootId = await getPostIdByUri(record.reply.root.uri); 128 - parentId = await getPostIdByUri(record.reply.parent.uri); 129 - } 130 131 - // Insert post 132 - await db.insert(posts).values({ 133 - did: event.did, 134 - rkey: event.commit.rkey, 135 - cid: event.commit.cid, 136 - text: record.text, 137 - forumUri: record.forum?.forum.uri ?? null, 138 - rootPostId: rootId, 139 - rootUri: record.reply?.root.uri ?? null, 140 - parentPostId: parentId, 141 - parentUri: record.reply?.parent.uri ?? null, 142 - createdAt: new Date(record.createdAt), 143 - indexedAt: new Date(), 144 }); 145 146 console.log(`[CREATE] Post: ${event.did}/${event.commit.rkey}`); ··· 149 `Failed to index post create: ${event.did}/${event.commit.rkey}`, 150 error 151 ); 152 } 153 } 154 ··· 175 `Failed to update post: ${event.did}/${event.commit.rkey}`, 176 error 177 ); 178 } 179 } 180 ··· 196 `Failed to delete post: ${event.did}/${event.commit.rkey}`, 197 error 198 ); 199 } 200 } 201 202 // ── Forum Handlers ────────────────────────────────────── 203 204 export async function handleForumCreate( 205 - event: CommitCreateEvent<"space.atbb.forum"> 206 ) { 207 try { 208 const record = event.commit.record as unknown as Forum.Record; 209 210 - // Ensure owner exists 211 - await ensureUser(event.did); 212 213 - // Insert forum 214 - await db.insert(forums).values({ 215 - did: event.did, 216 - rkey: event.commit.rkey, 217 - cid: event.commit.cid, 218 - name: record.name, 219 - description: record.description ?? null, 220 - indexedAt: new Date(), 221 }); 222 223 console.log(`[CREATE] Forum: ${event.did}/${event.commit.rkey}`); ··· 226 `Failed to index forum create: ${event.did}/${event.commit.rkey}`, 227 error 228 ); 229 } 230 } 231 232 export async function handleForumUpdate( 233 - event: CommitUpdateEvent<"space.atbb.forum"> 234 ) { 235 try { 236 const record = event.commit.record as unknown as Forum.Record; ··· 251 `Failed to update forum: ${event.did}/${event.commit.rkey}`, 252 error 253 ); 254 } 255 } 256 257 export async function handleForumDelete( 258 - event: CommitDeleteEvent<"space.atbb.forum"> 259 ) { 260 try { 261 // Hard delete ··· 271 `Failed to delete forum: ${event.did}/${event.commit.rkey}`, 272 error 273 ); 274 } 275 } 276 277 // ── Category Handlers ─────────────────────────────────── 278 279 export async function handleCategoryCreate( 280 - event: CommitCreateEvent<"space.atbb.category"> 281 ) { 282 try { 283 const record = event.commit.record as unknown as Category.Record; 284 285 - // Look up forum by URI 286 - const forumId = await getForumIdByUri((record.forum as any).forum.uri); 287 288 if (!forumId) { 289 console.warn( 290 - `[CREATE] Category: Forum not found for ${(record.forum as any).forum.uri}` 291 ); 292 return; 293 } ··· 312 `Failed to index category create: ${event.did}/${event.commit.rkey}`, 313 error 314 ); 315 } 316 } 317 318 export async function handleCategoryUpdate( 319 - event: CommitUpdateEvent<"space.atbb.category"> 320 ) { 321 try { 322 const record = event.commit.record as unknown as Category.Record; 323 324 - // Look up forum by URI (may have changed) 325 - const forumId = await getForumIdByUri((record.forum as any).forum.uri); 326 327 if (!forumId) { 328 console.warn( 329 - `[UPDATE] Category: Forum not found for ${(record.forum as any).forum.uri}` 330 ); 331 return; 332 } ··· 352 `Failed to update category: ${event.did}/${event.commit.rkey}`, 353 error 354 ); 355 } 356 } 357 358 export async function handleCategoryDelete( 359 - event: CommitDeleteEvent<"space.atbb.category"> 360 ) { 361 try { 362 // Hard delete ··· 372 `Failed to delete category: ${event.did}/${event.commit.rkey}`, 373 error 374 ); 375 } 376 } 377 ··· 383 try { 384 const record = event.commit.record as unknown as Membership.Record; 385 386 - // Ensure user exists 387 - await ensureUser(event.did); 388 - 389 - // Look up forum by URI 390 - const forumId = await getForumIdByUri((record.forum as any).forum.uri); 391 392 if (!forumId) { 393 console.warn( 394 - `[CREATE] Membership: Forum not found for ${(record.forum as any).forum.uri}` 395 ); 396 return; 397 } 398 399 - // Insert membership 400 - await db.insert(memberships).values({ 401 - did: event.did, 402 - rkey: event.commit.rkey, 403 - cid: event.commit.cid, 404 - forumId, 405 - forumUri: record.forum.forum.uri, 406 - role: null, // TODO: Extract role name from roleUri or lexicon 407 - roleUri: record.role?.role.uri ?? null, 408 - joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, 409 - createdAt: new Date(record.createdAt), 410 - indexedAt: new Date(), 411 }); 412 413 console.log(`[CREATE] Membership: ${event.did}/${event.commit.rkey}`); ··· 416 `Failed to index membership create: ${event.did}/${event.commit.rkey}`, 417 error 418 ); 419 } 420 } 421 ··· 456 `Failed to update membership: ${event.did}/${event.commit.rkey}`, 457 error 458 ); 459 } 460 } 461 ··· 476 `Failed to delete membership: ${event.did}/${event.commit.rkey}`, 477 error 478 ); 479 } 480 } 481 ··· 487 try { 488 const record = event.commit.record as unknown as ModAction.Record; 489 490 - // Ensure moderator exists 491 - await ensureUser(event.did); 492 - 493 - // Look up forum by URI 494 - const forumId = await getForumIdByUri((record.forum as any).forum.uri); 495 496 if (!forumId) { 497 console.warn( 498 - `[CREATE] ModAction: Forum not found for ${(record.forum as any).forum.uri}` 499 ); 500 return; 501 } 502 503 - // Determine subject type (post or user) 504 - let subjectPostUri: string | null = null; 505 - let subjectDid: string | null = null; 506 507 - if ((record.subject as any).uri.includes("/space.atbb.post/")) { 508 - subjectPostUri = (record.subject as any).uri; 509 - } else { 510 - const parsed = parseAtUri((record.subject as any).uri); 511 - if (parsed) subjectDid = parsed.did; 512 - } 513 514 - // Insert mod action 515 - await db.insert(modActions).values({ 516 - did: event.did, 517 - rkey: event.commit.rkey, 518 - cid: event.commit.cid, 519 - forumId, 520 - action: record.action, 521 - subjectPostUri, 522 - subjectDid, 523 - reason: record.reason ?? null, 524 - createdBy: record.createdBy, 525 - expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 526 - createdAt: new Date(record.createdAt), 527 - indexedAt: new Date(), 528 }); 529 530 console.log(`[CREATE] ModAction: ${event.did}/${event.commit.rkey}`); ··· 533 `Failed to index mod action create: ${event.did}/${event.commit.rkey}`, 534 error 535 ); 536 } 537 } 538 ··· 542 try { 543 const record = event.commit.record as unknown as ModAction.Record; 544 545 - // Look up forum by URI (may have changed) 546 - const forumId = await getForumIdByUri((record.forum as any).forum.uri); 547 548 if (!forumId) { 549 console.warn( 550 - `[UPDATE] ModAction: Forum not found for ${(record.forum as any).forum.uri}` 551 ); 552 return; 553 } ··· 556 let subjectPostUri: string | null = null; 557 let subjectDid: string | null = null; 558 559 - if ((record.subject as any).uri.includes("/space.atbb.post/")) { 560 - subjectPostUri = (record.subject as any).uri; 561 - } else { 562 - const parsed = parseAtUri((record.subject as any).uri); 563 - if (parsed) subjectDid = parsed.did; 564 } 565 566 await db ··· 586 `Failed to update mod action: ${event.did}/${event.commit.rkey}`, 587 error 588 ); 589 } 590 } 591 ··· 606 `Failed to delete mod action: ${event.did}/${event.commit.rkey}`, 607 error 608 ); 609 } 610 } 611
··· 31 32 /** 33 * Parse an AT Proto URI to extract DID, collection, and rkey 34 + * Format: at://did:plc:xxx/collection.name/rkey 35 */ 36 function parseAtUri(uri: string): { 37 did: string; ··· 39 rkey: string; 40 } | null { 41 try { 42 + // AT Protocol URIs use at:// scheme which isn't recognized by URL constructor 43 + // Pattern: at://did:plc:xxx/space.atbb.post/rkey123 44 + const match = uri.match(/^at:\/\/([^/]+)\/([^/]+)\/(.+)$/); 45 + if (!match) { 46 + console.error(`Invalid AT URI format: ${uri}`); 47 + return null; 48 + } 49 50 + const [, did, collection, rkey] = match; 51 return { did, collection, rkey }; 52 + } catch (error) { 53 + console.error(`Failed to parse AT URI: ${uri}`, error); 54 return null; 55 } 56 } 57 58 /** 59 * Ensure a user exists in the database. Creates if not exists. 60 + * @param dbOrTx - Database instance or transaction 61 */ 62 + async function ensureUser(did: string, dbOrTx: Database | Parameters<Parameters<Database['transaction']>[0]>[0] = db) { 63 try { 64 + const existing = await dbOrTx.select().from(users).where(eq(users.did, did)).limit(1); 65 66 if (existing.length === 0) { 67 + await dbOrTx.insert(users).values({ 68 did, 69 handle: null, // Will be updated by identity events 70 indexedAt: new Date(), ··· 94 } 95 96 /** 97 + * Look up a forum ID by the forum's DID 98 + * Used for records owned by the forum (categories, modActions) 99 + */ 100 + async function getForumIdByDid(forumDid: string): Promise<bigint | null> { 101 + try { 102 + const result = await db 103 + .select({ id: forums.id }) 104 + .from(forums) 105 + .where(eq(forums.did, forumDid)) 106 + .limit(1); 107 + 108 + return result.length > 0 ? result[0].id : null; 109 + } catch (error) { 110 + console.error(`Failed to look up forum by DID: ${forumDid}`, error); 111 + return null; 112 + } 113 + } 114 + 115 + /** 116 * Look up a post ID by its AT URI 117 */ 118 async function getPostIdByUri(postUri: string): Promise<bigint | null> { ··· 136 try { 137 const record = event.commit.record as unknown as Post.Record; 138 139 + await db.transaction(async (tx) => { 140 + // Ensure author exists 141 + await ensureUser(event.did, tx); 142 143 + // Look up parent/root for replies 144 + let rootId: bigint | null = null; 145 + let parentId: bigint | null = null; 146 147 + if (Post.isReplyRef(record.reply)) { 148 + rootId = await getPostIdByUri(record.reply.root.uri); 149 + parentId = await getPostIdByUri(record.reply.parent.uri); 150 + } 151 152 + // Insert post 153 + await tx.insert(posts).values({ 154 + did: event.did, 155 + rkey: event.commit.rkey, 156 + cid: event.commit.cid, 157 + text: record.text, 158 + forumUri: record.forum?.forum.uri ?? null, 159 + rootPostId: rootId, 160 + rootUri: record.reply?.root.uri ?? null, 161 + parentPostId: parentId, 162 + parentUri: record.reply?.parent.uri ?? null, 163 + createdAt: new Date(record.createdAt), 164 + indexedAt: new Date(), 165 + }); 166 }); 167 168 console.log(`[CREATE] Post: ${event.did}/${event.commit.rkey}`); ··· 171 `Failed to index post create: ${event.did}/${event.commit.rkey}`, 172 error 173 ); 174 + throw error; 175 } 176 } 177 ··· 198 `Failed to update post: ${event.did}/${event.commit.rkey}`, 199 error 200 ); 201 + throw error; 202 } 203 } 204 ··· 220 `Failed to delete post: ${event.did}/${event.commit.rkey}`, 221 error 222 ); 223 + throw error; 224 } 225 } 226 227 // ── Forum Handlers ────────────────────────────────────── 228 229 export async function handleForumCreate( 230 + event: CommitCreateEvent<"space.atbb.forum.forum"> 231 ) { 232 try { 233 const record = event.commit.record as unknown as Forum.Record; 234 235 + await db.transaction(async (tx) => { 236 + // Ensure owner exists 237 + await ensureUser(event.did, tx); 238 239 + // Insert forum 240 + await tx.insert(forums).values({ 241 + did: event.did, 242 + rkey: event.commit.rkey, 243 + cid: event.commit.cid, 244 + name: record.name, 245 + description: record.description ?? null, 246 + indexedAt: new Date(), 247 + }); 248 }); 249 250 console.log(`[CREATE] Forum: ${event.did}/${event.commit.rkey}`); ··· 253 `Failed to index forum create: ${event.did}/${event.commit.rkey}`, 254 error 255 ); 256 + throw error; 257 } 258 } 259 260 export async function handleForumUpdate( 261 + event: CommitUpdateEvent<"space.atbb.forum.forum"> 262 ) { 263 try { 264 const record = event.commit.record as unknown as Forum.Record; ··· 279 `Failed to update forum: ${event.did}/${event.commit.rkey}`, 280 error 281 ); 282 + throw error; 283 } 284 } 285 286 export async function handleForumDelete( 287 + event: CommitDeleteEvent<"space.atbb.forum.forum"> 288 ) { 289 try { 290 // Hard delete ··· 300 `Failed to delete forum: ${event.did}/${event.commit.rkey}`, 301 error 302 ); 303 + throw error; 304 } 305 } 306 307 // ── Category Handlers ─────────────────────────────────── 308 309 export async function handleCategoryCreate( 310 + event: CommitCreateEvent<"space.atbb.forum.category"> 311 ) { 312 try { 313 const record = event.commit.record as unknown as Category.Record; 314 315 + // Categories are owned by the Forum DID, so event.did IS the forum DID 316 + const forumId = await getForumIdByDid(event.did); 317 318 if (!forumId) { 319 console.warn( 320 + `[CREATE] Category: Forum not found for DID ${event.did}` 321 ); 322 return; 323 } ··· 342 `Failed to index category create: ${event.did}/${event.commit.rkey}`, 343 error 344 ); 345 + throw error; 346 } 347 } 348 349 export async function handleCategoryUpdate( 350 + event: CommitUpdateEvent<"space.atbb.forum.category"> 351 ) { 352 try { 353 const record = event.commit.record as unknown as Category.Record; 354 355 + // Categories are owned by the Forum DID, so event.did IS the forum DID 356 + const forumId = await getForumIdByDid(event.did); 357 358 if (!forumId) { 359 console.warn( 360 + `[UPDATE] Category: Forum not found for DID ${event.did}` 361 ); 362 return; 363 } ··· 383 `Failed to update category: ${event.did}/${event.commit.rkey}`, 384 error 385 ); 386 + throw error; 387 } 388 } 389 390 export async function handleCategoryDelete( 391 + event: CommitDeleteEvent<"space.atbb.forum.category"> 392 ) { 393 try { 394 // Hard delete ··· 404 `Failed to delete category: ${event.did}/${event.commit.rkey}`, 405 error 406 ); 407 + throw error; 408 } 409 } 410 ··· 416 try { 417 const record = event.commit.record as unknown as Membership.Record; 418 419 + // Look up forum by URI (outside transaction) 420 + const forumId = await getForumIdByUri(record.forum.forum.uri); 421 422 if (!forumId) { 423 console.warn( 424 + `[CREATE] Membership: Forum not found for ${record.forum.forum.uri}` 425 ); 426 return; 427 } 428 429 + await db.transaction(async (tx) => { 430 + // Ensure user exists 431 + await ensureUser(event.did, tx); 432 + 433 + // Insert membership 434 + await tx.insert(memberships).values({ 435 + did: event.did, 436 + rkey: event.commit.rkey, 437 + cid: event.commit.cid, 438 + forumId, 439 + forumUri: record.forum.forum.uri, 440 + role: null, // TODO: Extract role name from roleUri or lexicon 441 + roleUri: record.role?.role.uri ?? null, 442 + joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, 443 + createdAt: new Date(record.createdAt), 444 + indexedAt: new Date(), 445 + }); 446 }); 447 448 console.log(`[CREATE] Membership: ${event.did}/${event.commit.rkey}`); ··· 451 `Failed to index membership create: ${event.did}/${event.commit.rkey}`, 452 error 453 ); 454 + throw error; 455 } 456 } 457 ··· 492 `Failed to update membership: ${event.did}/${event.commit.rkey}`, 493 error 494 ); 495 + throw error; 496 } 497 } 498 ··· 513 `Failed to delete membership: ${event.did}/${event.commit.rkey}`, 514 error 515 ); 516 + throw error; 517 } 518 } 519 ··· 525 try { 526 const record = event.commit.record as unknown as ModAction.Record; 527 528 + // ModActions are owned by the Forum DID, so event.did IS the forum DID 529 + const forumId = await getForumIdByDid(event.did); 530 531 if (!forumId) { 532 console.warn( 533 + `[CREATE] ModAction: Forum not found for DID ${event.did}` 534 ); 535 return; 536 } 537 538 + await db.transaction(async (tx) => { 539 + // Ensure moderator exists 540 + await ensureUser(record.createdBy, tx); 541 542 + // Determine subject type (post or user) 543 + let subjectPostUri: string | null = null; 544 + let subjectDid: string | null = null; 545 546 + if (record.subject.post) { 547 + subjectPostUri = record.subject.post.uri; 548 + } 549 + if (record.subject.did) { 550 + subjectDid = record.subject.did; 551 + } 552 + 553 + // Insert mod action 554 + await tx.insert(modActions).values({ 555 + did: event.did, 556 + rkey: event.commit.rkey, 557 + cid: event.commit.cid, 558 + forumId, 559 + action: record.action, 560 + subjectPostUri, 561 + subjectDid, 562 + reason: record.reason ?? null, 563 + createdBy: record.createdBy, 564 + expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 565 + createdAt: new Date(record.createdAt), 566 + indexedAt: new Date(), 567 + }); 568 }); 569 570 console.log(`[CREATE] ModAction: ${event.did}/${event.commit.rkey}`); ··· 573 `Failed to index mod action create: ${event.did}/${event.commit.rkey}`, 574 error 575 ); 576 + throw error; 577 } 578 } 579 ··· 583 try { 584 const record = event.commit.record as unknown as ModAction.Record; 585 586 + // ModActions are owned by the Forum DID, so event.did IS the forum DID 587 + const forumId = await getForumIdByDid(event.did); 588 589 if (!forumId) { 590 console.warn( 591 + `[UPDATE] ModAction: Forum not found for DID ${event.did}` 592 ); 593 return; 594 } ··· 597 let subjectPostUri: string | null = null; 598 let subjectDid: string | null = null; 599 600 + if (record.subject.post) { 601 + subjectPostUri = record.subject.post.uri; 602 + } 603 + if (record.subject.did) { 604 + subjectDid = record.subject.did; 605 } 606 607 await db ··· 627 `Failed to update mod action: ${event.did}/${event.commit.rkey}`, 628 error 629 ); 630 + throw error; 631 } 632 } 633 ··· 648 `Failed to delete mod action: ${event.did}/${event.commit.rkey}`, 649 error 650 ); 651 + throw error; 652 } 653 } 654