A demo of a Bluesky feed generator in Go
at feed-interactions 123 lines 3.3 kB view raw
1package feed 2 3import ( 4 "context" 5 "encoding/json" 6 "strings" 7 8 "fmt" 9 "log/slog" 10 "time" 11 12 apibsky "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/bluesky-social/jetstream/pkg/client" 14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 15 "github.com/bluesky-social/jetstream/pkg/models" 16) 17 18// JetstreamConsumer is responsible for consuming from a jetstream instance 19type JetstreamConsumer struct { 20 cfg *client.ClientConfig 21 handler *Handler 22 logger *slog.Logger 23} 24 25// NewJetstreamConsumer configures a new jetstream consumer. To run or start you should call the Consume function 26func NewJetstreamConsumer(jsAddr string, logger *slog.Logger, handler *Handler) *JetstreamConsumer { 27 cfg := client.DefaultClientConfig() 28 if jsAddr != "" { 29 cfg.WebsocketURL = jsAddr 30 } 31 cfg.WantedCollections = []string{ 32 "app.bsky.feed.post", 33 } 34 cfg.WantedDids = []string{} 35 36 return &JetstreamConsumer{ 37 cfg: cfg, 38 logger: logger, 39 handler: handler, 40 } 41} 42 43// Consume will connect to a Jetstream client and start to consume and handle messages from it 44func (c *JetstreamConsumer) Consume(ctx context.Context) error { 45 scheduler := sequential.NewScheduler("jetstream", c.logger, c.handler.HandleEvent) 46 defer scheduler.Shutdown() 47 48 client, err := client.NewClient(c.cfg, c.logger, scheduler) 49 if err != nil { 50 return fmt.Errorf("failed to create client: %w", err) 51 } 52 53 cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 54 55 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 56 return fmt.Errorf("connect and read: %w", err) 57 } 58 59 slog.Info("stopping consume") 60 return nil 61} 62 63// Handler is responsible for handling a message consumed from Jetstream 64type Handler struct { 65 store PostStore 66} 67 68// NewFeedHandler returns a new handler 69func NewFeedHandler(store PostStore) *Handler { 70 return &Handler{store: store} 71} 72 73// HandleEvent will handle an event based on the event's commit operation 74func (h *Handler) HandleEvent(ctx context.Context, event *models.Event) error { 75 if event.Commit == nil { 76 return nil 77 } 78 79 switch event.Commit.Operation { 80 case models.CommitOperationCreate: 81 return h.handleCreateEvent(ctx, event) 82 // TODO: handle deletes too 83 default: 84 return nil 85 } 86} 87 88func (h *Handler) handleCreateEvent(_ context.Context, event *models.Event) error { 89 if event.Commit.Collection != "app.bsky.feed.post" { 90 return nil 91 } 92 93 var bskyPost apibsky.FeedPost 94 if err := json.Unmarshal(event.Commit.Record, &bskyPost); err != nil { 95 // ignore this 96 return nil 97 } 98 99 // this is where logic goes for what posts you wish to store for a feed but for this example 100 // just look for any post that contains the #golang hashtag 101 if !strings.Contains(strings.ToLower(bskyPost.Text), "#golang") { 102 return nil 103 } 104 105 createdAt, err := time.Parse(time.RFC3339, bskyPost.CreatedAt) 106 if err != nil { 107 slog.Error("parsing createdAt time from post", "error", err, "timestamp", bskyPost.CreatedAt) 108 createdAt = time.Now().UTC() 109 } 110 111 postURI := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey) 112 post := Post{ 113 RKey: event.Commit.RKey, 114 PostURI: postURI, 115 CreatedAt: createdAt.UnixMilli(), 116 } 117 err = h.store.CreatePost(post) 118 if err != nil { 119 slog.Error("error creating post in store", "error", err) 120 return nil 121 } 122 return nil 123}