this repo has no description
1package jetstream 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "os" 8 "os/signal" 9 "sync" 10 "syscall" 11 "time" 12 13 "github.com/bluesky-social/jetstream/pkg/client" 14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 15 "github.com/bluesky-social/jetstream/pkg/models" 16 "tangled.sh/tangled.sh/core/log" 17) 18 19type DB interface { 20 GetLastTimeUs() (int64, error) 21 SaveLastTimeUs(int64) error 22} 23 24type Set[T comparable] map[T]struct{} 25 26type JetstreamClient struct { 27 cfg *client.ClientConfig 28 client *client.Client 29 ident string 30 l *slog.Logger 31 32 wantedDids Set[string] 33 db DB 34 waitForDid bool 35 mu sync.RWMutex 36 37 cancel context.CancelFunc 38 cancelMu sync.Mutex 39} 40 41func (j *JetstreamClient) AddDid(did string) { 42 if did == "" { 43 return 44 } 45 46 j.mu.Lock() 47 j.wantedDids[did] = struct{}{} 48 j.mu.Unlock() 49} 50 51type processor func(context.Context, *models.Event) error 52 53func (j *JetstreamClient) withDidFilter(processFunc processor) processor { 54 // since this closure references j.WantedDids; it should auto-update 55 // existing instances of the closure when j.WantedDids is mutated 56 return func(ctx context.Context, evt *models.Event) error { 57 if _, ok := j.wantedDids[evt.Did]; ok { 58 return processFunc(ctx, evt) 59 } else { 60 return nil 61 } 62 } 63} 64 65func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid bool) (*JetstreamClient, error) { 66 if cfg == nil { 67 cfg = client.DefaultClientConfig() 68 cfg.WebsocketURL = endpoint 69 cfg.WantedCollections = collections 70 } 71 72 return &JetstreamClient{ 73 cfg: cfg, 74 ident: ident, 75 db: db, 76 l: logger, 77 wantedDids: make(map[string]struct{}), 78 79 // This will make the goroutine in StartJetstream wait until 80 // j.wantedDids has been populated, typically using addDids. 81 waitForDid: waitForDid, 82 }, nil 83} 84 85// StartJetstream starts the jetstream client and processes events using the provided processFunc. 86// The caller is responsible for saving the last time_us to the database (just use your db.UpdateLastTimeUs). 87func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 88 logger := j.l 89 90 sched := sequential.NewScheduler(j.ident, logger, j.withDidFilter(processFunc)) 91 92 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched) 93 if err != nil { 94 return fmt.Errorf("failed to create jetstream client: %w", err) 95 } 96 j.client = client 97 98 go func() { 99 if j.waitForDid { 100 for len(j.wantedDids) == 0 { 101 time.Sleep(time.Second) 102 } 103 } 104 logger.Info("done waiting for did") 105 106 go j.periodicLastTimeSave(ctx) 107 j.saveIfKilled(ctx) 108 109 j.connectAndRead(ctx) 110 }() 111 112 return nil 113} 114 115func (j *JetstreamClient) connectAndRead(ctx context.Context) { 116 l := log.FromContext(ctx) 117 for { 118 cursor := j.getLastTimeUs(ctx) 119 120 connCtx, cancel := context.WithCancel(ctx) 121 j.cancelMu.Lock() 122 j.cancel = cancel 123 j.cancelMu.Unlock() 124 125 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil { 126 l.Error("error reading jetstream", "error", err) 127 cancel() 128 continue 129 } 130 131 select { 132 case <-ctx.Done(): 133 l.Info("context done, stopping jetstream") 134 return 135 case <-connCtx.Done(): 136 l.Info("connection context done, reconnecting") 137 continue 138 } 139 } 140} 141 142// save cursor periodically 143func (j *JetstreamClient) periodicLastTimeSave(ctx context.Context) { 144 ticker := time.NewTicker(time.Minute) 145 defer ticker.Stop() 146 147 for { 148 select { 149 case <-ctx.Done(): 150 return 151 case <-ticker.C: 152 j.db.SaveLastTimeUs(time.Now().UnixMicro()) 153 } 154 } 155} 156 157func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 { 158 l := log.FromContext(ctx) 159 lastTimeUs, err := j.db.GetLastTimeUs() 160 if err != nil { 161 l.Warn("couldn't get last time us, starting from now", "error", err) 162 lastTimeUs = time.Now().UnixMicro() 163 err = j.db.SaveLastTimeUs(lastTimeUs) 164 if err != nil { 165 l.Error("failed to save last time us", "error", err) 166 } 167 } 168 169 // If last time is older than 2 days, start from now 170 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 { 171 lastTimeUs = time.Now().UnixMicro() 172 l.Warn("last time us is older than 2 days; discarding that and starting from now") 173 err = j.db.SaveLastTimeUs(lastTimeUs) 174 if err != nil { 175 l.Error("failed to save last time us", "error", err) 176 } 177 } 178 179 l.Info("found last time_us", "time_us", lastTimeUs) 180 return &lastTimeUs 181} 182 183func (j *JetstreamClient) saveIfKilled(ctx context.Context) context.Context { 184 ctxWithCancel, cancel := context.WithCancel(ctx) 185 186 sigChan := make(chan os.Signal, 1) 187 188 signal.Notify(sigChan, 189 syscall.SIGINT, 190 syscall.SIGTERM, 191 syscall.SIGQUIT, 192 syscall.SIGHUP, 193 syscall.SIGKILL, 194 syscall.SIGSTOP, 195 ) 196 197 go func() { 198 sig := <-sigChan 199 j.l.Info("Received signal, initiating graceful shutdown", "signal", sig) 200 201 lastTimeUs := time.Now().UnixMicro() 202 if err := j.db.SaveLastTimeUs(lastTimeUs); err != nil { 203 j.l.Error("Failed to save last time during shutdown", "error", err) 204 } 205 j.l.Info("Saved lastTimeUs before shutdown", "lastTimeUs", lastTimeUs) 206 207 j.cancelMu.Lock() 208 if j.cancel != nil { 209 j.cancel() 210 } 211 j.cancelMu.Unlock() 212 213 cancel() 214 215 os.Exit(0) 216 }() 217 218 return ctxWithCancel 219}