Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee

feat: implement Jetstream firehose consumer for real-time feed

Add firehose-based feed that consumes AT Protocol events via Jetstream,
replacing polling for dramatically improved performance:

- Add internal/firehose package with:
- config.go: Jetstream configuration and Arabica collection list
- index.go: BoltDB-backed feed index with multi-key indexes
- consumer.go: WebSocket consumer with reconnection and backoff
- adapter.go: Bridge between firehose and feed service interfaces

- Modify feed service to optionally use firehose index when available
- Add -firehose CLI flag to enable the feature
- Add graceful shutdown handling for firehose consumer

Benefits:
- Real-time feed updates (<1s latency vs 5min cache)
- Zero API calls per feed request (vs ~10N for N users)
- Automatic user discovery from firehose events
- Fallback to polling if firehose unavailable

Co-Authored-By: Claude <noreply@anthropic.com>

authored by

Claude
Claude
and committed by pdewey.com f5bcd286 f94d1997

+2069 -18
-3
BACKLOG.md
··· 26 26 27 27 ## Fixes 28 28 29 - - Adding new gear (grinders, etc.) from profile page redirects to brews on profile page after (should stay on current page) 30 - - Brews on profile page are stored in chronological order, should be reverse chronological (newest first) 31 - 32 29 - [Future work]: adjust timing of caching in feed, maybe use firehose and a sqlite database since we are only storing a few anyway 33 30 - Goal: reduce pings to server when idling 34 31
+122 -10
cmd/server/main.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 5 + "flag" 4 6 "fmt" 5 7 "net/http" 6 8 "os" 9 + "os/signal" 7 10 "path/filepath" 11 + "syscall" 8 12 "time" 9 13 10 14 "arabica/internal/atproto" 11 15 "arabica/internal/database/boltstore" 12 16 "arabica/internal/feed" 17 + "arabica/internal/firehose" 13 18 "arabica/internal/handlers" 14 19 "arabica/internal/routing" 15 20 ··· 18 23 ) 19 24 20 25 func main() { 26 + // Parse command-line flags 27 + useFirehose := flag.Bool("firehose", false, "Enable firehose-based feed (Jetstream consumer)") 28 + flag.Parse() 29 + 21 30 // Configure zerolog 22 31 // Set log level from environment (default: info) 23 32 logLevel := os.Getenv("LOG_LEVEL") ··· 46 55 }) 47 56 } 48 57 49 - log.Info().Msg("Starting Arabica Coffee Tracker") 58 + log.Info().Bool("firehose", *useFirehose).Msg("Starting Arabica Coffee Tracker") 50 59 51 60 // Get port from env or use default 52 61 port := os.Getenv("PORT") ··· 123 132 Int("registered_users", feedRegistry.Count()). 124 133 Msg("Feed service initialized with persistent registry") 125 134 135 + // Setup context for graceful shutdown 136 + ctx, cancel := context.WithCancel(context.Background()) 137 + defer cancel() 138 + 139 + // Handle shutdown signals 140 + sigCh := make(chan os.Signal, 1) 141 + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) 142 + 143 + // Initialize firehose consumer if enabled 144 + var firehoseConsumer *firehose.Consumer 145 + if *useFirehose { 146 + // Determine feed index path 147 + feedIndexPath := os.Getenv("ARABICA_FEED_INDEX_PATH") 148 + if feedIndexPath == "" { 149 + dataDir := os.Getenv("XDG_DATA_HOME") 150 + if dataDir == "" { 151 + home, err := os.UserHomeDir() 152 + if err != nil { 153 + log.Fatal().Err(err).Msg("Failed to get home directory for feed index") 154 + } 155 + dataDir = filepath.Join(home, ".local", "share") 156 + } 157 + feedIndexPath = filepath.Join(dataDir, "arabica", "feed-index.db") 158 + } 159 + 160 + // Create firehose config 161 + firehoseConfig := firehose.DefaultConfig() 162 + firehoseConfig.IndexPath = feedIndexPath 163 + 164 + // Parse profile cache TTL from env if set 165 + if ttlStr := os.Getenv("ARABICA_PROFILE_CACHE_TTL"); ttlStr != "" { 166 + if ttl, err := time.ParseDuration(ttlStr); err == nil { 167 + firehoseConfig.ProfileCacheTTL = int64(ttl.Seconds()) 168 + } 169 + } 170 + 171 + // Create feed index 172 + feedIndex, err := firehose.NewFeedIndex(feedIndexPath, time.Duration(firehoseConfig.ProfileCacheTTL)*time.Second) 173 + if err != nil { 174 + log.Fatal().Err(err).Str("path", feedIndexPath).Msg("Failed to create feed index") 175 + } 176 + 177 + log.Info().Str("path", feedIndexPath).Msg("Feed index opened") 178 + 179 + // Create and start consumer 180 + firehoseConsumer = firehose.NewConsumer(firehoseConfig, feedIndex) 181 + firehoseConsumer.Start(ctx) 182 + 183 + // Wire up the feed service to use the firehose index 184 + adapter := firehose.NewFeedIndexAdapter(feedIndex) 185 + feedService.SetFirehoseIndex(adapter) 186 + 187 + log.Info().Msg("Firehose consumer started") 188 + 189 + // Backfill registered users in background 190 + go func() { 191 + time.Sleep(5 * time.Second) // Wait for initial connection 192 + for _, did := range feedRegistry.List() { 193 + if err := firehoseConsumer.BackfillDID(ctx, did); err != nil { 194 + log.Warn().Err(err).Str("did", did).Msg("Failed to backfill user") 195 + } 196 + } 197 + log.Info().Int("count", feedRegistry.Count()).Msg("Backfill of registered users complete") 198 + }() 199 + } 200 + 126 201 // Register users in the feed when they authenticate 127 202 // This ensures users are added to the feed even if they had an existing session 128 203 oauthManager.SetOnAuthSuccess(func(did string) { 129 204 feedRegistry.Register(did) 205 + // If firehose is enabled, backfill the user's records 206 + if firehoseConsumer != nil { 207 + go func() { 208 + if err := firehoseConsumer.BackfillDID(context.Background(), did); err != nil { 209 + log.Warn().Err(err).Str("did", did).Msg("Failed to backfill new user") 210 + } 211 + }() 212 + } 130 213 }) 131 214 132 215 if clientID == "" { ··· 175 258 Logger: log.Logger, 176 259 }) 177 260 178 - // Start HTTP server 179 - log.Info(). 180 - Str("address", "0.0.0.0:"+port). 181 - Str("url", "http://localhost:"+port). 182 - Bool("secure_cookies", secureCookies). 183 - Str("database", dbPath). 184 - Msg("Starting HTTP server") 261 + // Create HTTP server 262 + server := &http.Server{ 263 + Addr: "0.0.0.0:" + port, 264 + Handler: handler, 265 + } 185 266 186 - if err := http.ListenAndServe("0.0.0.0:"+port, handler); err != nil { 187 - log.Fatal().Err(err).Msg("Server failed to start") 267 + // Start HTTP server in goroutine 268 + go func() { 269 + log.Info(). 270 + Str("address", "0.0.0.0:"+port). 271 + Str("url", "http://localhost:"+port). 272 + Bool("secure_cookies", secureCookies). 273 + Bool("firehose", *useFirehose). 274 + Str("database", dbPath). 275 + Msg("Starting HTTP server") 276 + 277 + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { 278 + log.Fatal().Err(err).Msg("Server failed to start") 279 + } 280 + }() 281 + 282 + // Wait for shutdown signal 283 + <-sigCh 284 + log.Info().Msg("Shutdown signal received") 285 + 286 + // Stop firehose consumer first 287 + if firehoseConsumer != nil { 288 + log.Info().Msg("Stopping firehose consumer...") 289 + firehoseConsumer.Stop() 188 290 } 291 + 292 + // Graceful shutdown of HTTP server 293 + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) 294 + defer shutdownCancel() 295 + 296 + if err := server.Shutdown(shutdownCtx); err != nil { 297 + log.Error().Err(err).Msg("HTTP server shutdown error") 298 + } 299 + 300 + log.Info().Msg("Server stopped") 189 301 }
+641
docs/firehose-plan.md
··· 1 + # Firehose Integration Plan for Arabica 2 + 3 + ## Executive Summary 4 + 5 + This document proposes refactoring Arabica's home page feed to consume events from the AT Protocol firehose via Jetstream, replacing the current polling-based approach. This will provide real-time updates, dramatically reduce API calls, and improve scalability. 6 + 7 + **Recommendation:** Implement Jetstream consumer with local BoltDB index as Phase 1, with optional Slingshot/Constellation integration in Phase 2. 8 + 9 + --- 10 + 11 + ## Problem Statement 12 + 13 + ### Current Architecture 14 + 15 + The feed service (`internal/feed/service.go`) polls each registered user's PDS directly: 16 + 17 + ``` 18 + For N registered users: 19 + - N profile fetches 20 + - N × 5 collection fetches (brew, bean, roaster, grinder, brewer) 21 + - N × 4 reference resolution fetches 22 + - Total: ~10N API calls per refresh 23 + ``` 24 + 25 + ### Issues 26 + 27 + | Problem | Impact | 28 + | ------------------------ | ----------------------------------- | 29 + | High API call volume | Risk of rate limiting as users grow | 30 + | 5-minute cache staleness | Users don't see recent activity | 31 + | N+1 query pattern | Linear scaling, O(N) per refresh | 32 + | PDS dependency | Feed fails if any PDS is slow/down | 33 + | No real-time updates | Requires manual refresh | 34 + 35 + --- 36 + 37 + ## Proposed Solution: Jetstream Consumer 38 + 39 + ### Architecture Overview 40 + 41 + ``` 42 + ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ 43 + │ AT Protocol │ │ Jetstream │ │ Arabica │ 44 + │ Firehose │────▶│ (Public/Self) │────▶│ Consumer │ 45 + │ (all records) │ │ JSON over WS │ │ (background) │ 46 + └─────────────────┘ └──────────────────┘ └────────┬────────┘ 47 + 48 + 49 + ┌─────────────────┐ 50 + │ Feed Index │ 51 + │ (BoltDB) │ 52 + └────────┬────────┘ 53 + 54 + 55 + ┌─────────────────┐ 56 + │ HTTP Handler │ 57 + │ (instant) │ 58 + └─────────────────┘ 59 + ``` 60 + 61 + ### How It Works 62 + 63 + 1. **Background Consumer** connects to Jetstream WebSocket 64 + 2. **Filters** for `social.arabica.alpha.*` collections 65 + 3. **Indexes** incoming events into local BoltDB 66 + 4. **Serves** feed requests instantly from local index 67 + 5. **Fallback** to direct polling if consumer disconnects 68 + 69 + ### Benefits 70 + 71 + | Metric | Current | With Jetstream | 72 + | --------------------- | ---------------- | ----------------- | 73 + | API calls per refresh | ~10N | 0 | 74 + | Feed latency | 5 min cache | Real-time (<1s) | 75 + | PDS dependency | High | None (after sync) | 76 + | User discovery | Manual registry | Automatic | 77 + | Scalability | O(N) per request | O(1) per request | 78 + 79 + --- 80 + 81 + ## Technical Design 82 + 83 + ### 1. Jetstream Client Configuration 84 + 85 + ```go 86 + // internal/firehose/config.go 87 + 88 + type JetstreamConfig struct { 89 + // Public endpoints (fallback rotation) 90 + Endpoints []string 91 + 92 + // Filter to Arabica collections only 93 + WantedCollections []string 94 + 95 + // Optional: filter to registered DIDs only 96 + // Leave empty to discover all Arabica users 97 + WantedDids []string 98 + 99 + // Enable zstd compression (~56% bandwidth reduction) 100 + Compress bool 101 + 102 + // Cursor file path for restart recovery 103 + CursorFile string 104 + } 105 + 106 + func DefaultConfig() *JetstreamConfig { 107 + return &JetstreamConfig{ 108 + Endpoints: []string{ 109 + "wss://jetstream1.us-east.bsky.network/subscribe", 110 + "wss://jetstream2.us-east.bsky.network/subscribe", 111 + "wss://jetstream1.us-west.bsky.network/subscribe", 112 + "wss://jetstream2.us-west.bsky.network/subscribe", 113 + }, 114 + WantedCollections: []string{ 115 + "social.arabica.alpha.brew", 116 + "social.arabica.alpha.bean", 117 + "social.arabica.alpha.roaster", 118 + "social.arabica.alpha.grinder", 119 + "social.arabica.alpha.brewer", 120 + }, 121 + Compress: true, 122 + CursorFile: "jetstream-cursor.txt", 123 + } 124 + } 125 + ``` 126 + 127 + ### 2. Event Processing 128 + 129 + ```go 130 + // internal/firehose/consumer.go 131 + 132 + type Consumer struct { 133 + config *JetstreamConfig 134 + index *FeedIndex 135 + client *jetstream.Client 136 + cursor atomic.Int64 137 + connected atomic.Bool 138 + } 139 + 140 + func (c *Consumer) handleEvent(ctx context.Context, event *models.Event) error { 141 + if event.Kind != "commit" || event.Commit == nil { 142 + return nil 143 + } 144 + 145 + commit := event.Commit 146 + 147 + // Only process Arabica collections 148 + if !strings.HasPrefix(commit.Collection, "social.arabica.alpha.") { 149 + return nil 150 + } 151 + 152 + switch commit.Operation { 153 + case "create", "update": 154 + return c.index.UpsertRecord(ctx, event.Did, commit) 155 + case "delete": 156 + return c.index.DeleteRecord(ctx, event.Did, commit.Collection, commit.RKey) 157 + } 158 + 159 + // Update cursor for recovery 160 + c.cursor.Store(event.TimeUS) 161 + 162 + return nil 163 + } 164 + ``` 165 + 166 + ### 3. Feed Index Schema (BoltDB) 167 + 168 + ```go 169 + // internal/firehose/index.go 170 + 171 + // BoltDB Buckets: 172 + // - "records" : {at-uri} -> {record JSON + metadata} 173 + // - "by_time" : {timestamp:at-uri} -> {} (for chronological queries) 174 + // - "by_did" : {did:at-uri} -> {} (for user-specific queries) 175 + // - "by_type" : {collection:timestamp:at-uri} -> {} (for type filtering) 176 + // - "profiles" : {did} -> {profile JSON} (cached profiles) 177 + // - "cursor" : "jetstream" -> {cursor value} 178 + 179 + type FeedIndex struct { 180 + db *bbolt.DB 181 + } 182 + 183 + type IndexedRecord struct { 184 + URI string `json:"uri"` 185 + DID string `json:"did"` 186 + Collection string `json:"collection"` 187 + RKey string `json:"rkey"` 188 + Record json.RawMessage `json:"record"` 189 + CID string `json:"cid"` 190 + IndexedAt time.Time `json:"indexed_at"` 191 + } 192 + 193 + func (idx *FeedIndex) GetRecentFeed(ctx context.Context, limit int) ([]*FeedItem, error) { 194 + // Query by_time bucket in reverse order 195 + // Hydrate with profile data from profiles bucket 196 + // Return feed items instantly from local data 197 + } 198 + ``` 199 + 200 + ### 4. Profile Resolution 201 + 202 + Profiles are not part of Arabica's lexicons, so we need a strategy: 203 + 204 + **Option A: Lazy Loading (Recommended for Phase 1)** 205 + 206 + ```go 207 + func (idx *FeedIndex) resolveProfile(ctx context.Context, did string) (*Profile, error) { 208 + // Check local cache first 209 + if profile := idx.getCachedProfile(did); profile != nil { 210 + return profile, nil 211 + } 212 + 213 + // Fetch from public API and cache 214 + profile, err := publicClient.GetProfile(ctx, did) 215 + if err != nil { 216 + return nil, err 217 + } 218 + 219 + idx.cacheProfile(did, profile, 1*time.Hour) 220 + return profile, nil 221 + } 222 + ``` 223 + 224 + **Option B: Slingshot Integration (Phase 2)** 225 + 226 + ```go 227 + // Use Slingshot's resolveMiniDoc for faster profile resolution 228 + func (idx *FeedIndex) resolveProfileViaSlingshot(ctx context.Context, did string) (*Profile, error) { 229 + url := fmt.Sprintf("https://slingshot.microcosm.blue/xrpc/com.bad-example.identity.resolveMiniDoc?identifier=%s", did) 230 + // Returns {did, handle, pds} in one call 231 + } 232 + ``` 233 + 234 + ### 5. Reference Resolution 235 + 236 + Brews reference beans, grinders, and brewers. The index already has these records: 237 + 238 + ```go 239 + func (idx *FeedIndex) resolveBrew(ctx context.Context, brew *IndexedRecord) (*FeedItem, error) { 240 + var record map[string]interface{} 241 + json.Unmarshal(brew.Record, &record) 242 + 243 + item := &FeedItem{RecordType: "brew"} 244 + 245 + // Resolve bean reference from local index 246 + if beanRef, ok := record["beanRef"].(string); ok { 247 + if bean := idx.getRecord(beanRef); bean != nil { 248 + item.Bean = recordToBean(bean) 249 + } 250 + } 251 + 252 + // Similar for grinder, brewer references 253 + // All from local index - no API calls 254 + 255 + return item, nil 256 + } 257 + ``` 258 + 259 + ### 6. Fallback and Resilience 260 + 261 + ```go 262 + // internal/firehose/consumer.go 263 + 264 + func (c *Consumer) Run(ctx context.Context) error { 265 + for { 266 + select { 267 + case <-ctx.Done(): 268 + return ctx.Err() 269 + default: 270 + if err := c.connectAndConsume(ctx); err != nil { 271 + log.Warn().Err(err).Msg("jetstream connection lost, reconnecting...") 272 + 273 + // Exponential backoff 274 + time.Sleep(c.backoff.NextBackOff()) 275 + 276 + // Rotate to next endpoint 277 + c.rotateEndpoint() 278 + continue 279 + } 280 + } 281 + } 282 + } 283 + 284 + func (c *Consumer) connectAndConsume(ctx context.Context) error { 285 + cursor := c.loadCursor() 286 + 287 + // Rewind cursor slightly to handle duplicates safely 288 + if cursor > 0 { 289 + cursor -= 5 * time.Second.Microseconds() 290 + } 291 + 292 + return c.client.ConnectAndRead(ctx, &cursor) 293 + } 294 + ``` 295 + 296 + ### 7. Feed Service Integration 297 + 298 + ```go 299 + // internal/feed/service.go (modified) 300 + 301 + type Service struct { 302 + registry *Registry 303 + publicClient *atproto.PublicClient 304 + cache *publicFeedCache 305 + 306 + // New: firehose index 307 + firehoseIndex *firehose.FeedIndex 308 + useFirehose bool 309 + } 310 + 311 + func (s *Service) GetRecentRecords(ctx context.Context, limit int) ([]*FeedItem, error) { 312 + // Prefer firehose index if available and populated 313 + if s.useFirehose && s.firehoseIndex.IsReady() { 314 + return s.firehoseIndex.GetRecentFeed(ctx, limit) 315 + } 316 + 317 + // Fallback to polling (existing code) 318 + return s.getRecentRecordsViaPolling(ctx, limit) 319 + } 320 + ``` 321 + 322 + --- 323 + 324 + ## Implementation Phases 325 + 326 + ### Phase 1: Core Jetstream Consumer (2 weeks) 327 + 328 + **Goal:** Replace polling with firehose consumption for the feed. 329 + 330 + **Tasks:** 331 + 332 + 1. Create `internal/firehose/` package 333 + - `config.go` - Jetstream configuration 334 + - `consumer.go` - WebSocket consumer with reconnection 335 + - `index.go` - BoltDB-backed feed index 336 + - `scheduler.go` - Event processing scheduler 337 + 338 + 2. Integrate with existing feed service 339 + - Add feature flag: `ARABICA_USE_FIREHOSE=true` (just use a cli flag) 340 + - Keep polling as fallback 341 + 342 + 3. Handle profile resolution 343 + - Cache profiles locally with 1-hour TTL 344 + - Lazy fetch on first access 345 + - Background refresh for active users 346 + 347 + 4. Cursor management 348 + - Persist cursor to survive restarts 349 + - Rewind on reconnection for safety 350 + 351 + **Deliverables:** 352 + 353 + - Real-time feed updates 354 + - Reduced API calls to near-zero 355 + - Automatic user discovery (anyone using Arabica lexicons) 356 + 357 + ### Phase 2: Slingshot Optimization (1 week) 358 + 359 + **Goal:** Faster profile and record hydration. 360 + 361 + **Tasks:** 362 + 363 + 1. Add Slingshot client (`internal/atproto/slingshot.go`) 364 + 2. Use `resolveMiniDoc` for profile resolution 365 + 3. Use Slingshot as fallback for missing records 366 + 367 + **Deliverables:** 368 + 369 + - Faster profile loading 370 + - Resilience to slow PDS endpoints 371 + 372 + ### Phase 3: Constellation for Social (1 week) 373 + 374 + **Goal:** Enable like/comment counts when social features are added. 375 + 376 + **Tasks:** 377 + 378 + 1. Add Constellation client (`internal/atproto/constellation.go`) 379 + 2. Query backlinks for interaction counts 380 + 3. Display counts on feed items 381 + 382 + **Deliverables:** 383 + 384 + - Like count on brews 385 + - Comment count on brews 386 + - Foundation for social features 387 + 388 + ### Phase 4: Spacedust for Real-time Notifications (Future) 389 + 390 + **Goal:** Push notifications for interactions. 391 + 392 + **Tasks:** 393 + 394 + 1. Subscribe to Spacedust for user's content interactions 395 + 2. Build notification storage and API 396 + 3. WebSocket to frontend for live updates 397 + 398 + --- 399 + 400 + ## Data Flow Comparison 401 + 402 + ### Before (Polling) 403 + 404 + ``` 405 + User Request → Check Cache → [Cache Miss] → Poll N PDSes → Build Feed → Return 406 + 407 + ~10N API calls 408 + 5-10 second latency 409 + ``` 410 + 411 + ### After (Jetstream) 412 + 413 + ``` 414 + Jetstream → Consumer → Index (BoltDB) 415 + 416 + User Request → Query Index → Return 417 + 418 + 0 API calls 419 + <10ms latency 420 + ``` 421 + 422 + --- 423 + 424 + ## Automatic User Discovery 425 + 426 + A major benefit of firehose consumption is automatic user discovery: 427 + 428 + **Current:** Users must explicitly register via `/api/feed/register` 429 + 430 + **With Jetstream:** Any user who creates an Arabica record is automatically indexed 431 + 432 + ```go 433 + // When we see a new DID creating Arabica records 434 + func (c *Consumer) handleNewUser(did string) { 435 + // Auto-register for feed 436 + c.registry.Register(did) 437 + 438 + // Fetch and cache their profile 439 + go c.index.fetchAndCacheProfile(did) 440 + 441 + // Backfill their existing records 442 + go c.backfillUser(did) 443 + } 444 + ``` 445 + 446 + This could replace the manual registry entirely, or supplement it for "featured" users. 447 + 448 + --- 449 + 450 + ## Backfill Strategy 451 + 452 + When starting fresh or discovering a new user, we need historical data: 453 + 454 + **Option A: Direct PDS Fetch (Simple)** 455 + 456 + ```go 457 + func (c *Consumer) backfillUser(ctx context.Context, did string) error { 458 + for _, collection := range arabicaCollections { 459 + records, _ := publicClient.ListRecords(ctx, did, collection, 100) 460 + for _, record := range records { 461 + c.index.UpsertFromPDS(record) 462 + } 463 + } 464 + return nil 465 + } 466 + ``` 467 + 468 + **Option B: Slingshot Fetch (Faster)** 469 + 470 + ```go 471 + func (c *Consumer) backfillUserViaSlingshot(ctx context.Context, did string) error { 472 + // Single endpoint, pre-cached records 473 + // Same API as PDS but faster 474 + } 475 + ``` 476 + 477 + **Option C: Jetstream Cursor Rewind (Events Only)** 478 + 479 + - Rewind cursor to desired point in time 480 + - Replay events (no records available before cursor) 481 + - Limited to ~24h of history typically 482 + 483 + **Recommendation:** Use Option A for Phase 1, add Option B in Phase 2. 484 + 485 + --- 486 + 487 + ## Configuration 488 + 489 + ```bash 490 + # Environment variables 491 + 492 + # Enable firehose-based feed (default: false during rollout) 493 + ARABICA_USE_FIREHOSE=true 494 + 495 + # Jetstream endpoint (default: public Bluesky instances) 496 + JETSTREAM_URL=wss://jetstream1.us-east.bsky.network/subscribe 497 + 498 + # Optional: self-hosted Jetstream 499 + # JETSTREAM_URL=ws://localhost:6008/subscribe 500 + 501 + # Feed index database path 502 + ARABICA_FEED_INDEX_PATH=~/.local/share/arabica/feed-index.db 503 + 504 + # Profile cache TTL (default: 1h) 505 + ARABICA_PROFILE_CACHE_TTL=1h 506 + 507 + # Optional: Slingshot endpoint for Phase 2 508 + # SLINGSHOT_URL=https://slingshot.microcosm.blue 509 + 510 + # Optional: Constellation endpoint for Phase 3 511 + # CONSTELLATION_URL=https://constellation.microcosm.blue 512 + ``` 513 + 514 + --- 515 + 516 + ## Monitoring and Metrics 517 + 518 + ```go 519 + // Prometheus metrics to track firehose health 520 + 521 + var ( 522 + eventsReceived = prometheus.NewCounterVec( 523 + prometheus.CounterOpts{ 524 + Name: "arabica_firehose_events_total", 525 + Help: "Total events received from Jetstream", 526 + }, 527 + []string{"collection", "operation"}, 528 + ) 529 + 530 + indexSize = prometheus.NewGauge( 531 + prometheus.GaugeOpts{ 532 + Name: "arabica_feed_index_records", 533 + Help: "Number of records in feed index", 534 + }, 535 + ) 536 + 537 + consumerLag = prometheus.NewGauge( 538 + prometheus.GaugeOpts{ 539 + Name: "arabica_firehose_lag_seconds", 540 + Help: "Lag between event time and processing time", 541 + }, 542 + ) 543 + 544 + connectionState = prometheus.NewGauge( 545 + prometheus.GaugeOpts{ 546 + Name: "arabica_firehose_connected", 547 + Help: "1 if connected to Jetstream, 0 otherwise", 548 + }, 549 + ) 550 + ) 551 + ``` 552 + 553 + --- 554 + 555 + ## Risk Assessment 556 + 557 + | Risk | Mitigation | 558 + | ----------------------- | --------------------------------------------- | 559 + | Jetstream unavailable | Fallback to polling, rotate endpoints | 560 + | Index corruption | Rebuild from backfill, periodic snapshots | 561 + | Duplicate events | Idempotent upserts using AT-URI as key | 562 + | Missing historical data | Backfill on startup and new user discovery | 563 + | High event volume | Filter to Arabica collections only (~0 noise) | 564 + | Profile resolution lag | Local cache with background refresh | 565 + 566 + --- 567 + 568 + ## Open Questions 569 + 570 + 1. **Should we remove the registry entirely?** 571 + - Pro: Simpler, automatic discovery 572 + - Con: Lose ability to curate "featured" users 573 + - Recommendation: Keep registry for admin features, but don't require it for feed inclusion 574 + 575 + 2. **Self-host Jetstream or use public?** 576 + - Public is free and reliable 577 + - Self-host gives control and removes dependency 578 + - Recommendation: Start with public, evaluate self-hosting if issues arise 579 + 580 + 3. **How long to keep historical data?** 581 + - Option: Rolling 30-day window 582 + - Option: Keep everything (disk is cheap) 583 + - Recommendation: Keep 90 days, prune older records 584 + 585 + 4. **Real-time feed updates to frontend?** 586 + - Could push new items via WebSocket/SSE 587 + - Or just reduce cache TTL to ~30 seconds 588 + - Recommendation: Phase 1 just reduces staleness; real-time push is future enhancement 589 + 590 + --- 591 + 592 + ## Alternatives Considered 593 + 594 + ### 1. Tap (Bluesky's Full Sync Tool) 595 + 596 + **Pros:** Full verification, automatic backfill, collection signal mode 597 + **Cons:** Heavy operational overhead, overkill for current scale 598 + **Verdict:** Revisit when user base exceeds 500+ 599 + 600 + ### 2. Direct Firehose Consumption 601 + 602 + **Pros:** No Jetstream dependency 603 + **Cons:** Complex CBOR/CAR parsing, high bandwidth 604 + **Verdict:** Jetstream provides the simplicity we need 605 + 606 + ### 3. Slingshot as Primary Data Source 607 + 608 + **Pros:** Pre-cached records, single endpoint 609 + **Cons:** Still polling-based, no real-time 610 + **Verdict:** Use as optimization layer, not primary 611 + 612 + ### 4. Spacedust Instead of Jetstream 613 + 614 + **Pros:** Link-focused, lightweight 615 + **Cons:** Only links, no full records 616 + **Verdict:** Use for notifications, not feed content 617 + 618 + --- 619 + 620 + ## Success Criteria 621 + 622 + | Metric | Target | 623 + | -------------------------- | ----------------------- | 624 + | Feed latency | <100ms (from >5s) | 625 + | API calls per feed request | 0 (from ~10N) | 626 + | Time to see new content | <5s (from 5min) | 627 + | Feed availability | 99.9% (with fallback) | 628 + | New user discovery | Automatic (from manual) | 629 + 630 + --- 631 + 632 + ## References 633 + 634 + - [Jetstream GitHub](https://github.com/bluesky-social/jetstream) 635 + - [Jetstream Blog Post](https://docs.bsky.app/blog/jetstream) 636 + - [Jetstream Go Client](https://pkg.go.dev/github.com/bluesky-social/jetstream/pkg/client) 637 + - [Microcosm.blue Services](https://microcosm.blue/) 638 + - [Constellation API](https://constellation.microcosm.blue/) 639 + - [Slingshot API](https://slingshot.microcosm.blue/) 640 + - [Existing Evaluation: Jetstream/Tap](./jetstream-tap-evaluation.md) 641 + - [Existing Evaluation: Microcosm Tools](./microcosm-tools-evaluation.md)
+2
go.mod
··· 18 18 github.com/golang-jwt/jwt/v5 v5.2.2 // indirect 19 19 github.com/google/go-cmp v0.6.0 // indirect 20 20 github.com/google/go-querystring v1.1.0 // indirect 21 + github.com/gorilla/websocket v1.5.3 // indirect 21 22 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 23 + github.com/klauspost/compress v1.18.3 // indirect 22 24 github.com/mattn/go-colorable v0.1.13 // indirect 23 25 github.com/mattn/go-isatty v0.0.20 // indirect 24 26 github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
+4
go.sum
··· 17 17 github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= 18 18 github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= 19 19 github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= 20 + github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= 21 + github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= 20 22 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= 21 23 github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= 22 24 github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= 23 25 github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= 26 + github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= 27 + github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= 24 28 github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= 25 29 github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= 26 30 github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+76 -4
internal/feed/service.go
··· 45 45 mu sync.RWMutex 46 46 } 47 47 48 + // FirehoseIndex is the interface for the firehose feed index 49 + // This allows the feed service to use firehose data when available 50 + type FirehoseIndex interface { 51 + IsReady() bool 52 + GetRecentFeed(ctx context.Context, limit int) ([]*FirehoseFeedItem, error) 53 + } 54 + 55 + // FirehoseFeedItem matches the FeedItem structure from firehose package 56 + // This avoids import cycles 57 + type FirehoseFeedItem struct { 58 + RecordType string 59 + Action string 60 + Brew *models.Brew 61 + Bean *models.Bean 62 + Roaster *models.Roaster 63 + Grinder *models.Grinder 64 + Brewer *models.Brewer 65 + Author *atproto.Profile 66 + Timestamp time.Time 67 + TimeAgo string 68 + } 69 + 48 70 // Service fetches and aggregates brews from registered users 49 71 type Service struct { 50 - registry *Registry 51 - publicClient *atproto.PublicClient 52 - cache *publicFeedCache 72 + registry *Registry 73 + publicClient *atproto.PublicClient 74 + cache *publicFeedCache 75 + firehoseIndex FirehoseIndex 76 + useFirehose bool 53 77 } 54 78 55 79 // NewService creates a new feed service ··· 59 83 publicClient: atproto.NewPublicClient(), 60 84 cache: &publicFeedCache{}, 61 85 } 86 + } 87 + 88 + // SetFirehoseIndex configures the service to use firehose-based feed when available 89 + func (s *Service) SetFirehoseIndex(index FirehoseIndex) { 90 + s.firehoseIndex = index 91 + s.useFirehose = true 92 + log.Info().Msg("feed: firehose index configured") 62 93 } 63 94 64 95 // GetCachedPublicFeed returns cached feed items for unauthenticated users. ··· 115 146 // GetRecentRecords fetches recent activity (brews and other records) from all registered users 116 147 // Returns up to `limit` items sorted by most recent first 117 148 func (s *Service) GetRecentRecords(ctx context.Context, limit int) ([]*FeedItem, error) { 149 + // Try firehose index first if available and ready 150 + if s.useFirehose && s.firehoseIndex != nil && s.firehoseIndex.IsReady() { 151 + log.Debug().Msg("feed: using firehose index") 152 + return s.getRecentRecordsFromFirehose(ctx, limit) 153 + } 154 + 155 + // Fallback to polling 156 + return s.getRecentRecordsViaPolling(ctx, limit) 157 + } 158 + 159 + // getRecentRecordsFromFirehose fetches feed items from the firehose index 160 + func (s *Service) getRecentRecordsFromFirehose(ctx context.Context, limit int) ([]*FeedItem, error) { 161 + firehoseItems, err := s.firehoseIndex.GetRecentFeed(ctx, limit) 162 + if err != nil { 163 + log.Warn().Err(err).Msg("feed: firehose index error, falling back to polling") 164 + return s.getRecentRecordsViaPolling(ctx, limit) 165 + } 166 + 167 + // Convert FirehoseFeedItem to FeedItem 168 + items := make([]*FeedItem, len(firehoseItems)) 169 + for i, fi := range firehoseItems { 170 + items[i] = &FeedItem{ 171 + RecordType: fi.RecordType, 172 + Action: fi.Action, 173 + Brew: fi.Brew, 174 + Bean: fi.Bean, 175 + Roaster: fi.Roaster, 176 + Grinder: fi.Grinder, 177 + Brewer: fi.Brewer, 178 + Author: fi.Author, 179 + Timestamp: fi.Timestamp, 180 + TimeAgo: fi.TimeAgo, 181 + } 182 + } 183 + 184 + log.Debug().Int("count", len(items)).Msg("feed: returning items from firehose index") 185 + return items, nil 186 + } 187 + 188 + // getRecentRecordsViaPolling fetches feed items by polling each user's PDS 189 + func (s *Service) getRecentRecordsViaPolling(ctx context.Context, limit int) ([]*FeedItem, error) { 118 190 dids := s.registry.List() 119 191 if len(dids) == 0 { 120 192 log.Debug().Msg("feed: no registered users") 121 193 return nil, nil 122 194 } 123 195 124 - log.Debug().Int("user_count", len(dids)).Msg("feed: fetching activity from registered users") 196 + log.Debug().Int("user_count", len(dids)).Msg("feed: fetching activity from registered users (polling)") 125 197 126 198 // Fetch all records from all users in parallel 127 199 type userActivity struct {
+51
internal/firehose/adapter.go
··· 1 + package firehose 2 + 3 + import ( 4 + "context" 5 + 6 + "arabica/internal/feed" 7 + ) 8 + 9 + // FeedIndexAdapter wraps FeedIndex to implement feed.FirehoseIndex interface 10 + // This avoids import cycles between feed and firehose packages 11 + type FeedIndexAdapter struct { 12 + index *FeedIndex 13 + } 14 + 15 + // NewFeedIndexAdapter creates a new adapter for the FeedIndex 16 + func NewFeedIndexAdapter(index *FeedIndex) *FeedIndexAdapter { 17 + return &FeedIndexAdapter{index: index} 18 + } 19 + 20 + // IsReady returns true if the index is ready to serve queries 21 + func (a *FeedIndexAdapter) IsReady() bool { 22 + return a.index.IsReady() 23 + } 24 + 25 + // GetRecentFeed returns recent feed items from the index 26 + // Converts FeedItem to feed.FirehoseFeedItem to satisfy the interface 27 + func (a *FeedIndexAdapter) GetRecentFeed(ctx context.Context, limit int) ([]*feed.FirehoseFeedItem, error) { 28 + items, err := a.index.GetRecentFeed(ctx, limit) 29 + if err != nil { 30 + return nil, err 31 + } 32 + 33 + // Convert to the type expected by feed.Service 34 + result := make([]*feed.FirehoseFeedItem, len(items)) 35 + for i, item := range items { 36 + result[i] = &feed.FirehoseFeedItem{ 37 + RecordType: item.RecordType, 38 + Action: item.Action, 39 + Brew: item.Brew, 40 + Bean: item.Bean, 41 + Roaster: item.Roaster, 42 + Grinder: item.Grinder, 43 + Brewer: item.Brewer, 44 + Author: item.Author, 45 + Timestamp: item.Timestamp, 46 + TimeAgo: item.TimeAgo, 47 + } 48 + } 49 + 50 + return result, nil 51 + }
+53
internal/firehose/config.go
··· 1 + // Package firehose provides real-time AT Protocol event consumption via Jetstream. 2 + // It indexes Arabica records into a local BoltDB database for fast feed queries. 3 + package firehose 4 + 5 + import ( 6 + "arabica/internal/atproto" 7 + ) 8 + 9 + // Default Jetstream public endpoints 10 + var DefaultJetstreamEndpoints = []string{ 11 + "wss://jetstream1.us-east.bsky.network/subscribe", 12 + "wss://jetstream2.us-east.bsky.network/subscribe", 13 + "wss://jetstream1.us-west.bsky.network/subscribe", 14 + "wss://jetstream2.us-west.bsky.network/subscribe", 15 + } 16 + 17 + // ArabicaCollections lists all Arabica lexicon collections to filter for 18 + var ArabicaCollections = []string{ 19 + atproto.NSIDBrew, 20 + atproto.NSIDBean, 21 + atproto.NSIDRoaster, 22 + atproto.NSIDGrinder, 23 + atproto.NSIDBrewer, 24 + } 25 + 26 + // Config holds configuration for the Jetstream consumer 27 + type Config struct { 28 + // Endpoints is a list of Jetstream WebSocket URLs to connect to (with fallback rotation) 29 + Endpoints []string 30 + 31 + // WantedCollections filters events to specific collection NSIDs 32 + WantedCollections []string 33 + 34 + // Compress enables zstd compression (~56% bandwidth reduction) 35 + Compress bool 36 + 37 + // IndexPath is the path to the BoltDB feed index database 38 + IndexPath string 39 + 40 + // ProfileCacheTTL is how long to cache profile data 41 + ProfileCacheTTL int64 // seconds 42 + } 43 + 44 + // DefaultConfig returns a configuration with sensible defaults 45 + func DefaultConfig() *Config { 46 + return &Config{ 47 + Endpoints: DefaultJetstreamEndpoints, 48 + WantedCollections: ArabicaCollections, 49 + Compress: false, // Disabled: Jetstream uses custom zstd dictionary 50 + IndexPath: "", // Will be set based on data directory 51 + ProfileCacheTTL: 3600, // 1 hour 52 + } 53 + }
+380
internal/firehose/consumer.go
··· 1 + package firehose 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "net/url" 8 + "strings" 9 + "sync" 10 + "sync/atomic" 11 + "time" 12 + 13 + "github.com/gorilla/websocket" 14 + "github.com/klauspost/compress/zstd" 15 + "github.com/rs/zerolog/log" 16 + ) 17 + 18 + // JetstreamEvent represents an event from Jetstream 19 + type JetstreamEvent struct { 20 + DID string `json:"did"` 21 + TimeUS int64 `json:"time_us"` 22 + Kind string `json:"kind"` // "commit", "identity", "account" 23 + Commit *struct { 24 + Rev string `json:"rev"` 25 + Operation string `json:"operation"` // "create", "update", "delete" 26 + Collection string `json:"collection"` 27 + RKey string `json:"rkey"` 28 + Record json.RawMessage `json:"record,omitempty"` 29 + CID string `json:"cid"` 30 + } `json:"commit,omitempty"` 31 + } 32 + 33 + // Consumer consumes events from Jetstream and indexes them 34 + type Consumer struct { 35 + config *Config 36 + index *FeedIndex 37 + 38 + // Connection state 39 + conn *websocket.Conn 40 + connMu sync.Mutex 41 + currentEndpointIdx int 42 + 43 + // Zstd decoder for compressed messages 44 + zstdDecoder *zstd.Decoder 45 + 46 + // Cursor for resume 47 + cursor atomic.Int64 48 + 49 + // Stats 50 + eventsReceived atomic.Int64 51 + bytesReceived atomic.Int64 52 + 53 + // Control 54 + connected atomic.Bool 55 + stopCh chan struct{} 56 + wg sync.WaitGroup 57 + } 58 + 59 + // NewConsumer creates a new Jetstream consumer 60 + func NewConsumer(config *Config, index *FeedIndex) *Consumer { 61 + // Create zstd decoder for compressed messages 62 + decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(1)) 63 + if err != nil { 64 + log.Fatal().Err(err).Msg("firehose: failed to create zstd decoder") 65 + } 66 + 67 + c := &Consumer{ 68 + config: config, 69 + index: index, 70 + stopCh: make(chan struct{}), 71 + zstdDecoder: decoder, 72 + } 73 + 74 + // Load cursor from index 75 + if cursor, err := index.GetCursor(); err == nil && cursor > 0 { 76 + c.cursor.Store(cursor) 77 + log.Info().Int64("cursor", cursor).Msg("firehose: loaded cursor from index") 78 + } 79 + 80 + return c 81 + } 82 + 83 + // Start begins consuming events in a background goroutine 84 + func (c *Consumer) Start(ctx context.Context) { 85 + c.wg.Add(1) 86 + go func() { 87 + defer c.wg.Done() 88 + c.run(ctx) 89 + }() 90 + } 91 + 92 + // Stop gracefully stops the consumer 93 + func (c *Consumer) Stop() { 94 + close(c.stopCh) 95 + c.connMu.Lock() 96 + if c.conn != nil { 97 + c.conn.Close() 98 + } 99 + c.connMu.Unlock() 100 + c.wg.Wait() 101 + 102 + // Close zstd decoder 103 + if c.zstdDecoder != nil { 104 + c.zstdDecoder.Close() 105 + } 106 + } 107 + 108 + // IsConnected returns true if currently connected to Jetstream 109 + func (c *Consumer) IsConnected() bool { 110 + return c.connected.Load() 111 + } 112 + 113 + // Stats returns consumer statistics 114 + func (c *Consumer) Stats() (eventsReceived, bytesReceived int64) { 115 + return c.eventsReceived.Load(), c.bytesReceived.Load() 116 + } 117 + 118 + func (c *Consumer) run(ctx context.Context) { 119 + backoff := time.Second 120 + maxBackoff := 30 * time.Second 121 + 122 + for { 123 + select { 124 + case <-ctx.Done(): 125 + log.Info().Msg("firehose: context cancelled, stopping consumer") 126 + return 127 + case <-c.stopCh: 128 + log.Info().Msg("firehose: stop requested, stopping consumer") 129 + return 130 + default: 131 + } 132 + 133 + endpoint := c.config.Endpoints[c.currentEndpointIdx] 134 + err := c.connectAndConsume(ctx, endpoint) 135 + 136 + if err != nil { 137 + c.connected.Store(false) 138 + log.Warn().Err(err).Str("endpoint", endpoint).Msg("firehose: connection error") 139 + 140 + // Rotate to next endpoint 141 + c.currentEndpointIdx = (c.currentEndpointIdx + 1) % len(c.config.Endpoints) 142 + 143 + // Backoff before retry 144 + select { 145 + case <-ctx.Done(): 146 + return 147 + case <-c.stopCh: 148 + return 149 + case <-time.After(backoff): 150 + } 151 + 152 + // Increase backoff 153 + backoff *= 2 154 + if backoff > maxBackoff { 155 + backoff = maxBackoff 156 + } 157 + } else { 158 + // Reset backoff on successful connection 159 + backoff = time.Second 160 + } 161 + } 162 + } 163 + 164 + func (c *Consumer) connectAndConsume(ctx context.Context, endpoint string) error { 165 + // Build WebSocket URL with query parameters 166 + wsURL, err := c.buildWebSocketURL(endpoint) 167 + if err != nil { 168 + return fmt.Errorf("failed to build WebSocket URL: %w", err) 169 + } 170 + 171 + log.Info().Str("url", wsURL).Msg("firehose: connecting to Jetstream") 172 + 173 + // Connect 174 + dialer := websocket.Dialer{ 175 + HandshakeTimeout: 10 * time.Second, 176 + } 177 + 178 + conn, _, err := dialer.DialContext(ctx, wsURL, nil) 179 + if err != nil { 180 + return fmt.Errorf("failed to connect: %w", err) 181 + } 182 + 183 + c.connMu.Lock() 184 + c.conn = conn 185 + c.connMu.Unlock() 186 + 187 + c.connected.Store(true) 188 + log.Info().Str("endpoint", endpoint).Msg("firehose: connected to Jetstream") 189 + 190 + // Mark index as ready once connected 191 + c.index.SetReady(true) 192 + 193 + defer func() { 194 + c.connMu.Lock() 195 + if c.conn != nil { 196 + c.conn.Close() 197 + c.conn = nil 198 + } 199 + c.connMu.Unlock() 200 + c.connected.Store(false) 201 + }() 202 + 203 + // Read events 204 + for { 205 + select { 206 + case <-ctx.Done(): 207 + return ctx.Err() 208 + case <-c.stopCh: 209 + return nil 210 + default: 211 + } 212 + 213 + // Set read deadline 214 + conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 215 + 216 + _, message, err := conn.ReadMessage() 217 + if err != nil { 218 + return fmt.Errorf("read error: %w", err) 219 + } 220 + 221 + c.bytesReceived.Add(int64(len(message))) 222 + 223 + if err := c.processMessage(message); err != nil { 224 + log.Warn().Err(err).Msg("firehose: failed to process message") 225 + } 226 + } 227 + } 228 + 229 + func (c *Consumer) buildWebSocketURL(endpoint string) (string, error) { 230 + u, err := url.Parse(endpoint) 231 + if err != nil { 232 + return "", err 233 + } 234 + 235 + q := u.Query() 236 + 237 + // Add wanted collections 238 + for _, coll := range c.config.WantedCollections { 239 + q.Add("wantedCollections", coll) 240 + } 241 + 242 + // Add compression 243 + if c.config.Compress { 244 + q.Set("compress", "true") 245 + } 246 + 247 + // Add cursor if we have one (rewind slightly for safety) 248 + cursor := c.cursor.Load() 249 + if cursor > 0 { 250 + // Rewind by 5 seconds to handle any gaps 251 + cursor -= 5 * time.Second.Microseconds() 252 + q.Set("cursor", fmt.Sprintf("%d", cursor)) 253 + } 254 + 255 + u.RawQuery = q.Encode() 256 + return u.String(), nil 257 + } 258 + 259 + func (c *Consumer) processMessage(data []byte) error { 260 + // Try to decompress if compression is enabled and data looks compressed 261 + // Zstd compressed data starts with magic number 0x28 0xB5 0x2F 0xFD 262 + if c.config.Compress && len(data) >= 4 && data[0] == 0x28 && data[1] == 0xB5 && data[2] == 0x2F && data[3] == 0xFD { 263 + decompressed, err := c.zstdDecoder.DecodeAll(data, nil) 264 + if err != nil { 265 + return fmt.Errorf("failed to decompress message: %w", err) 266 + } 267 + data = decompressed 268 + } else if c.config.Compress && len(data) > 0 && data[0] != '{' { 269 + // Try decompression anyway if it doesn't look like JSON 270 + decompressed, err := c.zstdDecoder.DecodeAll(data, nil) 271 + if err == nil { 272 + data = decompressed 273 + } 274 + // If decompression fails, try parsing as-is (maybe it's uncompressed) 275 + } 276 + 277 + var event JetstreamEvent 278 + if err := json.Unmarshal(data, &event); err != nil { 279 + // Log the first few bytes for debugging 280 + preview := data 281 + if len(preview) > 50 { 282 + preview = preview[:50] 283 + } 284 + return fmt.Errorf("failed to unmarshal event (first bytes: %q): %w", preview, err) 285 + } 286 + 287 + c.eventsReceived.Add(1) 288 + 289 + // Update cursor 290 + if event.TimeUS > 0 { 291 + c.cursor.Store(event.TimeUS) 292 + 293 + // Persist cursor periodically (every 1000 events) 294 + if c.eventsReceived.Load()%1000 == 0 { 295 + if err := c.index.SetCursor(event.TimeUS); err != nil { 296 + log.Warn().Err(err).Msg("firehose: failed to persist cursor") 297 + } 298 + } 299 + } 300 + 301 + // Only process commit events 302 + if event.Kind != "commit" || event.Commit == nil { 303 + return nil 304 + } 305 + 306 + commit := event.Commit 307 + 308 + // Verify it's an Arabica collection 309 + if !strings.HasPrefix(commit.Collection, "social.arabica.alpha.") { 310 + return nil 311 + } 312 + 313 + log.Debug(). 314 + Str("did", event.DID). 315 + Str("collection", commit.Collection). 316 + Str("operation", commit.Operation). 317 + Str("rkey", commit.RKey). 318 + Msg("firehose: processing event") 319 + 320 + switch commit.Operation { 321 + case "create", "update": 322 + if commit.Record == nil { 323 + return nil 324 + } 325 + if err := c.index.UpsertRecord( 326 + event.DID, 327 + commit.Collection, 328 + commit.RKey, 329 + commit.CID, 330 + commit.Record, 331 + event.TimeUS, 332 + ); err != nil { 333 + return fmt.Errorf("failed to upsert record: %w", err) 334 + } 335 + 336 + case "delete": 337 + if err := c.index.DeleteRecord( 338 + event.DID, 339 + commit.Collection, 340 + commit.RKey, 341 + ); err != nil { 342 + return fmt.Errorf("failed to delete record: %w", err) 343 + } 344 + } 345 + 346 + return nil 347 + } 348 + 349 + // BackfillKnownUsers backfills records for all known DIDs 350 + // This is useful on startup to ensure we have all existing records 351 + func (c *Consumer) BackfillKnownUsers(ctx context.Context) error { 352 + dids, err := c.index.GetKnownDIDs() 353 + if err != nil { 354 + return fmt.Errorf("failed to get known DIDs: %w", err) 355 + } 356 + 357 + log.Info().Int("count", len(dids)).Msg("firehose: backfilling known users") 358 + 359 + for _, did := range dids { 360 + select { 361 + case <-ctx.Done(): 362 + return ctx.Err() 363 + default: 364 + } 365 + 366 + if err := c.index.BackfillUser(ctx, did); err != nil { 367 + log.Warn().Err(err).Str("did", did).Msg("firehose: failed to backfill user") 368 + } 369 + 370 + // Small delay to avoid hammering PDS servers 371 + time.Sleep(100 * time.Millisecond) 372 + } 373 + 374 + return nil 375 + } 376 + 377 + // BackfillDID backfills records for a specific DID 378 + func (c *Consumer) BackfillDID(ctx context.Context, did string) error { 379 + return c.index.BackfillUser(ctx, did) 380 + }
+726
internal/firehose/index.go
··· 1 + package firehose 2 + 3 + import ( 4 + "context" 5 + "encoding/binary" 6 + "encoding/json" 7 + "fmt" 8 + "os" 9 + "path/filepath" 10 + "sort" 11 + "strings" 12 + "sync" 13 + "time" 14 + 15 + "arabica/internal/atproto" 16 + "arabica/internal/models" 17 + 18 + "github.com/rs/zerolog/log" 19 + bolt "go.etcd.io/bbolt" 20 + ) 21 + 22 + // Bucket names for the feed index 23 + var ( 24 + // BucketRecords stores full record data: {at-uri} -> {IndexedRecord JSON} 25 + BucketRecords = []byte("records") 26 + 27 + // BucketByTime stores records by timestamp for chronological queries: {timestamp:at-uri} -> {} 28 + BucketByTime = []byte("by_time") 29 + 30 + // BucketByDID stores records by DID for user-specific queries: {did:at-uri} -> {} 31 + BucketByDID = []byte("by_did") 32 + 33 + // BucketByCollection stores records by type: {collection:timestamp:at-uri} -> {} 34 + BucketByCollection = []byte("by_collection") 35 + 36 + // BucketProfiles stores cached profile data: {did} -> {CachedProfile JSON} 37 + BucketProfiles = []byte("profiles") 38 + 39 + // BucketMeta stores metadata like cursor position: {key} -> {value} 40 + BucketMeta = []byte("meta") 41 + 42 + // BucketKnownDIDs stores all DIDs we've seen with Arabica records 43 + BucketKnownDIDs = []byte("known_dids") 44 + ) 45 + 46 + // IndexedRecord represents a record stored in the index 47 + type IndexedRecord struct { 48 + URI string `json:"uri"` 49 + DID string `json:"did"` 50 + Collection string `json:"collection"` 51 + RKey string `json:"rkey"` 52 + Record json.RawMessage `json:"record"` 53 + CID string `json:"cid"` 54 + IndexedAt time.Time `json:"indexed_at"` 55 + CreatedAt time.Time `json:"created_at"` // Parsed from record 56 + } 57 + 58 + // CachedProfile stores profile data with TTL 59 + type CachedProfile struct { 60 + Profile *atproto.Profile `json:"profile"` 61 + CachedAt time.Time `json:"cached_at"` 62 + ExpiresAt time.Time `json:"expires_at"` 63 + } 64 + 65 + // FeedIndex provides persistent storage for firehose events 66 + type FeedIndex struct { 67 + db *bolt.DB 68 + publicClient *atproto.PublicClient 69 + profileTTL time.Duration 70 + 71 + // In-memory cache for hot data 72 + profileCache map[string]*CachedProfile 73 + profileCacheMu sync.RWMutex 74 + 75 + ready bool 76 + readyMu sync.RWMutex 77 + } 78 + 79 + // NewFeedIndex creates a new feed index backed by BoltDB 80 + func NewFeedIndex(path string, profileTTL time.Duration) (*FeedIndex, error) { 81 + if path == "" { 82 + return nil, fmt.Errorf("index path is required") 83 + } 84 + 85 + // Ensure parent directory exists 86 + dir := filepath.Dir(path) 87 + if dir != "" && dir != "." { 88 + if err := os.MkdirAll(dir, 0755); err != nil { 89 + return nil, fmt.Errorf("failed to create index directory: %w", err) 90 + } 91 + } 92 + 93 + db, err := bolt.Open(path, 0600, &bolt.Options{ 94 + Timeout: 5 * time.Second, 95 + }) 96 + if err != nil { 97 + return nil, fmt.Errorf("failed to open index database: %w", err) 98 + } 99 + 100 + // Create buckets 101 + err = db.Update(func(tx *bolt.Tx) error { 102 + buckets := [][]byte{ 103 + BucketRecords, 104 + BucketByTime, 105 + BucketByDID, 106 + BucketByCollection, 107 + BucketProfiles, 108 + BucketMeta, 109 + BucketKnownDIDs, 110 + } 111 + for _, bucket := range buckets { 112 + if _, err := tx.CreateBucketIfNotExists(bucket); err != nil { 113 + return fmt.Errorf("failed to create bucket %s: %w", bucket, err) 114 + } 115 + } 116 + return nil 117 + }) 118 + if err != nil { 119 + db.Close() 120 + return nil, err 121 + } 122 + 123 + idx := &FeedIndex{ 124 + db: db, 125 + publicClient: atproto.NewPublicClient(), 126 + profileTTL: profileTTL, 127 + profileCache: make(map[string]*CachedProfile), 128 + } 129 + 130 + return idx, nil 131 + } 132 + 133 + // Close closes the index database 134 + func (idx *FeedIndex) Close() error { 135 + if idx.db != nil { 136 + return idx.db.Close() 137 + } 138 + return nil 139 + } 140 + 141 + // SetReady marks the index as ready to serve queries 142 + func (idx *FeedIndex) SetReady(ready bool) { 143 + idx.readyMu.Lock() 144 + defer idx.readyMu.Unlock() 145 + idx.ready = ready 146 + } 147 + 148 + // IsReady returns true if the index is populated and ready 149 + func (idx *FeedIndex) IsReady() bool { 150 + idx.readyMu.RLock() 151 + defer idx.readyMu.RUnlock() 152 + return idx.ready 153 + } 154 + 155 + // GetCursor returns the last processed cursor (microseconds timestamp) 156 + func (idx *FeedIndex) GetCursor() (int64, error) { 157 + var cursor int64 158 + err := idx.db.View(func(tx *bolt.Tx) error { 159 + b := tx.Bucket(BucketMeta) 160 + v := b.Get([]byte("cursor")) 161 + if v != nil && len(v) == 8 { 162 + cursor = int64(binary.BigEndian.Uint64(v)) 163 + } 164 + return nil 165 + }) 166 + return cursor, err 167 + } 168 + 169 + // SetCursor stores the cursor position 170 + func (idx *FeedIndex) SetCursor(cursor int64) error { 171 + return idx.db.Update(func(tx *bolt.Tx) error { 172 + b := tx.Bucket(BucketMeta) 173 + buf := make([]byte, 8) 174 + binary.BigEndian.PutUint64(buf, uint64(cursor)) 175 + return b.Put([]byte("cursor"), buf) 176 + }) 177 + } 178 + 179 + // UpsertRecord adds or updates a record in the index 180 + func (idx *FeedIndex) UpsertRecord(did, collection, rkey, cid string, record json.RawMessage, eventTime int64) error { 181 + uri := atproto.BuildATURI(did, collection, rkey) 182 + 183 + // Parse createdAt from record 184 + var recordData map[string]interface{} 185 + createdAt := time.Now() 186 + if err := json.Unmarshal(record, &recordData); err == nil { 187 + if createdAtStr, ok := recordData["createdAt"].(string); ok { 188 + if t, err := time.Parse(time.RFC3339, createdAtStr); err == nil { 189 + createdAt = t 190 + } 191 + } 192 + } 193 + 194 + indexed := &IndexedRecord{ 195 + URI: uri, 196 + DID: did, 197 + Collection: collection, 198 + RKey: rkey, 199 + Record: record, 200 + CID: cid, 201 + IndexedAt: time.Now(), 202 + CreatedAt: createdAt, 203 + } 204 + 205 + data, err := json.Marshal(indexed) 206 + if err != nil { 207 + return fmt.Errorf("failed to marshal record: %w", err) 208 + } 209 + 210 + return idx.db.Update(func(tx *bolt.Tx) error { 211 + // Store the record 212 + records := tx.Bucket(BucketRecords) 213 + if err := records.Put([]byte(uri), data); err != nil { 214 + return err 215 + } 216 + 217 + // Index by time (use createdAt for sorting, not event time) 218 + byTime := tx.Bucket(BucketByTime) 219 + timeKey := makeTimeKey(createdAt, uri) 220 + if err := byTime.Put(timeKey, nil); err != nil { 221 + return err 222 + } 223 + 224 + // Index by DID 225 + byDID := tx.Bucket(BucketByDID) 226 + didKey := []byte(did + ":" + uri) 227 + if err := byDID.Put(didKey, nil); err != nil { 228 + return err 229 + } 230 + 231 + // Index by collection 232 + byCollection := tx.Bucket(BucketByCollection) 233 + collKey := []byte(collection + ":" + string(timeKey)) 234 + if err := byCollection.Put(collKey, nil); err != nil { 235 + return err 236 + } 237 + 238 + // Track known DID 239 + knownDIDs := tx.Bucket(BucketKnownDIDs) 240 + if err := knownDIDs.Put([]byte(did), []byte("1")); err != nil { 241 + return err 242 + } 243 + 244 + return nil 245 + }) 246 + } 247 + 248 + // DeleteRecord removes a record from the index 249 + func (idx *FeedIndex) DeleteRecord(did, collection, rkey string) error { 250 + uri := atproto.BuildATURI(did, collection, rkey) 251 + 252 + return idx.db.Update(func(tx *bolt.Tx) error { 253 + // Get the existing record to find its timestamp 254 + records := tx.Bucket(BucketRecords) 255 + existingData := records.Get([]byte(uri)) 256 + if existingData == nil { 257 + // Record doesn't exist, nothing to delete 258 + return nil 259 + } 260 + 261 + var existing IndexedRecord 262 + if err := json.Unmarshal(existingData, &existing); err != nil { 263 + // Can't parse, just delete the main record 264 + return records.Delete([]byte(uri)) 265 + } 266 + 267 + // Delete from records 268 + if err := records.Delete([]byte(uri)); err != nil { 269 + return err 270 + } 271 + 272 + // Delete from by_time index 273 + byTime := tx.Bucket(BucketByTime) 274 + timeKey := makeTimeKey(existing.CreatedAt, uri) 275 + if err := byTime.Delete(timeKey); err != nil { 276 + return err 277 + } 278 + 279 + // Delete from by_did index 280 + byDID := tx.Bucket(BucketByDID) 281 + didKey := []byte(did + ":" + uri) 282 + if err := byDID.Delete(didKey); err != nil { 283 + return err 284 + } 285 + 286 + // Delete from by_collection index 287 + byCollection := tx.Bucket(BucketByCollection) 288 + collKey := []byte(collection + ":" + string(timeKey)) 289 + if err := byCollection.Delete(collKey); err != nil { 290 + return err 291 + } 292 + 293 + return nil 294 + }) 295 + } 296 + 297 + // GetRecord retrieves a single record by URI 298 + func (idx *FeedIndex) GetRecord(uri string) (*IndexedRecord, error) { 299 + var record *IndexedRecord 300 + err := idx.db.View(func(tx *bolt.Tx) error { 301 + b := tx.Bucket(BucketRecords) 302 + data := b.Get([]byte(uri)) 303 + if data == nil { 304 + return nil 305 + } 306 + record = &IndexedRecord{} 307 + return json.Unmarshal(data, record) 308 + }) 309 + return record, err 310 + } 311 + 312 + // FeedItem represents an item in the feed (matches feed.FeedItem structure) 313 + type FeedItem struct { 314 + RecordType string 315 + Action string 316 + 317 + Brew *models.Brew 318 + Bean *models.Bean 319 + Roaster *models.Roaster 320 + Grinder *models.Grinder 321 + Brewer *models.Brewer 322 + 323 + Author *atproto.Profile 324 + Timestamp time.Time 325 + TimeAgo string 326 + } 327 + 328 + // GetRecentFeed returns recent feed items from the index 329 + func (idx *FeedIndex) GetRecentFeed(ctx context.Context, limit int) ([]*FeedItem, error) { 330 + var records []*IndexedRecord 331 + 332 + err := idx.db.View(func(tx *bolt.Tx) error { 333 + byTime := tx.Bucket(BucketByTime) 334 + recordsBucket := tx.Bucket(BucketRecords) 335 + 336 + c := byTime.Cursor() 337 + 338 + // Iterate in reverse (newest first) 339 + count := 0 340 + for k, _ := c.Last(); k != nil && count < limit*2; k, _ = c.Prev() { 341 + // Extract URI from key (format: timestamp:uri) 342 + uri := extractURIFromTimeKey(k) 343 + if uri == "" { 344 + continue 345 + } 346 + 347 + data := recordsBucket.Get([]byte(uri)) 348 + if data == nil { 349 + continue 350 + } 351 + 352 + var record IndexedRecord 353 + if err := json.Unmarshal(data, &record); err != nil { 354 + continue 355 + } 356 + 357 + records = append(records, &record) 358 + count++ 359 + } 360 + 361 + return nil 362 + }) 363 + if err != nil { 364 + return nil, err 365 + } 366 + 367 + // Build lookup maps for reference resolution 368 + recordsByURI := make(map[string]*IndexedRecord) 369 + for _, r := range records { 370 + recordsByURI[r.URI] = r 371 + } 372 + 373 + // Also load additional records we might need for references 374 + err = idx.db.View(func(tx *bolt.Tx) error { 375 + recordsBucket := tx.Bucket(BucketRecords) 376 + return recordsBucket.ForEach(func(k, v []byte) error { 377 + uri := string(k) 378 + if _, exists := recordsByURI[uri]; exists { 379 + return nil 380 + } 381 + var record IndexedRecord 382 + if err := json.Unmarshal(v, &record); err != nil { 383 + return nil 384 + } 385 + // Only load beans, roasters, grinders, brewers for reference resolution 386 + switch record.Collection { 387 + case atproto.NSIDBean, atproto.NSIDRoaster, atproto.NSIDGrinder, atproto.NSIDBrewer: 388 + recordsByURI[uri] = &record 389 + } 390 + return nil 391 + }) 392 + }) 393 + if err != nil { 394 + return nil, err 395 + } 396 + 397 + // Convert to FeedItems 398 + items := make([]*FeedItem, 0, len(records)) 399 + for _, record := range records { 400 + item, err := idx.recordToFeedItem(ctx, record, recordsByURI) 401 + if err != nil { 402 + log.Warn().Err(err).Str("uri", record.URI).Msg("failed to convert record to feed item") 403 + continue 404 + } 405 + items = append(items, item) 406 + } 407 + 408 + // Sort by timestamp descending 409 + sort.Slice(items, func(i, j int) bool { 410 + return items[i].Timestamp.After(items[j].Timestamp) 411 + }) 412 + 413 + // Apply limit 414 + if len(items) > limit { 415 + items = items[:limit] 416 + } 417 + 418 + return items, nil 419 + } 420 + 421 + // recordToFeedItem converts an IndexedRecord to a FeedItem 422 + func (idx *FeedIndex) recordToFeedItem(ctx context.Context, record *IndexedRecord, refMap map[string]*IndexedRecord) (*FeedItem, error) { 423 + var recordData map[string]interface{} 424 + if err := json.Unmarshal(record.Record, &recordData); err != nil { 425 + return nil, err 426 + } 427 + 428 + item := &FeedItem{ 429 + Timestamp: record.CreatedAt, 430 + TimeAgo: formatTimeAgo(record.CreatedAt), 431 + } 432 + 433 + // Get author profile 434 + profile, err := idx.GetProfile(ctx, record.DID) 435 + if err != nil { 436 + log.Warn().Err(err).Str("did", record.DID).Msg("failed to get profile") 437 + // Use a placeholder profile 438 + profile = &atproto.Profile{ 439 + DID: record.DID, 440 + Handle: record.DID, // Use DID as handle if we can't resolve 441 + } 442 + } 443 + item.Author = profile 444 + 445 + switch record.Collection { 446 + case atproto.NSIDBrew: 447 + brew, err := atproto.RecordToBrew(recordData, record.URI) 448 + if err != nil { 449 + return nil, err 450 + } 451 + 452 + // Resolve bean reference 453 + if beanRef, ok := recordData["beanRef"].(string); ok && beanRef != "" { 454 + if beanRecord, found := refMap[beanRef]; found { 455 + var beanData map[string]interface{} 456 + if err := json.Unmarshal(beanRecord.Record, &beanData); err == nil { 457 + bean, _ := atproto.RecordToBean(beanData, beanRef) 458 + brew.Bean = bean 459 + 460 + // Resolve roaster reference for bean 461 + if roasterRef, ok := beanData["roasterRef"].(string); ok && roasterRef != "" { 462 + if roasterRecord, found := refMap[roasterRef]; found { 463 + var roasterData map[string]interface{} 464 + if err := json.Unmarshal(roasterRecord.Record, &roasterData); err == nil { 465 + roaster, _ := atproto.RecordToRoaster(roasterData, roasterRef) 466 + brew.Bean.Roaster = roaster 467 + } 468 + } 469 + } 470 + } 471 + } 472 + } 473 + 474 + // Resolve grinder reference 475 + if grinderRef, ok := recordData["grinderRef"].(string); ok && grinderRef != "" { 476 + if grinderRecord, found := refMap[grinderRef]; found { 477 + var grinderData map[string]interface{} 478 + if err := json.Unmarshal(grinderRecord.Record, &grinderData); err == nil { 479 + grinder, _ := atproto.RecordToGrinder(grinderData, grinderRef) 480 + brew.GrinderObj = grinder 481 + } 482 + } 483 + } 484 + 485 + // Resolve brewer reference 486 + if brewerRef, ok := recordData["brewerRef"].(string); ok && brewerRef != "" { 487 + if brewerRecord, found := refMap[brewerRef]; found { 488 + var brewerData map[string]interface{} 489 + if err := json.Unmarshal(brewerRecord.Record, &brewerData); err == nil { 490 + brewer, _ := atproto.RecordToBrewer(brewerData, brewerRef) 491 + brew.BrewerObj = brewer 492 + } 493 + } 494 + } 495 + 496 + item.RecordType = "brew" 497 + item.Action = "added a new brew" 498 + item.Brew = brew 499 + 500 + case atproto.NSIDBean: 501 + bean, err := atproto.RecordToBean(recordData, record.URI) 502 + if err != nil { 503 + return nil, err 504 + } 505 + 506 + // Resolve roaster reference 507 + if roasterRef, ok := recordData["roasterRef"].(string); ok && roasterRef != "" { 508 + if roasterRecord, found := refMap[roasterRef]; found { 509 + var roasterData map[string]interface{} 510 + if err := json.Unmarshal(roasterRecord.Record, &roasterData); err == nil { 511 + roaster, _ := atproto.RecordToRoaster(roasterData, roasterRef) 512 + bean.Roaster = roaster 513 + } 514 + } 515 + } 516 + 517 + item.RecordType = "bean" 518 + item.Action = "added a new bean" 519 + item.Bean = bean 520 + 521 + case atproto.NSIDRoaster: 522 + roaster, err := atproto.RecordToRoaster(recordData, record.URI) 523 + if err != nil { 524 + return nil, err 525 + } 526 + item.RecordType = "roaster" 527 + item.Action = "added a new roaster" 528 + item.Roaster = roaster 529 + 530 + case atproto.NSIDGrinder: 531 + grinder, err := atproto.RecordToGrinder(recordData, record.URI) 532 + if err != nil { 533 + return nil, err 534 + } 535 + item.RecordType = "grinder" 536 + item.Action = "added a new grinder" 537 + item.Grinder = grinder 538 + 539 + case atproto.NSIDBrewer: 540 + brewer, err := atproto.RecordToBrewer(recordData, record.URI) 541 + if err != nil { 542 + return nil, err 543 + } 544 + item.RecordType = "brewer" 545 + item.Action = "added a new brewer" 546 + item.Brewer = brewer 547 + 548 + default: 549 + return nil, fmt.Errorf("unknown collection: %s", record.Collection) 550 + } 551 + 552 + return item, nil 553 + } 554 + 555 + // GetProfile fetches a profile, using cache when possible 556 + func (idx *FeedIndex) GetProfile(ctx context.Context, did string) (*atproto.Profile, error) { 557 + // Check in-memory cache first 558 + idx.profileCacheMu.RLock() 559 + if cached, ok := idx.profileCache[did]; ok && time.Now().Before(cached.ExpiresAt) { 560 + idx.profileCacheMu.RUnlock() 561 + return cached.Profile, nil 562 + } 563 + idx.profileCacheMu.RUnlock() 564 + 565 + // Check persistent cache 566 + var cached *CachedProfile 567 + err := idx.db.View(func(tx *bolt.Tx) error { 568 + b := tx.Bucket(BucketProfiles) 569 + data := b.Get([]byte(did)) 570 + if data == nil { 571 + return nil 572 + } 573 + cached = &CachedProfile{} 574 + return json.Unmarshal(data, cached) 575 + }) 576 + if err == nil && cached != nil && time.Now().Before(cached.ExpiresAt) { 577 + // Update in-memory cache 578 + idx.profileCacheMu.Lock() 579 + idx.profileCache[did] = cached 580 + idx.profileCacheMu.Unlock() 581 + return cached.Profile, nil 582 + } 583 + 584 + // Fetch from API 585 + profile, err := idx.publicClient.GetProfile(ctx, did) 586 + if err != nil { 587 + return nil, err 588 + } 589 + 590 + // Cache the result 591 + now := time.Now() 592 + cached = &CachedProfile{ 593 + Profile: profile, 594 + CachedAt: now, 595 + ExpiresAt: now.Add(idx.profileTTL), 596 + } 597 + 598 + // Update in-memory cache 599 + idx.profileCacheMu.Lock() 600 + idx.profileCache[did] = cached 601 + idx.profileCacheMu.Unlock() 602 + 603 + // Persist to database 604 + data, _ := json.Marshal(cached) 605 + _ = idx.db.Update(func(tx *bolt.Tx) error { 606 + b := tx.Bucket(BucketProfiles) 607 + return b.Put([]byte(did), data) 608 + }) 609 + 610 + return profile, nil 611 + } 612 + 613 + // GetKnownDIDs returns all DIDs that have created Arabica records 614 + func (idx *FeedIndex) GetKnownDIDs() ([]string, error) { 615 + var dids []string 616 + err := idx.db.View(func(tx *bolt.Tx) error { 617 + b := tx.Bucket(BucketKnownDIDs) 618 + return b.ForEach(func(k, v []byte) error { 619 + dids = append(dids, string(k)) 620 + return nil 621 + }) 622 + }) 623 + return dids, err 624 + } 625 + 626 + // RecordCount returns the total number of indexed records 627 + func (idx *FeedIndex) RecordCount() int { 628 + var count int 629 + _ = idx.db.View(func(tx *bolt.Tx) error { 630 + b := tx.Bucket(BucketRecords) 631 + count = b.Stats().KeyN 632 + return nil 633 + }) 634 + return count 635 + } 636 + 637 + // Helper functions 638 + 639 + func makeTimeKey(t time.Time, uri string) []byte { 640 + // Format: inverted timestamp (for reverse chronological order) + ":" + uri 641 + // Use nanoseconds for uniqueness 642 + inverted := ^uint64(t.UnixNano()) 643 + buf := make([]byte, 8) 644 + binary.BigEndian.PutUint64(buf, inverted) 645 + return append(buf, []byte(":"+uri)...) 646 + } 647 + 648 + func extractURIFromTimeKey(key []byte) string { 649 + if len(key) < 10 { // 8 bytes timestamp + ":" + at least 1 char 650 + return "" 651 + } 652 + // Skip 8 bytes timestamp + 1 byte ":" 653 + return string(key[9:]) 654 + } 655 + 656 + func formatTimeAgo(t time.Time) string { 657 + now := time.Now() 658 + diff := now.Sub(t) 659 + 660 + switch { 661 + case diff < time.Minute: 662 + return "just now" 663 + case diff < time.Hour: 664 + mins := int(diff.Minutes()) 665 + if mins == 1 { 666 + return "1 minute ago" 667 + } 668 + return fmt.Sprintf("%d minutes ago", mins) 669 + case diff < 24*time.Hour: 670 + hours := int(diff.Hours()) 671 + if hours == 1 { 672 + return "1 hour ago" 673 + } 674 + return fmt.Sprintf("%d hours ago", hours) 675 + case diff < 48*time.Hour: 676 + return "yesterday" 677 + case diff < 7*24*time.Hour: 678 + days := int(diff.Hours() / 24) 679 + return fmt.Sprintf("%d days ago", days) 680 + case diff < 30*24*time.Hour: 681 + weeks := int(diff.Hours() / 24 / 7) 682 + if weeks == 1 { 683 + return "1 week ago" 684 + } 685 + return fmt.Sprintf("%d weeks ago", weeks) 686 + default: 687 + months := int(diff.Hours() / 24 / 30) 688 + if months == 1 { 689 + return "1 month ago" 690 + } 691 + return fmt.Sprintf("%d months ago", months) 692 + } 693 + } 694 + 695 + // BackfillUser fetches all existing records for a DID and adds them to the index 696 + func (idx *FeedIndex) BackfillUser(ctx context.Context, did string) error { 697 + log.Info().Str("did", did).Msg("backfilling user records") 698 + 699 + for _, collection := range ArabicaCollections { 700 + records, err := idx.publicClient.ListRecords(ctx, did, collection, 100) 701 + if err != nil { 702 + log.Warn().Err(err).Str("did", did).Str("collection", collection).Msg("failed to list records for backfill") 703 + continue 704 + } 705 + 706 + for _, record := range records.Records { 707 + // Extract rkey from URI 708 + parts := strings.Split(record.URI, "/") 709 + if len(parts) < 3 { 710 + continue 711 + } 712 + rkey := parts[len(parts)-1] 713 + 714 + recordJSON, err := json.Marshal(record.Value) 715 + if err != nil { 716 + continue 717 + } 718 + 719 + if err := idx.UpsertRecord(did, collection, rkey, record.CID, recordJSON, 0); err != nil { 720 + log.Warn().Err(err).Str("uri", record.URI).Msg("failed to upsert record during backfill") 721 + } 722 + } 723 + } 724 + 725 + return nil 726 + }
+3
justfile
··· 4 4 run-production: 5 5 @LOG_FORMAT=json SECURE_COOKIES=true go run cmd/server/main.go 6 6 7 + run-firehose: 8 + @LOG_LEVEL=debug LOG_FORMAT=console go run cmd/server/main.go -firehose 9 + 7 10 test: 8 11 @go test ./... -cover -coverprofile=cover.out 9 12
+11 -1
module.nix
··· 36 36 default = true; 37 37 description = "Whether to set the Secure flag on cookies. Should be true when using HTTPS."; 38 38 }; 39 + 40 + firehose = lib.mkOption { 41 + type = lib.types.bool; 42 + default = false; 43 + description = '' 44 + Enable firehose-based feed using Jetstream. 45 + This provides real-time feed updates with zero API calls per request, 46 + instead of polling each user's PDS. 47 + ''; 48 + }; 39 49 }; 40 50 41 51 oauth = { ··· 103 113 Type = "simple"; 104 114 User = cfg.user; 105 115 Group = cfg.group; 106 - ExecStart = "${cfg.package}/bin/arabica"; 116 + ExecStart = "${cfg.package}/bin/arabica${lib.optionalString cfg.settings.firehose " -firehose"}"; 107 117 Restart = "on-failure"; 108 118 RestartSec = "10s"; 109 119