this repo has no description
1package jetstream
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "os"
8 "os/signal"
9 "sync"
10 "syscall"
11 "time"
12
13 "github.com/bluesky-social/jetstream/pkg/client"
14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
15 "github.com/bluesky-social/jetstream/pkg/models"
16 "tangled.sh/tangled.sh/core/log"
17)
18
19type DB interface {
20 GetLastTimeUs() (int64, error)
21 SaveLastTimeUs(int64) error
22}
23
24type Set[T comparable] map[T]struct{}
25
26type JetstreamClient struct {
27 cfg *client.ClientConfig
28 client *client.Client
29 ident string
30 l *slog.Logger
31
32 wantedDids Set[string]
33 db DB
34 waitForDid bool
35 mu sync.RWMutex
36
37 cancel context.CancelFunc
38 cancelMu sync.Mutex
39}
40
41func (j *JetstreamClient) AddDid(did string) {
42 if did == "" {
43 return
44 }
45
46 j.mu.Lock()
47 j.wantedDids[did] = struct{}{}
48 j.mu.Unlock()
49}
50
51type processor func(context.Context, *models.Event) error
52
53func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
54 // since this closure references j.WantedDids; it should auto-update
55 // existing instances of the closure when j.WantedDids is mutated
56 return func(ctx context.Context, evt *models.Event) error {
57 if _, ok := j.wantedDids[evt.Did]; ok {
58 return processFunc(ctx, evt)
59 } else {
60 return nil
61 }
62 }
63}
64
65func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid bool) (*JetstreamClient, error) {
66 if cfg == nil {
67 cfg = client.DefaultClientConfig()
68 cfg.WebsocketURL = endpoint
69 cfg.WantedCollections = collections
70 }
71
72 return &JetstreamClient{
73 cfg: cfg,
74 ident: ident,
75 db: db,
76 l: logger,
77 wantedDids: make(map[string]struct{}),
78
79 // This will make the goroutine in StartJetstream wait until
80 // j.wantedDids has been populated, typically using addDids.
81 waitForDid: waitForDid,
82 }, nil
83}
84
85// StartJetstream starts the jetstream client and processes events using the provided processFunc.
86// The caller is responsible for saving the last time_us to the database (just use your db.UpdateLastTimeUs).
87func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
88 logger := j.l
89
90 sched := sequential.NewScheduler(j.ident, logger, j.withDidFilter(processFunc))
91
92 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
93 if err != nil {
94 return fmt.Errorf("failed to create jetstream client: %w", err)
95 }
96 j.client = client
97
98 go func() {
99 if j.waitForDid {
100 for len(j.wantedDids) == 0 {
101 time.Sleep(time.Second)
102 }
103 }
104 logger.Info("done waiting for did")
105
106 go j.periodicLastTimeSave(ctx)
107 j.saveIfKilled(ctx)
108
109 j.connectAndRead(ctx)
110 }()
111
112 return nil
113}
114
115func (j *JetstreamClient) connectAndRead(ctx context.Context) {
116 l := log.FromContext(ctx)
117 for {
118 cursor := j.getLastTimeUs(ctx)
119
120 connCtx, cancel := context.WithCancel(ctx)
121 j.cancelMu.Lock()
122 j.cancel = cancel
123 j.cancelMu.Unlock()
124
125 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil {
126 l.Error("error reading jetstream", "error", err)
127 cancel()
128 continue
129 }
130
131 select {
132 case <-ctx.Done():
133 l.Info("context done, stopping jetstream")
134 return
135 case <-connCtx.Done():
136 l.Info("connection context done, reconnecting")
137 continue
138 }
139 }
140}
141
142// save cursor periodically
143func (j *JetstreamClient) periodicLastTimeSave(ctx context.Context) {
144 ticker := time.NewTicker(time.Minute)
145 defer ticker.Stop()
146
147 for {
148 select {
149 case <-ctx.Done():
150 return
151 case <-ticker.C:
152 j.db.SaveLastTimeUs(time.Now().UnixMicro())
153 }
154 }
155}
156
157func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 {
158 l := log.FromContext(ctx)
159 lastTimeUs, err := j.db.GetLastTimeUs()
160 if err != nil {
161 l.Warn("couldn't get last time us, starting from now", "error", err)
162 lastTimeUs = time.Now().UnixMicro()
163 err = j.db.SaveLastTimeUs(lastTimeUs)
164 if err != nil {
165 l.Error("failed to save last time us", "error", err)
166 }
167 }
168
169 // If last time is older than 2 days, start from now
170 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 {
171 lastTimeUs = time.Now().UnixMicro()
172 l.Warn("last time us is older than 2 days; discarding that and starting from now")
173 err = j.db.SaveLastTimeUs(lastTimeUs)
174 if err != nil {
175 l.Error("failed to save last time us", "error", err)
176 }
177 }
178
179 l.Info("found last time_us", "time_us", lastTimeUs)
180 return &lastTimeUs
181}
182
183func (j *JetstreamClient) saveIfKilled(ctx context.Context) context.Context {
184 ctxWithCancel, cancel := context.WithCancel(ctx)
185
186 sigChan := make(chan os.Signal, 1)
187
188 signal.Notify(sigChan,
189 syscall.SIGINT,
190 syscall.SIGTERM,
191 syscall.SIGQUIT,
192 syscall.SIGHUP,
193 syscall.SIGKILL,
194 syscall.SIGSTOP,
195 )
196
197 go func() {
198 sig := <-sigChan
199 j.l.Info("Received signal, initiating graceful shutdown", "signal", sig)
200
201 lastTimeUs := time.Now().UnixMicro()
202 if err := j.db.SaveLastTimeUs(lastTimeUs); err != nil {
203 j.l.Error("Failed to save last time during shutdown", "error", err)
204 }
205 j.l.Info("Saved lastTimeUs before shutdown", "lastTimeUs", lastTimeUs)
206
207 j.cancelMu.Lock()
208 if j.cancel != nil {
209 j.cancel()
210 }
211 j.cancelMu.Unlock()
212
213 cancel()
214
215 os.Exit(0)
216 }()
217
218 return ctxWithCancel
219}