A demo of a Bluesky feed generator in Go

some refactoring and added a script to regeister the feed

+92 -218
+7 -3
.env-example
··· 1 - FEED_DID_BASE="test" 2 - FEED_HOST_NAME="test" 3 - FEED_NAME="test-feed" 1 + BSKY_HANDLE="your-handle" 2 + BSKY_PASS="app password" 3 + FEED_HOST_NAME="demo-feed.com" 4 + FEED_NAME="demo-feed" 5 + FEED_DISPLAY_NAME="My demo feed" 6 + FEED_DESCRIPTION="This is a demo feed" 7 + FEED_DID="did:web:demo-feed.com"
+2
.gitignore
··· 1 1 .env 2 + database.db 3 + demo-feed-generator
+8 -2
consumer.go
··· 1 - package main 1 + package feed 2 2 3 3 import ( 4 4 "context" ··· 22 22 logger *slog.Logger 23 23 } 24 24 25 - func newJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer { 25 + // NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function 26 + func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer { 26 27 cfg := client.DefaultClientConfig() 27 28 if jsAddr != "" { 28 29 cfg.WebsocketURL = jsAddr ··· 62 63 // Handler is responsible for handling a message consumed from Jetstream 63 64 type Handler struct { 64 65 store PostStore 66 + } 67 + 68 + // NewFeedHandler returns a new handler 69 + func NewFeedHandler(store PostStore) *Handler { 70 + return &Handler{store: store} 65 71 } 66 72 67 73 // HandleEvent will handle an event based on the event's commit operation
database.db

This is a binary file and will not be displayed.

+12 -5
database.go
··· 1 - package main 1 + package feed 2 2 3 3 import ( 4 4 "database/sql" ··· 15 15 db *sql.DB 16 16 } 17 17 18 - func newDatabase(dbPath string) (*Database, error) { 18 + // NewDatabase will open a new database. It will ping the database to ensure it is available and error if not 19 + func NewDatabase(dbPath string) (*Database, error) { 19 20 if dbPath != ":memory:" { 20 21 err := createDbFile(dbPath) 21 22 if err != nil { ··· 41 42 return &Database{db: db}, nil 42 43 } 43 44 44 - func (d *Database) close() { 45 + // Close will cleanly stop the database connection 46 + func (d *Database) Close() { 45 47 err := d.db.Close() 46 48 if err != nil { 47 49 slog.Error("failed to close db", "error", err) ··· 57 59 if err != nil { 58 60 return fmt.Errorf("create db file : %w", err) 59 61 } 60 - f.Close() 62 + err = f.Close() 63 + if err != nil { 64 + return fmt.Errorf("failed to close DB file: %w", err) 65 + } 61 66 return nil 62 67 } 63 68 ··· 103 108 if err != nil { 104 109 return nil, fmt.Errorf("run query to get feed posts: %w", err) 105 110 } 106 - defer rows.Close() 111 + defer func() { 112 + _ = rows.Close() 113 + }() 107 114 108 115 posts := make([]Post, 0) 109 116 for rows.Next() {
-72
feedgenerator.go
··· 1 - package main 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "log/slog" 7 - "strconv" 8 - ) 9 - 10 - // Post describes a Bluesky post 11 - type Post struct { 12 - ID int 13 - RKey string 14 - PostURI string 15 - UserDID string 16 - CreatedAt int64 17 - } 18 - 19 - // PostStore defines the interactions with a store 20 - type PostStore interface { 21 - GetFeedPosts(cursor, limit int) ([]Post, error) 22 - CreatePost(post Post) error 23 - } 24 - 25 - // FeedGenerator is responsible for generating a feed 26 - type FeedGenerator struct { 27 - store PostStore 28 - } 29 - 30 - func newFeedGenerator(store PostStore) *FeedGenerator { 31 - return &FeedGenerator{ 32 - store: store, 33 - } 34 - } 35 - 36 - // GetFeed will fetch a feed and build up a response that can be returned 37 - func (f *FeedGenerator) GetFeed(ctx context.Context, feed, cursor string, limit int) (FeedSkeletonReponse, error) { 38 - resp := FeedSkeletonReponse{ 39 - Feed: make([]FeedSkeletonPost, 0), 40 - } 41 - 42 - cursorInt, err := strconv.Atoi(cursor) 43 - if err != nil && cursor != "" { 44 - slog.Error("convert cursor to int", "error", err, "cursor value", cursor) 45 - } 46 - if cursorInt == 0 { 47 - // if no cursor provided use a date waaaaay in the future to start the less than query 48 - cursorInt = 9999999999999 49 - } 50 - 51 - posts, err := f.store.GetFeedPosts(cursorInt, limit) 52 - if err != nil { 53 - return resp, fmt.Errorf("get feed from DB: %w", err) 54 - } 55 - 56 - usersFeed := make([]FeedSkeletonPost, 0, len(posts)) 57 - for _, post := range posts { 58 - usersFeed = append(usersFeed, FeedSkeletonPost{ 59 - Post: post.PostURI, 60 - }) 61 - } 62 - 63 - resp.Feed = usersFeed 64 - 65 - // only set the return cursor if there was at least 1 record returned and that the len of records 66 - // being returned is the same as the limit 67 - if len(posts) > 0 && len(posts) == limit { 68 - lastPost := posts[len(posts)-1] 69 - resp.Cursor = fmt.Sprintf("%d", lastPost.CreatedAt) 70 - } 71 - return resp, nil 72 - }
+41 -11
handlers.go
··· 1 - package main 1 + package feed 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 6 "fmt" 6 7 "log/slog" ··· 66 67 67 68 cursor := params.Get("cursor") 68 69 69 - resp, err := s.feeder.GetFeed(r.Context(), feed, cursor, limit) 70 + resp, err := s.getFeed(r.Context(), feed, cursor, limit) 70 71 if err != nil { 71 72 slog.Error("get feed", "error", err, "feed", feed) 72 73 http.Error(w, "error getting feed", http.StatusInternalServerError) ··· 103 104 DID: fmt.Sprintf("did:web:%s", s.feedHost), 104 105 Feeds: []Feed{ 105 106 { 106 - URI: fmt.Sprintf("at://%s/app.bsky.feed.generator/%s", s.feedDidBase, s.feedName), 107 + URI: fmt.Sprintf("at://%s/app.bsky.feed.generator/%s", s.feedHost, s.feedName), 107 108 }, 108 109 }, 109 110 } ··· 167 168 return limit, nil 168 169 } 169 170 170 - func checkUserAuth(r *http.Request) (string, error) { 171 - usersDID, err := getRequestUserDID(r) 172 - if err != nil { 173 - return "", fmt.Errorf("getting users did from request: %w", err) 174 - } 175 - return usersDID, nil 176 - } 177 - 178 171 func getRequestUserDID(r *http.Request) (string, error) { 179 172 headerValues := r.Header["Authorization"] 180 173 ··· 215 208 216 209 return string(syntax.DID(issVal)), nil 217 210 } 211 + 212 + func (s *Server) getFeed(ctx context.Context, feed, cursor string, limit int) (FeedSkeletonReponse, error) { 213 + resp := FeedSkeletonReponse{ 214 + Feed: make([]FeedSkeletonPost, 0), 215 + } 216 + 217 + cursorInt, err := strconv.Atoi(cursor) 218 + if err != nil && cursor != "" { 219 + slog.Error("convert cursor to int", "error", err, "cursor value", cursor) 220 + } 221 + if cursorInt == 0 { 222 + // if no cursor provided use a date waaaaay in the future to start the less than query 223 + cursorInt = 9999999999999 224 + } 225 + 226 + posts, err := s.postStore.GetFeedPosts(cursorInt, limit) 227 + if err != nil { 228 + return resp, fmt.Errorf("get feed from DB: %w", err) 229 + } 230 + 231 + usersFeed := make([]FeedSkeletonPost, 0, len(posts)) 232 + for _, post := range posts { 233 + usersFeed = append(usersFeed, FeedSkeletonPost{ 234 + Post: post.PostURI, 235 + }) 236 + } 237 + 238 + resp.Feed = usersFeed 239 + 240 + // only set the return cursor if there was at least 1 record returned and that the len of records 241 + // being returned is the same as the limit 242 + if len(posts) > 0 && len(posts) == limit { 243 + lastPost := posts[len(posts)-1] 244 + resp.Cursor = fmt.Sprintf("%d", lastPost.CreatedAt) 245 + } 246 + return resp, nil 247 + }
-111
main.go
··· 1 - package main 2 - 3 - import ( 4 - "context" 5 - "errors" 6 - "fmt" 7 - "log" 8 - "log/slog" 9 - "os" 10 - "os/signal" 11 - "path" 12 - "syscall" 13 - 14 - "github.com/avast/retry-go/v4" 15 - "github.com/joho/godotenv" 16 - ) 17 - 18 - const ( 19 - defaultJetstreamAddr = "wss://jetstream.atproto.tools/subscribe" 20 - serverPort = 443 // this must be the port value used. See https://docs.bsky.app/docs/starter-templates/custom-feeds#deploying-your-feed 21 - ) 22 - 23 - func main() { 24 - err := run() 25 - if err != nil { 26 - log.Fatal(err) 27 - } 28 - } 29 - 30 - func run() error { 31 - err := godotenv.Load() 32 - if err != nil && !os.IsNotExist(err) { 33 - return fmt.Errorf("Error loading .env file") 34 - } 35 - 36 - signals := make(chan os.Signal, 1) 37 - signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) 38 - 39 - feedDidBase := os.Getenv("FEED_DID_BASE") 40 - if feedDidBase == "" { 41 - return fmt.Errorf("FEED_DID_BASE not set") 42 - } 43 - feedHost := os.Getenv("FEED_HOST_NAME") 44 - if feedHost == "" { 45 - return fmt.Errorf("FEED_HOST_NAME not set") 46 - } 47 - feedName := os.Getenv("FEED_NAME") 48 - if feedHost == "" { 49 - return fmt.Errorf("FEED_NAME not set") 50 - } 51 - 52 - dbPath := os.Getenv("DATABASE_PATH") 53 - if dbPath == "" { 54 - dbPath = "./" 55 - } 56 - 57 - dbFilename := path.Join(dbPath, "database.db") 58 - database, err := newDatabase(dbFilename) 59 - if err != nil { 60 - return fmt.Errorf("create new store: %w", err) 61 - } 62 - defer database.close() 63 - 64 - feeder := newFeedGenerator(database) 65 - 66 - ctx, cancel := context.WithCancel(context.Background()) 67 - defer cancel() 68 - 69 - go consumeLoop(ctx, database) 70 - 71 - server, err := NewServer(serverPort, feedHost, feedDidBase, feedName, feeder) 72 - if err != nil { 73 - return fmt.Errorf("create new server: %w", err) 74 - } 75 - go func() { 76 - <-signals 77 - cancel() 78 - _ = server.Stop(context.Background()) 79 - }() 80 - 81 - server.Run() 82 - return nil 83 - } 84 - 85 - func consumeLoop(ctx context.Context, database *Database) { 86 - handler := Handler{ 87 - store: database, 88 - } 89 - 90 - jsServerAddr := os.Getenv("JS_SERVER_ADDR") 91 - if jsServerAddr == "" { 92 - jsServerAddr = defaultJetstreamAddr 93 - } 94 - 95 - consumer := newJetstreamConsumer(jsServerAddr, slog.Default(), &handler) 96 - 97 - _ = retry.Do(func() error { 98 - err := consumer.Consume(ctx) 99 - if err != nil { 100 - // if the context has been cancelled then it's time to exit 101 - if errors.Is(err, context.Canceled) { 102 - return nil 103 - } 104 - slog.Error("consume loop", "error", err) 105 - return err 106 - } 107 - return nil 108 - }, retry.Attempts(0)) // retry indefinitly until context canceled 109 - 110 - slog.Warn("exiting consume loop") 111 - }
+22 -14
server.go
··· 1 - package main 1 + package feed 2 2 3 3 import ( 4 4 "context" ··· 8 8 "net/http" 9 9 ) 10 10 11 - // Feeder describes building up a feed 12 - type Feeder interface { 13 - GetFeed(ctx context.Context, feed, cursor string, limit int) (FeedSkeletonReponse, error) 11 + // Post describes a Bluesky post 12 + type Post struct { 13 + ID int 14 + RKey string 15 + PostURI string 16 + UserDID string 17 + CreatedAt int64 18 + } 19 + 20 + // PostStore defines the interactions with a store 21 + type PostStore interface { 22 + GetFeedPosts(cursor, limit int) ([]Post, error) 23 + CreatePost(post Post) error 14 24 } 15 25 16 26 // Server is the feed server that will be called when a user requests to view a feed 17 27 type Server struct { 18 - httpsrv *http.Server 19 - feeder Feeder 20 - feedHost string 21 - feedDidBase string 22 - feedName string 28 + httpsrv *http.Server 29 + postStore PostStore 30 + feedHost string 31 + feedName string 23 32 } 24 33 25 34 // NewServer builds a server - call the Run function to start the server 26 - func NewServer(port int, feedHost, feedDidBase, feedName string, feeder Feeder) (*Server, error) { 35 + func NewServer(port int, feedHost, feedName string, postStore PostStore) (*Server, error) { 27 36 srv := &Server{ 28 - feedHost: feedHost, 29 - feedDidBase: feedDidBase, 30 - feedName: feedName, 31 - feeder: feeder, 37 + feedHost: feedHost, 38 + feedName: feedName, 39 + postStore: postStore, 32 40 } 33 41 34 42 mux := http.NewServeMux()