podcast manager

simple reconnect loop

+119 -26
+28 -6
src/common/async/sleep.ts
··· 3 3 * @param signal - an aptional abort signal, to cancel the sleep 4 4 * @returns a promise that resolves after given amount of time, and is interruptable with an abort signal. 5 5 */ 6 - export function sleep(ms: number, signal?: AbortSignal): Promise<void> { 6 + export async function sleep(ms: number, signal?: AbortSignal): Promise<void> { 7 7 signal?.throwIfAborted() 8 - 9 - // not sure why this error is coming up 10 8 11 9 const {resolve, reject, promise} = Promise.withResolvers<void>() 12 10 const timeout = setTimeout(resolve, ms) 13 11 12 + // without the signal, we can't cancel it, so just let the timeout resolve 14 13 if (!signal) return promise 15 14 16 15 const abortHandler = () => { ··· 18 17 reject(signal.reason) 19 18 } 20 19 21 - signal.addEventListener('abort', abortHandler) 22 - return promise.finally(() => { 20 + try { 21 + signal.addEventListener('abort', abortHandler) 22 + await promise 23 + return 24 + } finally { 23 25 signal.removeEventListener('abort', abortHandler) 24 - }) 26 + } 27 + } 28 + 29 + export function backoff(options?: {maxAttempts?: number; baseDelay?: number; maxDelay?: number}) { 30 + const maxAttempts = options?.maxAttempts ?? 10 31 + const baseDelay = options?.baseDelay ?? 1000 32 + const maxDelay = options?.maxDelay ?? 30_000 33 + 34 + let attempts = 0 35 + const nextDelay = () => { 36 + return attempts === 0 37 + ? attempts++ // immediate at 0 38 + : Math.min(baseDelay * Math.pow(2, attempts++ - 1), maxDelay) 39 + } 40 + 41 + return async (signal?: AbortSignal) => { 42 + signal?.throwIfAborted() 43 + if (attempts > maxAttempts) throw new Error('exceeded max attempts!') 44 + 45 + await sleep(nextDelay(), signal) 46 + } 25 47 }
+89 -17
src/realm/client/service-connection.ts
··· 3 3 import {z} from 'zod/v4' 4 4 5 5 import {timeoutSignal} from '#common/async/aborts' 6 - import {sleep} from '#common/async/sleep' 6 + import {backoff, sleep} from '#common/async/sleep' 7 7 import {generateSignableJwt, jwkImport} from '#common/crypto/jwks' 8 8 import {jwtPayload, verifyJwtToken} from '#common/crypto/jwts' 9 9 import {normalizeError, normalizeProtocolError, ProtocolError} from '#common/errors' ··· 34 34 35 35 /** manages websocket and webrtc connections for a realm */ 36 36 export class RealmConnection extends EventTarget { 37 + #url: string 37 38 #socket: WebSocket 38 39 #connectopts: ConnectionOptions 40 + #reconnecting = false 39 41 #abort: AbortController 40 42 41 43 #identities: Map<IdentID, CryptoKey> ··· 51 53 52 54 constructor(url: string, db: Database, identity: RealmIdentity, options: ConnectionOptions) { 53 55 super() 56 + this.#url = url 54 57 this.#abort = new AbortController() 55 58 this.#identity = identity 56 59 this.#connectopts = options ··· 131 134 132 135 destroy() { 133 136 console.debug('realm connection destroy!') 134 - if (this.connected) { 135 - this.#socket.close() 136 - } 137 + this.#reconnecting = false 138 + 139 + // shutdown the loops 140 + this.#abort.abort() 137 141 138 - // disconnect from peers 142 + // close ports 143 + this.#closeWebSocket() 139 144 for (const peer of this.#peers.values()) { 140 145 peer.destroy() 141 146 } 142 147 143 - // shutdown loops 144 - this.#abort.abort() 145 - 148 + // empty state 146 149 this.#peers.clear() 147 150 this.#nonces.clear() 148 151 } 149 152 153 + #closeWebSocket() { 154 + if (this.connected) { 155 + this.#socket.close() 156 + } 157 + } 158 + 159 + async #attemptReconnect() { 160 + if (this.#reconnecting || this.#abort.signal.aborted) return 161 + 162 + // backoff sleeps for exponentially longer amounts of time 163 + const gate = backoff({maxAttempts: 100_000}) 164 + try { 165 + this.#reconnecting = true 166 + while (true) { 167 + await gate(this.#abort.signal) 168 + 169 + console.debug('attempting WebSocket reconnection') 170 + this.#dispatchCustomEvent('wsreconnect.start') 171 + 172 + try { 173 + this.#socket = new WebSocket(this.#url) 174 + this.#socket.onopen = this.#handleSocketOpen 175 + this.#socket.onclose = this.#handleSocketClose 176 + this.#socket.onerror = this.#handleSocketError 177 + 178 + // wait a bit to see if connection succeeds 179 + await sleep(500) 180 + 181 + if (this.connected) { 182 + console.debug('WebSocket reconnection successful') 183 + this.#reconnecting = false 184 + return 185 + } 186 + } catch (err: unknown) { 187 + console.warn('WebSocket reconnection attempt failed:', err) 188 + this.#dispatchCustomEvent('wsreconnect.failed') 189 + } 190 + } 191 + } catch (err: unknown) { 192 + // why is typescript saying signal.aborted: false, not boolean? 193 + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition 194 + if (this.#abort.signal.aborted) { 195 + console.warn('WebSocket reconnection aborted') 196 + this.#dispatchCustomEvent('wsreconnect.abandoned') 197 + } else { 198 + console.error('WebSocket reconnection failed after max attempts', err) 199 + this.#dispatchCustomEvent('wsreconnect.abandoned') 200 + } 201 + 202 + this.#reconnecting = false 203 + } 204 + } 205 + 150 206 #dispatchCustomEvent(type: string, detail?: object) { 151 207 this.dispatchEvent(new CustomEvent(type, {bubbles: true, detail})) 152 208 } ··· 178 234 for await (const data of streamSocketJson(this.#socket)) { 179 235 await this.#handleOpenMessage(data) 180 236 } 181 - } catch (exc) { 237 + } catch (exc: unknown) { 182 238 const err = normalizeProtocolError(exc) 183 - 184 239 console.error('realm connection, socket loop error', err) 185 240 this.#dispatchCustomEvent('wserror', {error: err}) 186 241 } finally { 187 - this.destroy() 242 + this.#closeWebSocket() 243 + this.#attemptReconnect().catch((err: unknown) => { 244 + console.error('realm connection, reconnect loop error:', err) 245 + this.#dispatchCustomEvent('wserror', {error: err}) 246 + }) 188 247 } 189 248 } 190 249 ··· 335 394 } 336 395 } 337 396 338 - #handleSocketError: WebSocket['onerror'] = (exc) => { 339 - this.#dispatchCustomEvent('wserror', {error: normalizeProtocolError(exc)}) 340 - this.destroy() 397 + #handleSocketError: WebSocket['onerror'] = (exc: unknown) => { 398 + const error = normalizeProtocolError(exc) 399 + 400 + console.error('socket error', error) 401 + this.#dispatchCustomEvent('wserror', {error}) 341 402 } 342 403 343 404 #handleSocketClose: WebSocket['onclose'] = () => { 344 405 this.#dispatchCustomEvent('wsclose') 345 - this.destroy() 406 + 407 + if (!this.#reconnecting && !this.#abort.signal.aborted) { 408 + this.#attemptReconnect().catch((exc: unknown) => { 409 + const error = normalizeProtocolError(exc) 410 + 411 + console.error('socket error', error) 412 + this.#dispatchCustomEvent('wserror', {error}) 413 + }) 414 + } 346 415 } 347 416 417 + // loop that handles _realm_ level socket comms 348 418 #socketLoop = async () => { 349 419 this.#abort.signal.throwIfAborted() 350 420 351 - await sleep(250) 421 + // why does typescript say #abort.signal: false, not : boolean? 422 + 423 + await sleep(250, this.#abort.signal) 352 424 await this.#pingSocket() 353 425 354 426 while (!this.#abort.signal.aborted) { 355 - await sleep(30_000) 427 + await sleep(30_000, this.#abort.signal) 356 428 await this.#pingSocket() 357 429 } 358 430 }
+2 -3
src/realm/server/state-storage.ts
··· 226 226 try { 227 227 const clockKey = `${CLOCK_KEY_PREFIX}${actor}` 228 228 return (await this.#db.get(clockKey)) as LCTimestamp 229 - } catch (err) { 230 - // Key doesn't exist 231 - return null 229 + } catch { 230 + return null // key doesn't exist 232 231 } 233 232 } 234 233