A community based topic aggregation platform built on atproto

feat(server): wire up comment Jetstream consumer

Initialize comment repository and Jetstream consumer at server startup.
Consumer runs in background goroutine, indexing comment events from
atProto firehose to PostgreSQL AppView.

Consumer lifecycle:
- Start on server init
- Graceful shutdown on SIGINT/SIGTERM
- Automatic reconnection on connection loss

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+25
+25
cmd/server/main.go
··· 286 286 voteRepo := postgresRepo.NewVoteRepository(db) 287 287 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 288 288 289 + // Initialize comment repository (used by Jetstream consumer for indexing) 290 + commentRepo := postgresRepo.NewCommentRepository(db) 291 + log.Println("✅ Comment repository initialized (Jetstream indexing only)") 292 + 289 293 // Initialize feed service 290 294 feedRepo := postgresRepo.NewCommunityFeedRepository(db) 291 295 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) ··· 369 373 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 370 374 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 371 375 log.Println(" - Updating: Post vote counts atomically") 376 + 377 + // Start Jetstream consumer for comments 378 + // This consumer indexes comments from user repositories and updates parent counts 379 + commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 380 + if commentJetstreamURL == "" { 381 + // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 382 + commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.comment" 383 + } 384 + 385 + commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 386 + commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 387 + 388 + go func() { 389 + if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 390 + log.Printf("Comment Jetstream consumer stopped: %v", startErr) 391 + } 392 + }() 393 + 394 + log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 395 + log.Println(" - Indexing: social.coves.feed.comment CREATE/UPDATE/DELETE operations") 396 + log.Println(" - Updating: Post comment counts and comment reply counts atomically") 372 397 373 398 // Register XRPC routes 374 399 routes.RegisterUserRoutes(r, userService)