From 835453c829b6a72836201f30504ef3eb9df27e0c Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Tue, 13 Jun 2023 04:00:08 -0400 Subject: [PATCH] Lazily stream responses (#396) * Use a lazy stream implementation * Add proper test for unhandled rejection in pull * Use consumeUint8ArrayReadableStream * Remove module method mocking * Extract stream helper so it can be reused by Next * Fix test * Export method * Create spicy-toys-teach.md --------- Co-authored-by: Kiko Beats --- .changeset/spicy-toys-teach.md | 5 + packages/runtime/src/index.ts | 1 + packages/runtime/src/server/body-streams.ts | 70 ++++++- packages/runtime/src/server/create-handler.ts | 11 +- packages/runtime/src/server/index.ts | 5 +- packages/runtime/tests/fixtures/pull-error.ts | 52 +++++ .../tests/fixtures/unhandled-rejection.ts | 39 ++-- .../tests/rejections-and-errors.test.ts | 14 ++ packages/runtime/tests/server.test.ts | 181 +++++++++++++----- 9 files changed, 302 insertions(+), 76 deletions(-) create mode 100644 .changeset/spicy-toys-teach.md create mode 100644 packages/runtime/tests/fixtures/pull-error.ts diff --git a/.changeset/spicy-toys-teach.md b/.changeset/spicy-toys-teach.md new file mode 100644 index 00000000..6414aecb --- /dev/null +++ b/.changeset/spicy-toys-teach.md @@ -0,0 +1,5 @@ +--- +"edge-runtime": minor +--- + +Lazily stream responses diff --git a/packages/runtime/src/index.ts b/packages/runtime/src/index.ts index 75adff1e..5c3f5c61 100644 --- a/packages/runtime/src/index.ts +++ b/packages/runtime/src/index.ts @@ -1,5 +1,6 @@ export { consumeUint8ArrayReadableStream, + pipeBodyStreamToResponse, createHandler, runServer, } from './server' diff --git a/packages/runtime/src/server/body-streams.ts b/packages/runtime/src/server/body-streams.ts index 76c941bb..7d497d5e 100644 --- a/packages/runtime/src/server/body-streams.ts +++ b/packages/runtime/src/server/body-streams.ts @@ -1,4 +1,4 @@ -import type { IncomingMessage } from 'http' +import type { IncomingMessage, ServerResponse } from 'http' import { Readable } from 'stream' type BodyStream = ReadableStream @@ -94,6 +94,10 @@ function replaceRequestBody( return base } +function isUint8ArrayChunk(value: any): value is Uint8Array { + return value?.constructor?.name == 'Uint8Array' +} + /** * Creates an async iterator from a ReadableStream that ensures that every * emitted chunk is a `Uint8Array`. If there is some invalid chunk it will @@ -108,11 +112,71 @@ export async function* consumeUint8ArrayReadableStream(body?: ReadableStream) { return } - if (value?.constructor?.name !== 'Uint8Array') { + if (!isUint8ArrayChunk(value)) { throw new TypeError('This ReadableStream did not return bytes.') } + yield value + } + } +} + +/** + * Pipes the chunks of a BodyStream into a Response. This optimizes for + * laziness, pauses reading if we experience back-pressure, and handles early + * disconnects by the client on the other end of the server response. + */ +export async function pipeBodyStreamToResponse( + body: BodyStream | null, + res: ServerResponse +) { + if (!body) return + + // If the client has already disconnected, then we don't need to pipe anything. + if (res.destroyed) return body.cancel() + + // When the server pushes more data than the client reads, then we need to + // wait for the client to catch up before writing more data. We register this + // generic handler once so that we don't incur constant register/unregister + // calls. + let drainResolve: () => void + res.on('drain', () => drainResolve?.()) + + // If the user aborts, then we'll receive a close event before the + // body closes. In that case, we want to end the streaming. + let open = true + res.on('close', () => { + open = false + drainResolve?.() + }) + + const reader = body.getReader() + while (open) { + const { done, value } = await reader.read() + if (done) break + + if (!isUint8ArrayChunk(value)) { + const error = new TypeError('This ReadableStream did not return bytes.') + reader.cancel(error) + throw error + } + + if (open) { + const bufferSpaceAvailable = res.write(value) + + // If there's no more space in the buffer, then we need to wait on the + // client to read data before pushing again. + if (!bufferSpaceAvailable) { + await new Promise((res) => { + drainResolve = res + }) + } + } - yield value as Uint8Array + // If the client disconnected early, then we need to cleanup the stream. + // This cannot be joined with the above if-statement, because the client may + // have disconnected while waiting for a drain signal. + if (!open) { + return reader.cancel() } } } diff --git a/packages/runtime/src/server/create-handler.ts b/packages/runtime/src/server/create-handler.ts index 9660ef99..8f7babf2 100644 --- a/packages/runtime/src/server/create-handler.ts +++ b/packages/runtime/src/server/create-handler.ts @@ -2,8 +2,7 @@ import type { EdgeRuntime } from '../edge-runtime' import type { IncomingMessage, ServerResponse } from 'http' import type { Logger, NodeHeaders } from '../types' import type { EdgeContext } from '@edge-runtime/vm' -import { consumeUint8ArrayReadableStream } from './body-streams' -import { getClonableBodyStream } from './body-streams' +import { getClonableBodyStream, pipeBodyStreamToResponse } from './body-streams' import prettyMs from 'pretty-ms' import timeSpan from 'time-span' @@ -68,13 +67,7 @@ export function createHandler(options: Options) { } } - if (response.body) { - for await (const chunk of consumeUint8ArrayReadableStream( - response.body - )) { - res.write(chunk) - } - } + await pipeBodyStreamToResponse(response.body, res) const subject = `${req.socket.remoteAddress} ${req.method} ${req.url}` const time = `${prettyMs(start()) diff --git a/packages/runtime/src/server/index.ts b/packages/runtime/src/server/index.ts index 9ae205c4..f1c47778 100644 --- a/packages/runtime/src/server/index.ts +++ b/packages/runtime/src/server/index.ts @@ -1,3 +1,6 @@ -export { consumeUint8ArrayReadableStream } from './body-streams' +export { + consumeUint8ArrayReadableStream, + pipeBodyStreamToResponse, +} from './body-streams' export { createHandler } from './create-handler' export { runServer, EdgeRuntimeServer } from './run-server' diff --git a/packages/runtime/tests/fixtures/pull-error.ts b/packages/runtime/tests/fixtures/pull-error.ts new file mode 100644 index 00000000..a5960b73 --- /dev/null +++ b/packages/runtime/tests/fixtures/pull-error.ts @@ -0,0 +1,52 @@ +import { EdgeRuntime, runServer } from '../../src' +import assert from 'assert' +import fetch from 'node-fetch' + +async function main() { + const runtime = new EdgeRuntime() + const deferred = new Promise((resolve) => { + runtime.context.handleRejection = (event: PromiseRejectionEvent) => { + resolve(event) + } + }) + + runtime.evaluate(` + addEventListener('fetch', event => { + const stream = new ReadableStream({ + pull(controller) { + throw new Error('expected pull error'); + } + }); + return event.respondWith( + new Response(stream, { + status: 200, + }) + ) + }) + + addEventListener('unhandledrejection', (event) => { + globalThis.handleRejection(event) + }) + `) + + const server = await runServer({ runtime }) + + try { + const url = new URL(server.url) + const response = await fetch(String(url)) + assert.strictEqual(response.status, 200) + assert.strictEqual(await response.text(), '') + const event = await deferred + assert.strictEqual(event.reason.message, 'expected pull error') + return 'TEST PASSED!' + } finally { + await server.close() + } +} + +main() + .then(console.log) + .catch((error) => { + console.log('TEST FAILED!') + console.log(error) + }) diff --git a/packages/runtime/tests/fixtures/unhandled-rejection.ts b/packages/runtime/tests/fixtures/unhandled-rejection.ts index 15c846d9..97b1f989 100644 --- a/packages/runtime/tests/fixtures/unhandled-rejection.ts +++ b/packages/runtime/tests/fixtures/unhandled-rejection.ts @@ -4,18 +4,22 @@ import fetch from 'node-fetch' async function main() { const runtime = new EdgeRuntime() - const deferred = new Promise((resolve) => { - runtime.context.handleRejection = (event: PromiseRejectionEvent) => { - resolve(event) - } - }) + function waitForReject() { + return new Promise((resolve) => { + runtime.context.handleRejection = (event: PromiseRejectionEvent) => { + resolve(event) + } + }) + } runtime.evaluate(` addEventListener('fetch', event => { + const url = new URL(event.request.url) + const chunk = url.searchParams.get('chunk') const stream = new ReadableStream({ start(controller) { controller.enqueue(new TextEncoder().encode('hi there')); - controller.enqueue('wrong chunk'); + controller.enqueue(JSON.parse(chunk)); controller.close(); } }); @@ -33,16 +37,21 @@ async function main() { const server = await runServer({ runtime }) + const chunks = [1, 'String', true, { b: 1 }, [1], Buffer.from('Buffer')] + try { - const url = new URL(server.url) - const response = await fetch(String(url)) - assert.strictEqual(response.status, 200) - assert.strictEqual(await response.text(), 'hi there') - const event = await deferred - assert.strictEqual( - event.reason.message, - 'This ReadableStream did not return bytes.' - ) + for (const chunk of chunks) { + const deferred = waitForReject() + const url = new URL(`${server.url}?chunk=${JSON.stringify(chunk)}`) + const response = await fetch(String(url)) + assert.strictEqual(response.status, 200) + assert.strictEqual(await response.text(), 'hi there') + const event = await deferred + assert.strictEqual( + event.reason.message, + 'This ReadableStream did not return bytes.' + ) + } return 'TEST PASSED!' } finally { await server.close() diff --git a/packages/runtime/tests/rejections-and-errors.test.ts b/packages/runtime/tests/rejections-and-errors.test.ts index 6c70c0a7..aa30d4cc 100644 --- a/packages/runtime/tests/rejections-and-errors.test.ts +++ b/packages/runtime/tests/rejections-and-errors.test.ts @@ -18,3 +18,17 @@ it('handles correctly unhandled rejections', async () => { stderr: '', }) }) + +it('reports unhandled rejection for pull errors', async () => { + const result = await execAsync( + `ts-node --transpile-only ${resolve( + __dirname, + './fixtures/pull-error.ts' + )}`, + { encoding: 'utf8' } + ) + expect(result).toMatchObject({ + stdout: expect.stringContaining('TEST PASSED!'), + stderr: '', + }) +}) diff --git a/packages/runtime/tests/server.test.ts b/packages/runtime/tests/server.test.ts index a96fd3e4..601e95ab 100644 --- a/packages/runtime/tests/server.test.ts +++ b/packages/runtime/tests/server.test.ts @@ -1,31 +1,18 @@ import { EdgeRuntime } from '../src/edge-runtime' import { runServer } from '../src/server' import fetch from 'node-fetch' +import type { Readable } from 'stream' let server: Awaited> -const chunkErrorFn = jest.fn() -jest.mock('../src/server/body-streams', () => { - const utils = jest.requireActual('../src/server/body-streams') - return { - ...utils, - consumeUint8ArrayReadableStream: async function* (body?: ReadableStream) { - try { - for await (const chunk of utils.consumeUint8ArrayReadableStream(body)) { - yield chunk - } - } catch (error) { - chunkErrorFn(error) - } - }, - } -}) - afterEach(() => { server.close() - chunkErrorFn.mockReset() }) +function sleep(ms: number) { + return new Promise((res) => setTimeout(res, ms)) +} + test('starts an http server', async () => { const runtime = new EdgeRuntime() runtime.evaluate(` @@ -129,7 +116,7 @@ test('works with POST HTTP method', async () => { expect(content).toStrictEqual({ body }) }) -test(`allows to wait for effects created with waitUntil`, async () => { +test('allows to wait for effects created with waitUntil', async () => { const runtime = new EdgeRuntime() runtime.evaluate(` async function doAsyncStuff (event) { @@ -179,41 +166,139 @@ test(`do not fail writing to the response socket Uint8Array`, async () => { expect(response.status).toEqual(200) const text = await response.text() expect(text).toEqual('hi there1\nhi there2\nhi there3\n') - expect(chunkErrorFn).toHaveBeenCalledTimes(0) }) -test(`fails when writing to the response socket a wrong chunk`, async () => { - const chunks = [1, 'String', true, { b: 1 }, [1], Buffer.from('Buffer')] +test('streamable sanity test', async () => { const runtime = new EdgeRuntime() runtime.evaluate(` - addEventListener('fetch', event => { - const url = new URL(event.request.url) - const chunk = url.searchParams.get('chunk') - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(new TextEncoder().encode('hi there')); - controller.enqueue(JSON.parse(chunk)); - controller.close(); + addEventListener('fetch', event => { + let i = 0; + const encoder = new TextEncoder(); + + return event.respondWith(new Response(new ReadableStream({ + async pull(controller) { + if (i === 10) { + controller.close() + } else { + controller.enqueue(encoder.encode(i++)); } - }); - return event.respondWith( - new Response(stream, { - status: 200, - }) - ) - }) - `) + } + }))); + }) + `) + + server = await runServer({ runtime }) + + const response = await fetch(server.url) + let data = '' + + for await (const chunk of response.body) { + data += chunk.toString() + } + + expect(response).toBeTruthy() + expect(response.status).toEqual(200) + expect(data).toContain('9') +}) + +test('allows long-running streams to be cancelled immediately', async () => { + const runtime = new EdgeRuntime() + + const pulled = (runtime.context.pulled = []) + runtime.evaluate(` + addEventListener('fetch', event => { + let i = 0; + + return event.respondWith(new Response(new ReadableStream({ + async pull(controller) { + self.pulled.push(i); + if (i === 10) { + throw new Error('stream still connected: allows long-running streams to be cancelled immediately'); + } + const chunk = new Uint8Array(1024 * 1024).fill('0'.charCodeAt(0) + i); + controller.enqueue(chunk); + i++; + } + }))); + }) + `) server = await runServer({ runtime }) - for (const chunk of chunks) { - const response = await fetch(`${server.url}?chunk=${JSON.stringify(chunk)}`) - expect(response.status).toEqual(200) - expect(await response.text()).toEqual('hi there') - expect(chunkErrorFn).toHaveBeenCalledTimes(1) - expect(chunkErrorFn.mock.calls[0][0]).toBeInstanceOf(TypeError) - expect(chunkErrorFn.mock.calls[0][0].message).toEqual( - 'This ReadableStream did not return bytes.' - ) - chunkErrorFn.mockReset() + + const controller = new AbortController() + const response = await fetch(server.url, { + signal: controller.signal as any, + }) + + // There's a bug in pre-v3 node-fetch where aborting the fetch will never end + // end the async-iteration. + response.body.on('error', (e) => (response.body as Readable).destroy(e)) + + try { + controller.abort() + } catch (e) { + // Swallow the AbortError, but throw anything else. + if ((e as Error).name !== 'AbortError') throw e } + await sleep(10) + + expect(response).toBeTruthy() + // The error happens _after_ we begin streaming data, so this should still be + // a 200 response. + expect(response.status).toEqual(200) + + // Because the client and server are in the same node process, if the server + // doesn't pause then it will have pulled all 10 iterations immediately. + expect(pulled).not.toContain(10) +}) + +test('allows long-running streams to be cancelled after partial read', async () => { + const runtime = new EdgeRuntime() + + const pulled = (runtime.context.pulled = []) + runtime.evaluate(` + addEventListener('fetch', event => { + let i = 0; + + return event.respondWith(new Response(new ReadableStream({ + async pull(controller) { + self.pulled.push(i); + if (i === 10) { + throw new Error('stream still connected: allows long-running streams to be cancelled immediately'); + } + const chunk = new Uint8Array(1024 * 1024).fill('0'.charCodeAt(0) + i); + controller.enqueue(chunk); + i++; + } + }))); + }) + `) + + server = await runServer({ runtime }) + + const controller = new AbortController() + const response = await fetch(server.url, { + signal: controller.signal as any, + }) + + // There's a bug in pre-v3 node-fetch where aborting the fetch will never end + // end the async-iteration. + response.body.on('error', (e) => (response.body as Readable).destroy(e)) + + // Read a few chunks so we can pause in the middle of the stream. + for await (const _ of response.body) { + break + } + + try { + controller.abort() + } catch (e) { + // Swallow the AbortError, but throw anything else. + if ((e as Error).name !== 'AbortError') throw e + } + await sleep(10) + + // Because the client and server are in the same node process, if the server + // doesn't pause then it will have pulled all 10 iterations immediately. + expect(pulled).not.toContain(10) })