this repo has no description
1package jsclient 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "net/url" 8 "sync" 9 "time" 10 11 "github.com/gorilla/websocket" 12) 13 14type JetstreamClient struct { 15 collections []string 16 dids []string 17 conn *websocket.Conn 18 mu sync.RWMutex 19 reconnectCh chan struct{} 20} 21 22func NewJetstreamClient(collections, dids []string) *JetstreamClient { 23 return &JetstreamClient{ 24 collections: collections, 25 dids: dids, 26 reconnectCh: make(chan struct{}, 1), 27 } 28} 29 30func (j *JetstreamClient) buildWebsocketURL(queryParams string) url.URL { 31 32 u := url.URL{ 33 Scheme: "wss", 34 Host: "jetstream1.us-west.bsky.network", 35 Path: "/subscribe", 36 RawQuery: queryParams, 37 } 38 39 fmt.Println("URL:", u.String()) 40 return u 41} 42 43// UpdateCollections updates the collections list and triggers a reconnection 44func (j *JetstreamClient) UpdateCollections(collections []string) { 45 j.mu.Lock() 46 j.collections = collections 47 j.mu.Unlock() 48 j.triggerReconnect() 49} 50 51// UpdateDids updates the DIDs list and triggers a reconnection 52func (j *JetstreamClient) UpdateDids(dids []string) { 53 j.mu.Lock() 54 j.dids = dids 55 j.mu.Unlock() 56 j.triggerReconnect() 57} 58 59func (j *JetstreamClient) triggerReconnect() { 60 select { 61 case j.reconnectCh <- struct{}{}: 62 default: 63 // Channel already has a pending reconnect 64 } 65} 66 67func (j *JetstreamClient) buildQueryParams(cursor int64) string { 68 j.mu.RLock() 69 defer j.mu.RUnlock() 70 71 var collections, dids string 72 if len(j.collections) > 0 { 73 collections = fmt.Sprintf("wantedCollections=%s", j.collections[0]) 74 for _, collection := range j.collections[1:] { 75 collections += fmt.Sprintf("&wantedCollections=%s", collection) 76 } 77 } 78 if len(j.dids) > 0 { 79 for i, did := range j.dids { 80 if i == 0 { 81 dids = fmt.Sprintf("wantedDids=%s", did) 82 } else { 83 dids += fmt.Sprintf("&wantedDids=%s", did) 84 } 85 } 86 } 87 88 var queryStr string 89 if collections != "" && dids != "" { 90 queryStr = collections + "&" + dids 91 } else if collections != "" { 92 queryStr = collections 93 } else if dids != "" { 94 queryStr = dids 95 } 96 97 return queryStr 98} 99 100func (j *JetstreamClient) connect(cursor int64) error { 101 queryParams := j.buildQueryParams(cursor) 102 u := j.buildWebsocketURL(queryParams) 103 104 dialer := websocket.Dialer{ 105 HandshakeTimeout: 10 * time.Second, 106 } 107 108 conn, _, err := dialer.Dial(u.String(), nil) 109 if err != nil { 110 return err 111 } 112 113 if j.conn != nil { 114 j.conn.Close() 115 } 116 j.conn = conn 117 return nil 118} 119 120func (j *JetstreamClient) readMessages(ctx context.Context, messages chan []byte) { 121 defer close(messages) 122 defer j.conn.Close() 123 124 ticker := time.NewTicker(1 * time.Second) 125 defer ticker.Stop() 126 127 for { 128 select { 129 case <-ctx.Done(): 130 return 131 case <-j.reconnectCh: 132 // Reconnect with new parameters 133 // cursor := time.Now().Add(-5 * time.Second).UnixMicro() 134 if err := j.connect(0); err != nil { 135 log.Printf("error reconnecting to jetstream: %v", err) 136 return 137 } 138 case <-ticker.C: 139 _, message, err := j.conn.ReadMessage() 140 if err != nil { 141 log.Printf("error reading from websocket: %v", err) 142 return 143 } 144 messages <- message 145 } 146 } 147} 148 149func (j *JetstreamClient) ReadJetstream(ctx context.Context) (chan []byte, error) { 150 fiveSecondsAgo := time.Now().Add(-5 * time.Second).UnixMicro() 151 152 if err := j.connect(fiveSecondsAgo); err != nil { 153 log.Printf("error connecting to jetstream: %v", err) 154 return nil, err 155 } 156 157 messages := make(chan []byte) 158 go j.readMessages(ctx, messages) 159 160 return messages, nil 161}