A demo of a Bluesky feed generator in Go
1package feed
2
3import (
4 "context"
5 "encoding/json"
6 "strings"
7
8 "fmt"
9 "log/slog"
10 "time"
11
12 apibsky "github.com/bluesky-social/indigo/api/bsky"
13 "github.com/bluesky-social/jetstream/pkg/client"
14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
15 "github.com/bluesky-social/jetstream/pkg/models"
16)
17
18// JetstreamConsumer is responsible for consuming from a jetstream instance
19type JetstreamConsumer struct {
20 cfg *client.ClientConfig
21 handler *Handler
22 logger *slog.Logger
23}
24
25// NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function
26func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer {
27 cfg := client.DefaultClientConfig()
28 if jsAddr != "" {
29 cfg.WebsocketURL = jsAddr
30 }
31 cfg.WantedCollections = []string{
32 "app.bsky.feed.post",
33 }
34 cfg.WantedDids = []string{}
35
36 return &JetstreamConsumer{
37 cfg: cfg,
38 logger: logger,
39 handler: handler,
40 }
41}
42
43// Consume will connect to a Jetstream client and start to consume and handle messages from it
44func (c *JetstreamConsumer) Consume(ctx context.Context) error {
45 scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent)
46 defer scheduler.Shutdown()
47
48 client, err := client.NewClient(c.cfg, c.logger, scheduler)
49 if err != nil {
50 return fmt.Errorf("failed to create client: %w", err)
51 }
52
53 cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
54
55 if err := client.ConnectAndRead(ctx, &cursor); err != nil {
56 return fmt.Errorf("connect and read: %w", err)
57 }
58
59 slog.Info("stopping consume")
60 return nil
61}
62
63// Handler is responsible for handling a message consumed from Jetstream
64type Handler struct {
65 store PostStore
66}
67
68// NewFeedHandler returns a new handler
69func NewFeedHandler(store PostStore) *Handler {
70 return &Handler{store: store}
71}
72
73// HandleEvent will handle an event based on the event's commit operation
74func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error {
75 if event.Commit == nil {
76 return nil
77 }
78
79 switch event.Commit.Operation {
80 case models.CommitOperationCreate:
81 return h.handleCreateEvent(ctx, event)
82 // TODO: handle deletes too
83 default:
84 return nil
85 }
86}
87
88func (h *Handler) handleCreateEvent(_ context.Context, event *models.Event) error {
89 if event.Commit.Collection != "app.bsky.feed.post" {
90 return nil
91 }
92
93 var bskyPost apibsky.FeedPost
94 if err := json.Unmarshal(event.Commit.Record, &bskyPost); err != nil {
95 // ignore this
96 return nil
97 }
98
99 // this is where logic goes for what posts you wish to store for a feed but for this example
100 // just look for any post that contains the #golang hashtag
101 if !strings.Contains(strings.ToLower(bskyPost.Text), "#golang") {
102 return nil
103 }
104
105 createdAt, err := time.Parse(time.RFC3339, bskyPost.CreatedAt)
106 if err != nil {
107 slog.Error("parsing createdAt time from post", "error", err, "timestamp", bskyPost.CreatedAt)
108 createdAt = time.Now().UTC()
109 }
110
111 postURI := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey)
112 post := Post{
113 RKey: event.Commit.RKey,
114 PostURI: postURI,
115 CreatedAt: createdAt.UnixMilli(),
116 }
117 err = h.store.CreatePost(post)
118 if err != nil {
119 slog.Error("error creating post in store", "error", err)
120 return nil
121 }
122 return nil
123}