a demonstration replicated social networking web app built with anproto wiredove.net/
social ed25519 protocols
at master 275 lines 7.3 kB view raw
1const SEND_DELAY_MS = 100 2const HASH_RETRY_MS = 800 3const HASH_QUEUE_COOLDOWN_MS = 30000 4const MAX_QUEUE_ITEMS = 1800 5 6const PRIORITY_ORDER = ['high', 'normal', 'low'] 7const PRIORITY_RANK = { high: 3, normal: 2, low: 1 } 8 9const queues = { 10 high: [], 11 normal: [], 12 low: [] 13} 14 15const pending = new Map() 16const hashCooldown = new Map() 17let drainTimer = null 18let draining = false 19let nextHashTarget = 'ws' 20 21const senders = { 22 ws: null, 23 hasWs: null, 24 gossip: null, 25 hasGossip: null 26} 27const queueListeners = new Set() 28 29const normalizePriority = (priority) => { 30 if (priority === 'high' || priority === 'normal' || priority === 'low') { return priority } 31 return 'normal' 32} 33 34const totalQueueSize = () => queues.high.length + queues.normal.length + queues.low.length 35 36const getKey = (msg) => (typeof msg === 'string' ? msg : null) 37 38const isHash = (msg) => typeof msg === 'string' && msg.length === 44 39 40const flipTarget = (target) => (target === 'ws' ? 'gossip' : 'ws') 41 42const removeFromQueue = (queue, item) => { 43 const idx = queue.indexOf(item) 44 if (idx >= 0) { 45 queue.splice(idx, 1) 46 return true 47 } 48 return false 49} 50 51const promoteItem = (item, nextPriority) => { 52 const current = item.priority 53 if (PRIORITY_RANK[nextPriority] <= PRIORITY_RANK[current]) { return } 54 removeFromQueue(queues[current], item) 55 item.priority = nextPriority 56 queues[nextPriority].push(item) 57} 58 59const trimOverflow = () => { 60 while (totalQueueSize() > MAX_QUEUE_ITEMS) { 61 const low = queues.low.shift() 62 if (low) { 63 if (low.key) { pending.delete(low.key) } 64 continue 65 } 66 const normal = queues.normal.shift() 67 if (normal) { 68 if (normal.key) { pending.delete(normal.key) } 69 continue 70 } 71 // Never drop high-priority traffic automatically unless the queue is only high. 72 const high = queues.high.shift() 73 if (!high) { break } 74 if (high.key) { pending.delete(high.key) } 75 } 76} 77 78export const registerNetworkSenders = (config = {}) => { 79 if (config.sendWs) { senders.ws = config.sendWs } 80 if (config.hasWs) { senders.hasWs = config.hasWs } 81 if (config.sendGossip) { senders.gossip = config.sendGossip } 82 if (config.hasGossip) { senders.hasGossip = config.hasGossip } 83 notifyQueueListeners() 84} 85 86const isTargetReady = (target) => { 87 if (target === 'ws') { return senders.hasWs?.() } 88 if (target === 'gossip') { return senders.hasGossip?.() } 89 return false 90} 91 92const sendToTarget = (target, msg) => { 93 if (target === 'ws') { senders.ws?.(msg) } 94 if (target === 'gossip') { senders.gossip?.(msg) } 95} 96 97const queueSnapshot = () => ({ 98 total: totalQueueSize(), 99 high: queues.high.length, 100 normal: queues.normal.length, 101 low: queues.low.length, 102 draining, 103 wsReady: Boolean(isTargetReady('ws')), 104 gossipReady: Boolean(isTargetReady('gossip')) 105}) 106 107const notifyQueueListeners = () => { 108 const state = queueSnapshot() 109 queueListeners.forEach((listener) => { 110 try { 111 listener(state) 112 } catch (err) { 113 console.warn('queue status listener failed', err) 114 } 115 }) 116} 117 118const removeItem = (item, lane, index) => { 119 if (item.key) { pending.delete(item.key) } 120 queues[lane].splice(index, 1) 121 notifyQueueListeners() 122} 123 124const pickHashTarget = (item) => { 125 const wsReady = isTargetReady('ws') 126 const gossipReady = isTargetReady('gossip') 127 if (!item.sent.ws && !item.sent.gossip) { 128 const preferred = nextHashTarget 129 const preferredReady = preferred === 'ws' ? wsReady : gossipReady 130 if (preferredReady) { return preferred } 131 const fallback = flipTarget(preferred) 132 const fallbackReady = fallback === 'ws' ? wsReady : gossipReady 133 if (fallbackReady) { return fallback } 134 return null 135 } 136 if (item.sent.ws && item.sent.gossip) { return null } 137 const firstTarget = item.firstTarget 138 if (!firstTarget) { return null } 139 const otherTarget = flipTarget(firstTarget) 140 const otherReady = otherTarget === 'ws' ? wsReady : gossipReady 141 if (!item.sent[otherTarget] && otherReady && Date.now() - item.sentAt[firstTarget] >= HASH_RETRY_MS) { 142 return otherTarget 143 } 144 return null 145} 146 147const trySendItem = (item, lane, index) => { 148 if (item.kind === 'hash') { 149 const target = pickHashTarget(item) 150 if (!target) { return false } 151 sendToTarget(target, item.msg) 152 item.sent[target] = true 153 item.sentAt[target] = Date.now() 154 if (!item.firstTarget) { 155 item.firstTarget = target 156 nextHashTarget = flipTarget(target) 157 } 158 if (item.sent.ws && item.sent.gossip) { 159 removeItem(item, lane, index) 160 } 161 return true 162 } 163 164 const wsReady = !item.sent.ws && isTargetReady('ws') 165 const gossipReady = !item.sent.gossip && isTargetReady('gossip') 166 if (!wsReady && !gossipReady) { return false } 167 if (wsReady) { 168 sendToTarget('ws', item.msg) 169 item.sent.ws = true 170 } 171 if (gossipReady) { 172 sendToTarget('gossip', item.msg) 173 item.sent.gossip = true 174 } 175 if (item.sent.ws && item.sent.gossip) { 176 removeItem(item, lane, index) 177 } 178 return true 179} 180 181const drainQueue = () => { 182 drainTimer = null 183 if (draining) { 184 drainTimer = setTimeout(drainQueue, SEND_DELAY_MS) 185 return 186 } 187 draining = true 188 try { 189 let sent = false 190 for (const lane of PRIORITY_ORDER) { 191 const queue = queues[lane] 192 for (let i = 0; i < queue.length; i += 1) { 193 if (trySendItem(queue[i], lane, i)) { 194 sent = true 195 break 196 } 197 } 198 if (sent) { break } 199 } 200 } finally { 201 draining = false 202 } 203 if (totalQueueSize() > 0) { 204 drainTimer = setTimeout(drainQueue, SEND_DELAY_MS) 205 } 206 notifyQueueListeners() 207} 208 209export const queueSend = (msg, options = {}) => { 210 const priority = normalizePriority(options.priority) 211 const key = getKey(msg) 212 if (key && pending.has(key)) { 213 const existing = pending.get(key) 214 promoteItem(existing, priority) 215 if (!drainTimer) { drainTimer = setTimeout(drainQueue, 0) } 216 notifyQueueListeners() 217 return false 218 } 219 if (isHash(msg)) { 220 const now = Date.now() 221 const last = hashCooldown.get(msg) || 0 222 if (now - last < HASH_QUEUE_COOLDOWN_MS) { return false } 223 hashCooldown.set(msg, now) 224 } 225 226 const item = { 227 msg, 228 key, 229 priority, 230 kind: isHash(msg) ? 'hash' : 'blob', 231 sent: { ws: false, gossip: false }, 232 sentAt: { ws: 0, gossip: 0 }, 233 firstTarget: null 234 } 235 queues[priority].push(item) 236 if (key) { pending.set(key, item) } 237 trimOverflow() 238 if (!drainTimer) { drainTimer = setTimeout(drainQueue, 0) } 239 notifyQueueListeners() 240 return true 241} 242 243export const noteReceived = (msg) => { 244 const key = getKey(msg) 245 if (!key) { return } 246 const item = pending.get(key) 247 if (!item) { return } 248 pending.delete(key) 249 removeFromQueue(queues[item.priority], item) 250 notifyQueueListeners() 251} 252 253export const getQueueSize = () => totalQueueSize() 254export const getQueueStatusSnapshot = () => queueSnapshot() 255export const subscribeQueueStatus = (listener) => { 256 queueListeners.add(listener) 257 listener(queueSnapshot()) 258 return () => { 259 queueListeners.delete(listener) 260 } 261} 262 263export const clearQueue = () => { 264 queues.high.length = 0 265 queues.normal.length = 0 266 queues.low.length = 0 267 pending.clear() 268 hashCooldown.clear() 269 if (drainTimer) { 270 clearTimeout(drainTimer) 271 drainTimer = null 272 } 273 draining = false 274 notifyQueueListeners() 275}