A tool for people curious about the React Server Components protocol rscexplorer.dev/
rsc react

don't wait for stream to complete to display ui

reworks how we handle streams so that they're actually streamy

fixes https://github.com/gaearon/rscexplorer/issues/7

+181 -80
+59 -53
src/client/runtime/steppable-stream.ts
··· 5 5 6 6 export type CallServerCallback = ImportedCallServerCallback; 7 7 8 - // React's Thenable type (not exported from react package) 9 8 export interface Thenable<T> { 10 9 then<TResult1 = T, TResult2 = never>( 11 10 onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | null, ··· 17 16 callServer?: CallServerCallback; 18 17 } 19 18 20 - /** 21 - * SteppableStream - makes a Flight stream steppable for debugging. 22 - * 23 - * Buffers incoming rows and controls their release to the Flight decoder. 24 - * The flightPromise only resolves when all rows have been released. 25 - */ 19 + const noop = () => {}; 20 + const encoder = new TextEncoder(); 21 + 26 22 export class SteppableStream { 27 23 rows: string[] = []; 28 - releasedCount = 0; 29 - buffered = false; 30 - closed = false; 24 + done = false; 31 25 error: Error | null = null; 32 - release: (count: number) => void; 33 26 flightPromise: Thenable<unknown>; 34 - bufferPromise: Promise<void>; 27 + 28 + private controller!: ReadableStreamDefaultController<Uint8Array>; 29 + private releasedCount = 0; 30 + private closed = false; 31 + private yieldIndex = 0; 32 + private ping = noop; 35 33 36 34 constructor(source: ReadableStream<Uint8Array>, options: SteppableStreamOptions = {}) { 37 35 const { callServer } = options; 38 36 39 - const encoder = new TextEncoder(); 40 - let controller: ReadableStreamDefaultController<Uint8Array>; 41 37 const output = new ReadableStream<Uint8Array>({ 42 38 start: (c) => { 43 - controller = c; 39 + this.controller = c; 44 40 }, 45 41 }); 46 42 47 - this.release = (count: number): void => { 48 - while (this.releasedCount < count && this.releasedCount < this.rows.length) { 49 - const row = this.rows[this.releasedCount]; 50 - if (row !== undefined) { 51 - controller.enqueue(encoder.encode(row + "\n")); 52 - } 53 - this.releasedCount++; 54 - } 55 - if (this.releasedCount >= this.rows.length && this.buffered && !this.closed) { 56 - controller.close(); 57 - this.closed = true; 58 - } 59 - }; 60 - 61 43 const streamOptions = callServer ? { callServer } : {}; 62 44 this.flightPromise = createFromReadableStream(output, streamOptions); 63 - this.bufferPromise = this.buffer(source); 45 + this.consumeSource(source); 64 46 } 65 47 66 - private async buffer(stream: ReadableStream<Uint8Array>): Promise<void> { 67 - const reader = stream.getReader(); 48 + release(count: number): void { 49 + if (this.closed) return; 50 + 51 + while (this.releasedCount < count && this.releasedCount < this.rows.length) { 52 + this.controller.enqueue(encoder.encode(this.rows[this.releasedCount]! + "\n")); 53 + this.releasedCount++; 54 + } 55 + 56 + this.maybeClose(); 57 + } 58 + 59 + async *[Symbol.asyncIterator](): AsyncGenerator<string> { 60 + while (true) { 61 + while (this.yieldIndex < this.rows.length) { 62 + yield this.rows[this.yieldIndex++]!; 63 + } 64 + if (this.error) throw this.error; 65 + if (this.done) return; 66 + 67 + await new Promise<void>((resolve) => { 68 + this.ping = resolve; 69 + }); 70 + this.ping = noop; 71 + } 72 + } 73 + 74 + private async consumeSource(source: ReadableStream<Uint8Array>): Promise<void> { 75 + const reader = source.getReader(); 68 76 const decoder = new TextDecoder(); 69 77 let partial = ""; 70 78 ··· 78 86 partial = lines.pop() ?? ""; 79 87 80 88 for (const line of lines) { 81 - if (line.trim()) this.rows.push(line); 89 + if (line.trim()) { 90 + this.rows.push(line); 91 + } 82 92 } 93 + this.ping(); 83 94 } 84 95 85 96 partial += decoder.decode(); 86 - if (partial.trim()) this.rows.push(partial); 97 + if (partial.trim()) { 98 + this.rows.push(partial); 99 + } 87 100 } catch (err) { 88 101 this.error = err instanceof Error ? err : new Error(String(err)); 89 102 } finally { 90 - this.buffered = true; 103 + this.done = true; 104 + this.ping(); 105 + this.maybeClose(); 91 106 } 92 107 } 93 108 94 - async waitForBuffer(): Promise<void> { 95 - await this.bufferPromise; 96 - if (this.error) { 97 - throw this.error; 109 + private maybeClose(): void { 110 + if (this.closed) return; 111 + if (this.done && this.releasedCount >= this.rows.length) { 112 + this.closed = true; 113 + if (this.error) { 114 + this.controller.error(this.error); 115 + } else { 116 + this.controller.close(); 117 + } 98 118 } 99 - } 100 - 101 - static fromError(error: Error): SteppableStream { 102 - const emptyStream = new ReadableStream<Uint8Array>({ 103 - start(controller) { 104 - controller.close(); 105 - }, 106 - }); 107 - const stream = new SteppableStream(emptyStream); 108 - stream.error = error; 109 - stream.buffered = true; 110 - // Override flightPromise to reject so client transitions complete 111 - stream.flightPromise = Promise.reject(error); 112 - return stream; 113 119 } 114 120 }
+21 -5
src/client/runtime/timeline.ts
··· 8 8 type: "render" | "action"; 9 9 name?: string; 10 10 args?: string; 11 - rows: string[]; 11 + rows: readonly string[]; 12 12 flightPromise: Thenable<unknown> | undefined; 13 13 error: Error | null; 14 14 chunkStart: number; ··· 24 24 totalChunks: number; 25 25 isAtStart: boolean; 26 26 isAtEnd: boolean; 27 + isStreaming: boolean; 27 28 } 28 29 29 30 type Listener = () => void; ··· 53 54 54 55 let chunkStart = 0; 55 56 const entries: EntryView[] = this.entries.map((entry) => { 56 - const chunkCount = entry.stream.rows.length; 57 + const { stream } = entry; 58 + const chunkCount = stream.rows.length; 57 59 const chunkEnd = chunkStart + chunkCount; 58 60 const base = { 59 - rows: entry.stream.rows, 60 - flightPromise: entry.stream.flightPromise, 61 - error: entry.stream.error, 61 + rows: stream.rows, 62 + flightPromise: stream.flightPromise, 63 + error: stream.error, 62 64 chunkStart, 63 65 chunkCount, 64 66 canDelete: this.cursor <= chunkStart, ··· 78 80 totalChunks: chunkStart, 79 81 isAtStart: this.cursor === 0, 80 82 isAtEnd: this.cursor >= chunkStart, 83 + isStreaming: this.entries.some((e) => !e.stream.done), 81 84 }; 82 85 return this.cachedSnapshot; 83 86 }; ··· 85 88 setRender = (stream: SteppableStream): void => { 86 89 this.entries = [{ type: "render", stream }]; 87 90 this.cursor = 0; 91 + this.watchStream(stream); 88 92 this.notify(); 89 93 }; 90 94 91 95 addAction = (name: string, args: string, stream: SteppableStream): void => { 92 96 this.entries = [...this.entries, { type: "action", name, args, stream }]; 97 + this.watchStream(stream); 93 98 this.notify(); 94 99 }; 100 + 101 + private async watchStream(stream: SteppableStream): Promise<void> { 102 + try { 103 + for await (const _ of stream) { 104 + this.notify(); 105 + } 106 + this.notify(); 107 + } catch { 108 + this.notify(); 109 + } 110 + } 95 111 96 112 deleteEntry = (entryIndex: number): void => { 97 113 let chunkStart = 0;
+5 -1
src/client/ui/LivePreview.tsx
··· 47 47 totalChunks: number; 48 48 isAtStart: boolean; 49 49 isAtEnd: boolean; 50 + isStreaming: boolean; 50 51 isLoading: boolean; 51 52 onStep: () => void; 52 53 onSkip: () => void; ··· 59 60 totalChunks, 60 61 isAtStart, 61 62 isAtEnd, 63 + isStreaming, 62 64 isLoading, 63 65 onStep, 64 66 onSkip, ··· 105 107 statusText = "Loading"; 106 108 } else if (isAtStart) { 107 109 statusText = "Ready"; 108 - } else if (isAtEnd) { 110 + } else if (isAtEnd && !isStreaming) { 109 111 statusText = "Done"; 112 + } else if (isAtEnd && isStreaming) { 113 + statusText = "Waiting"; 110 114 } else { 111 115 statusText = `${cursor} / ${totalChunks}`; 112 116 }
+2 -1
src/client/ui/Workspace.tsx
··· 54 54 } 55 55 56 56 const timeline = session?.timeline ?? loadingTimeline; 57 - const { entries, cursor, totalChunks, isAtStart, isAtEnd } = useSyncExternalStore( 57 + const { entries, cursor, totalChunks, isAtStart, isAtEnd, isStreaming } = useSyncExternalStore( 58 58 timeline.subscribe, 59 59 timeline.getSnapshot, 60 60 ); ··· 98 98 totalChunks={totalChunks} 99 99 isAtStart={isAtStart} 100 100 isAtEnd={isAtEnd} 101 + isStreaming={isStreaming} 101 102 isLoading={isLoading || isError} 102 103 onStep={timeline.stepForward} 103 104 onSkip={timeline.skipToEntryEnd}
+9 -18
src/client/workspace-session.ts
··· 21 21 totalChunks: 0, 22 22 isAtStart: true, 23 23 isAtEnd: false, 24 + isStreaming: false, 24 25 }; 25 26 26 27 export const loadingTimeline = { ··· 69 70 const renderStream = new SteppableStream(renderRaw, { 70 71 callServer: session.callServer.bind(session), 71 72 }); 72 - await renderStream.waitForBuffer(); 73 73 session.timeline.setRender(renderStream); 74 74 75 75 return session; ··· 86 86 args: EncodedArgs, 87 87 argsDisplay: string, 88 88 ): Promise<SteppableStream> { 89 - let stream: SteppableStream; 89 + let source: ReadableStream<Uint8Array>; 90 90 try { 91 - const responseRaw = await this.worker.callAction(actionName, args); 92 - stream = new SteppableStream(responseRaw, { 93 - callServer: this.callServer.bind(this), 94 - }); 95 - await stream.waitForBuffer(); 91 + source = await this.worker.callAction(actionName, args); 96 92 } catch (err) { 97 - let error = err instanceof Error ? err : new Error(String(err)); 98 - if (error.message === "Connection closed.") { 99 - error = new Error( 100 - "Connection closed.\n\nThis usually means React couldn't parse the request payload. " + 101 - "Try triggering a real action first and copying its payload format.", 102 - ); 103 - } 104 - stream = SteppableStream.fromError(error); 93 + const error = err instanceof Error ? err : new Error(String(err)); 94 + source = new ReadableStream({ start: (c) => c.error(error) }); 105 95 } 96 + 97 + const stream = new SteppableStream(source, { 98 + callServer: this.callServer.bind(this), 99 + }); 106 100 this.timeline.addAction(actionName, argsDisplay, stream); 107 - if (stream.error) { 108 - throw stream.error; 109 - } 110 101 return stream; 111 102 } 112 103
+13 -1
src/server/worker-server.ts
··· 136 136 body = encodedArgs.data; 137 137 } 138 138 139 - const decoded = await decodeReply(body, {}); 139 + let decoded; 140 + try { 141 + decoded = await decodeReply(body, {}); 142 + } catch (e: any) { 143 + let message; 144 + message = 145 + "React couldn't parse the request payload. " + 146 + "Try triggering a real action first and copying its payload format."; 147 + if (e?.message !== "Connection closed.") { 148 + message += "\n\n" + e.toString(); 149 + } 150 + throw new Error(message); 151 + } 140 152 const args = Array.isArray(decoded) ? decoded : [decoded]; 141 153 const result = await actionFn(...args); 142 154
+1 -1
tests/actionerror.spec.ts
··· 112 112 113 113 // Verify error message includes our helpful hint about payload format 114 114 const errorText = await errorEntry.innerText(); 115 - expect(errorText).toContain("couldn't parse the request payload"); 115 + expect(errorText).toContain("React couldn't parse the request payload"); 116 116 });
+71
tests/streaming.spec.ts
··· 1 + import { test, expect, beforeAll, afterAll, afterEach } from "vitest"; 2 + import { createHelpers, launchBrowser, type TestHelpers } from "./helpers.ts"; 3 + import type { Browser, Page } from "playwright"; 4 + 5 + let browser: Browser; 6 + let page: Page; 7 + let h: TestHelpers; 8 + 9 + beforeAll(async () => { 10 + browser = await launchBrowser(); 11 + page = await browser.newPage(); 12 + h = createHelpers(page); 13 + }); 14 + 15 + afterAll(async () => { 16 + await browser.close(); 17 + }); 18 + 19 + afterEach(async () => { 20 + await h.checkNoRemainingSteps(); 21 + }); 22 + 23 + test("can step through stream while async component is still pending", async () => { 24 + // This async component will never resolve - but we should still be able to 25 + // step through the synchronous parts of the stream (header + Suspense fallback) 26 + await h.loadCode( 27 + ` 28 + import { Suspense } from 'react' 29 + 30 + export default function App() { 31 + return ( 32 + <div> 33 + <h1>Streaming Test</h1> 34 + <Suspense fallback={<p>Loading forever...</p>}> 35 + <NeverResolves /> 36 + </Suspense> 37 + </div> 38 + ) 39 + } 40 + 41 + async function NeverResolves() { 42 + await new Promise(() => {}) // Never resolves 43 + return <p>This will never appear</p> 44 + } 45 + `, 46 + `'use client'`, 47 + ); 48 + 49 + // We should be able to step through the initial render 50 + // The Suspense fallback should be visible 51 + expect(await h.stepAll()).toMatchInlineSnapshot(` 52 + "<div> 53 + <h1>Streaming Test</h1> 54 + <Suspense fallback={ 55 + <p>Loading forever...</p> 56 + }> 57 + Pending 58 + </Suspense> 59 + </div>" 60 + `); 61 + 62 + // The preview should show the header and fallback 63 + expect(await h.preview("Loading forever")).toMatchInlineSnapshot(` 64 + "Streaming Test 65 + 66 + Loading forever..." 67 + `); 68 + 69 + // Step info should show "Waiting" (stream is still open) 70 + expect(await h.stepInfo()).toBe("Waiting"); 71 + });