···1+// Package sqlitestore provides SQLite-backed store implementations.
2+package sqlitestore
3+4+import (
5+ "context"
6+ "database/sql"
7+ "encoding/json"
8+ "fmt"
9+ "time"
10+11+ "arabica/internal/moderation"
12+)
13+14+// ModerationStore implements moderation.Store using SQLite.
15+// It shares the database connection with the firehose FeedIndex.
16+type ModerationStore struct {
17+ db *sql.DB
18+}
19+20+// NewModerationStore creates a ModerationStore backed by the given database.
21+// The database must already have the moderation schema applied.
22+func NewModerationStore(db *sql.DB) *ModerationStore {
23+ return &ModerationStore{db: db}
24+}
25+26+// Ensure ModerationStore implements the interface at compile time.
27+var _ moderation.Store = (*ModerationStore)(nil)
28+29+// ========== Hidden Records ==========
30+31+func (s *ModerationStore) HideRecord(ctx context.Context, entry moderation.HiddenRecord) error {
32+ autoHidden := 0
33+ if entry.AutoHidden {
34+ autoHidden = 1
35+ }
36+ _, err := s.db.ExecContext(ctx, `
37+ INSERT INTO moderation_hidden_records (uri, hidden_at, hidden_by, reason, auto_hidden)
38+ VALUES (?, ?, ?, ?, ?)
39+ ON CONFLICT(uri) DO UPDATE SET
40+ hidden_at = excluded.hidden_at,
41+ hidden_by = excluded.hidden_by,
42+ reason = excluded.reason,
43+ auto_hidden = excluded.auto_hidden
44+ `, entry.ATURI, entry.HiddenAt.Format(time.RFC3339Nano), entry.HiddenBy, entry.Reason, autoHidden)
45+ if err != nil {
46+ return fmt.Errorf("hide record: %w", err)
47+ }
48+ return nil
49+}
50+51+func (s *ModerationStore) UnhideRecord(ctx context.Context, atURI string) error {
52+ _, err := s.db.ExecContext(ctx, `DELETE FROM moderation_hidden_records WHERE uri = ?`, atURI)
53+ return err
54+}
55+56+func (s *ModerationStore) IsRecordHidden(ctx context.Context, atURI string) bool {
57+ var exists int
58+ _ = s.db.QueryRowContext(ctx, `SELECT 1 FROM moderation_hidden_records WHERE uri = ?`, atURI).Scan(&exists)
59+ return exists == 1
60+}
61+62+func (s *ModerationStore) GetHiddenRecord(ctx context.Context, atURI string) (*moderation.HiddenRecord, error) {
63+ var r moderation.HiddenRecord
64+ var hiddenAtStr string
65+ var autoHidden int
66+ err := s.db.QueryRowContext(ctx, `
67+ SELECT uri, hidden_at, hidden_by, reason, auto_hidden
68+ FROM moderation_hidden_records WHERE uri = ?
69+ `, atURI).Scan(&r.ATURI, &hiddenAtStr, &r.HiddenBy, &r.Reason, &autoHidden)
70+ if err == sql.ErrNoRows {
71+ return nil, nil
72+ }
73+ if err != nil {
74+ return nil, err
75+ }
76+ r.HiddenAt, _ = time.Parse(time.RFC3339Nano, hiddenAtStr)
77+ r.AutoHidden = autoHidden == 1
78+ return &r, nil
79+}
80+81+func (s *ModerationStore) ListHiddenRecords(ctx context.Context) ([]moderation.HiddenRecord, error) {
82+ rows, err := s.db.QueryContext(ctx, `
83+ SELECT uri, hidden_at, hidden_by, reason, auto_hidden
84+ FROM moderation_hidden_records ORDER BY hidden_at DESC
85+ `)
86+ if err != nil {
87+ return nil, err
88+ }
89+ defer rows.Close()
90+91+ var records []moderation.HiddenRecord
92+ for rows.Next() {
93+ var r moderation.HiddenRecord
94+ var hiddenAtStr string
95+ var autoHidden int
96+ if err := rows.Scan(&r.ATURI, &hiddenAtStr, &r.HiddenBy, &r.Reason, &autoHidden); err != nil {
97+ continue
98+ }
99+ r.HiddenAt, _ = time.Parse(time.RFC3339Nano, hiddenAtStr)
100+ r.AutoHidden = autoHidden == 1
101+ records = append(records, r)
102+ }
103+ return records, rows.Err()
104+}
105+106+// ========== Blacklist ==========
107+108+func (s *ModerationStore) BlacklistUser(ctx context.Context, entry moderation.BlacklistedUser) error {
109+ _, err := s.db.ExecContext(ctx, `
110+ INSERT INTO moderation_blacklist (did, blacklisted_at, blacklisted_by, reason)
111+ VALUES (?, ?, ?, ?)
112+ ON CONFLICT(did) DO UPDATE SET
113+ blacklisted_at = excluded.blacklisted_at,
114+ blacklisted_by = excluded.blacklisted_by,
115+ reason = excluded.reason
116+ `, entry.DID, entry.BlacklistedAt.Format(time.RFC3339Nano), entry.BlacklistedBy, entry.Reason)
117+ if err != nil {
118+ return fmt.Errorf("blacklist user: %w", err)
119+ }
120+ return nil
121+}
122+123+func (s *ModerationStore) UnblacklistUser(ctx context.Context, did string) error {
124+ _, err := s.db.ExecContext(ctx, `DELETE FROM moderation_blacklist WHERE did = ?`, did)
125+ return err
126+}
127+128+func (s *ModerationStore) IsBlacklisted(ctx context.Context, did string) bool {
129+ var exists int
130+ _ = s.db.QueryRowContext(ctx, `SELECT 1 FROM moderation_blacklist WHERE did = ?`, did).Scan(&exists)
131+ return exists == 1
132+}
133+134+func (s *ModerationStore) GetBlacklistedUser(ctx context.Context, did string) (*moderation.BlacklistedUser, error) {
135+ var u moderation.BlacklistedUser
136+ var blacklistedAtStr string
137+ err := s.db.QueryRowContext(ctx, `
138+ SELECT did, blacklisted_at, blacklisted_by, reason
139+ FROM moderation_blacklist WHERE did = ?
140+ `, did).Scan(&u.DID, &blacklistedAtStr, &u.BlacklistedBy, &u.Reason)
141+ if err == sql.ErrNoRows {
142+ return nil, nil
143+ }
144+ if err != nil {
145+ return nil, err
146+ }
147+ u.BlacklistedAt, _ = time.Parse(time.RFC3339Nano, blacklistedAtStr)
148+ return &u, nil
149+}
150+151+func (s *ModerationStore) ListBlacklistedUsers(ctx context.Context) ([]moderation.BlacklistedUser, error) {
152+ rows, err := s.db.QueryContext(ctx, `
153+ SELECT did, blacklisted_at, blacklisted_by, reason
154+ FROM moderation_blacklist ORDER BY blacklisted_at DESC
155+ `)
156+ if err != nil {
157+ return nil, err
158+ }
159+ defer rows.Close()
160+161+ var users []moderation.BlacklistedUser
162+ for rows.Next() {
163+ var u moderation.BlacklistedUser
164+ var blacklistedAtStr string
165+ if err := rows.Scan(&u.DID, &blacklistedAtStr, &u.BlacklistedBy, &u.Reason); err != nil {
166+ continue
167+ }
168+ u.BlacklistedAt, _ = time.Parse(time.RFC3339Nano, blacklistedAtStr)
169+ users = append(users, u)
170+ }
171+ return users, rows.Err()
172+}
173+174+// ========== Reports ==========
175+176+func (s *ModerationStore) CreateReport(ctx context.Context, report moderation.Report) error {
177+ _, err := s.db.ExecContext(ctx, `
178+ INSERT INTO moderation_reports
179+ (id, subject_uri, subject_did, reporter_did, reason, created_at, status, resolved_by, resolved_at)
180+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
181+ `, report.ID, report.SubjectURI, report.SubjectDID, report.ReporterDID, report.Reason,
182+ report.CreatedAt.Format(time.RFC3339Nano), string(report.Status), report.ResolvedBy, nil)
183+ if err != nil {
184+ return fmt.Errorf("create report: %w", err)
185+ }
186+ return nil
187+}
188+189+func (s *ModerationStore) GetReport(ctx context.Context, id string) (*moderation.Report, error) {
190+ var r moderation.Report
191+ var createdAtStr string
192+ var resolvedAtStr sql.NullString
193+ err := s.db.QueryRowContext(ctx, `
194+ SELECT id, subject_uri, subject_did, reporter_did, reason, created_at, status, resolved_by, resolved_at
195+ FROM moderation_reports WHERE id = ?
196+ `, id).Scan(&r.ID, &r.SubjectURI, &r.SubjectDID, &r.ReporterDID, &r.Reason,
197+ &createdAtStr, &r.Status, &r.ResolvedBy, &resolvedAtStr)
198+ if err == sql.ErrNoRows {
199+ return nil, nil
200+ }
201+ if err != nil {
202+ return nil, err
203+ }
204+ r.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr)
205+ if resolvedAtStr.Valid {
206+ t, _ := time.Parse(time.RFC3339Nano, resolvedAtStr.String)
207+ r.ResolvedAt = &t
208+ }
209+ return &r, nil
210+}
211+212+func (s *ModerationStore) ListPendingReports(ctx context.Context) ([]moderation.Report, error) {
213+ return s.listReports(ctx, `WHERE status = 'pending' ORDER BY created_at DESC`)
214+}
215+216+func (s *ModerationStore) ListAllReports(ctx context.Context) ([]moderation.Report, error) {
217+ return s.listReports(ctx, `ORDER BY created_at DESC`)
218+}
219+220+func (s *ModerationStore) listReports(ctx context.Context, clause string) ([]moderation.Report, error) {
221+ rows, err := s.db.QueryContext(ctx, `
222+ SELECT id, subject_uri, subject_did, reporter_did, reason, created_at, status, resolved_by, resolved_at
223+ FROM moderation_reports `+clause)
224+ if err != nil {
225+ return nil, err
226+ }
227+ defer rows.Close()
228+ return scanReports(rows)
229+}
230+231+func scanReports(rows *sql.Rows) ([]moderation.Report, error) {
232+ var reports []moderation.Report
233+ for rows.Next() {
234+ var r moderation.Report
235+ var createdAtStr string
236+ var resolvedAtStr sql.NullString
237+ if err := rows.Scan(&r.ID, &r.SubjectURI, &r.SubjectDID, &r.ReporterDID, &r.Reason,
238+ &createdAtStr, &r.Status, &r.ResolvedBy, &resolvedAtStr); err != nil {
239+ continue
240+ }
241+ r.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr)
242+ if resolvedAtStr.Valid {
243+ t, _ := time.Parse(time.RFC3339Nano, resolvedAtStr.String)
244+ r.ResolvedAt = &t
245+ }
246+ reports = append(reports, r)
247+ }
248+ return reports, rows.Err()
249+}
250+251+func (s *ModerationStore) ResolveReport(ctx context.Context, id string, status moderation.ReportStatus, resolvedBy string) error {
252+ now := time.Now().Format(time.RFC3339Nano)
253+ res, err := s.db.ExecContext(ctx, `
254+ UPDATE moderation_reports SET status = ?, resolved_by = ?, resolved_at = ? WHERE id = ?
255+ `, string(status), resolvedBy, now, id)
256+ if err != nil {
257+ return fmt.Errorf("resolve report: %w", err)
258+ }
259+ n, _ := res.RowsAffected()
260+ if n == 0 {
261+ return fmt.Errorf("report not found: %s", id)
262+ }
263+ return nil
264+}
265+266+func (s *ModerationStore) CountReportsForURI(ctx context.Context, atURI string) (int, error) {
267+ var count int
268+ err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM moderation_reports WHERE subject_uri = ?`, atURI).Scan(&count)
269+ return count, err
270+}
271+272+func (s *ModerationStore) CountReportsForDID(ctx context.Context, did string) (int, error) {
273+ var count int
274+ err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM moderation_reports WHERE subject_did = ?`, did).Scan(&count)
275+ return count, err
276+}
277+278+func (s *ModerationStore) CountReportsForDIDSince(ctx context.Context, did string, since time.Time) (int, error) {
279+ var count int
280+ err := s.db.QueryRowContext(ctx, `
281+ SELECT COUNT(*) FROM moderation_reports WHERE subject_did = ? AND created_at > ?
282+ `, did, since.Format(time.RFC3339Nano)).Scan(&count)
283+ return count, err
284+}
285+286+func (s *ModerationStore) HasReportedURI(ctx context.Context, reporterDID, subjectURI string) (bool, error) {
287+ var exists int
288+ err := s.db.QueryRowContext(ctx, `
289+ SELECT 1 FROM moderation_reports WHERE reporter_did = ? AND subject_uri = ? LIMIT 1
290+ `, reporterDID, subjectURI).Scan(&exists)
291+ if err == sql.ErrNoRows {
292+ return false, nil
293+ }
294+ return exists == 1, err
295+}
296+297+func (s *ModerationStore) CountReportsFromUserSince(ctx context.Context, reporterDID string, since time.Time) (int, error) {
298+ var count int
299+ err := s.db.QueryRowContext(ctx, `
300+ SELECT COUNT(*) FROM moderation_reports WHERE reporter_did = ? AND created_at > ?
301+ `, reporterDID, since.Format(time.RFC3339Nano)).Scan(&count)
302+ return count, err
303+}
304+305+// ========== Audit Log ==========
306+307+func (s *ModerationStore) LogAction(ctx context.Context, entry moderation.AuditEntry) error {
308+ details, err := json.Marshal(entry.Details)
309+ if err != nil {
310+ details = []byte("{}")
311+ }
312+ autoMod := 0
313+ if entry.AutoMod {
314+ autoMod = 1
315+ }
316+ _, err = s.db.ExecContext(ctx, `
317+ INSERT INTO moderation_audit_log (id, action, actor_did, target_uri, reason, details, timestamp, auto_mod)
318+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
319+ `, entry.ID, string(entry.Action), entry.ActorDID, entry.TargetURI, entry.Reason,
320+ string(details), entry.Timestamp.Format(time.RFC3339Nano), autoMod)
321+ if err != nil {
322+ return fmt.Errorf("log action: %w", err)
323+ }
324+ return nil
325+}
326+327+func (s *ModerationStore) ListAuditLog(ctx context.Context, limit int) ([]moderation.AuditEntry, error) {
328+ rows, err := s.db.QueryContext(ctx, `
329+ SELECT id, action, actor_did, target_uri, reason, details, timestamp, auto_mod
330+ FROM moderation_audit_log ORDER BY timestamp DESC LIMIT ?
331+ `, limit)
332+ if err != nil {
333+ return nil, err
334+ }
335+ defer rows.Close()
336+337+ var entries []moderation.AuditEntry
338+ for rows.Next() {
339+ var e moderation.AuditEntry
340+ var timestampStr, detailsStr string
341+ var autoMod int
342+ if err := rows.Scan(&e.ID, &e.Action, &e.ActorDID, &e.TargetURI, &e.Reason,
343+ &detailsStr, ×tampStr, &autoMod); err != nil {
344+ continue
345+ }
346+ e.Timestamp, _ = time.Parse(time.RFC3339Nano, timestampStr)
347+ e.AutoMod = autoMod == 1
348+ _ = json.Unmarshal([]byte(detailsStr), &e.Details)
349+ entries = append(entries, e)
350+ }
351+ return entries, rows.Err()
352+}
353+354+// ========== Auto-hide Resets ==========
355+356+func (s *ModerationStore) SetAutoHideReset(ctx context.Context, did string, resetAt time.Time) error {
357+ _, err := s.db.ExecContext(ctx, `
358+ INSERT INTO moderation_autohide_resets (did, reset_at) VALUES (?, ?)
359+ ON CONFLICT(did) DO UPDATE SET reset_at = excluded.reset_at
360+ `, did, resetAt.Format(time.RFC3339Nano))
361+ if err != nil {
362+ return fmt.Errorf("set autohide reset: %w", err)
363+ }
364+ return nil
365+}
366+367+func (s *ModerationStore) GetAutoHideReset(ctx context.Context, did string) (time.Time, error) {
368+ var resetAtStr string
369+ err := s.db.QueryRowContext(ctx, `SELECT reset_at FROM moderation_autohide_resets WHERE did = ?`, did).Scan(&resetAtStr)
370+ if err == sql.ErrNoRows {
371+ return time.Time{}, nil
372+ }
373+ if err != nil {
374+ return time.Time{}, err
375+ }
376+ t, _ := time.Parse(time.RFC3339Nano, resetAtStr)
377+ return t, nil
378+}
+2-2
internal/firehose/config.go
···1// Package firehose provides real-time AT Protocol event consumption via Jetstream.
2-// It indexes Arabica records into a local BoltDB database for fast feed queries.
3package firehose
45import (
···36 // Compress enables zstd compression (~56% bandwidth reduction)
37 Compress bool
3839- // IndexPath is the path to the BoltDB feed index database
40 IndexPath string
4142 // ProfileCacheTTL is how long to cache profile data
···1// Package firehose provides real-time AT Protocol event consumption via Jetstream.
2+// It indexes Arabica records into a local SQLite database for fast feed queries.
3package firehose
45import (
···36 // Compress enables zstd compression (~56% bandwidth reduction)
37 Compress bool
3839+ // IndexPath is the path to the SQLite feed index database
40 IndexPath string
4142 // ProfileCacheTTL is how long to cache profile data
+479-890
internal/firehose/index.go
···1package firehose
23import (
4- "bytes"
5 "context"
6- "encoding/binary"
7- "encoding/hex"
8 "encoding/json"
9 "fmt"
10 "os"
···19 "arabica/internal/models"
2021 "github.com/rs/zerolog/log"
22- bolt "go.etcd.io/bbolt"
23-)
24-25-// Bucket names for the feed index
26-var (
27- // BucketRecords stores full record data: {at-uri} -> {IndexedRecord JSON}
28- BucketRecords = []byte("records")
29-30- // BucketByTime stores records by timestamp for chronological queries: {timestamp:at-uri} -> {}
31- BucketByTime = []byte("by_time")
32-33- // BucketByDID stores records by DID for user-specific queries: {did:at-uri} -> {}
34- BucketByDID = []byte("by_did")
35-36- // BucketByCollection stores records by type: {collection:timestamp:at-uri} -> {}
37- BucketByCollection = []byte("by_collection")
38-39- // BucketProfiles stores cached profile data: {did} -> {CachedProfile JSON}
40- BucketProfiles = []byte("profiles")
41-42- // BucketMeta stores metadata like cursor position: {key} -> {value}
43- BucketMeta = []byte("meta")
44-45- // BucketKnownDIDs stores all DIDs we've seen with Arabica records
46- BucketKnownDIDs = []byte("known_dids")
47-48- // BucketBackfilled stores DIDs that have been backfilled: {did} -> {timestamp}
49- BucketBackfilled = []byte("backfilled")
50-51- // BucketLikes stores like mappings: {subject_uri:actor_did} -> {rkey}
52- BucketLikes = []byte("likes")
53-54- // BucketLikeCounts stores aggregated like counts: {subject_uri} -> {uint64 count}
55- BucketLikeCounts = []byte("like_counts")
56-57- // BucketLikesByActor stores likes by actor for lookup: {actor_did:subject_uri} -> {rkey}
58- BucketLikesByActor = []byte("likes_by_actor")
59-60- // BucketComments stores comment data: {subject_uri:timestamp:actor_did} -> {comment JSON}
61- BucketComments = []byte("comments")
62-63- // BucketCommentCounts stores aggregated comment counts: {subject_uri} -> {uint64 count}
64- BucketCommentCounts = []byte("comment_counts")
65-66- // BucketCommentsByActor stores comments by actor for lookup: {actor_did:rkey} -> {subject_uri}
67- BucketCommentsByActor = []byte("comments_by_actor")
68-69- // BucketCommentChildren stores parent-child relationships: {parent_uri:child_rkey} -> {child_actor_did}
70- BucketCommentChildren = []byte("comment_children")
71)
7273// FeedableRecordTypes are the record types that should appear as feed items.
···89 Record json.RawMessage `json:"record"`
90 CID string `json:"cid"`
91 IndexedAt time.Time `json:"indexed_at"`
92- CreatedAt time.Time `json:"created_at"` // Parsed from record
93}
9495// CachedProfile stores profile data with TTL
···101102// FeedIndex provides persistent storage for firehose events
103type FeedIndex struct {
104- db *bolt.DB
105 publicClient *atproto.PublicClient
106 profileTTL time.Duration
107···123124// FeedQuery specifies filtering, sorting, and pagination for feed queries
125type FeedQuery struct {
126- Limit int // Max items to return
127- Cursor string // Opaque cursor for pagination (base64-encoded time key)
128- TypeFilter lexicons.RecordType // Filter to a specific record type (empty = all)
129- Sort FeedSort // Sort order (default: recent)
130}
131132// FeedResult contains feed items plus pagination info
···135 NextCursor string // Empty if no more results
136}
137138-// NewFeedIndex creates a new feed index backed by BoltDB
0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000139func NewFeedIndex(path string, profileTTL time.Duration) (*FeedIndex, error) {
140 if path == "" {
141 return nil, fmt.Errorf("index path is required")
···149 }
150 }
151152- db, err := bolt.Open(path, 0600, &bolt.Options{
153- Timeout: 5 * time.Second,
154- })
155 if err != nil {
156 return nil, fmt.Errorf("failed to open index database: %w", err)
157 }
158159- // Create buckets
160- err = db.Update(func(tx *bolt.Tx) error {
161- buckets := [][]byte{
162- BucketRecords,
163- BucketByTime,
164- BucketByDID,
165- BucketByCollection,
166- BucketProfiles,
167- BucketMeta,
168- BucketKnownDIDs,
169- BucketBackfilled,
170- BucketLikes,
171- BucketLikeCounts,
172- BucketLikesByActor,
173- BucketComments,
174- BucketCommentCounts,
175- BucketCommentsByActor,
176- BucketCommentChildren,
177- BucketNotifications,
178- BucketNotificationsMeta,
179- }
180- for _, bucket := range buckets {
181- if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
182- return fmt.Errorf("failed to create bucket %s: %w", bucket, err)
183- }
184- }
185- return nil
186- })
187- if err != nil {
188 _ = db.Close()
189- return nil, err
190 }
191192 idx := &FeedIndex{
···199 return idx, nil
200}
20100000202// Close closes the index database
203func (idx *FeedIndex) Close() error {
204 if idx.db != nil {
···224// GetCursor returns the last processed cursor (microseconds timestamp)
225func (idx *FeedIndex) GetCursor() (int64, error) {
226 var cursor int64
227- err := idx.db.View(func(tx *bolt.Tx) error {
228- b := tx.Bucket(BucketMeta)
229- v := b.Get([]byte("cursor"))
230- if len(v) == 8 {
231- cursor = int64(binary.BigEndian.Uint64(v))
232- }
233- return nil
234- })
235 return cursor, err
236}
237238// SetCursor stores the cursor position
239func (idx *FeedIndex) SetCursor(cursor int64) error {
240- return idx.db.Update(func(tx *bolt.Tx) error {
241- b := tx.Bucket(BucketMeta)
242- buf := make([]byte, 8)
243- binary.BigEndian.PutUint64(buf, uint64(cursor))
244- return b.Put([]byte("cursor"), buf)
245- })
246}
247248// UpsertRecord adds or updates a record in the index
···260 }
261 }
262263- indexed := &IndexedRecord{
264- URI: uri,
265- DID: did,
266- Collection: collection,
267- RKey: rkey,
268- Record: record,
269- CID: cid,
270- IndexedAt: time.Now(),
271- CreatedAt: createdAt,
00000272 }
273274- data, err := json.Marshal(indexed)
0275 if err != nil {
276- return fmt.Errorf("failed to marshal record: %w", err)
277 }
278279- return idx.db.Update(func(tx *bolt.Tx) error {
280- // Store the record
281- records := tx.Bucket(BucketRecords)
282- if err := records.Put([]byte(uri), data); err != nil {
283- return err
284- }
285-286- // Index by time (use createdAt for sorting, not event time)
287- byTime := tx.Bucket(BucketByTime)
288- timeKey := makeTimeKey(createdAt, uri)
289- if err := byTime.Put(timeKey, nil); err != nil {
290- return err
291- }
292-293- // Index by DID
294- byDID := tx.Bucket(BucketByDID)
295- didKey := []byte(did + ":" + uri)
296- if err := byDID.Put(didKey, nil); err != nil {
297- return err
298- }
299-300- // Index by collection
301- byCollection := tx.Bucket(BucketByCollection)
302- collKey := []byte(collection + ":" + string(timeKey))
303- if err := byCollection.Put(collKey, nil); err != nil {
304- return err
305- }
306-307- // Track known DID
308- knownDIDs := tx.Bucket(BucketKnownDIDs)
309- if err := knownDIDs.Put([]byte(did), []byte("1")); err != nil {
310- return err
311- }
312-313- return nil
314- })
315}
316317// DeleteRecord removes a record from the index
318func (idx *FeedIndex) DeleteRecord(did, collection, rkey string) error {
319 uri := atproto.BuildATURI(did, collection, rkey)
000320321- return idx.db.Update(func(tx *bolt.Tx) error {
322- // Get the existing record to find its timestamp
323- records := tx.Bucket(BucketRecords)
324- existingData := records.Get([]byte(uri))
325- if existingData == nil {
326- // Record doesn't exist, nothing to delete
327- return nil
328- }
329330- var existing IndexedRecord
331- if err := json.Unmarshal(existingData, &existing); err != nil {
332- // Can't parse, just delete the main record
333- return records.Delete([]byte(uri))
334- }
000000335336- // Delete from records
337- if err := records.Delete([]byte(uri)); err != nil {
338- return err
339- }
340-341- // Delete from by_time index
342- byTime := tx.Bucket(BucketByTime)
343- timeKey := makeTimeKey(existing.CreatedAt, uri)
344- if err := byTime.Delete(timeKey); err != nil {
345- return err
346- }
347348- // Delete from by_did index
349- byDID := tx.Bucket(BucketByDID)
350- didKey := []byte(did + ":" + uri)
351- if err := byDID.Delete(didKey); err != nil {
352- return err
353- }
354-355- // Delete from by_collection index
356- byCollection := tx.Bucket(BucketByCollection)
357- collKey := []byte(collection + ":" + string(timeKey))
358- if err := byCollection.Delete(collKey); err != nil {
359- return err
360- }
361-362- return nil
363- })
364-}
365-366-// GetRecord retrieves a single record by URI
367-func (idx *FeedIndex) GetRecord(uri string) (*IndexedRecord, error) {
368- var record *IndexedRecord
369- err := idx.db.View(func(tx *bolt.Tx) error {
370- b := tx.Bucket(BucketRecords)
371- data := b.Get([]byte(uri))
372- if data == nil {
373- return nil
374- }
375- record = &IndexedRecord{}
376- return json.Unmarshal(data, record)
377- })
378- return record, err
379}
380381// FeedItem represents an item in the feed (matches feed.FeedItem structure)
···404405// GetRecentFeed returns recent feed items from the index
406func (idx *FeedIndex) GetRecentFeed(ctx context.Context, limit int) ([]*FeedItem, error) {
407- var records []*IndexedRecord
408- err := idx.db.View(func(tx *bolt.Tx) error {
409- byTime := tx.Bucket(BucketByTime)
410- recordsBucket := tx.Bucket(BucketRecords)
411-412- c := byTime.Cursor()
413-414- // Iterate in reverse (newest first)
415- count := 0
416- for k, _ := c.First(); k != nil && count < limit*2; k, _ = c.Next() {
417- // Extract URI from key (format: timestamp:uri)
418- uri := extractURIFromTimeKey(k)
419- if uri == "" {
420- continue
421- }
422-423- data := recordsBucket.Get([]byte(uri))
424- if data == nil {
425- continue
426- }
427-428- var record IndexedRecord
429- if err := json.Unmarshal(data, &record); err != nil {
430- continue
431- }
432-433- records = append(records, &record)
434- count++
435- }
436-437- return nil
438- })
439- if err != nil {
440- return nil, err
441- }
442-443- // Build lookup maps for reference resolution
444- recordsByURI := make(map[string]*IndexedRecord)
445- for _, r := range records {
446- recordsByURI[r.URI] = r
447- }
448-449- // Also load additional records we might need for references
450- err = idx.db.View(func(tx *bolt.Tx) error {
451- recordsBucket := tx.Bucket(BucketRecords)
452- return recordsBucket.ForEach(func(k, v []byte) error {
453- uri := string(k)
454- if _, exists := recordsByURI[uri]; exists {
455- return nil
456- }
457- var record IndexedRecord
458- if err := json.Unmarshal(v, &record); err != nil {
459- return nil
460- }
461- // Only load beans, roasters, grinders, brewers for reference resolution
462- switch record.Collection {
463- case atproto.NSIDBean, atproto.NSIDRoaster, atproto.NSIDGrinder, atproto.NSIDBrewer:
464- recordsByURI[uri] = &record
465- }
466- return nil
467- })
468- })
469- if err != nil {
470- return nil, err
471- }
472-473- // Convert to FeedItems
474- items := make([]*FeedItem, 0, len(records))
475- for _, record := range records {
476- // Skip likes - they're indexed for like counts but not displayed as feed items
477- if record.Collection == atproto.NSIDLike {
478- continue
479- }
480-481- item, err := idx.recordToFeedItem(ctx, record, recordsByURI)
482- if err != nil {
483- log.Warn().Err(err).Str("uri", record.URI).Msg("failed to convert record to feed item")
484- continue
485- }
486- if !FeedableRecordTypes[item.RecordType] {
487- continue
488- }
489- items = append(items, item)
490- }
491-492- // Sort by timestamp descending
493- sort.Slice(items, func(i, j int) bool {
494- return items[i].Timestamp.After(items[j].Timestamp)
495- })
496-497- // Apply limit
498- if len(items) > limit {
499- items = items[:limit]
500- }
501-502- return items, nil
503}
504505// recordTypeToNSID maps a lexicons.RecordType to its NSID collection string
···511 lexicons.RecordTypeBrewer: atproto.NSIDBrewer,
512}
513000000000514// GetFeedWithQuery returns feed items matching the given query with cursor-based pagination
515func (idx *FeedIndex) GetFeedWithQuery(ctx context.Context, q FeedQuery) (*FeedResult, error) {
516 if q.Limit <= 0 {
···520 q.Sort = FeedSortRecent
521 }
522523- // For type-filtered queries, use BucketByCollection for efficiency
524- // For unfiltered queries, use BucketByTime
525- var records []*IndexedRecord
526- var lastTimeKey []byte
527-528- // Decode cursor if provided
529- var cursorBytes []byte
530- if q.Cursor != "" {
531- var err error
532- cursorBytes, err = decodeCursor(q.Cursor)
533- if err != nil {
534- return nil, fmt.Errorf("invalid cursor: %w", err)
535 }
0536 }
537538- // Fetch more than needed to account for filtering
539- fetchLimit := q.Limit + 10
540-541- err := idx.db.View(func(tx *bolt.Tx) error {
542- recordsBucket := tx.Bucket(BucketRecords)
543544- if q.TypeFilter != "" {
545- // Use BucketByCollection for filtered queries
546- nsid, ok := recordTypeToNSID[q.TypeFilter]
547- if !ok {
548- return fmt.Errorf("unknown record type: %s", q.TypeFilter)
00549 }
000550551- byCollection := tx.Bucket(BucketByCollection)
552- c := byCollection.Cursor()
0000553554- // Collection keys: {collection}:{inverted_timestamp}:{uri}
555- prefix := []byte(nsid + ":")
556557- var k []byte
558- if cursorBytes != nil {
559- // Seek to cursor position (cursor is the full collection key)
560- k, _ = c.Seek(cursorBytes)
561- // Skip the cursor key itself (it was the last item of previous page)
562- if k != nil && string(k) == string(cursorBytes) {
563- k, _ = c.Next()
564- }
565- } else {
566- k, _ = c.Seek(prefix)
567- }
568569- count := 0
570- for ; k != nil && count < fetchLimit; k, _ = c.Next() {
571- if !bytes.HasPrefix(k, prefix) {
572- break
573- }
0000000574575- // Extract URI from collection key: {collection}:{timestamp_bytes}:{uri}
576- uri := extractURIFromCollectionKey(k, nsid)
577- if uri == "" {
578- continue
579- }
000580581- data := recordsBucket.Get([]byte(uri))
582- if data == nil {
583- continue
584- }
585586- var record IndexedRecord
587- if err := json.Unmarshal(data, &record); err != nil {
588- continue
589- }
590- records = append(records, &record)
591- lastTimeKey = make([]byte, len(k))
592- copy(lastTimeKey, k)
593- count++
594- }
595- } else {
596- // Use BucketByTime for unfiltered queries
597- byTime := tx.Bucket(BucketByTime)
598- c := byTime.Cursor()
599600- var k []byte
601- if cursorBytes != nil {
602- k, _ = c.Seek(cursorBytes)
603- if k != nil && string(k) == string(cursorBytes) {
604- k, _ = c.Next()
605- }
606- } else {
607- k, _ = c.First()
608- }
609610- count := 0
611- for ; k != nil && count < fetchLimit; k, _ = c.Next() {
612- uri := extractURIFromTimeKey(k)
613- if uri == "" {
614- continue
615- }
00000616617- data := recordsBucket.Get([]byte(uri))
618- if data == nil {
619- continue
620- }
621-622- var record IndexedRecord
623- if err := json.Unmarshal(data, &record); err != nil {
624- continue
625- }
626- // Skip non-feedable records (likes, comments) so they don't
627- // consume slots in the fetch limit, which would cause pagination
628- // to break when many non-feedable records are intermixed.
629- if record.Collection == atproto.NSIDLike || record.Collection == atproto.NSIDComment {
630- continue
631 }
632- records = append(records, &record)
633- lastTimeKey = make([]byte, len(k))
634- copy(lastTimeKey, k)
635- count++
636 }
637 }
638-639- return nil
640- })
641- if err != nil {
642 return nil, err
643 }
644645- // Build lookup maps for reference resolution
646- recordsByURI := make(map[string]*IndexedRecord)
647 for _, r := range records {
648 recordsByURI[r.URI] = r
649 }
650651- // Load additional records for reference resolution
652- err = idx.db.View(func(tx *bolt.Tx) error {
653- recordsBucket := tx.Bucket(BucketRecords)
654- return recordsBucket.ForEach(func(k, v []byte) error {
655- uri := string(k)
656- if _, exists := recordsByURI[uri]; exists {
657- return nil
658- }
659- var record IndexedRecord
660- if err := json.Unmarshal(v, &record); err != nil {
661- return nil
662- }
663- switch record.Collection {
664- case atproto.NSIDBean, atproto.NSIDRoaster, atproto.NSIDGrinder, atproto.NSIDBrewer:
665- recordsByURI[uri] = &record
000000000000000000000000000000000000000666 }
667- return nil
668- })
669- })
670- if err != nil {
671- return nil, err
672 }
673674 // Convert to FeedItems
675 items := make([]*FeedItem, 0, len(records))
676 for _, record := range records {
677- if record.Collection == atproto.NSIDLike || record.Collection == atproto.NSIDComment {
678- continue
679- }
680-681 item, err := idx.recordToFeedItem(ctx, record, recordsByURI)
682 if err != nil {
683 log.Warn().Err(err).Str("uri", record.URI).Msg("failed to convert record to feed item")
···689 items = append(items, item)
690 }
691692- // Sort based on query
693- switch q.Sort {
694- case FeedSortPopular:
695- sort.Slice(items, func(i, j int) bool {
696- scoreI := items[i].LikeCount*3 + items[i].CommentCount*2
697- scoreJ := items[j].LikeCount*3 + items[j].CommentCount*2
698- if scoreI != scoreJ {
699- return scoreI > scoreJ
700- }
701- return items[i].Timestamp.After(items[j].Timestamp)
702- })
703- default: // FeedSortRecent
704- sort.Slice(items, func(i, j int) bool {
705- return items[i].Timestamp.After(items[j].Timestamp)
706- })
707- }
708-709- // Build result with cursor
710- result := &FeedResult{Items: items}
711-712- if len(items) > q.Limit {
713- result.Items = items[:q.Limit]
714- // Cursor is the last time key we read from the DB
715- if lastTimeKey != nil {
716- result.NextCursor = encodeCursor(lastTimeKey)
717- }
718- }
719-720- return result, nil
721-}
722-723-// extractURIFromCollectionKey extracts the URI from a collection key
724-// Format: {collection}:{inverted_timestamp_8bytes}:{uri}
725-func extractURIFromCollectionKey(key []byte, collection string) string {
726- // prefix is collection + ":"
727- prefixLen := len(collection) + 1
728- // Then 8 bytes of timestamp + ":"
729- minLen := prefixLen + 8 + 1 + 1 // prefix + timestamp + ":" + at least 1 char
730- if len(key) < minLen {
731- return ""
732- }
733- return string(key[prefixLen+9:])
734-}
735-736-func encodeCursor(key []byte) string {
737- return hex.EncodeToString(key)
738-}
739-740-func decodeCursor(s string) ([]byte, error) {
741- return hex.DecodeString(s)
742}
743744// recordToFeedItem converts an IndexedRecord to a FeedItem
···757 profile, err := idx.GetProfile(ctx, record.DID)
758 if err != nil {
759 log.Warn().Err(err).Str("did", record.DID).Msg("failed to get profile")
760- // Use a placeholder profile
761 profile = &atproto.Profile{
762 DID: record.DID,
763- Handle: record.DID, // Use DID as handle if we can't resolve
764 }
765 }
766 item.Author = profile
···869 item.Brewer = brewer
870871 case atproto.NSIDLike:
872- // This should never be reached - likes are filtered before calling recordToFeedItem
873 return nil, fmt.Errorf("unexpected: likes should be filtered before conversion")
874875 default:
···896 idx.profileCacheMu.RUnlock()
897898 // Check persistent cache
899- var cached *CachedProfile
900- err := idx.db.View(func(tx *bolt.Tx) error {
901- b := tx.Bucket(BucketProfiles)
902- data := b.Get([]byte(did))
903- if data == nil {
904- return nil
000000905 }
906- cached = &CachedProfile{}
907- return json.Unmarshal(data, cached)
908- })
909- if err == nil && cached != nil && time.Now().Before(cached.ExpiresAt) {
910- // Update in-memory cache
911- idx.profileCacheMu.Lock()
912- idx.profileCache[did] = cached
913- idx.profileCacheMu.Unlock()
914- return cached.Profile, nil
915 }
916917 // Fetch from API
···922923 // Cache the result
924 now := time.Now()
925- cached = &CachedProfile{
926 Profile: profile,
927 CachedAt: now,
928 ExpiresAt: now.Add(idx.profileTTL),
···935936 // Persist to database
937 data, _ := json.Marshal(cached)
938- _ = idx.db.Update(func(tx *bolt.Tx) error {
939- b := tx.Bucket(BucketProfiles)
940- return b.Put([]byte(did), data)
941- })
942943 return profile, nil
944}
945946// GetKnownDIDs returns all DIDs that have created Arabica records
947func (idx *FeedIndex) GetKnownDIDs() ([]string, error) {
000000948 var dids []string
949- err := idx.db.View(func(tx *bolt.Tx) error {
950- b := tx.Bucket(BucketKnownDIDs)
951- return b.ForEach(func(k, v []byte) error {
952- dids = append(dids, string(k))
953- return nil
954- })
955- })
956- return dids, err
000000000000000000000000000957}
958959// RecordCount returns the total number of indexed records
960func (idx *FeedIndex) RecordCount() int {
961 var count int
962- _ = idx.db.View(func(tx *bolt.Tx) error {
963- b := tx.Bucket(BucketRecords)
964- count = b.Stats().KeyN
965- return nil
966- })
967 return count
968}
969970// KnownDIDCount returns the number of unique DIDs in the index
971func (idx *FeedIndex) KnownDIDCount() int {
972 var count int
973- _ = idx.db.View(func(tx *bolt.Tx) error {
974- b := tx.Bucket(BucketKnownDIDs)
975- count = b.Stats().KeyN
976- return nil
977- })
978 return count
979}
980981// TotalLikeCount returns the total number of likes indexed
982func (idx *FeedIndex) TotalLikeCount() int {
983 var count int
984- _ = idx.db.View(func(tx *bolt.Tx) error {
985- b := tx.Bucket(BucketLikes)
986- count = b.Stats().KeyN
987- return nil
988- })
989 return count
990}
991992// TotalCommentCount returns the total number of comments indexed
993func (idx *FeedIndex) TotalCommentCount() int {
994 var count int
995- _ = idx.db.View(func(tx *bolt.Tx) error {
996- b := tx.Bucket(BucketCommentsByActor)
997- count = b.Stats().KeyN
998- return nil
999- })
1000 return count
1001}
10021003// RecordCountByCollection returns a breakdown of record counts by collection type
1004func (idx *FeedIndex) RecordCountByCollection() map[string]int {
1005 counts := make(map[string]int)
1006- _ = idx.db.View(func(tx *bolt.Tx) error {
1007- records := tx.Bucket(BucketRecords)
1008- return records.ForEach(func(k, v []byte) error {
1009- var record IndexedRecord
1010- if err := json.Unmarshal(v, &record); err != nil {
1011- return nil
1012- }
1013- counts[record.Collection]++
1014- return nil
1015- })
1016- })
01017 return counts
1018-}
1019-1020-// Helper functions
1021-1022-func makeTimeKey(t time.Time, uri string) []byte {
1023- // Format: inverted timestamp (for reverse chronological order) + ":" + uri
1024- // Use nanoseconds for uniqueness
1025- inverted := ^uint64(t.UnixNano())
1026- buf := make([]byte, 8)
1027- binary.BigEndian.PutUint64(buf, inverted)
1028- return append(buf, []byte(":"+uri)...)
1029-}
1030-1031-func extractURIFromTimeKey(key []byte) string {
1032- if len(key) < 10 { // 8 bytes timestamp + ":" + at least 1 char
1033- return ""
1034- }
1035- // Skip 8 bytes timestamp + 1 byte ":"
1036- return string(key[9:])
1037}
10381039func formatTimeAgo(t time.Time) string {
···10771078// IsBackfilled checks if a DID has already been backfilled
1079func (idx *FeedIndex) IsBackfilled(did string) bool {
1080- var exists bool
1081- _ = idx.db.View(func(tx *bolt.Tx) error {
1082- b := tx.Bucket(BucketBackfilled)
1083- exists = b.Get([]byte(did)) != nil
1084- return nil
1085- })
1086- return exists
1087}
10881089// MarkBackfilled marks a DID as backfilled with current timestamp
1090func (idx *FeedIndex) MarkBackfilled(did string) error {
1091- return idx.db.Update(func(tx *bolt.Tx) error {
1092- b := tx.Bucket(BucketBackfilled)
1093- timestamp := []byte(time.Now().Format(time.RFC3339))
1094- return b.Put([]byte(did), timestamp)
1095- })
1096}
10971098// BackfillUser fetches all existing records for a DID and adds them to the index
1099-// Returns early if the DID has already been backfilled
1100func (idx *FeedIndex) BackfillUser(ctx context.Context, did string) error {
1101- // Check if already backfilled
1102 if idx.IsBackfilled(did) {
1103 log.Debug().Str("did", did).Msg("DID already backfilled, skipping")
1104 return nil
···1115 }
11161117 for _, record := range records.Records {
1118- // Extract rkey from URI
1119 parts := strings.Split(record.URI, "/")
1120 if len(parts) < 3 {
1121 continue
···1133 }
1134 recordCount++
11351136- // Index likes and comments into their specialized buckets
1137 switch collection {
1138 case atproto.NSIDLike:
1139 if subject, ok := record.Value["subject"].(map[string]interface{}); ok {
···1170 }
1171 }
11721173- // Mark as backfilled
1174 if err := idx.MarkBackfilled(did); err != nil {
1175 log.Warn().Err(err).Str("did", did).Msg("failed to mark DID as backfilled")
1176 }
···11831184// UpsertLike adds or updates a like in the index
1185func (idx *FeedIndex) UpsertLike(actorDID, rkey, subjectURI string) error {
1186- return idx.db.Update(func(tx *bolt.Tx) error {
1187- likes := tx.Bucket(BucketLikes)
1188- likeCounts := tx.Bucket(BucketLikeCounts)
1189- likesByActor := tx.Bucket(BucketLikesByActor)
1190-1191- // Key format: {subject_uri}:{actor_did}
1192- likeKey := []byte(subjectURI + ":" + actorDID)
1193-1194- // Check if this like already exists
1195- existingRKey := likes.Get(likeKey)
1196- if existingRKey != nil {
1197- // Already exists, nothing to do
1198- return nil
1199- }
1200-1201- // Store the like mapping
1202- if err := likes.Put(likeKey, []byte(rkey)); err != nil {
1203- return err
1204- }
1205-1206- // Store by actor for reverse lookup
1207- actorKey := []byte(actorDID + ":" + subjectURI)
1208- if err := likesByActor.Put(actorKey, []byte(rkey)); err != nil {
1209- return err
1210- }
1211-1212- // Increment the like count
1213- countKey := []byte(subjectURI)
1214- currentCount := uint64(0)
1215- if countData := likeCounts.Get(countKey); len(countData) == 8 {
1216- currentCount = binary.BigEndian.Uint64(countData)
1217- }
1218- currentCount++
1219- countBuf := make([]byte, 8)
1220- binary.BigEndian.PutUint64(countBuf, currentCount)
1221- return likeCounts.Put(countKey, countBuf)
1222- })
1223}
12241225// DeleteLike removes a like from the index
1226func (idx *FeedIndex) DeleteLike(actorDID, subjectURI string) error {
1227- return idx.db.Update(func(tx *bolt.Tx) error {
1228- likes := tx.Bucket(BucketLikes)
1229- likeCounts := tx.Bucket(BucketLikeCounts)
1230- likesByActor := tx.Bucket(BucketLikesByActor)
1231-1232- // Key format: {subject_uri}:{actor_did}
1233- likeKey := []byte(subjectURI + ":" + actorDID)
1234-1235- // Check if like exists
1236- if likes.Get(likeKey) == nil {
1237- // Doesn't exist, nothing to do
1238- return nil
1239- }
1240-1241- // Delete the like mapping
1242- if err := likes.Delete(likeKey); err != nil {
1243- return err
1244- }
1245-1246- // Delete by actor lookup
1247- actorKey := []byte(actorDID + ":" + subjectURI)
1248- if err := likesByActor.Delete(actorKey); err != nil {
1249- return err
1250- }
1251-1252- // Decrement the like count
1253- countKey := []byte(subjectURI)
1254- currentCount := uint64(0)
1255- if countData := likeCounts.Get(countKey); len(countData) == 8 {
1256- currentCount = binary.BigEndian.Uint64(countData)
1257- }
1258- if currentCount > 0 {
1259- currentCount--
1260- }
1261- if currentCount == 0 {
1262- return likeCounts.Delete(countKey)
1263- }
1264- countBuf := make([]byte, 8)
1265- binary.BigEndian.PutUint64(countBuf, currentCount)
1266- return likeCounts.Put(countKey, countBuf)
1267- })
1268}
12691270// GetLikeCount returns the number of likes for a record
1271func (idx *FeedIndex) GetLikeCount(subjectURI string) int {
1272- var count uint64
1273- _ = idx.db.View(func(tx *bolt.Tx) error {
1274- likeCounts := tx.Bucket(BucketLikeCounts)
1275- countData := likeCounts.Get([]byte(subjectURI))
1276- if len(countData) == 8 {
1277- count = binary.BigEndian.Uint64(countData)
1278- }
1279- return nil
1280- })
1281- return int(count)
1282}
12831284// HasUserLiked checks if a user has liked a specific record
1285func (idx *FeedIndex) HasUserLiked(actorDID, subjectURI string) bool {
1286- var exists bool
1287- _ = idx.db.View(func(tx *bolt.Tx) error {
1288- likesByActor := tx.Bucket(BucketLikesByActor)
1289- actorKey := []byte(actorDID + ":" + subjectURI)
1290- exists = likesByActor.Get(actorKey) != nil
1291- return nil
1292- })
1293- return exists
1294}
12951296// GetUserLikeRKey returns the rkey of a user's like for a specific record, or empty string if not found
1297func (idx *FeedIndex) GetUserLikeRKey(actorDID, subjectURI string) string {
1298 var rkey string
1299- _ = idx.db.View(func(tx *bolt.Tx) error {
1300- likesByActor := tx.Bucket(BucketLikesByActor)
1301- actorKey := []byte(actorDID + ":" + subjectURI)
1302- if data := likesByActor.Get(actorKey); data != nil {
1303- rkey = string(data)
1304- }
1305- return nil
1306- })
1307 return rkey
1308}
1309···13321333// UpsertComment adds or updates a comment in the index
1334func (idx *FeedIndex) UpsertComment(actorDID, rkey, subjectURI, parentURI, cid, text string, createdAt time.Time) error {
1335- return idx.db.Update(func(tx *bolt.Tx) error {
1336- comments := tx.Bucket(BucketComments)
1337- commentCounts := tx.Bucket(BucketCommentCounts)
1338- commentsByActor := tx.Bucket(BucketCommentsByActor)
1339- commentChildren := tx.Bucket(BucketCommentChildren)
1340-1341- // Key format: {subject_uri}:{timestamp}:{actor_did}:{rkey}
1342- // Using timestamp for chronological ordering
1343- commentKey := []byte(subjectURI + ":" + createdAt.Format(time.RFC3339Nano) + ":" + actorDID + ":" + rkey)
1344-1345- // Check if this comment already exists (by actor key)
1346- actorKey := []byte(actorDID + ":" + rkey)
1347- existingSubject := commentsByActor.Get(actorKey)
1348- isNew := existingSubject == nil
1349-1350- // If the comment already exists, delete the old entry from BucketComments
1351- // to prevent duplicates (the key includes timestamp which may differ between calls)
1352- if !isNew {
1353- oldPrefix := []byte(string(existingSubject) + ":")
1354- suffix := ":" + actorDID + ":" + rkey
1355- cur := comments.Cursor()
1356- for k, _ := cur.Seek(oldPrefix); k != nil && strings.HasPrefix(string(k), string(oldPrefix)); k, _ = cur.Next() {
1357- if strings.HasSuffix(string(k), suffix) {
1358- _ = comments.Delete(k)
1359- break
1360- }
1361- }
1362- }
1363-1364- // Extract parent rkey from parent URI if present
1365- var parentRKey string
1366- if parentURI != "" {
1367- parts := strings.Split(parentURI, "/")
1368- if len(parts) > 0 {
1369- parentRKey = parts[len(parts)-1]
1370- }
1371- }
1372-1373- // Store comment data as JSON
1374- commentData := IndexedComment{
1375- RKey: rkey,
1376- SubjectURI: subjectURI,
1377- Text: text,
1378- ActorDID: actorDID,
1379- CreatedAt: createdAt,
1380- ParentURI: parentURI,
1381- ParentRKey: parentRKey,
1382- CID: cid,
1383- }
1384- commentJSON, err := json.Marshal(commentData)
1385- if err != nil {
1386- return fmt.Errorf("failed to marshal comment: %w", err)
1387- }
1388-1389- // Store comment
1390- if err := comments.Put(commentKey, commentJSON); err != nil {
1391- return fmt.Errorf("failed to store comment: %w", err)
1392- }
1393-1394- // Store actor lookup
1395- if err := commentsByActor.Put(actorKey, []byte(subjectURI)); err != nil {
1396- return fmt.Errorf("failed to store comment by actor: %w", err)
1397- }
1398-1399- // Store parent-child relationship if this is a reply
1400- if parentURI != "" {
1401- childKey := []byte(parentURI + ":" + rkey)
1402- if err := commentChildren.Put(childKey, []byte(actorDID)); err != nil {
1403- return fmt.Errorf("failed to store comment child: %w", err)
1404- }
1405- }
1406-1407- // Increment count only if this is a new comment
1408- if isNew {
1409- countKey := []byte(subjectURI)
1410- var count uint64
1411- if countData := commentCounts.Get(countKey); len(countData) == 8 {
1412- count = binary.BigEndian.Uint64(countData)
1413- }
1414- count++
1415- countBytes := make([]byte, 8)
1416- binary.BigEndian.PutUint64(countBytes, count)
1417- if err := commentCounts.Put(countKey, countBytes); err != nil {
1418- return fmt.Errorf("failed to update comment count: %w", err)
1419- }
1420 }
014211422- return nil
1423- })
00000000001424}
14251426// DeleteComment removes a comment from the index
1427func (idx *FeedIndex) DeleteComment(actorDID, rkey, subjectURI string) error {
1428- return idx.db.Update(func(tx *bolt.Tx) error {
1429- comments := tx.Bucket(BucketComments)
1430- commentCounts := tx.Bucket(BucketCommentCounts)
1431- commentsByActor := tx.Bucket(BucketCommentsByActor)
1432- commentChildren := tx.Bucket(BucketCommentChildren)
1433-1434- actorKey := []byte(actorDID + ":" + rkey)
1435-1436- // Get subject URI from the actor index, or use the provided one
1437- existingSubject := commentsByActor.Get(actorKey)
1438- if existingSubject != nil && subjectURI == "" {
1439- subjectURI = string(existingSubject)
1440- }
1441-1442- // Find and delete the comment from BucketComments
1443- var parentURI string
1444- suffix := ":" + actorDID + ":" + rkey
1445-1446- if subjectURI != "" {
1447- // Fast path: we know the subject URI, scan only that prefix
1448- prefix := []byte(subjectURI + ":")
1449- c := comments.Cursor()
1450- for k, v := c.Seek(prefix); k != nil && strings.HasPrefix(string(k), string(prefix)); k, v = c.Next() {
1451- if strings.HasSuffix(string(k), suffix) {
1452- var comment IndexedComment
1453- if err := json.Unmarshal(v, &comment); err == nil {
1454- parentURI = comment.ParentURI
1455- }
1456- if err := comments.Delete(k); err != nil {
1457- return fmt.Errorf("failed to delete comment: %w", err)
1458- }
1459- break
1460- }
1461- }
1462- } else {
1463- // Slow path: scan all comments to find this actor+rkey
1464- c := comments.Cursor()
1465- for k, v := c.First(); k != nil; k, v = c.Next() {
1466- if strings.HasSuffix(string(k), suffix) {
1467- var comment IndexedComment
1468- if err := json.Unmarshal(v, &comment); err == nil {
1469- parentURI = comment.ParentURI
1470- subjectURI = comment.SubjectURI
1471- }
1472- if err := comments.Delete(k); err != nil {
1473- return fmt.Errorf("failed to delete comment: %w", err)
1474- }
1475- break
1476- }
1477- }
1478- }
1479-1480- // Delete actor lookup
1481- if existingSubject != nil {
1482- if err := commentsByActor.Delete(actorKey); err != nil {
1483- return fmt.Errorf("failed to delete comment by actor: %w", err)
1484- }
1485- }
1486-1487- // Delete parent-child relationship if this was a reply
1488- if parentURI != "" {
1489- childKey := []byte(parentURI + ":" + rkey)
1490- if err := commentChildren.Delete(childKey); err != nil {
1491- return fmt.Errorf("failed to delete comment child: %w", err)
1492- }
1493- }
1494-1495- // Decrement count
1496- countKey := []byte(subjectURI)
1497- var count uint64
1498- if countData := commentCounts.Get(countKey); len(countData) == 8 {
1499- count = binary.BigEndian.Uint64(countData)
1500- }
1501- if count > 0 {
1502- count--
1503- }
1504- countBytes := make([]byte, 8)
1505- binary.BigEndian.PutUint64(countBytes, count)
1506- if err := commentCounts.Put(countKey, countBytes); err != nil {
1507- return fmt.Errorf("failed to update comment count: %w", err)
1508- }
1509-1510- return nil
1511- })
1512}
15131514// GetCommentCount returns the number of comments on a record
1515func (idx *FeedIndex) GetCommentCount(subjectURI string) int {
1516- var count uint64
1517- _ = idx.db.View(func(tx *bolt.Tx) error {
1518- commentCounts := tx.Bucket(BucketCommentCounts)
1519- countData := commentCounts.Get([]byte(subjectURI))
1520- if len(countData) == 8 {
1521- count = binary.BigEndian.Uint64(countData)
1522- }
1523- return nil
1524- })
1525- return int(count)
1526}
15271528// GetCommentsForSubject returns all comments for a specific record, ordered by creation time
1529-// This returns a flat list of comments without threading
1530func (idx *FeedIndex) GetCommentsForSubject(ctx context.Context, subjectURI string, limit int, viewerDID string) []IndexedComment {
1531- var comments []IndexedComment
1532- _ = idx.db.View(func(tx *bolt.Tx) error {
1533- bucket := tx.Bucket(BucketComments)
1534- prefix := []byte(subjectURI + ":")
1535- c := bucket.Cursor()
00015361537- for k, v := c.Seek(prefix); k != nil && strings.HasPrefix(string(k), string(prefix)); k, v = c.Next() {
1538- var comment IndexedComment
1539- if err := json.Unmarshal(v, &comment); err != nil {
1540- continue
1541- }
1542- comments = append(comments, comment)
1543- if limit > 0 && len(comments) >= limit {
1544- break
1545- }
1546- }
1547 return nil
1548- })
000000000000015491550 // Populate profile and like info for each comment
1551 for i := range comments {
1552 profile, err := idx.GetProfile(ctx, comments[i].ActorDID)
1553 if err != nil {
1554- // Use DID as fallback handle
1555 comments[i].Handle = comments[i].ActorDID
1556 } else {
1557 comments[i].Handle = profile.Handle
···1570}
15711572// GetThreadedCommentsForSubject returns comments for a record in threaded order with depth
1573-// Comments are returned in depth-first order (parent followed by children)
1574-// Visual depth is capped at 2 levels for display purposes
1575func (idx *FeedIndex) GetThreadedCommentsForSubject(ctx context.Context, subjectURI string, limit int, viewerDID string) []IndexedComment {
1576- // First get all comments for this subject
1577- allComments := idx.GetCommentsForSubject(ctx, subjectURI, 0, viewerDID) // Get all, we'll limit after threading
15781579 if len(allComments) == 0 {
1580 return nil
···1593 for i := range allComments {
1594 comment := &allComments[i]
1595 if comment.ParentRKey == "" {
1596- // Top-level comment
1597 topLevel = append(topLevel, comment)
1598 } else {
1599- // Reply - add to parent's children
1600 childrenMap[comment.ParentRKey] = append(childrenMap[comment.ParentRKey], comment)
1601 }
1602 }
···1620 if limit > 0 && len(result) >= limit {
1621 return
1622 }
1623- // Cap visual depth at 2 for display
1624 visualDepth := depth
1625 if visualDepth > 2 {
1626 visualDepth = 2
···1628 comment.Depth = visualDepth
1629 result = append(result, *comment)
16301631- // Add children (if any)
1632 if children, ok := childrenMap[comment.RKey]; ok {
1633 for _, child := range children {
1634 flatten(child, depth+1)
···1package firehose
23import (
04 "context"
5+ "database/sql"
06 "encoding/json"
7 "fmt"
8 "os"
···17 "arabica/internal/models"
1819 "github.com/rs/zerolog/log"
20+ _ "modernc.org/sqlite"
00000000000000000000000000000000000000000000000021)
2223// FeedableRecordTypes are the record types that should appear as feed items.
···39 Record json.RawMessage `json:"record"`
40 CID string `json:"cid"`
41 IndexedAt time.Time `json:"indexed_at"`
42+ CreatedAt time.Time `json:"created_at"`
43}
4445// CachedProfile stores profile data with TTL
···5152// FeedIndex provides persistent storage for firehose events
53type FeedIndex struct {
54+ db *sql.DB
55 publicClient *atproto.PublicClient
56 profileTTL time.Duration
57···7374// FeedQuery specifies filtering, sorting, and pagination for feed queries
75type FeedQuery struct {
76+ Limit int // Max items to return
77+ Cursor string // Opaque cursor for pagination (created_at|uri)
78+ TypeFilter lexicons.RecordType // Filter to a specific record type (empty = all)
79+ Sort FeedSort // Sort order (default: recent)
80}
8182// FeedResult contains feed items plus pagination info
···85 NextCursor string // Empty if no more results
86}
8788+const schemaNoTrailingPragma = `
89+CREATE TABLE IF NOT EXISTS records (
90+ uri TEXT PRIMARY KEY,
91+ did TEXT NOT NULL,
92+ collection TEXT NOT NULL,
93+ rkey TEXT NOT NULL,
94+ record TEXT NOT NULL,
95+ cid TEXT NOT NULL DEFAULT '',
96+ indexed_at TEXT NOT NULL,
97+ created_at TEXT NOT NULL
98+);
99+CREATE INDEX IF NOT EXISTS idx_records_created ON records(created_at DESC);
100+CREATE INDEX IF NOT EXISTS idx_records_did ON records(did);
101+CREATE INDEX IF NOT EXISTS idx_records_coll_created ON records(collection, created_at DESC);
102+103+CREATE TABLE IF NOT EXISTS meta (
104+ key TEXT PRIMARY KEY,
105+ value BLOB
106+);
107+108+CREATE TABLE IF NOT EXISTS known_dids (did TEXT PRIMARY KEY);
109+CREATE TABLE IF NOT EXISTS backfilled (did TEXT PRIMARY KEY, backfilled_at TEXT NOT NULL);
110+111+CREATE TABLE IF NOT EXISTS profiles (
112+ did TEXT PRIMARY KEY,
113+ data TEXT NOT NULL,
114+ expires_at TEXT NOT NULL
115+);
116+117+CREATE TABLE IF NOT EXISTS likes (
118+ subject_uri TEXT NOT NULL,
119+ actor_did TEXT NOT NULL,
120+ rkey TEXT NOT NULL,
121+ PRIMARY KEY (subject_uri, actor_did)
122+);
123+CREATE INDEX IF NOT EXISTS idx_likes_actor ON likes(actor_did, subject_uri);
124+125+CREATE TABLE IF NOT EXISTS comments (
126+ actor_did TEXT NOT NULL,
127+ rkey TEXT NOT NULL,
128+ subject_uri TEXT NOT NULL,
129+ parent_uri TEXT NOT NULL DEFAULT '',
130+ parent_rkey TEXT NOT NULL DEFAULT '',
131+ cid TEXT NOT NULL DEFAULT '',
132+ text TEXT NOT NULL,
133+ created_at TEXT NOT NULL,
134+ PRIMARY KEY (actor_did, rkey)
135+);
136+CREATE INDEX IF NOT EXISTS idx_comments_subject ON comments(subject_uri, created_at);
137+138+CREATE TABLE IF NOT EXISTS notifications (
139+ id TEXT NOT NULL,
140+ target_did TEXT NOT NULL,
141+ type TEXT NOT NULL,
142+ actor_did TEXT NOT NULL,
143+ subject_uri TEXT NOT NULL,
144+ created_at TEXT NOT NULL
145+);
146+CREATE INDEX IF NOT EXISTS idx_notif_target ON notifications(target_did, created_at DESC);
147+CREATE UNIQUE INDEX IF NOT EXISTS idx_notif_dedup ON notifications(target_did, type, actor_did, subject_uri);
148+149+CREATE TABLE IF NOT EXISTS notifications_meta (
150+ target_did TEXT PRIMARY KEY,
151+ last_read TEXT NOT NULL
152+);
153+154+CREATE TABLE IF NOT EXISTS moderation_hidden_records (
155+ uri TEXT PRIMARY KEY,
156+ hidden_at TEXT NOT NULL,
157+ hidden_by TEXT NOT NULL,
158+ reason TEXT NOT NULL DEFAULT '',
159+ auto_hidden INTEGER NOT NULL DEFAULT 0
160+);
161+162+CREATE TABLE IF NOT EXISTS moderation_blacklist (
163+ did TEXT PRIMARY KEY,
164+ blacklisted_at TEXT NOT NULL,
165+ blacklisted_by TEXT NOT NULL,
166+ reason TEXT NOT NULL DEFAULT ''
167+);
168+169+CREATE TABLE IF NOT EXISTS moderation_reports (
170+ id TEXT PRIMARY KEY,
171+ subject_uri TEXT NOT NULL DEFAULT '',
172+ subject_did TEXT NOT NULL DEFAULT '',
173+ reporter_did TEXT NOT NULL,
174+ reason TEXT NOT NULL,
175+ created_at TEXT NOT NULL,
176+ status TEXT NOT NULL DEFAULT 'pending',
177+ resolved_by TEXT NOT NULL DEFAULT '',
178+ resolved_at TEXT
179+);
180+CREATE INDEX IF NOT EXISTS idx_modreports_uri ON moderation_reports(subject_uri);
181+CREATE INDEX IF NOT EXISTS idx_modreports_did ON moderation_reports(subject_did);
182+CREATE INDEX IF NOT EXISTS idx_modreports_reporter ON moderation_reports(reporter_did, created_at);
183+CREATE INDEX IF NOT EXISTS idx_modreports_status ON moderation_reports(status);
184+185+CREATE TABLE IF NOT EXISTS moderation_audit_log (
186+ id TEXT PRIMARY KEY,
187+ action TEXT NOT NULL,
188+ actor_did TEXT NOT NULL,
189+ target_uri TEXT NOT NULL DEFAULT '',
190+ reason TEXT NOT NULL DEFAULT '',
191+ details TEXT NOT NULL DEFAULT '{}',
192+ timestamp TEXT NOT NULL,
193+ auto_mod INTEGER NOT NULL DEFAULT 0
194+);
195+CREATE INDEX IF NOT EXISTS idx_modaudit_ts ON moderation_audit_log(timestamp DESC);
196+197+CREATE TABLE IF NOT EXISTS moderation_autohide_resets (
198+ did TEXT PRIMARY KEY,
199+ reset_at TEXT NOT NULL
200+);
201+`
202+203+// NewFeedIndex creates a new feed index backed by SQLite
204func NewFeedIndex(path string, profileTTL time.Duration) (*FeedIndex, error) {
205 if path == "" {
206 return nil, fmt.Errorf("index path is required")
···214 }
215 }
216217+ db, err := sql.Open("sqlite", "file:"+path+"?_pragma=busy_timeout(5000)&_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=foreign_keys(ON)&_pragma=temp_store(MEMORY)&_pragma=mmap_size(134217728)&_pragma=cache_size(-65536)")
00218 if err != nil {
219 return nil, fmt.Errorf("failed to open index database: %w", err)
220 }
221222+ // WAL mode allows concurrent reads with a single writer.
223+ // Allow multiple reader connections but limit to avoid file descriptor exhaustion.
224+ db.SetMaxOpenConns(4)
225+ db.SetMaxIdleConns(4)
226+227+ // Execute schema (skip PRAGMAs — already set via DSN)
228+ if _, err := db.Exec(schemaNoTrailingPragma); err != nil {
0000000000000000000000229 _ = db.Close()
230+ return nil, fmt.Errorf("failed to initialize schema: %w", err)
231 }
232233 idx := &FeedIndex{
···240 return idx, nil
241}
242243+// DB returns the underlying database connection for shared use by other stores.
244+func (idx *FeedIndex) DB() *sql.DB {
245+ return idx.db
246+}
247+248// Close closes the index database
249func (idx *FeedIndex) Close() error {
250 if idx.db != nil {
···270// GetCursor returns the last processed cursor (microseconds timestamp)
271func (idx *FeedIndex) GetCursor() (int64, error) {
272 var cursor int64
273+ err := idx.db.QueryRow(`SELECT value FROM meta WHERE key = 'cursor'`).Scan(&cursor)
274+ if err == sql.ErrNoRows {
275+ return 0, nil
276+ }
0000277 return cursor, err
278}
279280// SetCursor stores the cursor position
281func (idx *FeedIndex) SetCursor(cursor int64) error {
282+ _, err := idx.db.Exec(`INSERT OR REPLACE INTO meta (key, value) VALUES ('cursor', ?)`, cursor)
283+ return err
0000284}
285286// UpsertRecord adds or updates a record in the index
···298 }
299 }
300301+ now := time.Now()
302+303+ _, err := idx.db.Exec(`
304+ INSERT INTO records (uri, did, collection, rkey, record, cid, indexed_at, created_at)
305+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
306+ ON CONFLICT(uri) DO UPDATE SET
307+ record = excluded.record,
308+ cid = excluded.cid,
309+ indexed_at = excluded.indexed_at,
310+ created_at = excluded.created_at
311+ `, uri, did, collection, rkey, string(record), cid,
312+ now.Format(time.RFC3339Nano), createdAt.Format(time.RFC3339Nano))
313+ if err != nil {
314+ return fmt.Errorf("failed to upsert record: %w", err)
315 }
316317+ // Track known DID
318+ _, err = idx.db.Exec(`INSERT OR IGNORE INTO known_dids (did) VALUES (?)`, did)
319 if err != nil {
320+ return fmt.Errorf("failed to track known DID: %w", err)
321 }
322323+ return nil
00000000000000000000000000000000000324}
325326// DeleteRecord removes a record from the index
327func (idx *FeedIndex) DeleteRecord(did, collection, rkey string) error {
328 uri := atproto.BuildATURI(did, collection, rkey)
329+ _, err := idx.db.Exec(`DELETE FROM records WHERE uri = ?`, uri)
330+ return err
331+}
332333+// GetRecord retrieves a single record by URI
334+func (idx *FeedIndex) GetRecord(uri string) (*IndexedRecord, error) {
335+ var rec IndexedRecord
336+ var recordStr, indexedAtStr, createdAtStr string
0000337338+ err := idx.db.QueryRow(`
339+ SELECT uri, did, collection, rkey, record, cid, indexed_at, created_at
340+ FROM records WHERE uri = ?
341+ `, uri).Scan(&rec.URI, &rec.DID, &rec.Collection, &rec.RKey,
342+ &recordStr, &rec.CID, &indexedAtStr, &createdAtStr)
343+ if err == sql.ErrNoRows {
344+ return nil, nil
345+ }
346+ if err != nil {
347+ return nil, err
348+ }
349350+ rec.Record = json.RawMessage(recordStr)
351+ rec.IndexedAt, _ = time.Parse(time.RFC3339Nano, indexedAtStr)
352+ rec.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr)
00000000353354+ return &rec, nil
000000000000000000000000000000355}
356357// FeedItem represents an item in the feed (matches feed.FeedItem structure)
···380381// GetRecentFeed returns recent feed items from the index
382func (idx *FeedIndex) GetRecentFeed(ctx context.Context, limit int) ([]*FeedItem, error) {
383+ return idx.getFeedItems(ctx, "", limit, "")
00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000384}
385386// recordTypeToNSID maps a lexicons.RecordType to its NSID collection string
···392 lexicons.RecordTypeBrewer: atproto.NSIDBrewer,
393}
394395+// feedableCollections is the set of collection NSIDs that appear in the feed
396+var feedableCollections = func() []string {
397+ out := make([]string, 0, len(recordTypeToNSID))
398+ for _, nsid := range recordTypeToNSID {
399+ out = append(out, nsid)
400+ }
401+ return out
402+}()
403+404// GetFeedWithQuery returns feed items matching the given query with cursor-based pagination
405func (idx *FeedIndex) GetFeedWithQuery(ctx context.Context, q FeedQuery) (*FeedResult, error) {
406 if q.Limit <= 0 {
···410 q.Sort = FeedSortRecent
411 }
412413+ var collectionFilter string
414+ if q.TypeFilter != "" {
415+ nsid, ok := recordTypeToNSID[q.TypeFilter]
416+ if !ok {
417+ return nil, fmt.Errorf("unknown record type: %s", q.TypeFilter)
0000000418 }
419+ collectionFilter = nsid
420 }
421422+ items, err := idx.getFeedItems(ctx, collectionFilter, q.Limit+1, q.Cursor)
423+ if err != nil {
424+ return nil, err
425+ }
0426427+ // Sort based on query
428+ if q.Sort == FeedSortPopular {
429+ sort.Slice(items, func(i, j int) bool {
430+ scoreI := items[i].LikeCount*3 + items[i].CommentCount*2
431+ scoreJ := items[j].LikeCount*3 + items[j].CommentCount*2
432+ if scoreI != scoreJ {
433+ return scoreI > scoreJ
434 }
435+ return items[i].Timestamp.After(items[j].Timestamp)
436+ })
437+ }
438439+ result := &FeedResult{Items: items}
440+ if len(items) > q.Limit {
441+ result.Items = items[:q.Limit]
442+ last := result.Items[q.Limit-1]
443+ result.NextCursor = last.Timestamp.Format(time.RFC3339Nano) + "|" + last.SubjectURI
444+ }
445446+ return result, nil
447+}
448449+// getFeedItems fetches records from SQLite, resolves references, and returns FeedItems.
450+func (idx *FeedIndex) getFeedItems(ctx context.Context, collectionFilter string, limit int, cursor string) ([]*FeedItem, error) {
451+ // Build query for feedable records
452+ var args []any
453+ query := `SELECT uri, did, collection, rkey, record, cid, indexed_at, created_at FROM records WHERE `
000000454455+ if collectionFilter != "" {
456+ query += `collection = ? `
457+ args = append(args, collectionFilter)
458+ } else {
459+ // Only feedable collections
460+ placeholders := make([]string, len(feedableCollections))
461+ for i, c := range feedableCollections {
462+ placeholders[i] = "?"
463+ args = append(args, c)
464+ }
465+ query += `collection IN (` + strings.Join(placeholders, ",") + `) `
466+ }
467468+ // Cursor-based pagination: cursor format is "created_at|uri"
469+ if cursor != "" {
470+ parts := strings.SplitN(cursor, "|", 2)
471+ if len(parts) == 2 {
472+ query += `AND (created_at < ? OR (created_at = ? AND uri < ?)) `
473+ args = append(args, parts[0], parts[0], parts[1])
474+ }
475+ }
476477+ query += `ORDER BY created_at DESC LIMIT ?`
478+ args = append(args, limit)
00479480+ rows, err := idx.db.QueryContext(ctx, query, args...)
481+ if err != nil {
482+ return nil, err
483+ }
484+ defer rows.Close()
00000000485486+ var records []*IndexedRecord
487+ refURIs := make(map[string]bool) // URIs we need to resolve
0000000488489+ for rows.Next() {
490+ var rec IndexedRecord
491+ var recordStr, indexedAtStr, createdAtStr string
492+ if err := rows.Scan(&rec.URI, &rec.DID, &rec.Collection, &rec.RKey,
493+ &recordStr, &rec.CID, &indexedAtStr, &createdAtStr); err != nil {
494+ continue
495+ }
496+ rec.Record = json.RawMessage(recordStr)
497+ rec.IndexedAt, _ = time.Parse(time.RFC3339Nano, indexedAtStr)
498+ rec.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr)
499+ records = append(records, &rec)
500501+ // Collect reference URIs from the record data
502+ var recordData map[string]any
503+ if err := json.Unmarshal(rec.Record, &recordData); err == nil {
504+ for _, key := range []string{"beanRef", "roasterRef", "grinderRef", "brewerRef"} {
505+ if ref, ok := recordData[key].(string); ok && ref != "" {
506+ refURIs[ref] = true
00000000507 }
0000508 }
509 }
510+ }
511+ if err := rows.Err(); err != nil {
00512 return nil, err
513 }
514515+ // Build lookup map starting with the fetched records
516+ recordsByURI := make(map[string]*IndexedRecord, len(records))
517 for _, r := range records {
518 recordsByURI[r.URI] = r
519 }
520521+ // Fetch referenced records that we don't already have
522+ var missingURIs []string
523+ for uri := range refURIs {
524+ if _, ok := recordsByURI[uri]; !ok {
525+ missingURIs = append(missingURIs, uri)
526+ }
527+ }
528+529+ if len(missingURIs) > 0 {
530+ placeholders := make([]string, len(missingURIs))
531+ refArgs := make([]any, len(missingURIs))
532+ for i, uri := range missingURIs {
533+ placeholders[i] = "?"
534+ refArgs[i] = uri
535+ }
536+ refQuery := `SELECT uri, did, collection, rkey, record, cid, indexed_at, created_at FROM records WHERE uri IN (` + strings.Join(placeholders, ",") + `)`
537+ refRows, err := idx.db.QueryContext(ctx, refQuery, refArgs...)
538+ if err == nil {
539+ defer refRows.Close()
540+ for refRows.Next() {
541+ var rec IndexedRecord
542+ var recordStr, indexedAtStr, createdAtStr string
543+ if err := refRows.Scan(&rec.URI, &rec.DID, &rec.Collection, &rec.RKey,
544+ &recordStr, &rec.CID, &indexedAtStr, &createdAtStr); err != nil {
545+ continue
546+ }
547+ rec.Record = json.RawMessage(recordStr)
548+ rec.IndexedAt, _ = time.Parse(time.RFC3339Nano, indexedAtStr)
549+ rec.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr)
550+ recordsByURI[rec.URI] = &rec
551+552+ // If this is a bean, check if it references a roaster we also need
553+ if rec.Collection == atproto.NSIDBean {
554+ var beanData map[string]any
555+ if err := json.Unmarshal(rec.Record, &beanData); err == nil {
556+ if roasterRef, ok := beanData["roasterRef"].(string); ok && roasterRef != "" {
557+ if _, ok := recordsByURI[roasterRef]; !ok {
558+ // Fetch this roaster too
559+ var rRec IndexedRecord
560+ var rStr, rIdxAt, rCreAt string
561+ err := idx.db.QueryRowContext(ctx,
562+ `SELECT uri, did, collection, rkey, record, cid, indexed_at, created_at FROM records WHERE uri = ?`,
563+ roasterRef).Scan(&rRec.URI, &rRec.DID, &rRec.Collection, &rRec.RKey,
564+ &rStr, &rRec.CID, &rIdxAt, &rCreAt)
565+ if err == nil {
566+ rRec.Record = json.RawMessage(rStr)
567+ rRec.IndexedAt, _ = time.Parse(time.RFC3339Nano, rIdxAt)
568+ rRec.CreatedAt, _ = time.Parse(time.RFC3339Nano, rCreAt)
569+ recordsByURI[rRec.URI] = &rRec
570+ }
571+ }
572+ }
573+ }
574+ }
575 }
576+ }
0000577 }
578579 // Convert to FeedItems
580 items := make([]*FeedItem, 0, len(records))
581 for _, record := range records {
0000582 item, err := idx.recordToFeedItem(ctx, record, recordsByURI)
583 if err != nil {
584 log.Warn().Err(err).Str("uri", record.URI).Msg("failed to convert record to feed item")
···590 items = append(items, item)
591 }
592593+ return items, nil
0000000000000000000000000000000000000000000000000594}
595596// recordToFeedItem converts an IndexedRecord to a FeedItem
···609 profile, err := idx.GetProfile(ctx, record.DID)
610 if err != nil {
611 log.Warn().Err(err).Str("did", record.DID).Msg("failed to get profile")
0612 profile = &atproto.Profile{
613 DID: record.DID,
614+ Handle: record.DID,
615 }
616 }
617 item.Author = profile
···720 item.Brewer = brewer
721722 case atproto.NSIDLike:
0723 return nil, fmt.Errorf("unexpected: likes should be filtered before conversion")
724725 default:
···746 idx.profileCacheMu.RUnlock()
747748 // Check persistent cache
749+ var dataStr, expiresAtStr string
750+ err := idx.db.QueryRow(`SELECT data, expires_at FROM profiles WHERE did = ?`, did).Scan(&dataStr, &expiresAtStr)
751+ if err == nil {
752+ expiresAt, _ := time.Parse(time.RFC3339Nano, expiresAtStr)
753+ if time.Now().Before(expiresAt) {
754+ cached := &CachedProfile{}
755+ if err := json.Unmarshal([]byte(dataStr), cached); err == nil {
756+ idx.profileCacheMu.Lock()
757+ idx.profileCache[did] = cached
758+ idx.profileCacheMu.Unlock()
759+ return cached.Profile, nil
760+ }
761 }
000000000762 }
763764 // Fetch from API
···769770 // Cache the result
771 now := time.Now()
772+ cached := &CachedProfile{
773 Profile: profile,
774 CachedAt: now,
775 ExpiresAt: now.Add(idx.profileTTL),
···782783 // Persist to database
784 data, _ := json.Marshal(cached)
785+ _, _ = idx.db.Exec(`INSERT OR REPLACE INTO profiles (did, data, expires_at) VALUES (?, ?, ?)`,
786+ did, string(data), cached.ExpiresAt.Format(time.RFC3339Nano))
00787788 return profile, nil
789}
790791// GetKnownDIDs returns all DIDs that have created Arabica records
792func (idx *FeedIndex) GetKnownDIDs() ([]string, error) {
793+ rows, err := idx.db.Query(`SELECT did FROM known_dids`)
794+ if err != nil {
795+ return nil, err
796+ }
797+ defer rows.Close()
798+799 var dids []string
800+ for rows.Next() {
801+ var did string
802+ if err := rows.Scan(&did); err != nil {
803+ continue
804+ }
805+ dids = append(dids, did)
806+ }
807+ return dids, rows.Err()
808+}
809+810+// ListRecordsByCollection returns all indexed records for a given collection.
811+func (idx *FeedIndex) ListRecordsByCollection(collection string) ([]IndexedRecord, error) {
812+ rows, err := idx.db.Query(`
813+ SELECT uri, did, collection, rkey, record, cid, indexed_at, created_at
814+ FROM records WHERE collection = ? ORDER BY created_at DESC
815+ `, collection)
816+ if err != nil {
817+ return nil, err
818+ }
819+ defer rows.Close()
820+821+ var records []IndexedRecord
822+ for rows.Next() {
823+ var rec IndexedRecord
824+ var recordStr, indexedAtStr, createdAtStr string
825+ if err := rows.Scan(&rec.URI, &rec.DID, &rec.Collection, &rec.RKey,
826+ &recordStr, &rec.CID, &indexedAtStr, &createdAtStr); err != nil {
827+ continue
828+ }
829+ rec.Record = json.RawMessage(recordStr)
830+ rec.IndexedAt, _ = time.Parse(time.RFC3339Nano, indexedAtStr)
831+ rec.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr)
832+ records = append(records, rec)
833+ }
834+ return records, rows.Err()
835}
836837// RecordCount returns the total number of indexed records
838func (idx *FeedIndex) RecordCount() int {
839 var count int
840+ _ = idx.db.QueryRow(`SELECT COUNT(*) FROM records`).Scan(&count)
0000841 return count
842}
843844// KnownDIDCount returns the number of unique DIDs in the index
845func (idx *FeedIndex) KnownDIDCount() int {
846 var count int
847+ _ = idx.db.QueryRow(`SELECT COUNT(*) FROM known_dids`).Scan(&count)
0000848 return count
849}
850851// TotalLikeCount returns the total number of likes indexed
852func (idx *FeedIndex) TotalLikeCount() int {
853 var count int
854+ _ = idx.db.QueryRow(`SELECT COUNT(*) FROM likes`).Scan(&count)
0000855 return count
856}
857858// TotalCommentCount returns the total number of comments indexed
859func (idx *FeedIndex) TotalCommentCount() int {
860 var count int
861+ _ = idx.db.QueryRow(`SELECT COUNT(*) FROM comments`).Scan(&count)
0000862 return count
863}
864865// RecordCountByCollection returns a breakdown of record counts by collection type
866func (idx *FeedIndex) RecordCountByCollection() map[string]int {
867 counts := make(map[string]int)
868+ rows, err := idx.db.Query(`SELECT collection, COUNT(*) FROM records GROUP BY collection`)
869+ if err != nil {
870+ return counts
871+ }
872+ defer rows.Close()
873+ for rows.Next() {
874+ var collection string
875+ var count int
876+ if err := rows.Scan(&collection, &count); err == nil {
877+ counts[collection] = count
878+ }
879+ }
880 return counts
0000000000000000000881}
882883func formatTimeAgo(t time.Time) string {
···921922// IsBackfilled checks if a DID has already been backfilled
923func (idx *FeedIndex) IsBackfilled(did string) bool {
924+ var exists int
925+ err := idx.db.QueryRow(`SELECT 1 FROM backfilled WHERE did = ?`, did).Scan(&exists)
926+ return err == nil
0000927}
928929// MarkBackfilled marks a DID as backfilled with current timestamp
930func (idx *FeedIndex) MarkBackfilled(did string) error {
931+ _, err := idx.db.Exec(`INSERT OR IGNORE INTO backfilled (did, backfilled_at) VALUES (?, ?)`,
932+ did, time.Now().Format(time.RFC3339))
933+ return err
00934}
935936// BackfillUser fetches all existing records for a DID and adds them to the index
0937func (idx *FeedIndex) BackfillUser(ctx context.Context, did string) error {
0938 if idx.IsBackfilled(did) {
939 log.Debug().Str("did", did).Msg("DID already backfilled, skipping")
940 return nil
···951 }
952953 for _, record := range records.Records {
0954 parts := strings.Split(record.URI, "/")
955 if len(parts) < 3 {
956 continue
···968 }
969 recordCount++
9700971 switch collection {
972 case atproto.NSIDLike:
973 if subject, ok := record.Value["subject"].(map[string]interface{}); ok {
···1004 }
1005 }
100601007 if err := idx.MarkBackfilled(did); err != nil {
1008 log.Warn().Err(err).Str("did", did).Msg("failed to mark DID as backfilled")
1009 }
···10161017// UpsertLike adds or updates a like in the index
1018func (idx *FeedIndex) UpsertLike(actorDID, rkey, subjectURI string) error {
1019+ _, err := idx.db.Exec(`INSERT OR IGNORE INTO likes (subject_uri, actor_did, rkey) VALUES (?, ?, ?)`,
1020+ subjectURI, actorDID, rkey)
1021+ return err
00000000000000000000000000000000001022}
10231024// DeleteLike removes a like from the index
1025func (idx *FeedIndex) DeleteLike(actorDID, subjectURI string) error {
1026+ _, err := idx.db.Exec(`DELETE FROM likes WHERE subject_uri = ? AND actor_did = ?`,
1027+ subjectURI, actorDID)
1028+ return err
000000000000000000000000000000000000001029}
10301031// GetLikeCount returns the number of likes for a record
1032func (idx *FeedIndex) GetLikeCount(subjectURI string) int {
1033+ var count int
1034+ _ = idx.db.QueryRow(`SELECT COUNT(*) FROM likes WHERE subject_uri = ?`, subjectURI).Scan(&count)
1035+ return count
00000001036}
10371038// HasUserLiked checks if a user has liked a specific record
1039func (idx *FeedIndex) HasUserLiked(actorDID, subjectURI string) bool {
1040+ var exists int
1041+ err := idx.db.QueryRow(`SELECT 1 FROM likes WHERE actor_did = ? AND subject_uri = ? LIMIT 1`,
1042+ actorDID, subjectURI).Scan(&exists)
1043+ return err == nil
00001044}
10451046// GetUserLikeRKey returns the rkey of a user's like for a specific record, or empty string if not found
1047func (idx *FeedIndex) GetUserLikeRKey(actorDID, subjectURI string) string {
1048 var rkey string
1049+ err := idx.db.QueryRow(`SELECT rkey FROM likes WHERE actor_did = ? AND subject_uri = ?`,
1050+ actorDID, subjectURI).Scan(&rkey)
1051+ if err != nil {
1052+ return ""
1053+ }
0001054 return rkey
1055}
1056···10791080// UpsertComment adds or updates a comment in the index
1081func (idx *FeedIndex) UpsertComment(actorDID, rkey, subjectURI, parentURI, cid, text string, createdAt time.Time) error {
1082+ // Extract parent rkey from parent URI if present
1083+ var parentRKey string
1084+ if parentURI != "" {
1085+ parts := strings.Split(parentURI, "/")
1086+ if len(parts) > 0 {
1087+ parentRKey = parts[len(parts)-1]
00000000000000000000000000000000000000000000000000000000000000000000000000000001088 }
1089+ }
10901091+ _, err := idx.db.Exec(`
1092+ INSERT INTO comments (actor_did, rkey, subject_uri, parent_uri, parent_rkey, cid, text, created_at)
1093+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
1094+ ON CONFLICT(actor_did, rkey) DO UPDATE SET
1095+ subject_uri = excluded.subject_uri,
1096+ parent_uri = excluded.parent_uri,
1097+ parent_rkey = excluded.parent_rkey,
1098+ cid = excluded.cid,
1099+ text = excluded.text,
1100+ created_at = excluded.created_at
1101+ `, actorDID, rkey, subjectURI, parentURI, parentRKey, cid, text, createdAt.Format(time.RFC3339Nano))
1102+ return err
1103}
11041105// DeleteComment removes a comment from the index
1106func (idx *FeedIndex) DeleteComment(actorDID, rkey, subjectURI string) error {
1107+ _, err := idx.db.Exec(`DELETE FROM comments WHERE actor_did = ? AND rkey = ?`, actorDID, rkey)
1108+ return err
00000000000000000000000000000000000000000000000000000000000000000000000000000000001109}
11101111// GetCommentCount returns the number of comments on a record
1112func (idx *FeedIndex) GetCommentCount(subjectURI string) int {
1113+ var count int
1114+ _ = idx.db.QueryRow(`SELECT COUNT(*) FROM comments WHERE subject_uri = ?`, subjectURI).Scan(&count)
1115+ return count
00000001116}
11171118// GetCommentsForSubject returns all comments for a specific record, ordered by creation time
01119func (idx *FeedIndex) GetCommentsForSubject(ctx context.Context, subjectURI string, limit int, viewerDID string) []IndexedComment {
1120+ query := `SELECT actor_did, rkey, subject_uri, parent_uri, parent_rkey, cid, text, created_at
1121+ FROM comments WHERE subject_uri = ? ORDER BY created_at`
1122+ var args []any
1123+ args = append(args, subjectURI)
1124+ if limit > 0 {
1125+ query += ` LIMIT ?`
1126+ args = append(args, limit)
1127+ }
11281129+ rows, err := idx.db.QueryContext(ctx, query, args...)
1130+ if err != nil {
000000001131 return nil
1132+ }
1133+ defer rows.Close()
1134+1135+ var comments []IndexedComment
1136+ for rows.Next() {
1137+ var c IndexedComment
1138+ var createdAtStr string
1139+ if err := rows.Scan(&c.ActorDID, &c.RKey, &c.SubjectURI, &c.ParentURI, &c.ParentRKey,
1140+ &c.CID, &c.Text, &createdAtStr); err != nil {
1141+ continue
1142+ }
1143+ c.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr)
1144+ comments = append(comments, c)
1145+ }
11461147 // Populate profile and like info for each comment
1148 for i := range comments {
1149 profile, err := idx.GetProfile(ctx, comments[i].ActorDID)
1150 if err != nil {
01151 comments[i].Handle = comments[i].ActorDID
1152 } else {
1153 comments[i].Handle = profile.Handle
···1166}
11671168// GetThreadedCommentsForSubject returns comments for a record in threaded order with depth
001169func (idx *FeedIndex) GetThreadedCommentsForSubject(ctx context.Context, subjectURI string, limit int, viewerDID string) []IndexedComment {
1170+ allComments := idx.GetCommentsForSubject(ctx, subjectURI, 0, viewerDID)
011711172 if len(allComments) == 0 {
1173 return nil
···1186 for i := range allComments {
1187 comment := &allComments[i]
1188 if comment.ParentRKey == "" {
01189 topLevel = append(topLevel, comment)
1190 } else {
01191 childrenMap[comment.ParentRKey] = append(childrenMap[comment.ParentRKey], comment)
1192 }
1193 }
···1211 if limit > 0 && len(result) >= limit {
1212 return
1213 }
01214 visualDepth := depth
1215 if visualDepth > 2 {
1216 visualDepth = 2
···1218 comment.Depth = visualDepth
1219 result = append(result, *comment)
122001221 if children, ok := childrenMap[comment.RKey]; ok {
1222 for _, child := range children {
1223 flatten(child, depth+1)
+75-147
internal/firehose/notifications.go
···1package firehose
23import (
4- "encoding/json"
5 "fmt"
6 "strings"
7 "time"
···9 "arabica/internal/models"
1011 "github.com/rs/zerolog/log"
12- bolt "go.etcd.io/bbolt"
13-)
14-15-// Bucket names for notifications
16-var (
17- // BucketNotifications stores notifications: {target_did}:{inverted_timestamp}:{id} -> {Notification JSON}
18- BucketNotifications = []byte("notifications")
19-20- // BucketNotificationsMeta stores per-user metadata: {target_did}:last_read -> {timestamp RFC3339}
21- BucketNotificationsMeta = []byte("notifications_meta")
22)
2324// CreateNotification stores a notification for the target user.
25-// Deduplicates by (type + actorDID + subjectURI) to prevent duplicates from backfills.
26// Self-notifications (actorDID == targetDID) are silently skipped.
27func (idx *FeedIndex) CreateNotification(targetDID string, notif models.Notification) error {
28 if targetDID == "" || targetDID == notif.ActorDID {
29 return nil // skip self-notifications
30 }
3132- return idx.db.Update(func(tx *bolt.Tx) error {
33- b := tx.Bucket(BucketNotifications)
34-35- // Deduplication: scan for existing notification with same type+actor+subject
36- prefix := []byte(targetDID + ":")
37- c := b.Cursor()
38- for k, v := c.Seek(prefix); k != nil && strings.HasPrefix(string(k), string(prefix)); k, v = c.Next() {
39- var existing models.Notification
40- if err := json.Unmarshal(v, &existing); err != nil {
41- continue
42- }
43- if existing.Type == notif.Type && existing.ActorDID == notif.ActorDID && existing.SubjectURI == notif.SubjectURI {
44- return nil // duplicate, skip
45- }
46- }
47-48- // Generate ID from timestamp
49- if notif.ID == "" {
50- notif.ID = fmt.Sprintf("%d", notif.CreatedAt.UnixNano())
51- }
52-53- data, err := json.Marshal(notif)
54- if err != nil {
55- return fmt.Errorf("failed to marshal notification: %w", err)
56- }
5758- // Key: {target_did}:{inverted_timestamp}:{id} for reverse chronological order
59- inverted := ^uint64(notif.CreatedAt.UnixNano())
60- key := fmt.Sprintf("%s:%016x:%s", targetDID, inverted, notif.ID)
61- return b.Put([]byte(key), data)
62- })
0063}
6465// GetNotifications returns notifications for a user, newest first.
66// Uses cursor-based pagination. Returns notifications, next cursor, and error.
67func (idx *FeedIndex) GetNotifications(targetDID string, limit int, cursor string) ([]models.Notification, string, error) {
68- var notifications []models.Notification
69- var nextCursor string
70-71 if limit <= 0 {
72 limit = 20
73 }
7475- // Get last_read timestamp for marking read status
76 lastRead := idx.getLastRead(targetDID)
7778- err := idx.db.View(func(tx *bolt.Tx) error {
79- b := tx.Bucket(BucketNotifications)
80- c := b.Cursor()
08182- prefix := []byte(targetDID + ":")
83- var k, v []byte
008485- if cursor != "" {
86- // Seek to cursor position, then advance past it
87- k, v = c.Seek([]byte(cursor))
88- if k != nil && string(k) == cursor {
89- k, v = c.Next()
90- }
91- } else {
92- k, v = c.Seek(prefix)
93- }
9495- var lastKey []byte
96- count := 0
97- for ; k != nil && strings.HasPrefix(string(k), string(prefix)); k, v = c.Next() {
98- if count >= limit {
99- // There are more items beyond our limit
100- nextCursor = string(lastKey)
101- break
102- }
103104- var notif models.Notification
105- if err := json.Unmarshal(v, ¬if); err != nil {
106- continue
107- }
108-109- // Determine read status based on last_read timestamp
110- if !lastRead.IsZero() && !notif.CreatedAt.After(lastRead) {
111- notif.Read = true
112- }
113114- notifications = append(notifications, notif)
115- lastKey = make([]byte, len(k))
116- copy(lastKey, k)
117- count++
118 }
119120- return nil
121- })
122123- return notifications, nextCursor, err
00000000124}
125126// GetUnreadCount returns the number of unread notifications for a user.
···132 lastRead := idx.getLastRead(targetDID)
133134 var count int
135- _ = idx.db.View(func(tx *bolt.Tx) error {
136- b := tx.Bucket(BucketNotifications)
137- c := b.Cursor()
138-139- prefix := []byte(targetDID + ":")
140- for k, v := c.Seek(prefix); k != nil && strings.HasPrefix(string(k), string(prefix)); k, v = c.Next() {
141- var notif models.Notification
142- if err := json.Unmarshal(v, ¬if); err != nil {
143- continue
144- }
145- // If no last_read set, all are unread
146- if lastRead.IsZero() || notif.CreatedAt.After(lastRead) {
147- count++
148- } else {
149- // Since keys are in reverse chronological order,
150- // once we hit a read notification, all remaining are also read
151- break
152- }
153- }
154- return nil
155- })
156157 return count
158}
159160// MarkAllRead updates the last_read timestamp to now for the user.
161func (idx *FeedIndex) MarkAllRead(targetDID string) error {
162- return idx.db.Update(func(tx *bolt.Tx) error {
163- b := tx.Bucket(BucketNotificationsMeta)
164- key := []byte(targetDID + ":last_read")
165- return b.Put(key, []byte(time.Now().Format(time.RFC3339Nano)))
166- })
167}
168169// getLastRead returns the last_read timestamp for a user.
170func (idx *FeedIndex) getLastRead(targetDID string) time.Time {
171- var lastRead time.Time
172- _ = idx.db.View(func(tx *bolt.Tx) error {
173- b := tx.Bucket(BucketNotificationsMeta)
174- v := b.Get([]byte(targetDID + ":last_read"))
175- if v != nil {
176- if t, err := time.Parse(time.RFC3339Nano, string(v)); err == nil {
177- lastRead = t
178- }
179- }
180- return nil
181- })
182- return lastRead
183}
184185// parseTargetDID extracts the DID from an AT-URI (at://did:plc:xxx/collection/rkey)
···206 return
207 }
208209- err := idx.db.Update(func(tx *bolt.Tx) error {
210- b := tx.Bucket(BucketNotifications)
211- prefix := []byte(targetDID + ":")
212- c := b.Cursor()
213- for k, v := c.Seek(prefix); k != nil && strings.HasPrefix(string(k), string(prefix)); k, v = c.Next() {
214- var existing models.Notification
215- if err := json.Unmarshal(v, &existing); err != nil {
216- continue
217- }
218- if existing.Type == notifType && existing.ActorDID == actorDID && existing.SubjectURI == subjectURI {
219- return b.Delete(k)
220- }
221- }
222- return nil
223- })
224 if err != nil {
225 log.Warn().Err(err).Str("target", targetDID).Str("actor", actorDID).Msg("failed to delete notification")
226 }
···251// Returns empty string if not found.
252func (idx *FeedIndex) GetCommentSubjectURI(actorDID, rkey string) string {
253 var subjectURI string
254- _ = idx.db.View(func(tx *bolt.Tx) error {
255- b := tx.Bucket(BucketCommentsByActor)
256- v := b.Get([]byte(actorDID + ":" + rkey))
257- if v != nil {
258- subjectURI = string(v)
259- }
260- return nil
261- })
262 return subjectURI
263}
264···301 }
302303 // If this is a reply, also notify the parent comment's author.
304- // We store the brew's subjectURI (not the parent comment URI) so the
305- // notification links directly to the brew page with comments.
306 if parentURI != "" {
307 parentAuthorDID := parseTargetDID(parentURI)
308 if parentAuthorDID != "" && parentAuthorDID != actorDID && parentAuthorDID != targetDID {
···1package firehose
23import (
04 "fmt"
5 "strings"
6 "time"
···8 "arabica/internal/models"
910 "github.com/rs/zerolog/log"
000000000011)
1213// CreateNotification stores a notification for the target user.
14+// Deduplicates by (type + actorDID + subjectURI) via unique index.
15// Self-notifications (actorDID == targetDID) are silently skipped.
16func (idx *FeedIndex) CreateNotification(targetDID string, notif models.Notification) error {
17 if targetDID == "" || targetDID == notif.ActorDID {
18 return nil // skip self-notifications
19 }
2021+ // Generate ID from timestamp
22+ if notif.ID == "" {
23+ notif.ID = fmt.Sprintf("%d", notif.CreatedAt.UnixNano())
24+ }
0000000000000000000002526+ // INSERT OR IGNORE deduplicates via the unique index on (target_did, type, actor_did, subject_uri)
27+ _, err := idx.db.Exec(`
28+ INSERT OR IGNORE INTO notifications (id, target_did, type, actor_did, subject_uri, created_at)
29+ VALUES (?, ?, ?, ?, ?, ?)
30+ `, notif.ID, targetDID, string(notif.Type), notif.ActorDID, notif.SubjectURI,
31+ notif.CreatedAt.Format(time.RFC3339Nano))
32+ return err
33}
3435// GetNotifications returns notifications for a user, newest first.
36// Uses cursor-based pagination. Returns notifications, next cursor, and error.
37func (idx *FeedIndex) GetNotifications(targetDID string, limit int, cursor string) ([]models.Notification, string, error) {
00038 if limit <= 0 {
39 limit = 20
40 }
41042 lastRead := idx.getLastRead(targetDID)
4344+ var args []any
45+ query := `SELECT id, type, actor_did, subject_uri, created_at
46+ FROM notifications WHERE target_did = ?`
47+ args = append(args, targetDID)
4849+ if cursor != "" {
50+ query += ` AND created_at < ?`
51+ args = append(args, cursor)
52+ }
5354+ query += ` ORDER BY created_at DESC LIMIT ?`
55+ // Fetch one extra to determine if there's a next page
56+ args = append(args, limit+1)
0000005758+ rows, err := idx.db.Query(query, args...)
59+ if err != nil {
60+ return nil, "", err
61+ }
62+ defer rows.Close()
0006364+ var notifications []models.Notification
65+ for rows.Next() {
66+ var notif models.Notification
67+ var typeStr, createdAtStr string
68+ if err := rows.Scan(¬if.ID, &typeStr, ¬if.ActorDID, ¬if.SubjectURI, &createdAtStr); err != nil {
69+ continue
70+ }
71+ notif.Type = models.NotificationType(typeStr)
72+ notif.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr)
7374+ if !lastRead.IsZero() && !notif.CreatedAt.After(lastRead) {
75+ notif.Read = true
0076 }
7778+ notifications = append(notifications, notif)
79+ }
8081+ var nextCursor string
82+ if len(notifications) > limit {
83+ // There are more results
84+ last := notifications[limit-1]
85+ nextCursor = last.CreatedAt.Format(time.RFC3339Nano)
86+ notifications = notifications[:limit]
87+ }
88+89+ return notifications, nextCursor, rows.Err()
90}
9192// GetUnreadCount returns the number of unread notifications for a user.
···98 lastRead := idx.getLastRead(targetDID)
99100 var count int
101+ if lastRead.IsZero() {
102+ _ = idx.db.QueryRow(`SELECT COUNT(*) FROM notifications WHERE target_did = ?`, targetDID).Scan(&count)
103+ } else {
104+ _ = idx.db.QueryRow(`SELECT COUNT(*) FROM notifications WHERE target_did = ? AND created_at > ?`,
105+ targetDID, lastRead.Format(time.RFC3339Nano)).Scan(&count)
106+ }
000000000000000107108 return count
109}
110111// MarkAllRead updates the last_read timestamp to now for the user.
112func (idx *FeedIndex) MarkAllRead(targetDID string) error {
113+ _, err := idx.db.Exec(`INSERT OR REPLACE INTO notifications_meta (target_did, last_read) VALUES (?, ?)`,
114+ targetDID, time.Now().Format(time.RFC3339Nano))
115+ return err
00116}
117118// getLastRead returns the last_read timestamp for a user.
119func (idx *FeedIndex) getLastRead(targetDID string) time.Time {
120+ var lastReadStr string
121+ err := idx.db.QueryRow(`SELECT last_read FROM notifications_meta WHERE target_did = ?`, targetDID).Scan(&lastReadStr)
122+ if err != nil {
123+ return time.Time{}
124+ }
125+ t, _ := time.Parse(time.RFC3339Nano, lastReadStr)
126+ return t
00000127}
128129// parseTargetDID extracts the DID from an AT-URI (at://did:plc:xxx/collection/rkey)
···150 return
151 }
152153+ _, err := idx.db.Exec(`
154+ DELETE FROM notifications
155+ WHERE target_did = ? AND type = ? AND actor_did = ? AND subject_uri = ?
156+ `, targetDID, string(notifType), actorDID, subjectURI)
00000000000157 if err != nil {
158 log.Warn().Err(err).Str("target", targetDID).Str("actor", actorDID).Msg("failed to delete notification")
159 }
···184// Returns empty string if not found.
185func (idx *FeedIndex) GetCommentSubjectURI(actorDID, rkey string) string {
186 var subjectURI string
187+ err := idx.db.QueryRow(`SELECT subject_uri FROM comments WHERE actor_did = ? AND rkey = ?`,
188+ actorDID, rkey).Scan(&subjectURI)
189+ if err != nil {
190+ return ""
191+ }
000192 return subjectURI
193}
194···231 }
232233 // If this is a reply, also notify the parent comment's author.
00234 if parentURI != "" {
235 parentAuthorDID := parseTargetDID(parentURI)
236 if parentAuthorDID != "" && parentAuthorDID != actorDID && parentAuthorDID != targetDID {
-220
internal/firehose/suggestions.go
···1-package firehose
2-3-import (
4- "encoding/json"
5- "sort"
6- "strings"
7-8- "arabica/internal/atproto"
9-10- bolt "go.etcd.io/bbolt"
11-)
12-13-// EntitySuggestion represents a suggestion for auto-completing an entity
14-type EntitySuggestion struct {
15- Name string `json:"name"`
16- SourceURI string `json:"source_uri"`
17- Fields map[string]string `json:"fields"`
18- Count int `json:"count"`
19-}
20-21-// entityFieldConfig defines which fields to extract and search for each entity type
22-type entityFieldConfig struct {
23- allFields []string
24- searchFields []string
25- nameField string
26-}
27-28-var entityConfigs = map[string]entityFieldConfig{
29- atproto.NSIDRoaster: {
30- allFields: []string{"name", "location", "website"},
31- searchFields: []string{"name", "location", "website"},
32- nameField: "name",
33- },
34- atproto.NSIDGrinder: {
35- allFields: []string{"name", "grinderType", "burrType"},
36- searchFields: []string{"name", "grinderType", "burrType"},
37- nameField: "name",
38- },
39- atproto.NSIDBrewer: {
40- allFields: []string{"name", "brewerType", "description"},
41- searchFields: []string{"name", "brewerType"},
42- nameField: "name",
43- },
44- atproto.NSIDBean: {
45- allFields: []string{"name", "origin", "roastLevel", "process"},
46- searchFields: []string{"name", "origin", "roastLevel"},
47- nameField: "name",
48- },
49-}
50-51-// SearchEntitySuggestions searches indexed records for entity suggestions matching a query.
52-// It scans BucketByCollection for the given collection, matches against searchable fields,
53-// deduplicates by normalized name, and returns results sorted by popularity.
54-func (idx *FeedIndex) SearchEntitySuggestions(collection, query string, limit int) ([]EntitySuggestion, error) {
55- if limit <= 0 {
56- limit = 10
57- }
58-59- config, ok := entityConfigs[collection]
60- if !ok {
61- return nil, nil
62- }
63-64- queryLower := strings.ToLower(strings.TrimSpace(query))
65- if len(queryLower) < 2 {
66- return nil, nil
67- }
68-69- // dedupKey -> aggregated suggestion
70- type candidate struct {
71- suggestion EntitySuggestion
72- fieldCount int // number of non-empty fields (to pick best representative)
73- dids map[string]struct{}
74- }
75- candidates := make(map[string]*candidate)
76-77- err := idx.db.View(func(tx *bolt.Tx) error {
78- byCollection := tx.Bucket(BucketByCollection)
79- recordsBucket := tx.Bucket(BucketRecords)
80-81- prefix := []byte(collection + ":")
82- c := byCollection.Cursor()
83-84- for k, _ := c.Seek(prefix); k != nil; k, _ = c.Next() {
85- if !hasPrefix(k, prefix) {
86- break
87- }
88-89- // Extract URI from collection key
90- uri := extractURIFromCollectionKey(k, collection)
91- if uri == "" {
92- continue
93- }
94-95- data := recordsBucket.Get([]byte(uri))
96- if data == nil {
97- continue
98- }
99-100- var indexed IndexedRecord
101- if err := json.Unmarshal(data, &indexed); err != nil {
102- continue
103- }
104-105- var recordData map[string]interface{}
106- if err := json.Unmarshal(indexed.Record, &recordData); err != nil {
107- continue
108- }
109-110- // Extract fields
111- fields := make(map[string]string)
112- for _, f := range config.allFields {
113- if v, ok := recordData[f].(string); ok && v != "" {
114- fields[f] = v
115- }
116- }
117-118- name := fields[config.nameField]
119- if name == "" {
120- continue
121- }
122-123- // Check if any searchable field matches the query
124- matched := false
125- for _, sf := range config.searchFields {
126- val := strings.ToLower(fields[sf])
127- if val == "" {
128- continue
129- }
130- if strings.HasPrefix(val, queryLower) || strings.Contains(val, queryLower) {
131- matched = true
132- break
133- }
134- }
135- if !matched {
136- continue
137- }
138-139- // Deduplicate by normalized name
140- normalizedName := strings.ToLower(strings.TrimSpace(name))
141-142- if existing, ok := candidates[normalizedName]; ok {
143- existing.dids[indexed.DID] = struct{}{}
144- // Keep the record with more complete fields
145- nonEmpty := 0
146- for _, v := range fields {
147- if v != "" {
148- nonEmpty++
149- }
150- }
151- if nonEmpty > existing.fieldCount {
152- existing.suggestion.Name = name
153- existing.suggestion.Fields = fields
154- existing.suggestion.SourceURI = indexed.URI
155- existing.fieldCount = nonEmpty
156- }
157- } else {
158- nonEmpty := 0
159- for _, v := range fields {
160- if v != "" {
161- nonEmpty++
162- }
163- }
164- candidates[normalizedName] = &candidate{
165- suggestion: EntitySuggestion{
166- Name: name,
167- SourceURI: indexed.URI,
168- Fields: fields,
169- },
170- fieldCount: nonEmpty,
171- dids: map[string]struct{}{indexed.DID: {}},
172- }
173- }
174- }
175-176- return nil
177- })
178- if err != nil {
179- return nil, err
180- }
181-182- // Build results with counts
183- results := make([]EntitySuggestion, 0, len(candidates))
184- for _, c := range candidates {
185- c.suggestion.Count = len(c.dids)
186- results = append(results, c.suggestion)
187- }
188-189- // Sort: prefix matches first, then by count desc, then alphabetically
190- sort.Slice(results, func(i, j int) bool {
191- iPrefix := strings.HasPrefix(strings.ToLower(results[i].Name), queryLower)
192- jPrefix := strings.HasPrefix(strings.ToLower(results[j].Name), queryLower)
193- if iPrefix != jPrefix {
194- return iPrefix
195- }
196- if results[i].Count != results[j].Count {
197- return results[i].Count > results[j].Count
198- }
199- return strings.ToLower(results[i].Name) < strings.ToLower(results[j].Name)
200- })
201-202- if len(results) > limit {
203- results = results[:limit]
204- }
205-206- return results, nil
207-}
208-209-// hasPrefix checks if a byte slice starts with a prefix (avoids import of bytes)
210-func hasPrefix(s, prefix []byte) bool {
211- if len(s) < len(prefix) {
212- return false
213- }
214- for i, b := range prefix {
215- if s[i] != b {
216- return false
217- }
218- }
219- return true
220-}