this repo has no description
1package jsclient
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "net/url"
8 "sync"
9 "time"
10
11 "github.com/gorilla/websocket"
12)
13
14type JetstreamClient struct {
15 collections []string
16 dids []string
17 conn *websocket.Conn
18 mu sync.RWMutex
19 reconnectCh chan struct{}
20}
21
22func NewJetstreamClient(collections, dids []string) *JetstreamClient {
23 return &JetstreamClient{
24 collections: collections,
25 dids: dids,
26 reconnectCh: make(chan struct{}, 1),
27 }
28}
29
30func (j *JetstreamClient) buildWebsocketURL(queryParams string) url.URL {
31
32 u := url.URL{
33 Scheme: "wss",
34 Host: "jetstream1.us-west.bsky.network",
35 Path: "/subscribe",
36 RawQuery: queryParams,
37 }
38
39 return u
40}
41
42// UpdateCollections updates the collections list and triggers a reconnection
43func (j *JetstreamClient) UpdateCollections(collections []string) {
44 j.mu.Lock()
45 j.collections = collections
46 j.mu.Unlock()
47 j.triggerReconnect()
48}
49
50// UpdateDids updates the DIDs list and triggers a reconnection
51func (j *JetstreamClient) UpdateDids(dids []string) {
52 j.mu.Lock()
53 j.dids = dids
54 j.mu.Unlock()
55 j.triggerReconnect()
56}
57
58func (j *JetstreamClient) triggerReconnect() {
59 select {
60 case j.reconnectCh <- struct{}{}:
61 default:
62 // Channel already has a pending reconnect
63 }
64}
65
66func (j *JetstreamClient) buildQueryParams(cursor int64) string {
67 j.mu.RLock()
68 defer j.mu.RUnlock()
69
70 var collections, dids string
71 if len(j.collections) > 0 {
72 collections = fmt.Sprintf("wantedCollections=%s&cursor=%d", j.collections[0], cursor)
73 for _, collection := range j.collections[1:] {
74 collections += fmt.Sprintf("&wantedCollections=%s", collection)
75 }
76 }
77 if len(j.dids) > 0 {
78 for i, did := range j.dids {
79 if i == 0 {
80 dids = fmt.Sprintf("wantedDids=%s", did)
81 } else {
82 dids += fmt.Sprintf("&wantedDids=%s", did)
83 }
84 }
85 }
86
87 var queryStr string
88 if collections != "" && dids != "" {
89 queryStr = collections + "&" + dids
90 } else if collections != "" {
91 queryStr = collections
92 } else if dids != "" {
93 queryStr = dids
94 }
95
96 return queryStr
97}
98
99func (j *JetstreamClient) connect(cursor int64) error {
100 queryParams := j.buildQueryParams(cursor)
101 u := j.buildWebsocketURL(queryParams)
102
103 dialer := websocket.Dialer{
104 HandshakeTimeout: 10 * time.Second,
105 }
106
107 conn, _, err := dialer.Dial(u.String(), nil)
108 if err != nil {
109 return err
110 }
111
112 if j.conn != nil {
113 j.conn.Close()
114 }
115 j.conn = conn
116 return nil
117}
118
119func (j *JetstreamClient) readMessages(ctx context.Context, messages chan []byte) {
120 defer close(messages)
121 defer j.conn.Close()
122
123 ticker := time.NewTicker(1 * time.Second)
124 defer ticker.Stop()
125
126 for {
127 select {
128 case <-ctx.Done():
129 return
130 case <-j.reconnectCh:
131 // Reconnect with new parameters
132 cursor := time.Now().Add(-5 * time.Second).UnixMicro()
133 if err := j.connect(cursor); err != nil {
134 log.Printf("error reconnecting to jetstream: %v", err)
135 return
136 }
137 case <-ticker.C:
138 _, message, err := j.conn.ReadMessage()
139 if err != nil {
140 log.Printf("error reading from websocket: %v", err)
141 return
142 }
143 messages <- message
144 }
145 }
146}
147
148func (j *JetstreamClient) ReadJetstream(ctx context.Context) (chan []byte, error) {
149 fiveSecondsAgo := time.Now().Add(-5 * time.Second).UnixMicro()
150
151 if err := j.connect(fiveSecondsAgo); err != nil {
152 log.Printf("error connecting to jetstream: %v", err)
153 return nil, err
154 }
155
156 messages := make(chan []byte)
157 go j.readMessages(ctx, messages)
158
159 return messages, nil
160}