tangled
alpha
login
or
join now
ladas552.me
/
core
forked from
tangled.org/core
0
fork
atom
Monorepo for Tangled
0
fork
atom
overview
issues
pulls
pipelines
better tap
Signed-off-by: Seongmin Lee <git@boltless.me>
boltless.me
3 weeks ago
6e7dfa98
fc083c44
verified
This commit was signed with the committer's
known signature
.
boltless.me
SSH Key Fingerprint:
SHA256:2RJEzZAkDasLTx4PI9wgenxutg9oQkR9LGk+9pzW3hM=
+36
-30
2 changed files
expand all
collapse all
unified
split
tap
readme.md
tap.go
+3
tap/readme.md
···
1
1
+
basic tap client package
2
2
+
3
3
+
Replace this to official indigo package when <https://github.com/bluesky-social/indigo/pull/1241> gets merged.
+33
-30
tap/tap.go
···
9
9
"fmt"
10
10
"net/http"
11
11
"net/url"
12
12
+
"time"
12
13
13
14
"github.com/bluesky-social/indigo/atproto/syntax"
14
15
"github.com/gorilla/websocket"
···
104
105
105
106
url := u.String()
106
107
107
107
-
// var backoff int
108
108
-
// for {
109
109
-
// select {
110
110
-
// case <-ctx.Done():
111
111
-
// return ctx.Err()
112
112
-
// default:
113
113
-
// }
114
114
-
//
115
115
-
// header := http.Header{
116
116
-
// "Authorization": []string{""},
117
117
-
// }
118
118
-
// conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
119
119
-
// if err != nil {
120
120
-
// l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
121
121
-
// time.Sleep(time.Duration(5+backoff) * time.Second)
122
122
-
// backoff++
123
123
-
//
124
124
-
// continue
125
125
-
// } else {
126
126
-
// backoff = 0
127
127
-
// }
128
128
-
//
129
129
-
// l.Info("event subscription response", "code", res.StatusCode)
130
130
-
// }
108
108
+
var backoff int
109
109
+
for {
110
110
+
select {
111
111
+
case <-ctx.Done():
112
112
+
return ctx.Err()
113
113
+
default:
114
114
+
}
131
115
132
132
-
// TODO: keep websocket connection alive
133
133
-
conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, nil)
134
134
-
if err != nil {
135
135
-
l.Error("failed to connect to tap", "err", err)
136
136
-
return err
116
116
+
header := http.Header{
117
117
+
"Authorization": []string{""},
118
118
+
}
119
119
+
conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
120
120
+
if err != nil {
121
121
+
l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
122
122
+
time.Sleep(time.Duration(5+backoff) * time.Second)
123
123
+
backoff++
124
124
+
125
125
+
continue
126
126
+
} else {
127
127
+
backoff = 0
128
128
+
}
129
129
+
130
130
+
l.Info("tap event subscription response", "code", res.StatusCode)
131
131
+
132
132
+
if err = c.handleConnection(ctx, conn, handler); err != nil {
133
133
+
l.Warn("tap connection failed", "err", err, "backoff", backoff)
134
134
+
}
137
135
}
138
138
-
defer conn.Close()
136
136
+
}
137
137
+
138
138
+
func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error {
139
139
+
l := log.FromContext(ctx)
140
140
+
139
141
defer func() {
142
142
+
conn.Close()
140
143
l.Warn("closed tap conection")
141
144
}()
142
145
l.Info("established tap conection")