this repo has no description
1package jsclient 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "net/url" 8 "sync" 9 "time" 10 11 "github.com/gorilla/websocket" 12) 13 14type JetstreamClient struct { 15 collections []string 16 dids []string 17 conn *websocket.Conn 18 mu sync.RWMutex 19 reconnectCh chan struct{} 20} 21 22func NewJetstreamClient(collections, dids []string) *JetstreamClient { 23 return &JetstreamClient{ 24 collections: collections, 25 dids: dids, 26 reconnectCh: make(chan struct{}, 1), 27 } 28} 29 30func (j *JetstreamClient) buildWebsocketURL(queryParams string) url.URL { 31 32 u := url.URL{ 33 Scheme: "wss", 34 Host: "jetstream1.us-west.bsky.network", 35 Path: "/subscribe", 36 RawQuery: queryParams, 37 } 38 39 return u 40} 41 42// UpdateCollections updates the collections list and triggers a reconnection 43func (j *JetstreamClient) UpdateCollections(collections []string) { 44 j.mu.Lock() 45 j.collections = collections 46 j.mu.Unlock() 47 j.triggerReconnect() 48} 49 50// UpdateDids updates the Dids list and triggers a reconnection 51func (j *JetstreamClient) UpdateDids(dids []string) { 52 j.mu.Lock() 53 j.dids = dids 54 j.mu.Unlock() 55 j.triggerReconnect() 56} 57 58// Adds one did to the did list 59func (j *JetstreamClient) AddDid(did string) { 60 j.mu.Lock() 61 j.dids = append(j.dids, did) 62 j.mu.Unlock() 63 j.triggerReconnect() 64} 65 66func (j *JetstreamClient) triggerReconnect() { 67 select { 68 case j.reconnectCh <- struct{}{}: 69 default: 70 // Channel already has a pending reconnect 71 } 72} 73 74func (j *JetstreamClient) buildQueryParams(cursor int64) string { 75 j.mu.RLock() 76 defer j.mu.RUnlock() 77 78 var collections, dids string 79 if len(j.collections) > 0 { 80 collections = fmt.Sprintf("wantedCollections=%s&cursor=%d", j.collections[0], cursor) 81 for _, collection := range j.collections[1:] { 82 collections += fmt.Sprintf("&wantedCollections=%s", collection) 83 } 84 } 85 if len(j.dids) > 0 { 86 for i, did := range j.dids { 87 if i == 0 { 88 dids = fmt.Sprintf("wantedDids=%s", did) 89 } else { 90 dids += fmt.Sprintf("&wantedDids=%s", did) 91 } 92 } 93 } 94 95 var queryStr string 96 if collections != "" && dids != "" { 97 queryStr = collections + "&" + dids 98 } else if collections != "" { 99 queryStr = collections 100 } else if dids != "" { 101 queryStr = dids 102 } 103 104 return queryStr 105} 106 107func (j *JetstreamClient) connect(cursor int64) error { 108 queryParams := j.buildQueryParams(cursor) 109 u := j.buildWebsocketURL(queryParams) 110 111 dialer := websocket.Dialer{ 112 HandshakeTimeout: 10 * time.Second, 113 } 114 115 conn, _, err := dialer.Dial(u.String(), nil) 116 if err != nil { 117 return err 118 } 119 120 if j.conn != nil { 121 j.conn.Close() 122 } 123 j.conn = conn 124 return nil 125} 126 127func (j *JetstreamClient) readMessages(ctx context.Context, messages chan []byte) { 128 defer close(messages) 129 defer j.conn.Close() 130 131 ticker := time.NewTicker(1 * time.Second) 132 defer ticker.Stop() 133 134 for { 135 select { 136 case <-ctx.Done(): 137 return 138 case <-j.reconnectCh: 139 // Reconnect with new parameters 140 cursor := time.Now().Add(-5 * time.Second).UnixMicro() 141 if err := j.connect(cursor); err != nil { 142 log.Printf("error reconnecting to jetstream: %v", err) 143 return 144 } 145 case <-ticker.C: 146 _, message, err := j.conn.ReadMessage() 147 if err != nil { 148 log.Printf("error reading from websocket: %v", err) 149 return 150 } 151 messages <- message 152 } 153 } 154} 155 156func (j *JetstreamClient) ReadJetstream(ctx context.Context, lastTimestamp int64) (chan []byte, error) { 157 if lastTimestamp == 0 { 158 lastTimestamp = time.Now().Add(-5 * time.Second).UnixMicro() 159 } 160 161 if err := j.connect(lastTimestamp); err != nil { 162 log.Printf("error connecting to jetstream: %v", err) 163 return nil, err 164 } 165 166 messages := make(chan []byte) 167 go j.readMessages(ctx, messages) 168 169 return messages, nil 170}