this repo has no description
1package jsclient
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "net/url"
8 "sync"
9 "time"
10
11 "github.com/gorilla/websocket"
12)
13
14type JetstreamClient struct {
15 collections []string
16 dids []string
17 conn *websocket.Conn
18 mu sync.RWMutex
19 reconnectCh chan struct{}
20}
21
22func NewJetstreamClient(collections, dids []string) *JetstreamClient {
23 return &JetstreamClient{
24 collections: collections,
25 dids: dids,
26 reconnectCh: make(chan struct{}, 1),
27 }
28}
29
30func (j *JetstreamClient) buildWebsocketURL(queryParams string) url.URL {
31
32 u := url.URL{
33 Scheme: "wss",
34 Host: "jetstream1.us-west.bsky.network",
35 Path: "/subscribe",
36 RawQuery: queryParams,
37 }
38
39 fmt.Println("URL:", u.String())
40 return u
41}
42
43// UpdateCollections updates the collections list and triggers a reconnection
44func (j *JetstreamClient) UpdateCollections(collections []string) {
45 j.mu.Lock()
46 j.collections = collections
47 j.mu.Unlock()
48 j.triggerReconnect()
49}
50
51// UpdateDids updates the DIDs list and triggers a reconnection
52func (j *JetstreamClient) UpdateDids(dids []string) {
53 j.mu.Lock()
54 j.dids = dids
55 j.mu.Unlock()
56 j.triggerReconnect()
57}
58
59func (j *JetstreamClient) triggerReconnect() {
60 select {
61 case j.reconnectCh <- struct{}{}:
62 default:
63 // Channel already has a pending reconnect
64 }
65}
66
67func (j *JetstreamClient) buildQueryParams(cursor int64) string {
68 j.mu.RLock()
69 defer j.mu.RUnlock()
70
71 var collections, dids string
72 if len(j.collections) > 0 {
73 collections = fmt.Sprintf("wantedCollections=%s", j.collections[0])
74 for _, collection := range j.collections[1:] {
75 collections += fmt.Sprintf("&wantedCollections=%s", collection)
76 }
77 }
78 if len(j.dids) > 0 {
79 for i, did := range j.dids {
80 if i == 0 {
81 dids = fmt.Sprintf("wantedDids=%s", did)
82 } else {
83 dids += fmt.Sprintf("&wantedDids=%s", did)
84 }
85 }
86 }
87
88 var queryStr string
89 if collections != "" && dids != "" {
90 queryStr = collections + "&" + dids
91 } else if collections != "" {
92 queryStr = collections
93 } else if dids != "" {
94 queryStr = dids
95 }
96
97 return queryStr
98}
99
100func (j *JetstreamClient) connect(cursor int64) error {
101 queryParams := j.buildQueryParams(cursor)
102 u := j.buildWebsocketURL(queryParams)
103
104 dialer := websocket.Dialer{
105 HandshakeTimeout: 10 * time.Second,
106 }
107
108 conn, _, err := dialer.Dial(u.String(), nil)
109 if err != nil {
110 return err
111 }
112
113 if j.conn != nil {
114 j.conn.Close()
115 }
116 j.conn = conn
117 return nil
118}
119
120func (j *JetstreamClient) readMessages(ctx context.Context, messages chan []byte) {
121 defer close(messages)
122 defer j.conn.Close()
123
124 ticker := time.NewTicker(1 * time.Second)
125 defer ticker.Stop()
126
127 for {
128 select {
129 case <-ctx.Done():
130 return
131 case <-j.reconnectCh:
132 // Reconnect with new parameters
133 // cursor := time.Now().Add(-5 * time.Second).UnixMicro()
134 if err := j.connect(0); err != nil {
135 log.Printf("error reconnecting to jetstream: %v", err)
136 return
137 }
138 case <-ticker.C:
139 _, message, err := j.conn.ReadMessage()
140 if err != nil {
141 log.Printf("error reading from websocket: %v", err)
142 return
143 }
144 messages <- message
145 }
146 }
147}
148
149func (j *JetstreamClient) ReadJetstream(ctx context.Context) (chan []byte, error) {
150 fiveSecondsAgo := time.Now().Add(-5 * time.Second).UnixMicro()
151
152 if err := j.connect(fiveSecondsAgo); err != nil {
153 log.Printf("error connecting to jetstream: %v", err)
154 return nil, err
155 }
156
157 messages := make(chan []byte)
158 go j.readMessages(ctx, messages)
159
160 return messages, nil
161}