···11+package main
22+33+import (
44+ "context"
55+ "encoding/json"
66+ "strings"
77+88+ "fmt"
99+ "log/slog"
1010+ "time"
1111+1212+ apibsky "github.com/bluesky-social/indigo/api/bsky"
1313+ "github.com/bluesky-social/jetstream/pkg/client"
1414+ "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
1515+ "github.com/bluesky-social/jetstream/pkg/models"
1616+)
1717+1818+// JetstreamConsumer is responsible for consuming from a jetstream instance
1919+type JetstreamConsumer struct {
2020+ cfg *client.ClientConfig
2121+ handler *Handler
2222+ logger *slog.Logger
2323+}
2424+2525+func newJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer {
2626+ cfg := client.DefaultClientConfig()
2727+ if jsAddr != "" {
2828+ cfg.WebsocketURL = jsAddr
2929+ }
3030+ cfg.WantedCollections = []string{
3131+ "app.bsky.feed.post",
3232+ }
3333+ cfg.WantedDids = []string{}
3434+3535+ return &JetstreamConsumer{
3636+ cfg: cfg,
3737+ logger: logger,
3838+ handler: handler,
3939+ }
4040+}
4141+4242+// Consume will connect to a Jetstream client and start to consume and handle messages from it
4343+func (c *JetstreamConsumer) Consume(ctx context.Context) error {
4444+ scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent)
4545+ defer scheduler.Shutdown()
4646+4747+ client, err := client.NewClient(c.cfg, c.logger, scheduler)
4848+ if err != nil {
4949+ return fmt.Errorf("failed to create client: %w", err)
5050+ }
5151+5252+ cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
5353+5454+ if err := client.ConnectAndRead(ctx, &cursor); err != nil {
5555+ return fmt.Errorf("connect and read: %w", err)
5656+ }
5757+5858+ slog.Info("stopping consume")
5959+ return nil
6060+}
6161+6262+// Handler is responsible for handling a message consumed from Jetstream
6363+type Handler struct {
6464+ store PostStore
6565+}
6666+6767+// HandleEvent will handle an event based on the event's commit operation
6868+func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error {
6969+ if event.Commit == nil {
7070+ return nil
7171+ }
7272+7373+ switch event.Commit.Operation {
7474+ case models.CommitOperationCreate:
7575+ return h.handleCreateEvent(ctx, event)
7676+ // TODO: handle deletes too
7777+ default:
7878+ return nil
7979+ }
8080+}
8181+8282+func (h *Handler) handleCreateEvent(_ context.Context, event *models.Event) error {
8383+ if event.Commit.Collection != "app.bsky.feed.post" {
8484+ return nil
8585+ }
8686+8787+ var bskyPost apibsky.FeedPost
8888+ if err := json.Unmarshal(event.Commit.Record, &bskyPost); err != nil {
8989+ // ignore this
9090+ return nil
9191+ }
9292+9393+ // look for any post that contains the #golang hashtag
9494+ if !strings.Contains(strings.ToLower(bskyPost.Text), "#golang") {
9595+ return nil
9696+ }
9797+9898+ createdAt, err := time.Parse(time.RFC3339, bskyPost.CreatedAt)
9999+ if err != nil {
100100+ slog.Error("parsing createdAt time from post", "error", err, "timestamp", bskyPost.CreatedAt)
101101+ createdAt = time.Now().UTC()
102102+ }
103103+104104+ postURI := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey)
105105+ post := Post{
106106+ RKey: event.Commit.RKey,
107107+ PostURI: postURI,
108108+ CreatedAt: createdAt.UnixMilli(),
109109+ }
110110+ err = h.store.CreatePost(post)
111111+ if err != nil {
112112+ slog.Error("error creating post in store", "error", err)
113113+ return nil
114114+ }
115115+ return nil
116116+}
database.db
This is a binary file and will not be displayed.
+118
database.go
···11+package main
22+33+import (
44+ "database/sql"
55+ "errors"
66+ "fmt"
77+ "log/slog"
88+ "os"
99+1010+ _ "github.com/glebarez/go-sqlite"
1111+)
1212+1313+// Database is a sqlite database
1414+type Database struct {
1515+ db *sql.DB
1616+}
1717+1818+func newDatabase(dbPath string) (*Database, error) {
1919+ if dbPath != ":memory:" {
2020+ err := createDbFile(dbPath)
2121+ if err != nil {
2222+ return nil, fmt.Errorf("create db file: %w", err)
2323+ }
2424+ }
2525+2626+ db, err := sql.Open("sqlite", dbPath)
2727+ if err != nil {
2828+ return nil, fmt.Errorf("open database: %w", err)
2929+ }
3030+3131+ err = db.Ping()
3232+ if err != nil {
3333+ return nil, fmt.Errorf("ping db: %w", err)
3434+ }
3535+3636+ err = createPostsTable(db)
3737+ if err != nil {
3838+ return nil, fmt.Errorf("creating posts table: %w", err)
3939+ }
4040+4141+ return &Database{db: db}, nil
4242+}
4343+4444+func (d *Database) close() {
4545+ err := d.db.Close()
4646+ if err != nil {
4747+ slog.Error("failed to close db", "error", err)
4848+ }
4949+}
5050+5151+func createDbFile(dbFilename string) error {
5252+ if _, err := os.Stat(dbFilename); !errors.Is(err, os.ErrNotExist) {
5353+ return nil
5454+ }
5555+5656+ f, err := os.Create(dbFilename)
5757+ if err != nil {
5858+ return fmt.Errorf("create db file : %w", err)
5959+ }
6060+ f.Close()
6161+ return nil
6262+}
6363+6464+func createPostsTable(db *sql.DB) error {
6565+ createTableSQL := `CREATE TABLE IF NOT EXISTS posts (
6666+ "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
6767+ "postRKey" TEXT,
6868+ "postURI" TEXT,
6969+ "createdAt" integer NOT NULL,
7070+ UNIQUE(postRKey)
7171+ );`
7272+7373+ slog.Info("Create posts table...")
7474+ statement, err := db.Prepare(createTableSQL)
7575+ if err != nil {
7676+ return fmt.Errorf("prepare DB statement to create posts table: %w", err)
7777+ }
7878+ _, err = statement.Exec()
7979+ if err != nil {
8080+ return fmt.Errorf("exec sql statement to create posts table: %w", err)
8181+ }
8282+ slog.Info("posts table created")
8383+8484+ return nil
8585+}
8686+8787+// CreatePost will insert a post into a database
8888+func (d *Database) CreatePost(post Post) error {
8989+ sql := `INSERT INTO posts (postRKey, postURI, createdAt) VALUES (?, ?, ?) ON CONFLICT(postRKey) DO NOTHING;`
9090+ _, err := d.db.Exec(sql, post.RKey, post.PostURI, post.CreatedAt)
9191+ if err != nil {
9292+ return fmt.Errorf("exec insert post: %w", err)
9393+ }
9494+ return nil
9595+}
9696+9797+// GetFeedPosts return a slice of posts
9898+func (d *Database) GetFeedPosts(cursor, limit int) ([]Post, error) {
9999+ sql := `SELECT id, postRKey, postURI, createdAt FROM posts
100100+ WHERE createdAt < ?
101101+ ORDER BY createdAt DESC LIMIT ?;`
102102+ rows, err := d.db.Query(sql, cursor, limit)
103103+ if err != nil {
104104+ return nil, fmt.Errorf("run query to get feed posts: %w", err)
105105+ }
106106+ defer rows.Close()
107107+108108+ posts := make([]Post, 0)
109109+ for rows.Next() {
110110+ var post Post
111111+ if err := rows.Scan(&post.ID, &post.RKey, &post.PostURI, &post.CreatedAt); err != nil {
112112+ return nil, fmt.Errorf("scan row: %w", err)
113113+ }
114114+ posts = append(posts, post)
115115+ }
116116+117117+ return posts, nil
118118+}
+72
feedgenerator.go
···11+package main
22+33+import (
44+ "context"
55+ "fmt"
66+ "log/slog"
77+ "strconv"
88+)
99+1010+// Post describes a Bluesky post
1111+type Post struct {
1212+ ID int
1313+ RKey string
1414+ PostURI string
1515+ UserDID string
1616+ CreatedAt int64
1717+}
1818+1919+// PostStore defines the interactions with a store
2020+type PostStore interface {
2121+ GetFeedPosts(cursor, limit int) ([]Post, error)
2222+ CreatePost(post Post) error
2323+}
2424+2525+// FeedGenerator is responsible for generating a feed
2626+type FeedGenerator struct {
2727+ store PostStore
2828+}
2929+3030+func newFeedGenerator(store PostStore) *FeedGenerator {
3131+ return &FeedGenerator{
3232+ store: store,
3333+ }
3434+}
3535+3636+// GetFeed will fetch a feed and build up a response that can be returned
3737+func (f *FeedGenerator) GetFeed(ctx context.Context, feed, cursor string, limit int) (FeedSkeletonReponse, error) {
3838+ resp := FeedSkeletonReponse{
3939+ Feed: make([]FeedSkeletonPost, 0),
4040+ }
4141+4242+ cursorInt, err := strconv.Atoi(cursor)
4343+ if err != nil && cursor != "" {
4444+ slog.Error("convert cursor to int", "error", err, "cursor value", cursor)
4545+ }
4646+ if cursorInt == 0 {
4747+ // if no cursor provided use a date waaaaay in the future to start the less than query
4848+ cursorInt = 9999999999999
4949+ }
5050+5151+ posts, err := f.store.GetFeedPosts(cursorInt, limit)
5252+ if err != nil {
5353+ return resp, fmt.Errorf("get feed from DB: %w", err)
5454+ }
5555+5656+ usersFeed := make([]FeedSkeletonPost, 0, len(posts))
5757+ for _, post := range posts {
5858+ usersFeed = append(usersFeed, FeedSkeletonPost{
5959+ Post: post.PostURI,
6060+ })
6161+ }
6262+6363+ resp.Feed = usersFeed
6464+6565+ // only set the return cursor if there was at least 1 record returned and that the len of records
6666+ // being returned is the same as the limit
6767+ if len(posts) > 0 && len(posts) == limit {
6868+ lastPost := posts[len(posts)-1]
6969+ resp.Cursor = fmt.Sprintf("%d", lastPost.CreatedAt)
7070+ }
7171+ return resp, nil
7272+}