Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
at main 472 lines 14 kB view raw
1package feed 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 "time" 8 9 "arabica/internal/atproto" 10 "arabica/internal/lexicons" 11 "arabica/internal/metrics" 12 "arabica/internal/models" 13 14 "github.com/rs/zerolog/log" 15) 16 17// ModerationFilter provides content filtering for moderation. 18// This interface allows the feed service to filter hidden/blacklisted content. 19type ModerationFilter interface { 20 IsRecordHidden(ctx context.Context, atURI string) bool 21 IsBlacklisted(ctx context.Context, did string) bool 22} 23 24const ( 25 // PublicFeedCacheTTL is the duration for which the public feed cache is valid. 26 // This value can be adjusted based on desired freshness vs. performance tradeoff. 27 // Consider values between 5-10 minutes for a good balance. 28 PublicFeedCacheTTL = 5 * time.Minute 29 30 // PublicFeedCacheSize is the number of items to cache in the server 31 PublicFeedCacheSize = 20 32 // PublicFeedLimit is the number of items to show for unauthenticated users 33 PublicFeedLimit = 5 34 // Number of feed items to show for authenticated users. 35 FeedLimit = 20 36) 37 38// FeedItem represents an activity in the social feed with author info 39type FeedItem struct { 40 // Record type and data (only one will be non-nil) 41 RecordType lexicons.RecordType // Use lexicons.RecordTypeBrew, lexicons.RecordTypeBean, etc. 42 Action string // "added a new brew", "added a new bean", etc. 43 44 Brew *models.Brew 45 Bean *models.Bean 46 Roaster *models.Roaster 47 Grinder *models.Grinder 48 Brewer *models.Brewer 49 50 Author *atproto.Profile 51 Timestamp time.Time 52 TimeAgo string // "2 hours ago", "yesterday", etc. 53 54 // Like-related fields 55 LikeCount int // Number of likes on this record 56 SubjectURI string // AT-URI of this record (for like button) 57 SubjectCID string // CID of this record (for like button) 58 IsLikedByViewer bool // Whether the current viewer has liked this record 59 60 // Comment-related fields 61 CommentCount int // Number of comments on this record 62 63 // Ownership 64 IsOwner bool // Whether the current viewer owns this record 65} 66 67// publicFeedCache holds cached feed items for unauthenticated users 68type publicFeedCache struct { 69 items []*FeedItem 70 expiresAt time.Time 71 mu sync.RWMutex 72} 73 74// FeedSort defines the sort order for feed queries 75type FeedSort string 76 77const ( 78 FeedSortRecent FeedSort = "recent" 79 FeedSortPopular FeedSort = "popular" 80) 81 82// FeedQuery specifies filtering, sorting, and pagination for feed queries 83type FeedQuery struct { 84 Limit int 85 Cursor string 86 TypeFilter lexicons.RecordType 87 Sort FeedSort 88} 89 90// FeedResult contains feed items plus pagination info 91type FeedResult struct { 92 Items []*FeedItem 93 NextCursor string 94} 95 96// FirehoseIndex is the interface for the firehose feed index 97// This allows the feed service to use firehose data when available 98type FirehoseIndex interface { 99 IsReady() bool 100 GetRecentFeed(ctx context.Context, limit int) ([]*FirehoseFeedItem, error) 101 GetFeedWithQuery(ctx context.Context, q FirehoseFeedQuery) (*FirehoseFeedResult, error) 102} 103 104// FirehoseFeedQuery mirrors FeedQuery for the firehose layer 105type FirehoseFeedQuery struct { 106 Limit int 107 Cursor string 108 TypeFilter lexicons.RecordType 109 Sort string // "recent" or "popular" 110} 111 112// FirehoseFeedResult mirrors FeedResult for the firehose layer 113type FirehoseFeedResult struct { 114 Items []*FirehoseFeedItem 115 NextCursor string 116} 117 118// FirehoseFeedItem matches the FeedItem structure from firehose package 119// This avoids import cycles 120type FirehoseFeedItem struct { 121 RecordType lexicons.RecordType 122 Action string 123 Brew *models.Brew 124 Bean *models.Bean 125 Roaster *models.Roaster 126 Grinder *models.Grinder 127 Brewer *models.Brewer 128 Author *atproto.Profile 129 Timestamp time.Time 130 TimeAgo string 131 LikeCount int 132 CommentCount int 133 SubjectURI string 134 SubjectCID string 135} 136 137// Service fetches and aggregates brews from registered users 138type Service struct { 139 registry *Registry 140 cache *publicFeedCache 141 firehoseIndex FirehoseIndex 142 moderationFilter ModerationFilter 143} 144 145// NewService creates a new feed service 146func NewService(registry *Registry) *Service { 147 return &Service{ 148 registry: registry, 149 cache: &publicFeedCache{}, 150 } 151} 152 153// SetFirehoseIndex configures the service to use firehose-based feed 154func (s *Service) SetFirehoseIndex(index FirehoseIndex) { 155 s.firehoseIndex = index 156 log.Info().Msg("feed: firehose index configured") 157} 158 159// SetModerationFilter configures the service to filter moderated content 160func (s *Service) SetModerationFilter(filter ModerationFilter) { 161 s.moderationFilter = filter 162 log.Info().Msg("feed: moderation filter configured") 163} 164 165// filterModeratedItems removes hidden records and content from blacklisted users 166func (s *Service) filterModeratedItems(ctx context.Context, items []*FeedItem) []*FeedItem { 167 if s.moderationFilter == nil { 168 return items 169 } 170 171 filtered := make([]*FeedItem, 0, len(items)) 172 for _, item := range items { 173 // Get author DID from the item 174 authorDID := s.getAuthorDID(item) 175 if authorDID != "" && s.moderationFilter.IsBlacklisted(ctx, authorDID) { 176 log.Debug().Str("author", authorDID).Msg("feed: filtering blacklisted user's content") 177 continue 178 } 179 180 // Check if the record is hidden 181 if item.SubjectURI != "" && s.moderationFilter.IsRecordHidden(ctx, item.SubjectURI) { 182 log.Debug().Str("uri", item.SubjectURI).Msg("feed: filtering hidden record") 183 continue 184 } 185 186 filtered = append(filtered, item) 187 } 188 189 if len(items) != len(filtered) { 190 log.Debug(). 191 Int("original", len(items)). 192 Int("filtered", len(filtered)). 193 Msg("feed: moderation filtering applied") 194 } 195 196 return filtered 197} 198 199// getAuthorDID extracts the author DID from a feed item 200func (s *Service) getAuthorDID(item *FeedItem) string { 201 if item.Author != nil { 202 return item.Author.DID 203 } 204 // Author should always be set on feed items, but handle gracefully 205 return "" 206} 207 208// GetCachedPublicFeed returns cached feed items for unauthenticated users. 209// It returns up to PublicFeedLimit items from the cache, refreshing if expired. 210// The cache stores PublicFeedCacheSize items internally but only returns PublicFeedLimit. 211// Moderated content is filtered even from cached items to ensure hidden content 212// doesn't appear if it was hidden after caching. 213func (s *Service) GetCachedPublicFeed(ctx context.Context) ([]*FeedItem, error) { 214 s.cache.mu.RLock() 215 cacheValid := time.Now().Before(s.cache.expiresAt) && len(s.cache.items) > 0 216 items := s.cache.items 217 s.cache.mu.RUnlock() 218 219 if cacheValid { 220 metrics.FeedCacheHitsTotal.Inc() 221 // Apply moderation filtering to cached items 222 // This ensures recently hidden content doesn't appear 223 items = s.filterModeratedItems(ctx, items) 224 225 // Return only the first PublicFeedLimit items from the cache 226 if len(items) > PublicFeedLimit { 227 items = items[:PublicFeedLimit] 228 } 229 log.Debug().Int("item_count", len(items)).Msg("feed: returning cached public feed") 230 return items, nil 231 } 232 233 // Cache is expired or empty, refresh it 234 return s.refreshPublicFeedCache(ctx) 235} 236 237// refreshPublicFeedCache fetches fresh feed items and updates the cache 238func (s *Service) refreshPublicFeedCache(ctx context.Context) ([]*FeedItem, error) { 239 s.cache.mu.Lock() 240 defer s.cache.mu.Unlock() 241 242 // Double-check if another goroutine already refreshed the cache 243 if time.Now().Before(s.cache.expiresAt) && len(s.cache.items) > 0 { 244 // Return only the first PublicFeedLimit items 245 items := s.cache.items 246 if len(items) > PublicFeedLimit { 247 items = items[:PublicFeedLimit] 248 } 249 return items, nil 250 } 251 252 metrics.FeedCacheMissesTotal.Inc() 253 log.Debug().Msg("feed: refreshing public feed cache") 254 255 // Fetch PublicFeedCacheSize items to cache (20 items) 256 items, err := s.GetRecentRecords(ctx, PublicFeedCacheSize) 257 if err != nil { 258 // If we have stale data, return it rather than failing 259 if len(s.cache.items) > 0 { 260 log.Warn().Err(err).Msg("feed: failed to refresh cache, returning stale data") 261 cachedItems := s.cache.items 262 if len(cachedItems) > PublicFeedLimit { 263 cachedItems = cachedItems[:PublicFeedLimit] 264 } 265 return cachedItems, nil 266 } 267 return nil, err 268 } 269 270 // Update cache with all fetched items 271 s.cache.items = items 272 s.cache.expiresAt = time.Now().Add(PublicFeedCacheTTL) 273 274 log.Debug(). 275 Int("cached_count", len(items)). 276 Time("expires_at", s.cache.expiresAt). 277 Msg("feed: updated public feed cache") 278 279 // Return only the first PublicFeedLimit items to the user 280 displayItems := items 281 if len(displayItems) > PublicFeedLimit { 282 displayItems = displayItems[:PublicFeedLimit] 283 } 284 285 return displayItems, nil 286} 287 288// GetRecentRecords fetches recent activity (brews and other records) from firehose index 289// Returns up to `limit` items sorted by most recent first 290// Moderated content (hidden records, blacklisted users) is filtered out 291func (s *Service) GetRecentRecords(ctx context.Context, limit int) ([]*FeedItem, error) { 292 if s.firehoseIndex == nil || !s.firehoseIndex.IsReady() { 293 log.Warn().Msg("feed: firehose index not ready") 294 return nil, fmt.Errorf("firehose index not ready") 295 } 296 297 log.Debug().Msg("feed: using firehose index") 298 299 // Fetch more items than requested to account for filtered content 300 // This ensures we can still return `limit` items after filtering 301 fetchLimit := limit 302 if s.moderationFilter != nil { 303 fetchLimit = limit + 10 // Buffer for filtered items 304 } 305 306 items, err := s.getRecentRecordsFromFirehose(ctx, fetchLimit) 307 if err != nil { 308 return nil, err 309 } 310 311 // Apply moderation filtering 312 items = s.filterModeratedItems(ctx, items) 313 314 // Trim to requested limit 315 if len(items) > limit { 316 items = items[:limit] 317 } 318 319 return items, nil 320} 321 322// GetFeedWithQuery fetches feed items with filtering, sorting, and pagination 323func (s *Service) GetFeedWithQuery(ctx context.Context, q FeedQuery) (*FeedResult, error) { 324 if s.firehoseIndex == nil || !s.firehoseIndex.IsReady() { 325 return nil, fmt.Errorf("firehose index not ready") 326 } 327 328 if q.Limit <= 0 { 329 q.Limit = FeedLimit 330 } 331 if q.Sort == "" { 332 q.Sort = FeedSortRecent 333 } 334 335 // Fetch more than needed to account for moderation filtering 336 fetchLimit := q.Limit 337 if s.moderationFilter != nil { 338 fetchLimit = q.Limit + 10 339 } 340 341 firehoseResult, err := s.firehoseIndex.GetFeedWithQuery(ctx, FirehoseFeedQuery{ 342 Limit: fetchLimit, 343 Cursor: q.Cursor, 344 TypeFilter: q.TypeFilter, 345 Sort: string(q.Sort), 346 }) 347 if err != nil { 348 return nil, err 349 } 350 351 // Convert to FeedItems 352 items := make([]*FeedItem, 0, len(firehoseResult.Items)) 353 for _, fi := range firehoseResult.Items { 354 items = append(items, &FeedItem{ 355 RecordType: fi.RecordType, 356 Action: fi.Action, 357 Brew: fi.Brew, 358 Bean: fi.Bean, 359 Roaster: fi.Roaster, 360 Grinder: fi.Grinder, 361 Brewer: fi.Brewer, 362 Author: fi.Author, 363 Timestamp: fi.Timestamp, 364 TimeAgo: fi.TimeAgo, 365 LikeCount: fi.LikeCount, 366 CommentCount: fi.CommentCount, 367 SubjectURI: fi.SubjectURI, 368 SubjectCID: fi.SubjectCID, 369 }) 370 } 371 372 // Apply moderation filtering 373 items = s.filterModeratedItems(ctx, items) 374 375 // Trim to requested limit 376 result := &FeedResult{ 377 NextCursor: firehoseResult.NextCursor, 378 } 379 if len(items) > q.Limit { 380 result.Items = items[:q.Limit] 381 } else { 382 result.Items = items 383 } 384 385 return result, nil 386} 387 388// getRecentRecordsFromFirehose fetches feed items from the firehose index 389func (s *Service) getRecentRecordsFromFirehose(ctx context.Context, limit int) ([]*FeedItem, error) { 390 firehoseItems, err := s.firehoseIndex.GetRecentFeed(ctx, limit) 391 if err != nil { 392 log.Warn().Err(err).Msg("feed: firehose index error") 393 return nil, err 394 } 395 396 // Convert FirehoseFeedItem to FeedItem 397 items := make([]*FeedItem, len(firehoseItems)) 398 for i, fi := range firehoseItems { 399 items[i] = &FeedItem{ 400 RecordType: fi.RecordType, 401 Action: fi.Action, 402 Brew: fi.Brew, 403 Bean: fi.Bean, 404 Roaster: fi.Roaster, 405 Grinder: fi.Grinder, 406 Brewer: fi.Brewer, 407 Author: fi.Author, 408 Timestamp: fi.Timestamp, 409 TimeAgo: fi.TimeAgo, 410 LikeCount: fi.LikeCount, 411 CommentCount: fi.CommentCount, 412 SubjectURI: fi.SubjectURI, 413 SubjectCID: fi.SubjectCID, 414 } 415 } 416 417 log.Debug().Int("count", len(items)).Msg("feed: returning items from firehose index") 418 return items, nil 419} 420 421// FormatTimeAgo returns a human-readable relative time string 422func FormatTimeAgo(t time.Time) string { 423 now := time.Now() 424 diff := now.Sub(t) 425 426 switch { 427 case diff < time.Minute: 428 return "just now" 429 case diff < time.Hour: 430 mins := int(diff.Minutes()) 431 if mins == 1 { 432 return "1 minute ago" 433 } 434 return formatPlural(mins, "minute") 435 case diff < 24*time.Hour: 436 hours := int(diff.Hours()) 437 if hours == 1 { 438 return "1 hour ago" 439 } 440 return formatPlural(hours, "hour") 441 case diff < 48*time.Hour: 442 return "yesterday" 443 case diff < 7*24*time.Hour: 444 days := int(diff.Hours() / 24) 445 return formatPlural(days, "day") 446 case diff < 30*24*time.Hour: 447 weeks := int(diff.Hours() / 24 / 7) 448 if weeks == 1 { 449 return "1 week ago" 450 } 451 return formatPlural(weeks, "week") 452 case diff < 365*24*time.Hour: 453 months := int(diff.Hours() / 24 / 30) 454 if months == 1 { 455 return "1 month ago" 456 } 457 return formatPlural(months, "month") 458 default: 459 years := int(diff.Hours() / 24 / 365) 460 if years == 1 { 461 return "1 year ago" 462 } 463 return formatPlural(years, "year") 464 } 465} 466 467func formatPlural(n int, unit string) string { 468 if n == 1 { 469 return "1 " + unit + " ago" 470 } 471 return fmt.Sprintf("%d %ss ago", n, unit) 472}