Coffee journaling on ATProto (alpha)
alpha.arabica.social
coffee
1package feed
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "arabica/internal/atproto"
10 "arabica/internal/lexicons"
11 "arabica/internal/metrics"
12 "arabica/internal/models"
13
14 "github.com/rs/zerolog/log"
15)
16
17// ModerationFilter provides content filtering for moderation.
18// This interface allows the feed service to filter hidden/blacklisted content.
19type ModerationFilter interface {
20 IsRecordHidden(ctx context.Context, atURI string) bool
21 IsBlacklisted(ctx context.Context, did string) bool
22}
23
24const (
25 // PublicFeedCacheTTL is the duration for which the public feed cache is valid.
26 // This value can be adjusted based on desired freshness vs. performance tradeoff.
27 // Consider values between 5-10 minutes for a good balance.
28 PublicFeedCacheTTL = 5 * time.Minute
29
30 // PublicFeedCacheSize is the number of items to cache in the server
31 PublicFeedCacheSize = 20
32 // PublicFeedLimit is the number of items to show for unauthenticated users
33 PublicFeedLimit = 5
34 // Number of feed items to show for authenticated users.
35 FeedLimit = 20
36)
37
38// FeedItem represents an activity in the social feed with author info
39type FeedItem struct {
40 // Record type and data (only one will be non-nil)
41 RecordType lexicons.RecordType // Use lexicons.RecordTypeBrew, lexicons.RecordTypeBean, etc.
42 Action string // "added a new brew", "added a new bean", etc.
43
44 Brew *models.Brew
45 Bean *models.Bean
46 Roaster *models.Roaster
47 Grinder *models.Grinder
48 Brewer *models.Brewer
49
50 Author *atproto.Profile
51 Timestamp time.Time
52 TimeAgo string // "2 hours ago", "yesterday", etc.
53
54 // Like-related fields
55 LikeCount int // Number of likes on this record
56 SubjectURI string // AT-URI of this record (for like button)
57 SubjectCID string // CID of this record (for like button)
58 IsLikedByViewer bool // Whether the current viewer has liked this record
59
60 // Comment-related fields
61 CommentCount int // Number of comments on this record
62
63 // Ownership
64 IsOwner bool // Whether the current viewer owns this record
65}
66
67// publicFeedCache holds cached feed items for unauthenticated users
68type publicFeedCache struct {
69 items []*FeedItem
70 expiresAt time.Time
71 mu sync.RWMutex
72}
73
74// FeedSort defines the sort order for feed queries
75type FeedSort string
76
77const (
78 FeedSortRecent FeedSort = "recent"
79 FeedSortPopular FeedSort = "popular"
80)
81
82// FeedQuery specifies filtering, sorting, and pagination for feed queries
83type FeedQuery struct {
84 Limit int
85 Cursor string
86 TypeFilter lexicons.RecordType
87 Sort FeedSort
88}
89
90// FeedResult contains feed items plus pagination info
91type FeedResult struct {
92 Items []*FeedItem
93 NextCursor string
94}
95
96// FirehoseIndex is the interface for the firehose feed index
97// This allows the feed service to use firehose data when available
98type FirehoseIndex interface {
99 IsReady() bool
100 GetRecentFeed(ctx context.Context, limit int) ([]*FirehoseFeedItem, error)
101 GetFeedWithQuery(ctx context.Context, q FirehoseFeedQuery) (*FirehoseFeedResult, error)
102}
103
104// FirehoseFeedQuery mirrors FeedQuery for the firehose layer
105type FirehoseFeedQuery struct {
106 Limit int
107 Cursor string
108 TypeFilter lexicons.RecordType
109 Sort string // "recent" or "popular"
110}
111
112// FirehoseFeedResult mirrors FeedResult for the firehose layer
113type FirehoseFeedResult struct {
114 Items []*FirehoseFeedItem
115 NextCursor string
116}
117
118// FirehoseFeedItem matches the FeedItem structure from firehose package
119// This avoids import cycles
120type FirehoseFeedItem struct {
121 RecordType lexicons.RecordType
122 Action string
123 Brew *models.Brew
124 Bean *models.Bean
125 Roaster *models.Roaster
126 Grinder *models.Grinder
127 Brewer *models.Brewer
128 Author *atproto.Profile
129 Timestamp time.Time
130 TimeAgo string
131 LikeCount int
132 CommentCount int
133 SubjectURI string
134 SubjectCID string
135}
136
137// Service fetches and aggregates brews from registered users
138type Service struct {
139 registry *Registry
140 cache *publicFeedCache
141 firehoseIndex FirehoseIndex
142 moderationFilter ModerationFilter
143}
144
145// NewService creates a new feed service
146func NewService(registry *Registry) *Service {
147 return &Service{
148 registry: registry,
149 cache: &publicFeedCache{},
150 }
151}
152
153// SetFirehoseIndex configures the service to use firehose-based feed
154func (s *Service) SetFirehoseIndex(index FirehoseIndex) {
155 s.firehoseIndex = index
156 log.Info().Msg("feed: firehose index configured")
157}
158
159// SetModerationFilter configures the service to filter moderated content
160func (s *Service) SetModerationFilter(filter ModerationFilter) {
161 s.moderationFilter = filter
162 log.Info().Msg("feed: moderation filter configured")
163}
164
165// filterModeratedItems removes hidden records and content from blacklisted users
166func (s *Service) filterModeratedItems(ctx context.Context, items []*FeedItem) []*FeedItem {
167 if s.moderationFilter == nil {
168 return items
169 }
170
171 filtered := make([]*FeedItem, 0, len(items))
172 for _, item := range items {
173 // Get author DID from the item
174 authorDID := s.getAuthorDID(item)
175 if authorDID != "" && s.moderationFilter.IsBlacklisted(ctx, authorDID) {
176 log.Debug().Str("author", authorDID).Msg("feed: filtering blacklisted user's content")
177 continue
178 }
179
180 // Check if the record is hidden
181 if item.SubjectURI != "" && s.moderationFilter.IsRecordHidden(ctx, item.SubjectURI) {
182 log.Debug().Str("uri", item.SubjectURI).Msg("feed: filtering hidden record")
183 continue
184 }
185
186 filtered = append(filtered, item)
187 }
188
189 if len(items) != len(filtered) {
190 log.Debug().
191 Int("original", len(items)).
192 Int("filtered", len(filtered)).
193 Msg("feed: moderation filtering applied")
194 }
195
196 return filtered
197}
198
199// getAuthorDID extracts the author DID from a feed item
200func (s *Service) getAuthorDID(item *FeedItem) string {
201 if item.Author != nil {
202 return item.Author.DID
203 }
204 // Author should always be set on feed items, but handle gracefully
205 return ""
206}
207
208// GetCachedPublicFeed returns cached feed items for unauthenticated users.
209// It returns up to PublicFeedLimit items from the cache, refreshing if expired.
210// The cache stores PublicFeedCacheSize items internally but only returns PublicFeedLimit.
211// Moderated content is filtered even from cached items to ensure hidden content
212// doesn't appear if it was hidden after caching.
213func (s *Service) GetCachedPublicFeed(ctx context.Context) ([]*FeedItem, error) {
214 s.cache.mu.RLock()
215 cacheValid := time.Now().Before(s.cache.expiresAt) && len(s.cache.items) > 0
216 items := s.cache.items
217 s.cache.mu.RUnlock()
218
219 if cacheValid {
220 metrics.FeedCacheHitsTotal.Inc()
221 // Apply moderation filtering to cached items
222 // This ensures recently hidden content doesn't appear
223 items = s.filterModeratedItems(ctx, items)
224
225 // Return only the first PublicFeedLimit items from the cache
226 if len(items) > PublicFeedLimit {
227 items = items[:PublicFeedLimit]
228 }
229 log.Debug().Int("item_count", len(items)).Msg("feed: returning cached public feed")
230 return items, nil
231 }
232
233 // Cache is expired or empty, refresh it
234 return s.refreshPublicFeedCache(ctx)
235}
236
237// refreshPublicFeedCache fetches fresh feed items and updates the cache
238func (s *Service) refreshPublicFeedCache(ctx context.Context) ([]*FeedItem, error) {
239 s.cache.mu.Lock()
240 defer s.cache.mu.Unlock()
241
242 // Double-check if another goroutine already refreshed the cache
243 if time.Now().Before(s.cache.expiresAt) && len(s.cache.items) > 0 {
244 // Return only the first PublicFeedLimit items
245 items := s.cache.items
246 if len(items) > PublicFeedLimit {
247 items = items[:PublicFeedLimit]
248 }
249 return items, nil
250 }
251
252 metrics.FeedCacheMissesTotal.Inc()
253 log.Debug().Msg("feed: refreshing public feed cache")
254
255 // Fetch PublicFeedCacheSize items to cache (20 items)
256 items, err := s.GetRecentRecords(ctx, PublicFeedCacheSize)
257 if err != nil {
258 // If we have stale data, return it rather than failing
259 if len(s.cache.items) > 0 {
260 log.Warn().Err(err).Msg("feed: failed to refresh cache, returning stale data")
261 cachedItems := s.cache.items
262 if len(cachedItems) > PublicFeedLimit {
263 cachedItems = cachedItems[:PublicFeedLimit]
264 }
265 return cachedItems, nil
266 }
267 return nil, err
268 }
269
270 // Update cache with all fetched items
271 s.cache.items = items
272 s.cache.expiresAt = time.Now().Add(PublicFeedCacheTTL)
273
274 log.Debug().
275 Int("cached_count", len(items)).
276 Time("expires_at", s.cache.expiresAt).
277 Msg("feed: updated public feed cache")
278
279 // Return only the first PublicFeedLimit items to the user
280 displayItems := items
281 if len(displayItems) > PublicFeedLimit {
282 displayItems = displayItems[:PublicFeedLimit]
283 }
284
285 return displayItems, nil
286}
287
288// GetRecentRecords fetches recent activity (brews and other records) from firehose index
289// Returns up to `limit` items sorted by most recent first
290// Moderated content (hidden records, blacklisted users) is filtered out
291func (s *Service) GetRecentRecords(ctx context.Context, limit int) ([]*FeedItem, error) {
292 if s.firehoseIndex == nil || !s.firehoseIndex.IsReady() {
293 log.Warn().Msg("feed: firehose index not ready")
294 return nil, fmt.Errorf("firehose index not ready")
295 }
296
297 log.Debug().Msg("feed: using firehose index")
298
299 // Fetch more items than requested to account for filtered content
300 // This ensures we can still return `limit` items after filtering
301 fetchLimit := limit
302 if s.moderationFilter != nil {
303 fetchLimit = limit + 10 // Buffer for filtered items
304 }
305
306 items, err := s.getRecentRecordsFromFirehose(ctx, fetchLimit)
307 if err != nil {
308 return nil, err
309 }
310
311 // Apply moderation filtering
312 items = s.filterModeratedItems(ctx, items)
313
314 // Trim to requested limit
315 if len(items) > limit {
316 items = items[:limit]
317 }
318
319 return items, nil
320}
321
322// GetFeedWithQuery fetches feed items with filtering, sorting, and pagination
323func (s *Service) GetFeedWithQuery(ctx context.Context, q FeedQuery) (*FeedResult, error) {
324 if s.firehoseIndex == nil || !s.firehoseIndex.IsReady() {
325 return nil, fmt.Errorf("firehose index not ready")
326 }
327
328 if q.Limit <= 0 {
329 q.Limit = FeedLimit
330 }
331 if q.Sort == "" {
332 q.Sort = FeedSortRecent
333 }
334
335 // Fetch more than needed to account for moderation filtering
336 fetchLimit := q.Limit
337 if s.moderationFilter != nil {
338 fetchLimit = q.Limit + 10
339 }
340
341 firehoseResult, err := s.firehoseIndex.GetFeedWithQuery(ctx, FirehoseFeedQuery{
342 Limit: fetchLimit,
343 Cursor: q.Cursor,
344 TypeFilter: q.TypeFilter,
345 Sort: string(q.Sort),
346 })
347 if err != nil {
348 return nil, err
349 }
350
351 // Convert to FeedItems
352 items := make([]*FeedItem, 0, len(firehoseResult.Items))
353 for _, fi := range firehoseResult.Items {
354 items = append(items, &FeedItem{
355 RecordType: fi.RecordType,
356 Action: fi.Action,
357 Brew: fi.Brew,
358 Bean: fi.Bean,
359 Roaster: fi.Roaster,
360 Grinder: fi.Grinder,
361 Brewer: fi.Brewer,
362 Author: fi.Author,
363 Timestamp: fi.Timestamp,
364 TimeAgo: fi.TimeAgo,
365 LikeCount: fi.LikeCount,
366 CommentCount: fi.CommentCount,
367 SubjectURI: fi.SubjectURI,
368 SubjectCID: fi.SubjectCID,
369 })
370 }
371
372 // Apply moderation filtering
373 items = s.filterModeratedItems(ctx, items)
374
375 // Trim to requested limit
376 result := &FeedResult{
377 NextCursor: firehoseResult.NextCursor,
378 }
379 if len(items) > q.Limit {
380 result.Items = items[:q.Limit]
381 } else {
382 result.Items = items
383 }
384
385 return result, nil
386}
387
388// getRecentRecordsFromFirehose fetches feed items from the firehose index
389func (s *Service) getRecentRecordsFromFirehose(ctx context.Context, limit int) ([]*FeedItem, error) {
390 firehoseItems, err := s.firehoseIndex.GetRecentFeed(ctx, limit)
391 if err != nil {
392 log.Warn().Err(err).Msg("feed: firehose index error")
393 return nil, err
394 }
395
396 // Convert FirehoseFeedItem to FeedItem
397 items := make([]*FeedItem, len(firehoseItems))
398 for i, fi := range firehoseItems {
399 items[i] = &FeedItem{
400 RecordType: fi.RecordType,
401 Action: fi.Action,
402 Brew: fi.Brew,
403 Bean: fi.Bean,
404 Roaster: fi.Roaster,
405 Grinder: fi.Grinder,
406 Brewer: fi.Brewer,
407 Author: fi.Author,
408 Timestamp: fi.Timestamp,
409 TimeAgo: fi.TimeAgo,
410 LikeCount: fi.LikeCount,
411 CommentCount: fi.CommentCount,
412 SubjectURI: fi.SubjectURI,
413 SubjectCID: fi.SubjectCID,
414 }
415 }
416
417 log.Debug().Int("count", len(items)).Msg("feed: returning items from firehose index")
418 return items, nil
419}
420
421// FormatTimeAgo returns a human-readable relative time string
422func FormatTimeAgo(t time.Time) string {
423 now := time.Now()
424 diff := now.Sub(t)
425
426 switch {
427 case diff < time.Minute:
428 return "just now"
429 case diff < time.Hour:
430 mins := int(diff.Minutes())
431 if mins == 1 {
432 return "1 minute ago"
433 }
434 return formatPlural(mins, "minute")
435 case diff < 24*time.Hour:
436 hours := int(diff.Hours())
437 if hours == 1 {
438 return "1 hour ago"
439 }
440 return formatPlural(hours, "hour")
441 case diff < 48*time.Hour:
442 return "yesterday"
443 case diff < 7*24*time.Hour:
444 days := int(diff.Hours() / 24)
445 return formatPlural(days, "day")
446 case diff < 30*24*time.Hour:
447 weeks := int(diff.Hours() / 24 / 7)
448 if weeks == 1 {
449 return "1 week ago"
450 }
451 return formatPlural(weeks, "week")
452 case diff < 365*24*time.Hour:
453 months := int(diff.Hours() / 24 / 30)
454 if months == 1 {
455 return "1 month ago"
456 }
457 return formatPlural(months, "month")
458 default:
459 years := int(diff.Hours() / 24 / 365)
460 if years == 1 {
461 return "1 year ago"
462 }
463 return formatPlural(years, "year")
464 }
465}
466
467func formatPlural(n int, unit string) string {
468 if n == 1 {
469 return "1 " + unit + " ago"
470 }
471 return fmt.Sprintf("%d %ss ago", n, unit)
472}