a demonstration replicated social networking web app built with anproto
wiredove.net/
social
ed25519
protocols
1const SEND_DELAY_MS = 100
2const HASH_RETRY_MS = 800
3const HASH_QUEUE_COOLDOWN_MS = 30000
4const MAX_QUEUE_ITEMS = 1800
5
6const PRIORITY_ORDER = ['high', 'normal', 'low']
7const PRIORITY_RANK = { high: 3, normal: 2, low: 1 }
8
9const queues = {
10 high: [],
11 normal: [],
12 low: []
13}
14
15const pending = new Map()
16const hashCooldown = new Map()
17let drainTimer = null
18let draining = false
19let nextHashTarget = 'ws'
20
21const senders = {
22 ws: null,
23 hasWs: null,
24 gossip: null,
25 hasGossip: null
26}
27const queueListeners = new Set()
28
29const normalizePriority = (priority) => {
30 if (priority === 'high' || priority === 'normal' || priority === 'low') { return priority }
31 return 'normal'
32}
33
34const totalQueueSize = () => queues.high.length + queues.normal.length + queues.low.length
35
36const getKey = (msg) => (typeof msg === 'string' ? msg : null)
37
38const isHash = (msg) => typeof msg === 'string' && msg.length === 44
39
40const flipTarget = (target) => (target === 'ws' ? 'gossip' : 'ws')
41
42const removeFromQueue = (queue, item) => {
43 const idx = queue.indexOf(item)
44 if (idx >= 0) {
45 queue.splice(idx, 1)
46 return true
47 }
48 return false
49}
50
51const promoteItem = (item, nextPriority) => {
52 const current = item.priority
53 if (PRIORITY_RANK[nextPriority] <= PRIORITY_RANK[current]) { return }
54 removeFromQueue(queues[current], item)
55 item.priority = nextPriority
56 queues[nextPriority].push(item)
57}
58
59const trimOverflow = () => {
60 while (totalQueueSize() > MAX_QUEUE_ITEMS) {
61 const low = queues.low.shift()
62 if (low) {
63 if (low.key) { pending.delete(low.key) }
64 continue
65 }
66 const normal = queues.normal.shift()
67 if (normal) {
68 if (normal.key) { pending.delete(normal.key) }
69 continue
70 }
71 // Never drop high-priority traffic automatically unless the queue is only high.
72 const high = queues.high.shift()
73 if (!high) { break }
74 if (high.key) { pending.delete(high.key) }
75 }
76}
77
78export const registerNetworkSenders = (config = {}) => {
79 if (config.sendWs) { senders.ws = config.sendWs }
80 if (config.hasWs) { senders.hasWs = config.hasWs }
81 if (config.sendGossip) { senders.gossip = config.sendGossip }
82 if (config.hasGossip) { senders.hasGossip = config.hasGossip }
83 notifyQueueListeners()
84}
85
86const isTargetReady = (target) => {
87 if (target === 'ws') { return senders.hasWs?.() }
88 if (target === 'gossip') { return senders.hasGossip?.() }
89 return false
90}
91
92const sendToTarget = (target, msg) => {
93 if (target === 'ws') { senders.ws?.(msg) }
94 if (target === 'gossip') { senders.gossip?.(msg) }
95}
96
97const queueSnapshot = () => ({
98 total: totalQueueSize(),
99 high: queues.high.length,
100 normal: queues.normal.length,
101 low: queues.low.length,
102 draining,
103 wsReady: Boolean(isTargetReady('ws')),
104 gossipReady: Boolean(isTargetReady('gossip'))
105})
106
107const notifyQueueListeners = () => {
108 const state = queueSnapshot()
109 queueListeners.forEach((listener) => {
110 try {
111 listener(state)
112 } catch (err) {
113 console.warn('queue status listener failed', err)
114 }
115 })
116}
117
118const removeItem = (item, lane, index) => {
119 if (item.key) { pending.delete(item.key) }
120 queues[lane].splice(index, 1)
121 notifyQueueListeners()
122}
123
124const pickHashTarget = (item) => {
125 const wsReady = isTargetReady('ws')
126 const gossipReady = isTargetReady('gossip')
127 if (!item.sent.ws && !item.sent.gossip) {
128 const preferred = nextHashTarget
129 const preferredReady = preferred === 'ws' ? wsReady : gossipReady
130 if (preferredReady) { return preferred }
131 const fallback = flipTarget(preferred)
132 const fallbackReady = fallback === 'ws' ? wsReady : gossipReady
133 if (fallbackReady) { return fallback }
134 return null
135 }
136 if (item.sent.ws && item.sent.gossip) { return null }
137 const firstTarget = item.firstTarget
138 if (!firstTarget) { return null }
139 const otherTarget = flipTarget(firstTarget)
140 const otherReady = otherTarget === 'ws' ? wsReady : gossipReady
141 if (!item.sent[otherTarget] && otherReady && Date.now() - item.sentAt[firstTarget] >= HASH_RETRY_MS) {
142 return otherTarget
143 }
144 return null
145}
146
147const trySendItem = (item, lane, index) => {
148 if (item.kind === 'hash') {
149 const target = pickHashTarget(item)
150 if (!target) { return false }
151 sendToTarget(target, item.msg)
152 item.sent[target] = true
153 item.sentAt[target] = Date.now()
154 if (!item.firstTarget) {
155 item.firstTarget = target
156 nextHashTarget = flipTarget(target)
157 }
158 if (item.sent.ws && item.sent.gossip) {
159 removeItem(item, lane, index)
160 }
161 return true
162 }
163
164 const wsReady = !item.sent.ws && isTargetReady('ws')
165 const gossipReady = !item.sent.gossip && isTargetReady('gossip')
166 if (!wsReady && !gossipReady) { return false }
167 if (wsReady) {
168 sendToTarget('ws', item.msg)
169 item.sent.ws = true
170 }
171 if (gossipReady) {
172 sendToTarget('gossip', item.msg)
173 item.sent.gossip = true
174 }
175 if (item.sent.ws && item.sent.gossip) {
176 removeItem(item, lane, index)
177 }
178 return true
179}
180
181const drainQueue = () => {
182 drainTimer = null
183 if (draining) {
184 drainTimer = setTimeout(drainQueue, SEND_DELAY_MS)
185 return
186 }
187 draining = true
188 try {
189 let sent = false
190 for (const lane of PRIORITY_ORDER) {
191 const queue = queues[lane]
192 for (let i = 0; i < queue.length; i += 1) {
193 if (trySendItem(queue[i], lane, i)) {
194 sent = true
195 break
196 }
197 }
198 if (sent) { break }
199 }
200 } finally {
201 draining = false
202 }
203 if (totalQueueSize() > 0) {
204 drainTimer = setTimeout(drainQueue, SEND_DELAY_MS)
205 }
206 notifyQueueListeners()
207}
208
209export const queueSend = (msg, options = {}) => {
210 const priority = normalizePriority(options.priority)
211 const key = getKey(msg)
212 if (key && pending.has(key)) {
213 const existing = pending.get(key)
214 promoteItem(existing, priority)
215 if (!drainTimer) { drainTimer = setTimeout(drainQueue, 0) }
216 notifyQueueListeners()
217 return false
218 }
219 if (isHash(msg)) {
220 const now = Date.now()
221 const last = hashCooldown.get(msg) || 0
222 if (now - last < HASH_QUEUE_COOLDOWN_MS) { return false }
223 hashCooldown.set(msg, now)
224 }
225
226 const item = {
227 msg,
228 key,
229 priority,
230 kind: isHash(msg) ? 'hash' : 'blob',
231 sent: { ws: false, gossip: false },
232 sentAt: { ws: 0, gossip: 0 },
233 firstTarget: null
234 }
235 queues[priority].push(item)
236 if (key) { pending.set(key, item) }
237 trimOverflow()
238 if (!drainTimer) { drainTimer = setTimeout(drainQueue, 0) }
239 notifyQueueListeners()
240 return true
241}
242
243export const noteReceived = (msg) => {
244 const key = getKey(msg)
245 if (!key) { return }
246 const item = pending.get(key)
247 if (!item) { return }
248 pending.delete(key)
249 removeFromQueue(queues[item.priority], item)
250 notifyQueueListeners()
251}
252
253export const getQueueSize = () => totalQueueSize()
254export const getQueueStatusSnapshot = () => queueSnapshot()
255export const subscribeQueueStatus = (listener) => {
256 queueListeners.add(listener)
257 listener(queueSnapshot())
258 return () => {
259 queueListeners.delete(listener)
260 }
261}
262
263export const clearQueue = () => {
264 queues.high.length = 0
265 queues.normal.length = 0
266 queues.low.length = 0
267 pending.clear()
268 hashCooldown.clear()
269 if (drainTimer) {
270 clearTimeout(drainTimer)
271 drainTimer = null
272 }
273 draining = false
274 notifyQueueListeners()
275}