a minimal web framework for deno
at main 166 lines 4.0 kB view raw
1/** 2 * @module stream 3 * Utilities for Server-Sent Events (SSE), WebSockets, and generic streaming responses. 4 */ 5 6/** 7 * Copyright (c) 2025 adoravel 8 * SPDX-License-Identifier: LGPL-3.0-or-later 9 */ 10 11import { Context } from "./middleware.ts"; 12 13/** 14 * represents a message sent over server-sent events 15 */ 16export interface SSEMessage { 17 /** the event name */ 18 event?: string; 19 /** the data payload */ 20 data: string; 21 /** the event identifier */ 22 id?: string; 23 /** the reconnection time in milliseconds */ 24 retry?: number; 25} 26 27/** 28 * creates a `Response` that streams server-sent events 29 * @param source an async iterable or a function returning one 30 * @param init optional `ResponseInit` body. 31 */ 32export function sse( 33 source: AsyncIterable<SSEMessage> | (() => AsyncIterable<SSEMessage>), 34 init?: ResponseInit, 35): Response { 36 const encoder = new TextEncoder(); 37 const iterable = typeof source === "function" ? source() : source; 38 39 const stream = new ReadableStream({ 40 async start(controller) { 41 try { 42 for await (const message of iterable) { 43 let chunk = ""; 44 45 if (message.event) { 46 chunk += `event: ${message.event}\n`; 47 } 48 if (message.id) { 49 chunk += `id: ${message.id}\n`; 50 } 51 if (message.retry) { 52 chunk += `retry: ${message.retry}\n`; 53 } 54 55 const lines = message.data.split("\n"); 56 for (const line of lines) { 57 chunk += `data: ${line}\n`; 58 } 59 60 chunk += "\n"; // eoi 61 controller.enqueue(encoder.encode(chunk)); 62 } 63 controller.close(); 64 } catch (error) { 65 controller.error(error); 66 } 67 }, 68 }); 69 70 return new Response(stream, { 71 ...init, 72 headers: { 73 "Content-Type": "text/event-stream", 74 "Cache-Control": "no-cache", 75 "Connection": "keep-alive", 76 ...init?.headers, 77 }, 78 }); 79} 80 81/** 82 * handlers for WebSocket lifecycle events 83 */ 84export interface WebSocketHandler { 85 onOpen?: (ws: WebSocket) => void | Promise<void>; 86 onMessage?: (ws: WebSocket, event: MessageEvent) => void | Promise<void>; 87 onClose?: (ws: WebSocket, event: CloseEvent) => void | Promise<void>; 88 onError?: (ws: WebSocket, event: Event | ErrorEvent) => void | Promise<void>; 89} 90 91/** 92 * upgrades an incoming HTTP connection to a WebSocket connection 93 * @param ctx the request context 94 * @param handler the WebSocket event handlers 95 */ 96export function upgradeWebSocket( 97 ctx: Context, 98 handler: WebSocketHandler, 99): Response { 100 const upgrade = ctx.request.headers.get("upgrade")?.toLowerCase(); 101 102 if (upgrade !== "websocket") { 103 return new Response("expected websocket upgrade", { 104 status: 426, 105 headers: { 106 "Upgrade": "websocket", 107 }, 108 }); 109 } 110 111 const { socket, response } = Deno.upgradeWebSocket(ctx.request); 112 113 if (handler.onOpen) { 114 socket.addEventListener("open", () => { 115 handler.onOpen!(socket); 116 }); 117 } 118 119 if (handler.onMessage) { 120 socket.addEventListener("message", (event) => { 121 handler.onMessage!(socket, event); 122 }); 123 } 124 125 if (handler.onClose) { 126 socket.addEventListener("close", (event) => { 127 handler.onClose!(socket, event); 128 }); 129 } 130 131 if (handler.onError) { 132 socket.addEventListener("error", (event) => { 133 handler.onError!(socket, event); 134 }); 135 } 136 return response; 137} 138 139/** 140 * creates a streaming Response from an async iterable of data 141 * @param source an async iterable of strings or Uint8Arrays 142 * @param init optional `ResponseInit` body 143 */ 144export function stream( 145 source: AsyncIterable<string | Uint8Array> | (() => AsyncIterable<string | Uint8Array>), 146 init?: ResponseInit, 147): Response { 148 const encoder = new TextEncoder(); 149 const iterable = typeof source === "function" ? source() : source; 150 151 const readable = new ReadableStream({ 152 async start(controller) { 153 try { 154 for await (const chunk of iterable) { 155 const data = typeof chunk === "string" ? encoder.encode(chunk) : chunk; 156 controller.enqueue(data); 157 } 158 controller.close(); 159 } catch (error) { 160 controller.error(error); 161 } 162 }, 163 }); 164 165 return new Response(readable, init); 166}