this repo has no description
1package jsclient
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "net/url"
8 "sync"
9 "time"
10
11 "github.com/gorilla/websocket"
12)
13
14type JetstreamClient struct {
15 collections []string
16 dids []string
17 conn *websocket.Conn
18 mu sync.RWMutex
19 reconnectCh chan struct{}
20}
21
22func NewJetstreamClient(collections, dids []string) *JetstreamClient {
23 return &JetstreamClient{
24 collections: collections,
25 dids: dids,
26 reconnectCh: make(chan struct{}, 1),
27 }
28}
29
30func (j *JetstreamClient) buildWebsocketURL(queryParams string) url.URL {
31
32 u := url.URL{
33 Scheme: "wss",
34 Host: "jetstream1.us-west.bsky.network",
35 Path: "/subscribe",
36 RawQuery: queryParams,
37 }
38
39 return u
40}
41
42// UpdateCollections updates the collections list and triggers a reconnection
43func (j *JetstreamClient) UpdateCollections(collections []string) {
44 j.mu.Lock()
45 j.collections = collections
46 j.mu.Unlock()
47 j.triggerReconnect()
48}
49
50// UpdateDids updates the Dids list and triggers a reconnection
51func (j *JetstreamClient) UpdateDids(dids []string) {
52 j.mu.Lock()
53 j.dids = dids
54 j.mu.Unlock()
55 j.triggerReconnect()
56}
57
58// Adds one did to the did list
59func (j *JetstreamClient) AddDid(did string) {
60 j.mu.Lock()
61 j.dids = append(j.dids, did)
62 j.mu.Unlock()
63 j.triggerReconnect()
64}
65
66func (j *JetstreamClient) triggerReconnect() {
67 select {
68 case j.reconnectCh <- struct{}{}:
69 default:
70 // Channel already has a pending reconnect
71 }
72}
73
74func (j *JetstreamClient) buildQueryParams(cursor int64) string {
75 j.mu.RLock()
76 defer j.mu.RUnlock()
77
78 var collections, dids string
79 if len(j.collections) > 0 {
80 collections = fmt.Sprintf("wantedCollections=%s&cursor=%d", j.collections[0], cursor)
81 for _, collection := range j.collections[1:] {
82 collections += fmt.Sprintf("&wantedCollections=%s", collection)
83 }
84 }
85 if len(j.dids) > 0 {
86 for i, did := range j.dids {
87 if i == 0 {
88 dids = fmt.Sprintf("wantedDids=%s", did)
89 } else {
90 dids += fmt.Sprintf("&wantedDids=%s", did)
91 }
92 }
93 }
94
95 var queryStr string
96 if collections != "" && dids != "" {
97 queryStr = collections + "&" + dids
98 } else if collections != "" {
99 queryStr = collections
100 } else if dids != "" {
101 queryStr = dids
102 }
103
104 return queryStr
105}
106
107func (j *JetstreamClient) connect(cursor int64) error {
108 queryParams := j.buildQueryParams(cursor)
109 u := j.buildWebsocketURL(queryParams)
110
111 dialer := websocket.Dialer{
112 HandshakeTimeout: 10 * time.Second,
113 }
114
115 conn, _, err := dialer.Dial(u.String(), nil)
116 if err != nil {
117 return err
118 }
119
120 if j.conn != nil {
121 j.conn.Close()
122 }
123 j.conn = conn
124 return nil
125}
126
127func (j *JetstreamClient) readMessages(ctx context.Context, messages chan []byte) {
128 defer close(messages)
129 defer j.conn.Close()
130
131 ticker := time.NewTicker(1 * time.Second)
132 defer ticker.Stop()
133
134 for {
135 select {
136 case <-ctx.Done():
137 return
138 case <-j.reconnectCh:
139 // Reconnect with new parameters
140 cursor := time.Now().Add(-5 * time.Second).UnixMicro()
141 if err := j.connect(cursor); err != nil {
142 log.Printf("error reconnecting to jetstream: %v", err)
143 return
144 }
145 case <-ticker.C:
146 _, message, err := j.conn.ReadMessage()
147 if err != nil {
148 log.Printf("error reading from websocket: %v", err)
149 return
150 }
151 messages <- message
152 }
153 }
154}
155
156func (j *JetstreamClient) ReadJetstream(ctx context.Context, lastTimestamp int64) (chan []byte, error) {
157 if lastTimestamp == 0 {
158 lastTimestamp = time.Now().Add(-5 * time.Second).UnixMicro()
159 }
160
161 if err := j.connect(lastTimestamp); err != nil {
162 log.Printf("error connecting to jetstream: %v", err)
163 return nil, err
164 }
165
166 messages := make(chan []byte)
167 go j.readMessages(ctx, messages)
168
169 return messages, nil
170}