podcast manager

back to where I was with the deno version

+548 -80
+3
src/client/page-app.jsx
··· 1 1 import './page-app.css' 2 2 3 + /** 4 + * @returns {preact.JSX.Element} app wrapper component 5 + */ 3 6 export function App() { 4 7 return ( 5 8 <>
+5 -7
src/common.js src/types.js
··· 1 - /** 2 - * useful types and utilities that don't belong elsewhere. 3 - * 4 - * @module common 5 - */ 1 + /** @module common */ 6 2 7 3 /** 4 + * A callback function, with arbitrary arguments; use {Parameters} to extract them. 5 + * 8 6 * @typedef {function(...*): void} Callback 9 - * A callback function, with arbitrary arguments; use {Parameters} to extract them. 10 7 */ 11 8 12 9 /** 10 + * A callback function with no arguments. 11 + * 13 12 * @typedef {function(): void} VoidCallback 14 - * A callback function with no arguments. 15 13 */
+4 -1
src/common/async/aborts.js
··· 32 32 * Create an abort signal that aborts if any of the passed signals aborts. 33 33 * Better than managing them manually because we do proper cleanup of non-triggered aborts. 34 34 * 35 - * @param {...AbortSignal} signals - the signals to combine 35 + * @param {...(AbortSignal | undefined)} signals - the signals to combine 36 36 * @returns {AbortSignal} the combined signal 37 37 */ 38 38 export function combineSignals(...signals) { ··· 41 41 const cleanups = [] 42 42 43 43 for (const signal of signals) { 44 + if (!signal) 45 + continue 46 + 44 47 if (signal.aborted) { 45 48 controller.abort(signal.reason) 46 49 return controller.signal
-1
src/common/async/blocking-atom.js
··· 47 47 } 48 48 49 49 signal?.throwIfAborted() 50 - return undefined 51 50 } 52 51 53 52 }
+21
src/common/async/blocking-queue.js
··· 25 25 this.#maxsize = maxsize ? maxsize : undefined 26 26 } 27 27 28 + /** @returns {number} how deep is the queue? */ 29 + get depth() { 30 + return this.#items.length 31 + } 32 + 33 + /** 34 + * place one or more items on the queue, to be picked up by awaiters. 35 + * 36 + * @param {...T} elements the items to place on the queue. 37 + */ 38 + prequeue(...elements) { 39 + for (const el of elements.reverse()) { 40 + if (this.#maxsize && this.#items.length >= this.#maxsize) { 41 + throw Error('out of room') 42 + } 43 + 44 + this.#items.unshift(el) 45 + this.#sema.free() 46 + } 47 + } 48 + 28 49 /** 29 50 * place one or more items on the queue, to be picked up by awaiters. 30 51 *
src/common/async/gate.js src/common/breaker.js
+147
src/common/crypto/jwks.js
··· 1 + /** @module common/crypto */ 2 + 3 + import * as jose from 'jose' 4 + import { z } from 'zod/v4' 5 + 6 + const signAlgo = { name: 'ES256' } 7 + 8 + const jwkBaseSchema = z.object({ 9 + 'alg': z.string().optional(), 10 + 'ext': z.boolean().optional(), 11 + 'key_ops': z.array(z.string()).optional(), 12 + 'kid': z.string().optional(), 13 + 'use': z.string().optional(), 14 + 'x5c': z.array(z.string()).optional(), 15 + 'x5t#S256': z.string().optional(), 16 + 'x5t': z.string().optional(), 17 + 'x5u': z.string().optional(), 18 + }) 19 + 20 + const jwkOkpPublicSchema = z.object({ 21 + ...jwkBaseSchema.shape, 22 + crv: z.string(), 23 + x: z.string(), 24 + }) 25 + 26 + const jwkOkpPrivateSchema = z.object({ 27 + ...jwkOkpPublicSchema.shape, 28 + d: z.string(), 29 + }) 30 + 31 + const jwkEcPublicSchema = z.object({ 32 + ...jwkBaseSchema.shape, 33 + crv: z.string(), 34 + x: z.string(), 35 + y: z.string(), 36 + }) 37 + 38 + const jwkEcPrivateSchema = z.object({ 39 + ...jwkEcPublicSchema.shape, 40 + d: z.string(), 41 + }) 42 + 43 + const jwkRSAPublicSchema = z.object({ 44 + ...jwkBaseSchema.shape, 45 + e: z.string(), 46 + n: z.string(), 47 + }) 48 + 49 + const jwkRSAPrivateSchema = z.object({ 50 + ...jwkRSAPublicSchema.shape, 51 + d: z.string(), 52 + dp: z.string(), 53 + qp: z.string(), 54 + p: z.string(), 55 + q: z.string(), 56 + qi: z.string(), 57 + }) 58 + 59 + const jwkOctSchema = z.object({ 60 + ...jwkBaseSchema.shape, 61 + k: z.string(), 62 + }) 63 + 64 + /** 65 + * a zod schema describing a JWK from jose 66 + * EC, OKP, RSA and oct key types are supported 67 + * 68 + * @see https://www.rfc-editor.org/rfc/rfc7517 69 + * @see https://github.com/panva/jose/blob/main/src/types.d.ts#L2 70 + * @type {z.ZodType<jose.JWK>} 71 + */ 72 + export const jwkSchema = z.union([ 73 + jwkBaseSchema, 74 + jwkOkpPublicSchema, 75 + jwkOkpPrivateSchema, 76 + jwkEcPublicSchema, 77 + jwkEcPrivateSchema, 78 + jwkRSAPublicSchema, 79 + jwkRSAPrivateSchema, 80 + jwkOctSchema, 81 + ]) 82 + 83 + /** 84 + * zod transform from JWK to CryptoKey 85 + * 86 + * @type {z.ZodTransform<CryptoKey, jose.JWK>} 87 + */ 88 + export const jwkImport = z.transform(async (val, ctx) => { 89 + try { 90 + if (typeof val === 'object' && val !== null) { 91 + const key = await jose.importJWK(val, signAlgo.name) 92 + if (key instanceof CryptoKey) { 93 + return key 94 + } 95 + 96 + ctx.issues.push({ 97 + code: 'custom', 98 + message: 'symmetric keys unsupported', 99 + input: val, 100 + }) 101 + } 102 + else { 103 + ctx.issues.push({ 104 + code: 'custom', 105 + message: 'not a valid JWK object', 106 + input: val, 107 + }) 108 + } 109 + } 110 + catch (e) { 111 + ctx.issues.push({ 112 + code: 'custom', 113 + message: `could not import JWK object: ${e}`, 114 + input: val, 115 + }) 116 + } 117 + 118 + return z.NEVER 119 + }) 120 + 121 + /** 122 + * zod transform from exportable CryptoKey to JWK 123 + * 124 + * @type {z.ZodTransform<jose.JWK, CryptoKey>} 125 + */ 126 + export const jwkExport = z.transform(async (val, ctx) => { 127 + try { 128 + if (val.extractable) { 129 + return await jose.exportJWK(val) 130 + } 131 + 132 + ctx.issues.push({ 133 + code: 'custom', 134 + message: 'non-extractable key!', 135 + input: val, 136 + }) 137 + } 138 + catch (e) { 139 + ctx.issues.push({ 140 + code: 'custom', 141 + message: `could not export JWK object: ${e}`, 142 + input: val, 143 + }) 144 + } 145 + 146 + return z.NEVER 147 + })
+3 -71
src/common/crypto/signing.js
··· 3 3 import * as jose from 'jose' 4 4 import { z } from 'zod/v4' 5 5 6 - const signAlgo = { name: 'ES256' } 7 - 8 6 /** 9 - * @typedef JWTToken 7 + * @typedef {object} JWTToken 10 8 * @property {string} token the still-encoded JWT, for later verification 11 - * @augments jose.JWTPayload 9 + * @property {jose.JWTPayload} payload the decoded JWT payload, for later verification 12 10 * 13 11 * A JWTToken is both the decoded payload and the token itself, for later processing. 14 12 */ ··· 22 20 export const jwtSchema = z.jwt({ abort: true }).transform((token, ctx) => { 23 21 try { 24 22 const payload = jose.decodeJwt(token) 25 - return { ...payload, token } 23 + return { payload, token } 26 24 } 27 25 catch (e) { 28 26 ctx.issues.push({ ··· 33 31 34 32 return z.NEVER 35 33 } 36 - }) 37 - 38 - /** 39 - * schema describing a transform from JWK to CryptoKey 40 - * 41 - * @type {z.ZodTransform<CryptoKey, jose.JWK>} 42 - */ 43 - export const jwkImport = z.transform(async (val, ctx) => { 44 - try { 45 - if (typeof val === 'object' && val !== null) { 46 - const key = await jose.importJWK(val, signAlgo.name) 47 - if (key instanceof CryptoKey) { 48 - return key 49 - } 50 - 51 - ctx.issues.push({ 52 - code: 'custom', 53 - message: 'symmetric keys unsupported', 54 - input: val, 55 - }) 56 - } 57 - else { 58 - ctx.issues.push({ 59 - code: 'custom', 60 - message: 'not a valid JWK object', 61 - input: val, 62 - }) 63 - } 64 - } 65 - catch (e) { 66 - ctx.issues.push({ 67 - code: 'custom', 68 - message: `could not import JWK object: ${e}`, 69 - input: val, 70 - }) 71 - } 72 - 73 - return z.NEVER 74 - }) 75 - 76 - /** 77 - * schema describing a transform from exportable CryptoKey to JWK 78 - * 79 - * @type {z.ZodTransform<jose.JWK, CryptoKey>} 80 - */ 81 - export const jwkExport = z.transform(async (val, ctx) => { 82 - try { 83 - if (val.extractable) { 84 - return await jose.exportJWK(val) 85 - } 86 - 87 - ctx.issues.push({ 88 - code: 'custom', 89 - message: 'non-extractable key!', 90 - input: val, 91 - }) 92 - } 93 - catch (e) { 94 - ctx.issues.push({ 95 - code: 'custom', 96 - message: `could not export JWK object: ${e}`, 97 - input: val, 98 - }) 99 - } 100 - 101 - return z.NEVER 102 34 }) 103 35 104 36 /**
+71
src/common/errors.js
··· 1 + /** @module common */ 2 + 3 + import { prettifyError, ZodError } from 'zod/v4' 4 + 5 + /** 6 + * @private 7 + * @type {Record<number, string>} 8 + */ 9 + const StatusCodes = { 10 + 400: 'Bad Request', 11 + 403: 'Forbidden', 12 + 404: 'Not Found', 13 + 408: 'Request Timeout', 14 + 409: 'Conflict', 15 + 429: 'Too Many Requests', 16 + 499: 'Client Closed Request', 17 + 500: 'Internal Server Error', 18 + } 19 + 20 + /** 21 + * Common base class for Websocket Errors 22 + * 23 + * @augments Error 24 + */ 25 + export class ProtocolError extends Error { 26 + 27 + /** 28 + * @param {string} message a "details" message representing this error 29 + * @param {number} status the HTTP status code representing this error 30 + * @param {Error} [cause] a previous error we're wrapping 31 + */ 32 + constructor(message, status, cause) { 33 + const statusText = StatusCodes[status] || 'Unknown' 34 + super(`${status} ${statusText}: ${message}`) 35 + 36 + this.name = this.constructor.name 37 + this.status = status 38 + this.cause = cause 39 + } 40 + 41 + } 42 + 43 + /** 44 + * @param {Error} e the error to check 45 + * @param {number} [status] the protocol status to check, or any protocol error if undefined. 46 + * @returns {boolean} true if the given error is a protocol error with the given status. 47 + */ 48 + export function isProtocolError(e, status) { 49 + return (e instanceof ProtocolError && e.status == status) 50 + } 51 + 52 + /** 53 + * Normalizes the given error into a protocol error; passes through input that is already 54 + * protocol errors. 55 + * 56 + * @param {unknown} error some caught error 57 + * @returns {ProtocolError} an apporpriate protocol error 58 + */ 59 + export function normalizeProtocolError(error) { 60 + if (error instanceof ProtocolError) 61 + return error 62 + 63 + if (error instanceof ZodError) 64 + return new ProtocolError(prettifyError(error), 400, error) 65 + 66 + if (error instanceof Error) 67 + return new ProtocolError(error.message, 500, error) 68 + 69 + // fallback, unknown 70 + return new ProtocolError(`Error! ${error}`, 500) 71 + }
+46
src/common/protocol.js
··· 1 + /** @module common/protocol */ 2 + 3 + import { z } from 'zod/v4' 4 + import { jwkSchema } from '@/common/crypto/jwks.js' 5 + import { Brand } from '@/common/schema/brand.js' 6 + 7 + export const IdentID = new Brand('ident') 8 + export const RealmID = new Brand('realm') 9 + 10 + /** zod schema for `preauth.authn` message */ 11 + export const preauthAuthnMessageSchema = z.object({ 12 + msg: z.literal('preauth.authn'), 13 + pubkey: jwkSchema, 14 + }) 15 + 16 + /** zod schema for any `preauth` messages */ 17 + export const preauthMessageSchema = z.discriminatedUnion('msg', [ 18 + preauthAuthnMessageSchema, 19 + ]) 20 + 21 + //// 22 + 23 + /** zod schema for `realm.status` message */ 24 + export const realmStatusMessageSchema = z.object({ 25 + msg: z.literal('realm.status'), 26 + }) 27 + 28 + /** zod schema for `realm.status` response */ 29 + export const realmStatusResponseSchema = z.object({ 30 + msg: z.literal('realm.status'), 31 + realm: RealmID.schema, 32 + identities: z.array(IdentID.schema), 33 + }) 34 + 35 + /** zod schema for `realm.broadcast` message */ 36 + export const realmBroadcastMessageSchema = z.object({ 37 + msg: z.literal('realm.broadcast'), 38 + recipients: z.array(IdentID.schema), 39 + payload: z.any(), 40 + }) 41 + 42 + /** zod schema for any `realm` messages */ 43 + export const realmMessageSchema = z.discriminatedUnion('msg', [ 44 + realmStatusMessageSchema, 45 + realmBroadcastMessageSchema, 46 + ])
+69
src/common/schema/brand.js
··· 1 + /** @module common/schema */ 2 + 3 + import { nanoid } from 'nanoid' 4 + import { z } from 'zod/v4' 5 + 6 + /** 7 + * A brand creates identifiers that are typesafe by construction, 8 + * and shouldn't be able to be passed to the wrong resource type. 9 + */ 10 + export class Brand { 11 + 12 + /** 13 + * @private 14 + * @typedef {z.core.$ZodBranded<z.ZodString, symbol>} BrandSchema 15 + * @typedef {z.infer<BrandSchema>} BrandId 16 + */ 17 + 18 + /** @type {string} */ 19 + #prefix 20 + 21 + /** @type {number} */ 22 + #length 23 + 24 + /** @type {BrandSchema} */ 25 + #schema 26 + 27 + /** 28 + * @param {string} prefix string prefix for the identifier, eg. "ident", "acct") 29 + * @param {number} [length] the length of the random identifier part 30 + */ 31 + constructor(prefix, length = 16) { 32 + const brand = prefix.replace(/[.*+?^${}()|[\]\\]/g, '\\$&') // escape regex chars 33 + const pattern = new RegExp(`^${brand}-[A-Za-z0-9_-]{${length.toString()}}$`) 34 + 35 + this.#prefix = brand 36 + this.#length = length 37 + this.#schema = z.string().regex(pattern).brand(Symbol(brand)) 38 + } 39 + 40 + get schema() { 41 + return this.#schema 42 + } 43 + 44 + /** @returns {BrandId} a generated identifier */ 45 + generate() { 46 + const id = nanoid(this.#length) 47 + return this.#schema.parse(`${this.#prefix}-${id}`) 48 + } 49 + 50 + /** 51 + * @param {unknown} input the input to validate 52 + * @returns {BrandId} a valid identifier 53 + * @throws {z.ZodError} if the input could not be parsed 54 + */ 55 + parse(input) { 56 + return this.#schema.parse(input) 57 + } 58 + 59 + /** 60 + * @param {unknown} input the input to validate 61 + * @returns {boolean} true if a valid identifier, false otherwise 62 + */ 63 + validate(input) { 64 + return input != null 65 + && typeof input === 'string' 66 + && this.#schema.safeParse(input).success 67 + } 68 + 69 + }
+179
src/common/socket.js
··· 1 + /** @module common/socket */ 2 + 3 + import { combineSignals } from '@/common/async/aborts.js' 4 + import { BlockingAtom } from '@/common/async/blocking-atom.js' 5 + import { BlockingQueue } from '@/common/async/blocking-queue.js' 6 + import { Breaker } from '@/common/breaker.js' 7 + import { ProtocolError } from '@/common/errors.js' 8 + 9 + /** 10 + * Given a websocket, wait and take a single message off and return it. 11 + * 12 + * @example 13 + * const ws = new WebSocket("wss://example.com/stream") 14 + * const timeout = timeoutSignal(5000) 15 + * 16 + * try { 17 + * const msg = await takeSocket(ws, timeout.signal) 18 + * doWhatever(msg) 19 + * } finally { 20 + * timeout.cleanup() 21 + * if (ws.readyState !== ws.CLOSED) 22 + * ws.close(); 23 + * } 24 + * 25 + * @param {WebSocket} ws the socket to read 26 + * @param {AbortSignal} [signal] an abort signal to cancel the block 27 + * @returns {Promise<unknown>} the message off the socket 28 + */ 29 + export async function takeSocket(ws, signal) { 30 + signal?.throwIfAborted() 31 + 32 + const atom = new BlockingAtom() 33 + const breaker = new Breaker() 34 + 35 + const error = new AbortController() 36 + const multisignal = combineSignals(error.signal, signal) 37 + 38 + const onMessage = breaker.tripThen(m => atom.set(m.data)) 39 + const onError = breaker.tripThen(e => error.abort(e)) 40 + const onClose = breaker.tripThen(() => error.abort('closed')) 41 + 42 + try { 43 + ws.addEventListener('message', onMessage) 44 + ws.addEventListener('error', onError) 45 + ws.addEventListener('close', onClose) 46 + 47 + const data = await atom.get(multisignal) 48 + if (!data) { 49 + throw new ProtocolError('socket read aborted', 408, multisignal?.reason) 50 + } 51 + 52 + return data 53 + } 54 + finally { 55 + ws.removeEventListener('message', onMessage) 56 + ws.removeEventListener('error', onError) 57 + ws.removeEventListener('close', onClose) 58 + } 59 + } 60 + 61 + /** 62 + * stream configuration options 63 + * 64 + * @typedef ConfigProps 65 + * @property {number} maxDepth max depth of the queue, after which messages are dropped 66 + * @property {AbortSignal} [signal] an abort signal to cancel the block 67 + */ 68 + 69 + /** @type {ConfigProps} */ 70 + export const STREAM_CONFIG_DEFAULT = { maxDepth: 1000 } 71 + 72 + // symbols for iteration protocol 73 + const yield$ = Symbol('yield$') 74 + const error$ = Symbol('error$') 75 + const end$ = Symbol('end$') 76 + 77 + /** 78 + * Given a websocket, stream messages off the socket as an async generator. 79 + * 80 + * @example 81 + * const ws = new WebSocket("wss://example.com/stream") 82 + * const timeout = timeoutSignal(5000) 83 + * 84 + * try { 85 + * // signal will fire in 5s, so we'll take as many messages as we get until then 86 + * for await (const msg of streamSocket(ws, { signal: timeout.signal })) { 87 + * doWhatever(msg) 88 + * } 89 + * } finally { 90 + * timeout.cleanup() 91 + * if (ws.readyState !== ws.CLOSED) 92 + * ws.close(); 93 + * } 94 + * 95 + * 96 + * 97 + * @param {WebSocket} ws the socket to read 98 + * @param {Partial<ConfigProps>} [config_] stream configuration to merge into defaults 99 + * @yields {unknown} messages from the socket 100 + */ 101 + export async function* streamSocket(ws, config_) { 102 + const { signal, ...config } = { ...STREAM_CONFIG_DEFAULT, ...config_ } 103 + signal?.throwIfAborted() 104 + 105 + // await incoming messages without blocking 106 + /** @type {BlockingQueue<Array<*>>} */ 107 + const queue = new BlockingQueue(config.maxDepth) 108 + 109 + // if true, we're ignoring incoming messages until we drop the queue 110 + let inBackoffMode = false 111 + const backoffThresh = Math.floor(config.maxDepth * 0.9) 112 + 113 + // we don't want to keep processing after we've been closed 114 + const breaker = new Breaker() 115 + 116 + // define callback functions (need to be able to reference for removing them) 117 + 118 + /** @type {function(MessageEvent): void} */ 119 + const onMessage = breaker.untilTripped((m) => { 120 + if (inBackoffMode) { 121 + console.warn('ignoring incoming message due to backpressure protection!') 122 + return 123 + } 124 + 125 + queue.enqueue([yield$, m.data]) 126 + inBackoffMode = queue.depth > backoffThresh 127 + if (inBackoffMode) { 128 + console.warn('message stream will start dropping messages due to backpressure!') 129 + } 130 + }) 131 + 132 + /** @type {function(Event): void} */ 133 + // todo: why are we getting this on client shutdown instead of onClose? 134 + const onError = breaker.tripThen((e) => { 135 + if (e.message === 'Unexpected EOF') { 136 + queue.enqueue([end$]) 137 + } 138 + else { 139 + queue.enqueue([error$, e]) 140 + } 141 + }) 142 + 143 + /** @type {function(): void} */ 144 + const onClose = breaker.tripThen(() => { 145 + queue.enqueue([end$]) 146 + }) 147 + 148 + // finally get into our loop 149 + try { 150 + ws.addEventListener('message', onMessage) 151 + ws.addEventListener('error', onError) 152 + ws.addEventListener('close', onClose) 153 + 154 + while (true) { 155 + signal?.throwIfAborted() 156 + 157 + const [event, value] = await queue.dequeue(signal) 158 + if (queue.depth < backoffThresh) { 159 + console.log('message stream will stop dropping messages due to eased backpressure') 160 + inBackoffMode = false 161 + } 162 + 163 + switch (event) { 164 + case yield$: 165 + yield value 166 + continue 167 + case error$: 168 + throw value 169 + case end$: 170 + return 171 + } 172 + } 173 + } 174 + finally { 175 + ws.removeEventListener('message', onMessage) 176 + ws.removeEventListener('error', onError) 177 + ws.removeEventListener('close', onClose) 178 + } 179 + }