···11.vscode/
12.env
1314-# Feed Generator Binary
15feedgen
01617# Test binary, built with `go test -c`
18*.test
···2526# Go workspace file
27go.work
0000
···11.vscode/
12.env
1314+# Misc Binaries
15feedgen
16+indexer/indexer
1718# Test binary, built with `go test -c`
19*.test
···2627# Go workspace file
28go.work
29+30+# trash
31+.DS_Store
32+/*/.DS_Store
+63-49
README.md
···1-# go-bsky-feed-generator
2-A minimal implementation of a BlueSky Feed Generator in Go
3-4-5-## Requirements
67-To run this feed generator, all you need is `docker` with `docker-compose`.
89## Running
10-11-Start up the feed generator by running: `make up`
12-13-This will build the feed generator service binary inside a docker container and stand up the service on your machine at port `9032`.
14-15-To view a sample static feed (with only one post) go to:
16-17-- [`http://localhost:9032/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://did:plc:replace-me-with-your-did/app.bsky.feed.generator/static`](http://localhost:9032/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://did:plc:replace-me-with-your-did/app.bsky.feed.generator/static)
18-19-Update the variables in `.env` when you actually want to deploy the service somewhere, at which point `did:plc:replace-me-with-your-did` should be replaced with the value of `FEED_ACTOR_DID`.
20-21-## Accessing
22-23-This service exposes the following routes:
24-25-- `/.well-known/did.json`
26- - This route is used by ATProto to verify ownership of the DID the service is claiming, it's a static JSON document.
27- - You can see how this is generated in `pkg/gin/endpoints.go:GetWellKnownDID()`
28-- `/xrpc/app.bsky.feed.getFeedSkeleton`
29- - This route is what clients call to generate a feed page, it includes three query parameters for feed generation: `feed`, `cursor`, and `limit`
30- - You can see how those are parsed and handled in `pkg/gin/endpoints.go:GetFeedSkeleton()`
31-- `/xrpc/app.bsky.feed.describeFeedGenerator`
32- - This route is how the service advertises which feeds it supports to clients.
33- - You can see how those are parsed and handled in `pkg/gin/endpoints.go:DescribeFeeds()`
34-35-## Publishing
36-37-Once you've got your feed generator up and running and have it exposed to the internet, you can publish the feed using the script from the official BSky repo [here](https://github.com/bluesky-social/feed-generator/blob/main/scripts/publishFeedGen.ts).
3839-Your feed will be published under _your_ DID and should show up in your profile under the `feeds` tab.
04041-## Architecture
4243-This repo is structured to abstract away a `Feed` interface that allows for you to add all sorts of feeds to the router.
04445-These feeds can be simple static feeds like the `pkg/feeds/static/feed.go` implementation, or they can be much more complex feeds that draw on different data sources and filter them in cool ways to produce pages of feed items.
4647-The `Feed` interface is defined by any struct implementing two functions:
00000000004849-``` go
50-type Feed interface {
51- GetPage(ctx context.Context, feed string, userDID string, limit int64, cursor string) (feedPosts []*appbsky.FeedDefs_SkeletonFeedPost, newCursor *string, err error)
52- Describe(ctx context.Context) ([]appbsky.FeedDescribeFeedGenerator_Feed, error)
53-}
54```
05556-`GetPage` gets a page of a feed for a given user with the limit and cursor provided, this is the main function that serves posts to a user.
000000000000000005758-`Describe` is used by the router to advertise what feeds are available, for foward compatibility, `Feed`s should be self describing in case this endpoint allows more details about feeds to be provided.
05960-You can configure external resources and requirements in your Feed implementation before `Adding` the feed to the `FeedRouter` with `feedRouter.AddFeed([]string{"{feed_name}"}, feedInstance)`
06162-This `Feed` interface is somewhat flexible right now but it could be better. I'm not sure if it will change in the future so keep that in mind when using this template.
06364-- This has since been updated to allow a Feed to take in a feed name when generating a page and register multiple aliases for feeds that are supported.
···1+# Rinds
2+A collection of feeds under one roof.
00034+I don't like Docker and I don't need to compile this, okay thanks!
56## Running
7+aint that hard, go install go, setup postgres, and generate the required feed definitions under your user account. you can use https://pdsls.dev to generate a `app.bsky.feed.generator` record.
8+Set the `rkey` to the desired short url value. itll look like:
9+```
10+https://bsky.app/profile/{you}/feed/{rkey}
11+```
12+for the contents you can use the example below
13+```
14+{
15+ "did": "did:web:${INSERT DID:WEB HERE}",
16+ "$type": "app.bsky.feed.generator",
17+ "createdAt": "2025-01-21T11:33:02.396Z",
18+ "description": "wowww very descriptive",
19+ "displayName": "Cool Feed Name",
20+}
21+```
00000000000002223+## Env
24+You can check out `.env.example` for an example
2502627+## Postgres
28+Be sure to set up `.env` correctly
2930+All relevant tables should be created automatically when needed.
3132+## Index
33+You should start Postgres first
34+Then go run the firehose ingester in
35+```
36+cd ./indexer
37+```
38+and go compile it
39+```
40+go build -o indexer ./indexer.go && export $(grep -v '^#' ./../.env | xargs) && ./indexer
41+```
42+after it has been compiled, you can use `rerun.sh` to ensure it will automatically recover after failure
4344+## Serve
45+Make sure the indexer (or at least Postgres) is running first:
46+```
47+go build -o feedgen cmd/main.go && export $(grep -v '^#' ./.env | xargs) && ./feedgen
048```
49+the logs are pretty verbose imo, fyi
5051+## Todo
52+- [ ] Faster Indexing
53+- [ ] Proper Up-to-Date Following Indexing
54+- [x] Repost Indicators
55+- [ ] Cache Timeouts
56+ - [x] Likes
57+ - [x] Posts
58+ - [x] Feed Caches
59+ - [ ] Followings
60+- [ ] More Fresh Feed Variants
61+ - [ ] unFresh
62+ - [x] +9 hrs
63+ - [ ] Glimpse
64+ - [ ] Media
65+ - [ ] Fresh: Gram
66+ - [ ] Fresh: Tube
67+ - [ ] Fresh: Media Only
68+ - [ ] Fresh: Text Only
6970+## Architecture
71+Based on [go-bsky-feed-generator](https://github.com/ericvolp12/go-bsky-feed-generator). Read the README in the linked repo for more info about how it all works.
7273+### /feeds/static
74+Basic example feed from the template. Kept as a sanity check if all else seems to fail.
7576+### /feeds/fresh
77+Fresh feeds, all based around a shared Following feed builder and logic to set posts as viewed. May contain some remnant old references to the old name "rinds".
780
···1+package main
2+3+import (
4+ "context"
5+ "database/sql"
6+ "fmt"
7+ "log"
8+ "os"
9+ "time"
10+11+ "github.com/gorilla/websocket"
12+ "github.com/lib/pq"
13+ _ "github.com/lib/pq"
14+)
15+16+const wsUrl = "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.feed.like"
17+18+type LikeMessage struct {
19+ Did string `json:"did"`
20+ TimeUs int64 `json:"time_us"`
21+ Kind string `json:"kind"`
22+ Commit Commit `json:"commit"`
23+}
24+25+type Commit struct {
26+ Rev string `json:"rev"`
27+ Operation string `json:"operation"`
28+ Collection string `json:"collection"`
29+ RKey string `json:"rkey"`
30+ Record LikeRecord `json:"record"`
31+ CID string `json:"cid"`
32+}
33+34+type LikeRecord struct {
35+ Type string `json:"$type"`
36+ CreatedAt string `json:"createdAt"`
37+ Subject LikeSubject `json:"subject"`
38+ Reply *Reply `json:"reply,omitempty"`
39+}
40+41+type Reply struct {
42+ Parent ReplySubject `json:"parent"`
43+ Root ReplySubject `json:"root"`
44+}
45+46+type LikeSubject struct {
47+ CID string `json:"cid"`
48+ URI string `json:"uri"`
49+}
50+51+type ReplySubject struct {
52+ CID string `json:"cid"`
53+ URI string `json:"uri"`
54+}
55+56+var lastLoggedSecond int64 // Keep track of the last logged second
57+58+var (
59+ postBatch []Post
60+ likeBatch []Like
61+ batchInsertSize = 1000 // Adjust the batch size as needed
62+ batchInterval = 30 * time.Second // Flush every 30 seconds
63+)
64+65+type Post struct {
66+ RelAuthor string
67+ PostUri string
68+ RelDate int64
69+ IsRepost bool
70+ RepostUri string
71+ ReplyTo string
72+}
73+74+type Like struct {
75+ RelAuthor string
76+ PostUri string
77+ RelDate int64
78+}
79+80+func getLastCursor(db *sql.DB) int64 {
81+ var lastCursor int64
82+ err := db.QueryRow("SELECT lastCursor FROM cursor WHERE id = 1").Scan(&lastCursor)
83+ if err != nil {
84+ if err == sql.ErrNoRows {
85+ log.Println("Cursor table is empty; starting fresh.")
86+ return 0
87+ }
88+ log.Fatalf("Error fetching last cursor: %v", err)
89+ }
90+ return lastCursor
91+}
92+93+func main() {
94+ // Connect to Postgres // Open the database connection
95+ dbHost := os.Getenv("DB_HOST")
96+ dbUser := os.Getenv("DB_USER")
97+ dbName := os.Getenv("DB_NAME")
98+ dbPassword := os.Getenv("DB_PASSWORD")
99+ db, err := sql.Open("postgres", fmt.Sprintf("user=%s dbname=%s host=%s password=%s sslmode=disable", dbUser, dbName, dbHost, dbPassword))
100+101+ if err != nil {
102+ log.Fatalf("Failed to connect to Postgres: %v", err)
103+ }
104+ defer db.Close()
105+106+ // Ensure tables exist
107+ createTables(db)
108+109+ // Start the cleanup job
110+ go startCleanupJob(db)
111+112+ // Start the batch insert job
113+ go startBatchInsertJob(db)
114+115+ // Start the batch insert job for likes
116+ go startBatchInsertLikesJob(db)
117+118+ // Retrieve the last cursor
119+ lastCursor := getLastCursor(db)
120+121+ // If the cursor is older than 24 hours, skip it
122+ if lastCursor > 0 {
123+ cursorTime := time.UnixMicro(lastCursor)
124+ if time.Since(cursorTime) > 24*time.Hour {
125+ log.Printf("Cursor is older than 24 hours (%s); skipping it.", cursorTime.Format("2006-01-02 15:04:05"))
126+ lastCursor = 0 // Ignore this cursor
127+ } else {
128+ log.Printf("Resuming from cursor: %d (%s)", lastCursor, cursorTime.Format("2006-01-02 15:04:05"))
129+ }
130+ }
131+132+ // WebSocket URL with cursor if available
133+ wsFullUrl := wsUrl
134+ if lastCursor > 0 {
135+ wsFullUrl += "&cursor=" + fmt.Sprintf("%d", lastCursor)
136+ }
137+138+ // Connect to WebSocket
139+ conn, _, err := websocket.DefaultDialer.Dial(wsFullUrl, nil)
140+ if err != nil {
141+ log.Fatalf("WebSocket connection error: %v", err)
142+ }
143+ defer conn.Close()
144+145+ //print wsFullUrl
146+ log.Printf("Connected to WebSocket: %s", wsFullUrl)
147+148+ log.Println("Listening for WebSocket messages...")
149+150+ // Process WebSocket messages
151+ for {
152+ var msg LikeMessage
153+ err := conn.ReadJSON(&msg)
154+ if err != nil {
155+ log.Printf("Error reading WebSocket message: %v", err)
156+ continue
157+ }
158+159+ processMessage(db, msg)
160+ }
161+}
162+163+func createTables(db *sql.DB) {
164+ _, err := db.Exec(`
165+ CREATE TABLE IF NOT EXISTS posts (
166+ id SERIAL PRIMARY KEY,
167+ rel_author TEXT NOT NULL,
168+ post_uri TEXT NOT NULL,
169+ rel_date BIGINT NOT NULL,
170+ is_repost BOOLEAN NOT NULL DEFAULT FALSE,
171+ repost_uri TEXT,
172+ reply_to TEXT,
173+ UNIQUE(rel_author, post_uri, rel_date)
174+ );
175+ `)
176+ if err != nil {
177+ log.Fatalf("Error creating 'posts' table: %v", err)
178+ }
179+180+ _, err = db.Exec(`
181+ CREATE TABLE IF NOT EXISTS likes (
182+ id SERIAL PRIMARY KEY,
183+ rel_author TEXT NOT NULL,
184+ post_uri TEXT NOT NULL,
185+ rel_date BIGINT NOT NULL
186+ );
187+ `)
188+ if err != nil {
189+ log.Fatalf("Error creating 'posts' table: %v", err)
190+ }
191+192+ // Create a cursor table with a single-row constraint
193+ _, err = db.Exec(`
194+ CREATE TABLE IF NOT EXISTS cursor (
195+ id INT PRIMARY KEY CHECK (id = 1),
196+ lastCursor BIGINT NOT NULL
197+ );
198+ `)
199+ if err != nil {
200+ log.Fatalf("Error creating 'cursor' table: %v", err)
201+ }
202+203+ // Ensure the cursor table always has exactly one row
204+ _, err = db.Exec(`
205+ INSERT INTO cursor (id, lastCursor)
206+ VALUES (1, 0)
207+ ON CONFLICT (id) DO NOTHING;
208+ `)
209+ if err != nil {
210+ log.Fatalf("Error initializing cursor table: %v", err)
211+ }
212+}
213+func processMessage(db *sql.DB, msg LikeMessage) {
214+ // Convert cursor to time
215+ cursorTime := time.UnixMicro(msg.TimeUs)
216+217+ // Get the whole second as a Unix timestamp
218+ currentSecond := cursorTime.Unix()
219+220+ // Check if this second has already been logged
221+ if currentSecond != lastLoggedSecond && cursorTime.Nanosecond() >= 100_000_000 && cursorTime.Nanosecond() < 200_000_000 {
222+ // Update the last logged second
223+ lastLoggedSecond = currentSecond
224+225+ // Log only once per second
226+ humanReadableTime := cursorTime.Format("2006-01-02 15:04:05.000")
227+ log.Printf("Cursor (time_us): %d, Human-readable time: %s", msg.TimeUs, humanReadableTime)
228+ }
229+230+ // Save the record
231+ record := msg.Commit.Record
232+ postUri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", msg.Did, msg.Commit.RKey)
233+ repostUri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", msg.Did, msg.Commit.RKey)
234+ reply := ""
235+ if msg.Commit.Record.Reply != nil {
236+ reply = fmt.Sprintf("Parent: %s, Root: %s", msg.Commit.Record.Reply.Parent.URI, msg.Commit.Record.Reply.Root.URI)
237+ }
238+239+ switch msg.Commit.Collection {
240+ case "app.bsky.feed.post":
241+ if msg.Commit.Operation == "create" {
242+ postBatch = append(postBatch, Post{msg.Did, postUri, msg.TimeUs, false, "", reply})
243+ } else if msg.Commit.Operation == "delete" {
244+ deletePost(db, msg.Did, postUri, msg.TimeUs)
245+ }
246+ case "app.bsky.feed.repost":
247+ if record.Subject.URI != "" {
248+ if msg.Commit.Operation == "create" {
249+ postBatch = append(postBatch, Post{msg.Did, record.Subject.URI, msg.TimeUs, true, repostUri, ""})
250+ } else if msg.Commit.Operation == "delete" {
251+ deletePost(db, msg.Did, record.Subject.URI, msg.TimeUs)
252+ }
253+ }
254+ case "app.bsky.feed.like":
255+ if record.Subject.URI != "" {
256+ if msg.Commit.Operation == "create" {
257+ likeBatch = append(likeBatch, Like{msg.Did, record.Subject.URI, msg.TimeUs})
258+ } else if msg.Commit.Operation == "delete" {
259+ deleteLike(db, msg.Did, record.Subject.URI)
260+ }
261+ }
262+ default:
263+ //log.Printf("Unknown collection: %s", msg.Commit.Collection)
264+ }
265+266+ // Update the cursor in the single-row table
267+ _, err := db.Exec(`
268+ UPDATE cursor SET lastCursor = $1 WHERE id = 1;
269+ `, msg.TimeUs)
270+ if err != nil {
271+ log.Printf("Error updating cursor: %v", err)
272+ }
273+}
274+275+func deletePost(db *sql.DB, relAuthor, postUri string, relDate int64) {
276+ _, err := db.Exec(`
277+ DELETE FROM posts WHERE rel_author = $1 AND post_uri = $2 AND rel_date = $3;
278+ `, relAuthor, postUri, relDate)
279+ if err != nil {
280+ log.Printf("Error deleting post: %v", err)
281+ }
282+}
283+284+func deleteLike(db *sql.DB, relAuthor, postUri string) {
285+ _, err := db.Exec(`
286+ DELETE FROM likes WHERE rel_author = $1 AND post_uri = $2;
287+ `, relAuthor, postUri)
288+ if err != nil {
289+ log.Printf("Error deleting like: %v", err)
290+ }
291+}
292+293+func startCleanupJob(db *sql.DB) {
294+ ticker := time.NewTicker(1 * time.Hour)
295+ defer ticker.Stop()
296+297+ for range ticker.C {
298+ cleanupOldPosts(db)
299+ if err := cleanOldFeedCaches(context.Background(), db); err != nil {
300+ log.Printf("Error cleaning old feed caches: %v\n", err)
301+ }
302+ }
303+}
304+305+func cleanupOldPosts(db *sql.DB) {
306+ threshold := time.Now().Add(-24 * time.Hour).UnixMicro()
307+ _, err := db.Exec(`
308+ DELETE FROM posts WHERE rel_date < $1;
309+ `, threshold)
310+ if err != nil {
311+ log.Printf("Error deleting old posts: %v", err)
312+ } else {
313+ log.Printf("Deleted posts older than 24 hours.")
314+ }
315+}
316+317+func startBatchInsertJob(db *sql.DB) {
318+ ticker := time.NewTicker(batchInterval)
319+ defer ticker.Stop()
320+321+ for range ticker.C {
322+ if len(postBatch) >= batchInsertSize {
323+ batchInsertPosts(db)
324+ }
325+ }
326+}
327+328+func batchInsertPosts(db *sql.DB) {
329+ tx, err := db.Begin()
330+ if err != nil {
331+ log.Printf("Error starting transaction: %v", err)
332+ return
333+ }
334+335+ stmt, err := tx.Prepare(`
336+ INSERT INTO posts (rel_author, post_uri, rel_date, is_repost, repost_uri, reply_to)
337+ VALUES ($1, $2, $3, $4, $5, $6)
338+ ON CONFLICT (rel_author, post_uri, rel_date) DO NOTHING;
339+ `)
340+ if err != nil {
341+ log.Printf("Error preparing statement: %v", err)
342+ return
343+ }
344+ defer stmt.Close()
345+346+ for _, post := range postBatch {
347+ _, err := stmt.Exec(post.RelAuthor, post.PostUri, post.RelDate, post.IsRepost, post.RepostUri, post.ReplyTo)
348+ if err != nil {
349+ log.Printf("Error executing statement: %v", err)
350+ }
351+ }
352+353+ err = tx.Commit()
354+ if err != nil {
355+ log.Printf("Error committing transaction: %v", err)
356+ }
357+358+ // Clear the batch
359+ postBatch = postBatch[:0]
360+}
361+362+func startBatchInsertLikesJob(db *sql.DB) {
363+ ticker := time.NewTicker(1 * time.Second)
364+ defer ticker.Stop()
365+366+ for range ticker.C {
367+ if len(likeBatch) > 0 {
368+ batchInsertLikes(db)
369+ }
370+ }
371+}
372+373+func batchInsertLikes(db *sql.DB) {
374+ tx, err := db.Begin()
375+ if err != nil {
376+ log.Printf("Error starting transaction: %v", err)
377+ return
378+ }
379+380+ stmt, err := tx.Prepare(`
381+ INSERT INTO likes (rel_author, post_uri, rel_date)
382+ VALUES ($1, $2, $3)
383+ ON CONFLICT (rel_author, post_uri) DO NOTHING;
384+ `)
385+ if err != nil {
386+ log.Printf("Error preparing statement: %v", err)
387+ return
388+ }
389+ defer stmt.Close()
390+391+ for _, like := range likeBatch {
392+ _, err := stmt.Exec(like.RelAuthor, like.PostUri, like.RelDate)
393+ if err != nil {
394+ log.Printf("Error executing statement: %v", err)
395+ }
396+ }
397+398+ err = tx.Commit()
399+ if err != nil {
400+ log.Printf("Error committing transaction: %v", err)
401+ }
402+403+ // Clear the batch
404+ likeBatch = likeBatch[:0]
405+}
406+407+func cleanOldFeedCaches(ctx context.Context, db *sql.DB) error {
408+ //log
409+ log.Println("Cleaning old feed caches")
410+ // Get the current time minus 24 hours
411+ expirationTime := time.Now().Add(-24 * time.Hour)
412+413+ // Get all tables from cachetimeout that are older than 24 hours
414+ rows, err := db.QueryContext(ctx, `
415+ SELECT table_name
416+ FROM cachetimeout
417+ WHERE creation_time < $1
418+ `, expirationTime)
419+ if err != nil {
420+ return fmt.Errorf("error querying cachetimeout table: %w", err)
421+ }
422+ defer rows.Close()
423+424+ var tablesToDelete []string
425+ for rows.Next() {
426+ var tableName string
427+ if err := rows.Scan(&tableName); err != nil {
428+ return fmt.Errorf("error scanning table name: %w", err)
429+ }
430+ tablesToDelete = append(tablesToDelete, tableName)
431+ }
432+433+ if err := rows.Err(); err != nil {
434+ return fmt.Errorf("error iterating cachetimeout rows: %w", err)
435+ }
436+437+ // Get all feedcache_* tables that do not have an entry in cachetimeout
438+ rows, err = db.QueryContext(ctx, `
439+ SELECT table_name
440+ FROM information_schema.tables
441+ WHERE table_name LIKE 'feedcache_%'
442+ AND table_name NOT IN (SELECT table_name FROM cachetimeout)
443+ `)
444+ if err != nil {
445+ return fmt.Errorf("error querying feedcache tables: %w", err)
446+ }
447+ defer rows.Close()
448+449+ for rows.Next() {
450+ var tableName string
451+ if err := rows.Scan(&tableName); err != nil {
452+ return fmt.Errorf("error scanning table name: %w", err)
453+ }
454+ tablesToDelete = append(tablesToDelete, tableName)
455+ }
456+457+ if err := rows.Err(); err != nil {
458+ return fmt.Errorf("error iterating feedcache rows: %w", err)
459+ }
460+461+ // Drop the old tables and remove their entries from cachetimeout
462+ for _, tableName := range tablesToDelete {
463+ _, err := db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", pq.QuoteIdentifier(tableName)))
464+ if err != nil {
465+ return fmt.Errorf("error dropping table %s: %w", tableName, err)
466+ }
467+ _, err = db.ExecContext(ctx, "DELETE FROM cachetimeout WHERE table_name = $1", tableName)
468+ if err != nil {
469+ return fmt.Errorf("error deleting from cachetimeout table: %w", err)
470+ }
471+ }
472+473+ return nil
474+}
+11
indexer/rerun.sh
···00000000000
···1+#!/bin/bash
2+3+# Infinite loop to rerun the Go program
4+while true; do
5+ echo "Starting Go program..."
6+ go run indexer.go
7+8+ # Exit message
9+ echo "Program exited. Restarting in 5 seconds..."
10+ sleep 5
11+done
···115 accessToken := authHeaderParts[1]
116117 parser := jwt.Parser{
118+ ValidMethods: []string{es256k.SigningMethodES256K.Alg()},
119+ SkipClaimsValidation: true, // IM SORRY I HAD TO, MY VPS IS ACTING STRANGE. I THINK ITS FINE
120 }
121122 token, err := parser.ParseWithClaims(accessToken, claims, func(token *jwt.Token) (interface{}, error) {