this repo has no description
1package jsclient 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "net/url" 8 "sync" 9 "time" 10 11 "github.com/gorilla/websocket" 12) 13 14type JetstreamClient struct { 15 collections []string 16 dids []string 17 conn *websocket.Conn 18 mu sync.RWMutex 19 reconnectCh chan struct{} 20} 21 22func NewJetstreamClient(collections, dids []string) *JetstreamClient { 23 return &JetstreamClient{ 24 collections: collections, 25 dids: dids, 26 reconnectCh: make(chan struct{}, 1), 27 } 28} 29 30func (j *JetstreamClient) buildWebsocketURL(queryParams string) url.URL { 31 32 u := url.URL{ 33 Scheme: "wss", 34 Host: "jetstream1.us-west.bsky.network", 35 Path: "/subscribe", 36 RawQuery: queryParams, 37 } 38 39 return u 40} 41 42// UpdateCollections updates the collections list and triggers a reconnection 43func (j *JetstreamClient) UpdateCollections(collections []string) { 44 j.mu.Lock() 45 j.collections = collections 46 j.mu.Unlock() 47 j.triggerReconnect() 48} 49 50// UpdateDids updates the DIDs list and triggers a reconnection 51func (j *JetstreamClient) UpdateDids(dids []string) { 52 j.mu.Lock() 53 j.dids = dids 54 j.mu.Unlock() 55 j.triggerReconnect() 56} 57 58func (j *JetstreamClient) triggerReconnect() { 59 select { 60 case j.reconnectCh <- struct{}{}: 61 default: 62 // Channel already has a pending reconnect 63 } 64} 65 66func (j *JetstreamClient) buildQueryParams(cursor int64) string { 67 j.mu.RLock() 68 defer j.mu.RUnlock() 69 70 var collections, dids string 71 if len(j.collections) > 0 { 72 collections = fmt.Sprintf("wantedCollections=%s&cursor=%d", j.collections[0], cursor) 73 for _, collection := range j.collections[1:] { 74 collections += fmt.Sprintf("&wantedCollections=%s", collection) 75 } 76 } 77 if len(j.dids) > 0 { 78 for i, did := range j.dids { 79 if i == 0 { 80 dids = fmt.Sprintf("wantedDids=%s", did) 81 } else { 82 dids += fmt.Sprintf("&wantedDids=%s", did) 83 } 84 } 85 } 86 87 var queryStr string 88 if collections != "" && dids != "" { 89 queryStr = collections + "&" + dids 90 } else if collections != "" { 91 queryStr = collections 92 } else if dids != "" { 93 queryStr = dids 94 } 95 96 return queryStr 97} 98 99func (j *JetstreamClient) connect(cursor int64) error { 100 queryParams := j.buildQueryParams(cursor) 101 u := j.buildWebsocketURL(queryParams) 102 103 dialer := websocket.Dialer{ 104 HandshakeTimeout: 10 * time.Second, 105 } 106 107 conn, _, err := dialer.Dial(u.String(), nil) 108 if err != nil { 109 return err 110 } 111 112 if j.conn != nil { 113 j.conn.Close() 114 } 115 j.conn = conn 116 return nil 117} 118 119func (j *JetstreamClient) readMessages(ctx context.Context, messages chan []byte) { 120 defer close(messages) 121 defer j.conn.Close() 122 123 ticker := time.NewTicker(1 * time.Second) 124 defer ticker.Stop() 125 126 for { 127 select { 128 case <-ctx.Done(): 129 return 130 case <-j.reconnectCh: 131 // Reconnect with new parameters 132 cursor := time.Now().Add(-5 * time.Second).UnixMicro() 133 if err := j.connect(cursor); err != nil { 134 log.Printf("error reconnecting to jetstream: %v", err) 135 return 136 } 137 case <-ticker.C: 138 _, message, err := j.conn.ReadMessage() 139 if err != nil { 140 log.Printf("error reading from websocket: %v", err) 141 return 142 } 143 messages <- message 144 } 145 } 146} 147 148func (j *JetstreamClient) ReadJetstream(ctx context.Context) (chan []byte, error) { 149 fiveSecondsAgo := time.Now().Add(-5 * time.Second).UnixMicro() 150 151 if err := j.connect(fiveSecondsAgo); err != nil { 152 log.Printf("error connecting to jetstream: %v", err) 153 return nil, err 154 } 155 156 messages := make(chan []byte) 157 go j.readMessages(ctx, messages) 158 159 return messages, nil 160}