this repo has no description
1package jetstream
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/bluesky-social/jetstream/pkg/client"
10 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/sotangled/tangled/log"
13)
14
15type DB interface {
16 GetLastTimeUs() (int64, error)
17 SaveLastTimeUs(int64) error
18}
19
20type JetstreamClient struct {
21 cfg *client.ClientConfig
22 client *client.Client
23 ident string
24
25 db DB
26 reconnectCh chan struct{}
27 waitForDid bool
28 mu sync.RWMutex
29}
30
31func (j *JetstreamClient) AddDid(did string) {
32 if did == "" {
33 return
34 }
35 j.mu.Lock()
36 j.cfg.WantedDids = append(j.cfg.WantedDids, did)
37 j.mu.Unlock()
38 j.reconnectCh <- struct{}{}
39}
40
41func (j *JetstreamClient) UpdateDids(dids []string) {
42 j.mu.Lock()
43 for _, did := range dids {
44 if did != "" {
45 }
46 j.cfg.WantedDids = append(j.cfg.WantedDids, did)
47 }
48 j.mu.Unlock()
49 j.reconnectCh <- struct{}{}
50}
51
52func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB, waitForDid bool) (*JetstreamClient, error) {
53 if cfg == nil {
54 cfg = client.DefaultClientConfig()
55 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
56 cfg.WantedCollections = collections
57 }
58
59 return &JetstreamClient{
60 cfg: cfg,
61 ident: ident,
62 db: db,
63
64 // This will make the goroutine in StartJetstream wait until
65 // cfg.WantedDids has been populated, typically using UpdateDids.
66 waitForDid: waitForDid,
67 reconnectCh: make(chan struct{}, 1),
68 }, nil
69}
70
71// StartJetstream starts the jetstream client and processes events using the provided processFunc.
72// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs).
73func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
74 logger := log.FromContext(ctx)
75
76 sched := sequential.NewScheduler(j.ident, logger, processFunc)
77
78 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
79 if err != nil {
80 return fmt.Errorf("failed to create jetstream client: %w", err)
81 }
82 j.client = client
83
84 go func() {
85 lastTimeUs := j.getLastTimeUs(ctx)
86 if j.waitForDid {
87 for len(j.cfg.WantedDids) == 0 {
88 time.Sleep(time.Second)
89 }
90 }
91 logger.Info("done waiting for did")
92 j.connectAndRead(ctx, &lastTimeUs)
93 }()
94
95 return nil
96}
97
98func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) {
99 l := log.FromContext(ctx)
100 for {
101 select {
102 case <-j.reconnectCh:
103 l.Info("(re)connecting jetstream client")
104 j.client.Scheduler.Shutdown()
105 if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
106 l.Error("error reading jetstream", "error", err)
107 }
108 default:
109 if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
110 l.Error("error reading jetstream", "error", err)
111 }
112 }
113 }
114}
115
116func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 {
117 l := log.FromContext(ctx)
118 lastTimeUs, err := j.db.GetLastTimeUs()
119 if err != nil {
120 l.Warn("couldn't get last time us, starting from now", "error", err)
121 lastTimeUs = time.Now().UnixMicro()
122 err = j.db.SaveLastTimeUs(lastTimeUs)
123 if err != nil {
124 l.Error("failed to save last time us")
125 }
126 }
127
128 // If last time is older than a week, start from now
129 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
130 lastTimeUs = time.Now().UnixMicro()
131 l.Warn("last time us is older than a week. discarding that and starting from now")
132 err = j.db.SaveLastTimeUs(lastTimeUs)
133 if err != nil {
134 l.Error("failed to save last time us")
135 }
136 }
137
138 l.Info("found last time_us", "time_us", lastTimeUs)
139 return lastTimeUs
140}