this repo has no description
1package jsclient
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "net/url"
8 "sync"
9 "time"
10
11 "github.com/gorilla/websocket"
12)
13
14type JetstreamClient struct {
15 collections []string
16 dids []string
17 conn *websocket.Conn
18 mu sync.RWMutex
19 reconnectCh chan struct{}
20}
21
22func NewJetstreamClient(collections, dids []string) *JetstreamClient {
23 return &JetstreamClient{
24 collections: collections,
25 dids: dids,
26 reconnectCh: make(chan struct{}, 1),
27 }
28}
29
30func (j *JetstreamClient) buildWebsocketURL(queryParams string) url.URL {
31
32 u := url.URL{
33 Scheme: "wss",
34 Host: "jetstream1.us-west.bsky.network",
35 Path: "/subscribe",
36 RawQuery: queryParams,
37 }
38
39 return u
40}
41
42// UpdateCollections updates the collections list and triggers a reconnection
43func (j *JetstreamClient) UpdateCollections(collections []string) {
44 j.mu.Lock()
45 j.collections = collections
46 j.mu.Unlock()
47 j.triggerReconnect()
48}
49
50// UpdateDids updates the DIDs list and triggers a reconnection
51func (j *JetstreamClient) UpdateDids(dids []string) {
52 j.mu.Lock()
53 j.dids = dids
54 j.mu.Unlock()
55 j.triggerReconnect()
56}
57
58func (j *JetstreamClient) triggerReconnect() {
59 select {
60 case j.reconnectCh <- struct{}{}:
61 default:
62 // Channel already has a pending reconnect
63 }
64}
65
66func (j *JetstreamClient) buildQueryParams(cursor int64) string {
67 j.mu.RLock()
68 defer j.mu.RUnlock()
69
70 var collections, dids string
71 if len(j.collections) > 0 {
72 collections = fmt.Sprintf("wantedCollections=%s&cursor=%d", j.collections[0], cursor)
73 for _, collection := range j.collections[1:] {
74 collections += fmt.Sprintf("&wantedCollections=%s", collection)
75 }
76 }
77 if len(j.dids) > 0 {
78 for i, did := range j.dids {
79 if i == 0 {
80 dids = fmt.Sprintf("wantedDids=%s", did)
81 } else {
82 dids += fmt.Sprintf("&wantedDids=%s", did)
83 }
84 }
85 }
86
87 var queryStr string
88 if collections != "" && dids != "" {
89 queryStr = collections + "&" + dids
90 } else if collections != "" {
91 queryStr = collections
92 } else if dids != "" {
93 queryStr = dids
94 }
95
96 return queryStr
97}
98
99func (j *JetstreamClient) connect(cursor int64) error {
100 queryParams := j.buildQueryParams(cursor)
101 u := j.buildWebsocketURL(queryParams)
102
103 log.Printf("connecting to jetstream at: %s", u.String())
104
105 dialer := websocket.Dialer{
106 HandshakeTimeout: 10 * time.Second,
107 }
108
109 conn, _, err := dialer.Dial(u.String(), nil)
110 if err != nil {
111 return err
112 }
113
114 if j.conn != nil {
115 j.conn.Close()
116 }
117 j.conn = conn
118 return nil
119}
120
121func (j *JetstreamClient) readMessages(ctx context.Context, messages chan []byte) {
122 defer close(messages)
123 defer j.conn.Close()
124
125 ticker := time.NewTicker(1 * time.Second)
126 defer ticker.Stop()
127
128 for {
129 select {
130 case <-ctx.Done():
131 return
132 case <-j.reconnectCh:
133 // Reconnect with new parameters
134 cursor := time.Now().Add(-5 * time.Second).UnixMicro()
135 if err := j.connect(cursor); err != nil {
136 log.Printf("error reconnecting to jetstream: %v", err)
137 return
138 }
139 case <-ticker.C:
140 _, message, err := j.conn.ReadMessage()
141 if err != nil {
142 log.Printf("error reading from websocket: %v", err)
143 return
144 }
145 messages <- message
146 }
147 }
148}
149
150func (j *JetstreamClient) ReadJetstream(ctx context.Context) (chan []byte, error) {
151 fiveSecondsAgo := time.Now().Add(-5 * time.Second).UnixMicro()
152
153 if err := j.connect(fiveSecondsAgo); err != nil {
154 log.Printf("error connecting to jetstream: %v", err)
155 return nil, err
156 }
157
158 messages := make(chan []byte)
159 go j.readMessages(ctx, messages)
160
161 return messages, nil
162}