a minimal web framework for deno
1/**
2 * @module stream
3 * Utilities for Server-Sent Events (SSE), WebSockets, and generic streaming responses.
4 */
5
6/**
7 * Copyright (c) 2025 adoravel
8 * SPDX-License-Identifier: LGPL-3.0-or-later
9 */
10
11import { Context } from "./middleware.ts";
12
13/**
14 * represents a message sent over server-sent events
15 */
16export interface SSEMessage {
17 /** the event name */
18 event?: string;
19 /** the data payload */
20 data: string;
21 /** the event identifier */
22 id?: string;
23 /** the reconnection time in milliseconds */
24 retry?: number;
25}
26
27/**
28 * creates a `Response` that streams server-sent events
29 * @param source an async iterable or a function returning one
30 * @param init optional `ResponseInit` body.
31 */
32export function sse(
33 source: AsyncIterable<SSEMessage> | (() => AsyncIterable<SSEMessage>),
34 init?: ResponseInit,
35): Response {
36 const encoder = new TextEncoder();
37 const iterable = typeof source === "function" ? source() : source;
38
39 const stream = new ReadableStream({
40 async start(controller) {
41 try {
42 for await (const message of iterable) {
43 let chunk = "";
44
45 if (message.event) {
46 chunk += `event: ${message.event}\n`;
47 }
48 if (message.id) {
49 chunk += `id: ${message.id}\n`;
50 }
51 if (message.retry) {
52 chunk += `retry: ${message.retry}\n`;
53 }
54
55 const lines = message.data.split("\n");
56 for (const line of lines) {
57 chunk += `data: ${line}\n`;
58 }
59
60 chunk += "\n"; // eoi
61 controller.enqueue(encoder.encode(chunk));
62 }
63 controller.close();
64 } catch (error) {
65 controller.error(error);
66 }
67 },
68 });
69
70 return new Response(stream, {
71 ...init,
72 headers: {
73 "Content-Type": "text/event-stream",
74 "Cache-Control": "no-cache",
75 "Connection": "keep-alive",
76 ...init?.headers,
77 },
78 });
79}
80
81/**
82 * handlers for WebSocket lifecycle events
83 */
84export interface WebSocketHandler {
85 onOpen?: (ws: WebSocket) => void | Promise<void>;
86 onMessage?: (ws: WebSocket, event: MessageEvent) => void | Promise<void>;
87 onClose?: (ws: WebSocket, event: CloseEvent) => void | Promise<void>;
88 onError?: (ws: WebSocket, event: Event | ErrorEvent) => void | Promise<void>;
89}
90
91/**
92 * upgrades an incoming HTTP connection to a WebSocket connection
93 * @param ctx the request context
94 * @param handler the WebSocket event handlers
95 */
96export function upgradeWebSocket(
97 ctx: Context,
98 handler: WebSocketHandler,
99): Response {
100 const upgrade = ctx.request.headers.get("upgrade")?.toLowerCase();
101
102 if (upgrade !== "websocket") {
103 return new Response("expected websocket upgrade", {
104 status: 426,
105 headers: {
106 "Upgrade": "websocket",
107 },
108 });
109 }
110
111 const { socket, response } = Deno.upgradeWebSocket(ctx.request);
112
113 if (handler.onOpen) {
114 socket.addEventListener("open", () => {
115 handler.onOpen!(socket);
116 });
117 }
118
119 if (handler.onMessage) {
120 socket.addEventListener("message", (event) => {
121 handler.onMessage!(socket, event);
122 });
123 }
124
125 if (handler.onClose) {
126 socket.addEventListener("close", (event) => {
127 handler.onClose!(socket, event);
128 });
129 }
130
131 if (handler.onError) {
132 socket.addEventListener("error", (event) => {
133 handler.onError!(socket, event);
134 });
135 }
136 return response;
137}
138
139/**
140 * creates a streaming Response from an async iterable of data
141 * @param source an async iterable of strings or Uint8Arrays
142 * @param init optional `ResponseInit` body
143 */
144export function stream(
145 source: AsyncIterable<string | Uint8Array> | (() => AsyncIterable<string | Uint8Array>),
146 init?: ResponseInit,
147): Response {
148 const encoder = new TextEncoder();
149 const iterable = typeof source === "function" ? source() : source;
150
151 const readable = new ReadableStream({
152 async start(controller) {
153 try {
154 for await (const chunk of iterable) {
155 const data = typeof chunk === "string" ? encoder.encode(chunk) : chunk;
156 controller.enqueue(data);
157 }
158 controller.close();
159 } catch (error) {
160 controller.error(error);
161 }
162 },
163 });
164
165 return new Response(readable, init);
166}