this repo has no description
1package jetstream 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 "time" 8 9 "github.com/bluesky-social/jetstream/pkg/client" 10 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/sotangled/tangled/log" 13) 14 15type DB interface { 16 GetLastTimeUs() (int64, error) 17 SaveLastTimeUs(int64) error 18} 19 20type JetstreamClient struct { 21 cfg *client.ClientConfig 22 client *client.Client 23 ident string 24 25 db DB 26 reconnectCh chan struct{} 27 waitForDid bool 28 mu sync.RWMutex 29} 30 31func (j *JetstreamClient) AddDid(did string) { 32 if did == "" { 33 return 34 } 35 j.mu.Lock() 36 j.cfg.WantedDids = append(j.cfg.WantedDids, did) 37 j.mu.Unlock() 38 j.reconnectCh <- struct{}{} 39} 40 41func (j *JetstreamClient) UpdateDids(dids []string) { 42 j.mu.Lock() 43 for _, did := range dids { 44 if did != "" { 45 } 46 j.cfg.WantedDids = append(j.cfg.WantedDids, did) 47 } 48 j.mu.Unlock() 49 j.reconnectCh <- struct{}{} 50} 51 52func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB, waitForDid bool) (*JetstreamClient, error) { 53 if cfg == nil { 54 cfg = client.DefaultClientConfig() 55 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 56 cfg.WantedCollections = collections 57 } 58 59 return &JetstreamClient{ 60 cfg: cfg, 61 ident: ident, 62 db: db, 63 64 // This will make the goroutine in StartJetstream wait until 65 // cfg.WantedDids has been populated, typically using UpdateDids. 66 waitForDid: waitForDid, 67 reconnectCh: make(chan struct{}, 1), 68 }, nil 69} 70 71// StartJetstream starts the jetstream client and processes events using the provided processFunc. 72// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs). 73func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 74 logger := log.FromContext(ctx) 75 76 sched := sequential.NewScheduler(j.ident, logger, processFunc) 77 78 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched) 79 if err != nil { 80 return fmt.Errorf("failed to create jetstream client: %w", err) 81 } 82 j.client = client 83 84 go func() { 85 lastTimeUs := j.getLastTimeUs(ctx) 86 if j.waitForDid { 87 for len(j.cfg.WantedDids) == 0 { 88 time.Sleep(time.Second) 89 } 90 } 91 logger.Info("done waiting for did") 92 j.connectAndRead(ctx, &lastTimeUs) 93 }() 94 95 return nil 96} 97 98func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) { 99 l := log.FromContext(ctx) 100 for { 101 select { 102 case <-j.reconnectCh: 103 l.Info("(re)connecting jetstream client") 104 j.client.Scheduler.Shutdown() 105 if err := j.client.ConnectAndRead(ctx, cursor); err != nil { 106 l.Error("error reading jetstream", "error", err) 107 } 108 default: 109 if err := j.client.ConnectAndRead(ctx, cursor); err != nil { 110 l.Error("error reading jetstream", "error", err) 111 } 112 } 113 } 114} 115 116func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 { 117 l := log.FromContext(ctx) 118 lastTimeUs, err := j.db.GetLastTimeUs() 119 if err != nil { 120 l.Warn("couldn't get last time us, starting from now", "error", err) 121 lastTimeUs = time.Now().UnixMicro() 122 err = j.db.SaveLastTimeUs(lastTimeUs) 123 if err != nil { 124 l.Error("failed to save last time us") 125 } 126 } 127 128 // If last time is older than a week, start from now 129 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 130 lastTimeUs = time.Now().UnixMicro() 131 l.Warn("last time us is older than a week. discarding that and starting from now") 132 err = j.db.SaveLastTimeUs(lastTimeUs) 133 if err != nil { 134 l.Error("failed to save last time us") 135 } 136 } 137 138 l.Info("found last time_us", "time_us", lastTimeUs) 139 return lastTimeUs 140}