tangled
alpha
login
or
join now
evbogue.com
/
wiredove
1
fork
atom
a demonstration replicated social networking web app built with anproto
wiredove.net/
social
ed25519
protocols
1
fork
atom
overview
issues
pulls
pipelines
add http fallback
Everett Bogue
1 month ago
35c033e8
8d2c2982
+114
-17
2 changed files
expand all
collapse all
unified
split
connect.js
websocket.js
+2
-1
connect.js
···
1
1
import { apds } from 'apds'
2
2
import { makeRoom } from './gossip.js'
3
3
-
import { makeWs} from './websocket.js'
3
3
+
import { makeWs, startHttpGossip } from './websocket.js'
4
4
import { send } from './send.js'
5
5
6
6
await apds.start('wiredovedbversion1')
···
9
9
await makeWs('ws://localhost:9000')
10
10
//await makeWs('wss://apds.anproto.com/')
11
11
//makeWs('wss://pub.wiredove.net/')
12
12
+
await startHttpGossip('http://localhost:9000')
12
13
makeRoom('wiredovev1')
13
14
send('evSFOKnXaF9ZWSsff8bVfXP6+XnGZUj8XNp6bca590k=')
14
15
}
+112
-16
websocket.js
···
4
4
5
5
const pubs = new Set()
6
6
const wsBackoff = new Map()
7
7
+
const HTTP_POLL_INTERVAL_MS = 5000
8
8
+
const httpState = {
9
9
+
baseUrl: null,
10
10
+
ready: false,
11
11
+
pollTimer: null,
12
12
+
lastSince: 0
13
13
+
}
7
14
8
15
let wsReadyResolver
9
16
const createWsReadyPromise = () => new Promise(resolve => {
···
17
24
})
18
25
}
19
26
27
27
+
const isHash = (msg) => typeof msg === 'string' && msg.length === 44
28
28
+
29
29
+
const handleIncoming = async (msg) => {
30
30
+
noteReceived(msg)
31
31
+
if (isHash(msg)) {
32
32
+
const blob = await apds.get(msg)
33
33
+
if (blob) {
34
34
+
if (pubs.size) {
35
35
+
deliverWs(blob)
36
36
+
} else {
37
37
+
await sendHttp(blob)
38
38
+
}
39
39
+
}
40
40
+
return
41
41
+
}
42
42
+
await render.shouldWe(msg)
43
43
+
await apds.make(msg)
44
44
+
await apds.add(msg)
45
45
+
await render.blob(msg)
46
46
+
}
47
47
+
48
48
+
const toHttpBase = (wsUrl) => wsUrl.replace(/^ws:/, 'http:').replace(/^wss:/, 'https:')
49
49
+
50
50
+
const scheduleHttpPoll = () => {
51
51
+
if (httpState.pollTimer) { return }
52
52
+
httpState.pollTimer = setTimeout(pollHttp, HTTP_POLL_INTERVAL_MS)
53
53
+
}
54
54
+
55
55
+
const pollHttp = async () => {
56
56
+
httpState.pollTimer = null
57
57
+
if (!httpState.ready || pubs.size) {
58
58
+
scheduleHttpPoll()
59
59
+
return
60
60
+
}
61
61
+
try {
62
62
+
const url = new URL('/gossip/poll', httpState.baseUrl)
63
63
+
url.searchParams.set('since', String(httpState.lastSince))
64
64
+
const res = await fetch(url.toString(), { cache: 'no-store' })
65
65
+
if (res.ok) {
66
66
+
const data = await res.json()
67
67
+
const messages = Array.isArray(data.messages) ? data.messages : []
68
68
+
for (const msg of messages) {
69
69
+
await handleIncoming(msg)
70
70
+
}
71
71
+
if (Number.isFinite(data.nextSince)) {
72
72
+
httpState.lastSince = Math.max(httpState.lastSince, data.nextSince)
73
73
+
}
74
74
+
}
75
75
+
} catch (err) {
76
76
+
console.warn('http gossip poll failed', err)
77
77
+
} finally {
78
78
+
scheduleHttpPoll()
79
79
+
}
80
80
+
}
81
81
+
82
82
+
const sendHttp = async (msg) => {
83
83
+
if (!httpState.ready) { return }
84
84
+
try {
85
85
+
const url = new URL('/gossip', httpState.baseUrl)
86
86
+
const res = await fetch(url.toString(), {
87
87
+
method: 'POST',
88
88
+
headers: { 'Content-Type': 'text/plain' },
89
89
+
body: msg
90
90
+
})
91
91
+
if (!res.ok) { return }
92
92
+
const data = await res.json()
93
93
+
const messages = Array.isArray(data.messages) ? data.messages : []
94
94
+
for (const reply of messages) {
95
95
+
await handleIncoming(reply)
96
96
+
}
97
97
+
} catch (err) {
98
98
+
console.warn('http gossip send failed', err)
99
99
+
}
100
100
+
}
101
101
+
102
102
+
export const startHttpGossip = async (baseUrl) => {
103
103
+
if (httpState.ready) { return }
104
104
+
httpState.baseUrl = baseUrl
105
105
+
httpState.ready = true
106
106
+
try {
107
107
+
const q = await apds.query()
108
108
+
if (q && q.length) {
109
109
+
const last = q[q.length - 1]
110
110
+
const ts = parseInt(last?.ts || '0', 10)
111
111
+
if (Number.isFinite(ts)) {
112
112
+
httpState.lastSince = ts
113
113
+
}
114
114
+
}
115
115
+
} catch (err) {
116
116
+
console.warn('http gossip seed failed', err)
117
117
+
}
118
118
+
scheduleHttpPoll()
119
119
+
}
120
120
+
20
121
export const sendWs = async (msg) => {
21
21
-
if (pubs.size) { deliverWs(msg) }
122
122
+
if (pubs.size) {
123
123
+
deliverWs(msg)
124
124
+
} else {
125
125
+
await sendHttp(msg)
126
126
+
}
22
127
}
23
128
24
24
-
export const hasWs = () => pubs.size > 0
129
129
+
export const hasWs = () => pubs.size > 0 || httpState.ready
25
130
26
131
registerNetworkSenders({
27
132
sendWs,
···
29
134
})
30
135
31
136
export const makeWs = async (pub) => {
137
137
+
const httpBase = toHttpBase(pub)
138
138
+
await startHttpGossip(httpBase)
139
139
+
32
140
const getBackoff = () => {
33
141
let state = wsBackoff.get(pub)
34
142
if (!state) {
···
119
227
}
120
228
121
229
ws.onmessage = async (m) => {
122
122
-
noteReceived(m.data)
123
123
-
if (m.data.length === 44) {
124
124
-
//console.log('NEEDS' + m.data)
125
125
-
const blob = await apds.get(m.data)
126
126
-
if (blob) {
127
127
-
ws.send(blob)
128
128
-
}
129
129
-
} else {
130
130
-
await render.shouldWe(m.data)
131
131
-
await apds.make(m.data)
132
132
-
await apds.add(m.data)
133
133
-
await render.blob(m.data)
134
134
-
}
135
135
-
}
230
230
+
await handleIncoming(m.data)
231
231
+
}
136
232
137
233
ws.onerror = () => {
138
234
scheduleReconnect()