tangled
alpha
login
or
join now
t0.lv
/
core
forked from
tangled.org/core
0
fork
atom
this repo has no description
0
fork
atom
overview
issues
pulls
pipelines
knotserver/jsclient: reenable cursor and better logging
anirudh.fi
1 year ago
5397a709
e72fef82
+7
-4
2 changed files
expand all
collapse all
unified
split
knotserver
handler.go
jsclient
jetstream.go
+2
knotserver/handler.go
···
94
94
}
95
95
}()
96
96
97
97
+
log.Printf("started jetstream")
98
98
+
97
99
return nil
98
100
}
99
101
+5
-4
knotserver/jsclient/jetstream.go
···
36
36
RawQuery: queryParams,
37
37
}
38
38
39
39
-
fmt.Println("URL:", u.String())
40
39
return u
41
40
}
42
41
···
70
69
71
70
var collections, dids string
72
71
if len(j.collections) > 0 {
73
73
-
collections = fmt.Sprintf("wantedCollections=%s", j.collections[0])
72
72
+
collections = fmt.Sprintf("wantedCollections=%s&cursor=%d", j.collections[0], cursor)
74
73
for _, collection := range j.collections[1:] {
75
74
collections += fmt.Sprintf("&wantedCollections=%s", collection)
76
75
}
···
100
99
func (j *JetstreamClient) connect(cursor int64) error {
101
100
queryParams := j.buildQueryParams(cursor)
102
101
u := j.buildWebsocketURL(queryParams)
102
102
+
103
103
+
log.Printf("connecting to jetstream at: %s", u.String())
103
104
104
105
dialer := websocket.Dialer{
105
106
HandshakeTimeout: 10 * time.Second,
···
130
131
return
131
132
case <-j.reconnectCh:
132
133
// Reconnect with new parameters
133
133
-
// cursor := time.Now().Add(-5 * time.Second).UnixMicro()
134
134
-
if err := j.connect(0); err != nil {
134
134
+
cursor := time.Now().Add(-5 * time.Second).UnixMicro()
135
135
+
if err := j.connect(cursor); err != nil {
135
136
log.Printf("error reconnecting to jetstream: %v", err)
136
137
return
137
138
}