From f213388c0b05d5df90869212d9b4b99ff18e3f0d Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Sat, 10 Jun 2023 00:01:33 -0400 Subject: [PATCH 1/8] Use a lazy stream implementation --- packages/runtime/src/server/create-handler.ts | 54 ++++- packages/runtime/tests/server.test.ts | 188 +++++++++++++++++- 2 files changed, 233 insertions(+), 9 deletions(-) diff --git a/packages/runtime/src/server/create-handler.ts b/packages/runtime/src/server/create-handler.ts index 9660ef99..81b79bc4 100644 --- a/packages/runtime/src/server/create-handler.ts +++ b/packages/runtime/src/server/create-handler.ts @@ -2,7 +2,6 @@ import type { EdgeRuntime } from '../edge-runtime' import type { IncomingMessage, ServerResponse } from 'http' import type { Logger, NodeHeaders } from '../types' import type { EdgeContext } from '@edge-runtime/vm' -import { consumeUint8ArrayReadableStream } from './body-streams' import { getClonableBodyStream } from './body-streams' import prettyMs from 'pretty-ms' import timeSpan from 'time-span' @@ -68,13 +67,7 @@ export function createHandler(options: Options) { } } - if (response.body) { - for await (const chunk of consumeUint8ArrayReadableStream( - response.body - )) { - res.write(chunk) - } - } + await stream(response.body, res) const subject = `${req.socket.remoteAddress} ${req.method} ${req.url}` const time = `${prettyMs(start()) @@ -95,6 +88,51 @@ export function createHandler(options: Options) { } } +async function stream( + body: ReadableStream | null, + res: ServerResponse +) { + if (!body) return + + // If the client has already disconnected, then we don't need to pipe anything. + if (res.destroyed) return body.cancel() + + const reader = body.getReader() + + // When the server pushes more data than the client reads, then we need to + // wait for the client to catch up before writing more data. We register this + // generic handler once so that we don't incur constant register/unregister + // calls. + let drainResolve: () => void + res.on('drain', () => drainResolve?.()) + + // If the user aborts, then we'll receive a close event before the + // body closes. In that case, we want to end the streaming. + let open = true + res.on('close', () => { + open = false + drainResolve?.() + }) + + while (open) { + const { done, value } = await reader.read() + if (done) break + + const bufferSpaceAvailable = res.write(value) + + // If there's no more space in the buffer, then we need to wait on the + // client to read data before pushing again. + if (!bufferSpaceAvailable) { + await new Promise((res) => { + drainResolve = res + }) + } + } + + // If the client disconnected early, then we need to cleanup the stream. + if (!open) return reader.cancel() +} + /** * Builds a full URL from the provided incoming message. Note this function * is not safe as one can set has a host anything based on headers. It is diff --git a/packages/runtime/tests/server.test.ts b/packages/runtime/tests/server.test.ts index a96fd3e4..c9d41c14 100644 --- a/packages/runtime/tests/server.test.ts +++ b/packages/runtime/tests/server.test.ts @@ -1,6 +1,7 @@ import { EdgeRuntime } from '../src/edge-runtime' import { runServer } from '../src/server' import fetch from 'node-fetch' +import type { Readable } from 'stream' let server: Awaited> @@ -26,6 +27,10 @@ afterEach(() => { chunkErrorFn.mockReset() }) +function sleep(ms: number) { + return new Promise((res) => setTimeout(res, ms)) +} + test('starts an http server', async () => { const runtime = new EdgeRuntime() runtime.evaluate(` @@ -129,7 +134,7 @@ test('works with POST HTTP method', async () => { expect(content).toStrictEqual({ body }) }) -test(`allows to wait for effects created with waitUntil`, async () => { +test('allows to wait for effects created with waitUntil', async () => { const runtime = new EdgeRuntime() runtime.evaluate(` async function doAsyncStuff (event) { @@ -217,3 +222,184 @@ test(`fails when writing to the response socket a wrong chunk`, async () => { chunkErrorFn.mockReset() } }) + +test('streamable sanity test', async () => { + const runtime = new EdgeRuntime() + runtime.evaluate(` + addEventListener('fetch', event => { + let i = 0; + const encoder = new TextEncoder(); + + return event.respondWith(new Response(new ReadableStream({ + async pull(controller) { + if (i === 10) { + controller.close() + } else { + controller.enqueue(encoder.encode(i++)); + } + } + }))); + }) + `) + + server = await runServer({ runtime }) + + const response = await fetch(server.url) + let data = '' + + for await (const chunk of response.body) { + data += chunk.toString() + } + + expect(response).toBeTruthy() + expect(response.status).toEqual(200) + expect(data).toContain('9') +}) + +// Jest's process context is a nightmare. It's impossible to test for +// unhandledrejection events, because they unregister all handlers in the main +// process and tests are run in a child process that won't receive the event. +// https://github.com/jestjs/jest/issues/10361 +test.skip('ends response when ReadableStream errors', async () => { + const runtime = new EdgeRuntime() + + const handled = new Promise<{ error: Error; promise: Promise }>( + (resolve) => { + runtime.context.handle = resolve + } + ) + runtime.evaluate(` + addEventListener('fetch', event => { + return event.respondWith(new Response(new ReadableStream({ + pull(controller) { + throw new Error('Boom') + } + }))) + }) + + addEventListener("unhandledrejection", (error, promise) => { + self.handle({ error, promise }) + }) + `) + + server = await runServer({ runtime }) + + const response = await fetch(server.url) + + expect(response).toBeTruthy() + // The error happens _after_ we begin streaming data, so this should still be + // a 200 response. + expect(response.status).toEqual(200) + + for await (const chunk of response.body) { + throw new Error(`should never read chunk "${chunk}"`) + } + + const { error, promise } = await handled + expect(error.message).toBe('Boom') + promise.catch(() => { + // noop, this is just to "handle" to the rejection. + }) +}) + +test('allows long-running streams to be cancelled immediately', async () => { + const runtime = new EdgeRuntime() + + const pulled = (runtime.context.pulled = []) + runtime.evaluate(` + addEventListener('fetch', event => { + let i = 0; + + return event.respondWith(new Response(new ReadableStream({ + async pull(controller) { + self.pulled.push(i); + if (i === 10) { + throw new Error('stream still connected: allows long-running streams to be cancelled immediately'); + } + const chunk = new Uint8Array(1024 * 1024).fill('0'.charCodeAt(0) + i); + controller.enqueue(chunk); + i++; + } + }))); + }) + `) + + server = await runServer({ runtime }) + + const controller = new AbortController() + const response = await fetch(server.url, { + signal: controller.signal as any, + }) + + // There's a bug in pre-v3 node-fetch where aborting the fetch will never end + // end the async-iteration. + response.body.on('error', (e) => (response.body as Readable).destroy(e)) + + try { + controller.abort() + } catch (e) { + // Swallow the AbortError, but throw anything else. + if ((e as Error).name !== 'AbortError') throw e + } + await sleep(10) + + expect(response).toBeTruthy() + // The error happens _after_ we begin streaming data, so this should still be + // a 200 response. + expect(response.status).toEqual(200) + + // Because the client and server are in the same node process, if the server + // doesn't pause then it will have pulled all 10 iterations immediately. + expect(pulled).toEqual([0, 1]) +}) + +test('allows long-running streams to be cancelled after partial read', async () => { + const runtime = new EdgeRuntime() + + const pulled = (runtime.context.pulled = []) + runtime.evaluate(` + addEventListener('fetch', event => { + let i = 0; + + return event.respondWith(new Response(new ReadableStream({ + async pull(controller) { + self.pulled.push(i); + if (i === 10) { + throw new Error('stream still connected: allows long-running streams to be cancelled immediately'); + } + const chunk = new Uint8Array(1024 * 1024).fill('0'.charCodeAt(0) + i); + controller.enqueue(chunk); + i++; + } + }))); + }) + `) + + server = await runServer({ runtime }) + + const controller = new AbortController() + const response = await fetch(server.url, { + signal: controller.signal as any, + }) + + // There's a bug in pre-v3 node-fetch where aborting the fetch will never end + // end the async-iteration. + response.body.on('error', (e) => (response.body as Readable).destroy(e)) + + // Read a few chunks so we can pause in the middle of the stream. + for await (const _ of response.body) { + break + } + + try { + controller.abort() + } catch (e) { + // Swallow the AbortError, but throw anything else. + if ((e as Error).name !== 'AbortError') throw e + } + await sleep(10) + + // Because the client and server are in the same node process, if the server + // doesn't pause then it will have pulled all 10 iterations immediately. + expect(pulled).not.toContain(10) +}) From 45cc08f79f40b7fce77e80fb906bb99b170821c2 Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Mon, 12 Jun 2023 22:50:08 -0400 Subject: [PATCH 2/8] Add proper test for unhandled rejection in pull --- packages/runtime/tests/fixtures/pull-error.ts | 52 +++++++++++++++++++ .../tests/rejections-and-errors.test.ts | 14 +++++ packages/runtime/tests/server.test.ts | 46 ---------------- 3 files changed, 66 insertions(+), 46 deletions(-) create mode 100644 packages/runtime/tests/fixtures/pull-error.ts diff --git a/packages/runtime/tests/fixtures/pull-error.ts b/packages/runtime/tests/fixtures/pull-error.ts new file mode 100644 index 00000000..a5960b73 --- /dev/null +++ b/packages/runtime/tests/fixtures/pull-error.ts @@ -0,0 +1,52 @@ +import { EdgeRuntime, runServer } from '../../src' +import assert from 'assert' +import fetch from 'node-fetch' + +async function main() { + const runtime = new EdgeRuntime() + const deferred = new Promise((resolve) => { + runtime.context.handleRejection = (event: PromiseRejectionEvent) => { + resolve(event) + } + }) + + runtime.evaluate(` + addEventListener('fetch', event => { + const stream = new ReadableStream({ + pull(controller) { + throw new Error('expected pull error'); + } + }); + return event.respondWith( + new Response(stream, { + status: 200, + }) + ) + }) + + addEventListener('unhandledrejection', (event) => { + globalThis.handleRejection(event) + }) + `) + + const server = await runServer({ runtime }) + + try { + const url = new URL(server.url) + const response = await fetch(String(url)) + assert.strictEqual(response.status, 200) + assert.strictEqual(await response.text(), '') + const event = await deferred + assert.strictEqual(event.reason.message, 'expected pull error') + return 'TEST PASSED!' + } finally { + await server.close() + } +} + +main() + .then(console.log) + .catch((error) => { + console.log('TEST FAILED!') + console.log(error) + }) diff --git a/packages/runtime/tests/rejections-and-errors.test.ts b/packages/runtime/tests/rejections-and-errors.test.ts index 6c70c0a7..aa30d4cc 100644 --- a/packages/runtime/tests/rejections-and-errors.test.ts +++ b/packages/runtime/tests/rejections-and-errors.test.ts @@ -18,3 +18,17 @@ it('handles correctly unhandled rejections', async () => { stderr: '', }) }) + +it('reports unhandled rejection for pull errors', async () => { + const result = await execAsync( + `ts-node --transpile-only ${resolve( + __dirname, + './fixtures/pull-error.ts' + )}`, + { encoding: 'utf8' } + ) + expect(result).toMatchObject({ + stdout: expect.stringContaining('TEST PASSED!'), + stderr: '', + }) +}) diff --git a/packages/runtime/tests/server.test.ts b/packages/runtime/tests/server.test.ts index c9d41c14..bbf60de1 100644 --- a/packages/runtime/tests/server.test.ts +++ b/packages/runtime/tests/server.test.ts @@ -256,52 +256,6 @@ test('streamable sanity test', async () => { expect(data).toContain('9') }) -// Jest's process context is a nightmare. It's impossible to test for -// unhandledrejection events, because they unregister all handlers in the main -// process and tests are run in a child process that won't receive the event. -// https://github.com/jestjs/jest/issues/10361 -test.skip('ends response when ReadableStream errors', async () => { - const runtime = new EdgeRuntime() - - const handled = new Promise<{ error: Error; promise: Promise }>( - (resolve) => { - runtime.context.handle = resolve - } - ) - runtime.evaluate(` - addEventListener('fetch', event => { - return event.respondWith(new Response(new ReadableStream({ - pull(controller) { - throw new Error('Boom') - } - }))) - }) - - addEventListener("unhandledrejection", (error, promise) => { - self.handle({ error, promise }) - }) - `) - - server = await runServer({ runtime }) - - const response = await fetch(server.url) - - expect(response).toBeTruthy() - // The error happens _after_ we begin streaming data, so this should still be - // a 200 response. - expect(response.status).toEqual(200) - - for await (const chunk of response.body) { - throw new Error(`should never read chunk "${chunk}"`) - } - - const { error, promise } = await handled - expect(error.message).toBe('Boom') - promise.catch(() => { - // noop, this is just to "handle" to the rejection. - }) -}) - test('allows long-running streams to be cancelled immediately', async () => { const runtime = new EdgeRuntime() From 0ef6d2d99ee2577c667c2dcd53e785e12617efdf Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Mon, 12 Jun 2023 22:50:42 -0400 Subject: [PATCH 3/8] Use consumeUint8ArrayReadableStream --- packages/runtime/src/server/body-streams.ts | 40 +++++++++++++++++-- packages/runtime/src/server/create-handler.ts | 25 +++++++----- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/packages/runtime/src/server/body-streams.ts b/packages/runtime/src/server/body-streams.ts index 76c941bb..331ce0b8 100644 --- a/packages/runtime/src/server/body-streams.ts +++ b/packages/runtime/src/server/body-streams.ts @@ -100,19 +100,53 @@ function replaceRequestBody( * throw. */ export async function* consumeUint8ArrayReadableStream(body?: ReadableStream) { - const reader = body?.getReader() - if (reader) { + if (!body) { + return + } + + const reader = body.getReader() + + // If the consumer calls `it.return()`, our generator code's `yield` will + // perform an AbruptCompletion and behave as if this was a `return` statement. + // To ensure we perform cleanup, we need to guard the yield statement and + // detect this condition with a try-finally. + let needsCleanup = false + + // If we detect an invalid chunk, we store an error to be thrown as part of + // the cleanup phase. + let invalidChunkError + + try { while (true) { + // If the read errors, or we are done reading, we do not need to cleanup + // further. const { done, value } = await reader.read() if (done) { return } + // We also need to cleanup if the user returned an invalid type. The loop + // isn't done yet, but we're not going to be reading any more. + needsCleanup = true + if (value?.constructor?.name !== 'Uint8Array') { - throw new TypeError('This ReadableStream did not return bytes.') + invalidChunkError = new TypeError( + 'This ReadableStream did not return bytes.' + ) + break } yield value as Uint8Array + needsCleanup = false + } + } finally { + // The reader either returned an invalid chunk, or our consumer early + // exited. In either case, we need to cleanup the stream's resources. + if (needsCleanup) { + reader.cancel(invalidChunkError) + } + if (invalidChunkError) { + throw invalidChunkError } } } diff --git a/packages/runtime/src/server/create-handler.ts b/packages/runtime/src/server/create-handler.ts index 81b79bc4..8cc9bded 100644 --- a/packages/runtime/src/server/create-handler.ts +++ b/packages/runtime/src/server/create-handler.ts @@ -2,7 +2,10 @@ import type { EdgeRuntime } from '../edge-runtime' import type { IncomingMessage, ServerResponse } from 'http' import type { Logger, NodeHeaders } from '../types' import type { EdgeContext } from '@edge-runtime/vm' -import { getClonableBodyStream } from './body-streams' +import { + consumeUint8ArrayReadableStream, + getClonableBodyStream, +} from './body-streams' import prettyMs from 'pretty-ms' import timeSpan from 'time-span' @@ -97,8 +100,6 @@ async function stream( // If the client has already disconnected, then we don't need to pipe anything. if (res.destroyed) return body.cancel() - const reader = body.getReader() - // When the server pushes more data than the client reads, then we need to // wait for the client to catch up before writing more data. We register this // generic handler once so that we don't incur constant register/unregister @@ -114,11 +115,9 @@ async function stream( drainResolve?.() }) - while (open) { - const { done, value } = await reader.read() - if (done) break - - const bufferSpaceAvailable = res.write(value) + const stream = consumeUint8ArrayReadableStream(body) + for await (const chunk of stream) { + const bufferSpaceAvailable = res.write(chunk) // If there's no more space in the buffer, then we need to wait on the // client to read data before pushing again. @@ -127,10 +126,14 @@ async function stream( drainResolve = res }) } - } - // If the client disconnected early, then we need to cleanup the stream. - if (!open) return reader.cancel() + // If the client disconnects, then we need to manually cleanup the stream + // iterator so the body can release its resources. + if (!open) { + stream.return() + break + } + } } /** From f37e7eb8075cb2643d5b6350d8d56a4d8fe3c2dd Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Mon, 12 Jun 2023 23:37:15 -0400 Subject: [PATCH 4/8] Remove module method mocking --- .../tests/fixtures/unhandled-rejection.ts | 39 ++++++++----- packages/runtime/tests/server.test.ts | 55 ------------------- 2 files changed, 24 insertions(+), 70 deletions(-) diff --git a/packages/runtime/tests/fixtures/unhandled-rejection.ts b/packages/runtime/tests/fixtures/unhandled-rejection.ts index 15c846d9..97b1f989 100644 --- a/packages/runtime/tests/fixtures/unhandled-rejection.ts +++ b/packages/runtime/tests/fixtures/unhandled-rejection.ts @@ -4,18 +4,22 @@ import fetch from 'node-fetch' async function main() { const runtime = new EdgeRuntime() - const deferred = new Promise((resolve) => { - runtime.context.handleRejection = (event: PromiseRejectionEvent) => { - resolve(event) - } - }) + function waitForReject() { + return new Promise((resolve) => { + runtime.context.handleRejection = (event: PromiseRejectionEvent) => { + resolve(event) + } + }) + } runtime.evaluate(` addEventListener('fetch', event => { + const url = new URL(event.request.url) + const chunk = url.searchParams.get('chunk') const stream = new ReadableStream({ start(controller) { controller.enqueue(new TextEncoder().encode('hi there')); - controller.enqueue('wrong chunk'); + controller.enqueue(JSON.parse(chunk)); controller.close(); } }); @@ -33,16 +37,21 @@ async function main() { const server = await runServer({ runtime }) + const chunks = [1, 'String', true, { b: 1 }, [1], Buffer.from('Buffer')] + try { - const url = new URL(server.url) - const response = await fetch(String(url)) - assert.strictEqual(response.status, 200) - assert.strictEqual(await response.text(), 'hi there') - const event = await deferred - assert.strictEqual( - event.reason.message, - 'This ReadableStream did not return bytes.' - ) + for (const chunk of chunks) { + const deferred = waitForReject() + const url = new URL(`${server.url}?chunk=${JSON.stringify(chunk)}`) + const response = await fetch(String(url)) + assert.strictEqual(response.status, 200) + assert.strictEqual(await response.text(), 'hi there') + const event = await deferred + assert.strictEqual( + event.reason.message, + 'This ReadableStream did not return bytes.' + ) + } return 'TEST PASSED!' } finally { await server.close() diff --git a/packages/runtime/tests/server.test.ts b/packages/runtime/tests/server.test.ts index bbf60de1..00995335 100644 --- a/packages/runtime/tests/server.test.ts +++ b/packages/runtime/tests/server.test.ts @@ -5,26 +5,8 @@ import type { Readable } from 'stream' let server: Awaited> -const chunkErrorFn = jest.fn() -jest.mock('../src/server/body-streams', () => { - const utils = jest.requireActual('../src/server/body-streams') - return { - ...utils, - consumeUint8ArrayReadableStream: async function* (body?: ReadableStream) { - try { - for await (const chunk of utils.consumeUint8ArrayReadableStream(body)) { - yield chunk - } - } catch (error) { - chunkErrorFn(error) - } - }, - } -}) - afterEach(() => { server.close() - chunkErrorFn.mockReset() }) function sleep(ms: number) { @@ -184,43 +166,6 @@ test(`do not fail writing to the response socket Uint8Array`, async () => { expect(response.status).toEqual(200) const text = await response.text() expect(text).toEqual('hi there1\nhi there2\nhi there3\n') - expect(chunkErrorFn).toHaveBeenCalledTimes(0) -}) - -test(`fails when writing to the response socket a wrong chunk`, async () => { - const chunks = [1, 'String', true, { b: 1 }, [1], Buffer.from('Buffer')] - const runtime = new EdgeRuntime() - runtime.evaluate(` - addEventListener('fetch', event => { - const url = new URL(event.request.url) - const chunk = url.searchParams.get('chunk') - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(new TextEncoder().encode('hi there')); - controller.enqueue(JSON.parse(chunk)); - controller.close(); - } - }); - return event.respondWith( - new Response(stream, { - status: 200, - }) - ) - }) - `) - - server = await runServer({ runtime }) - for (const chunk of chunks) { - const response = await fetch(`${server.url}?chunk=${JSON.stringify(chunk)}`) - expect(response.status).toEqual(200) - expect(await response.text()).toEqual('hi there') - expect(chunkErrorFn).toHaveBeenCalledTimes(1) - expect(chunkErrorFn.mock.calls[0][0]).toBeInstanceOf(TypeError) - expect(chunkErrorFn.mock.calls[0][0].message).toEqual( - 'This ReadableStream did not return bytes.' - ) - chunkErrorFn.mockReset() - } }) test('streamable sanity test', async () => { From 575850dc81a09d6eaaad2f0d7b5d7be572b7614c Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Mon, 12 Jun 2023 23:38:34 -0400 Subject: [PATCH 5/8] Extract stream helper so it can be reused by Next --- packages/runtime/src/server/body-streams.ts | 106 +++++++++++------- packages/runtime/src/server/create-handler.ts | 52 +-------- 2 files changed, 70 insertions(+), 88 deletions(-) diff --git a/packages/runtime/src/server/body-streams.ts b/packages/runtime/src/server/body-streams.ts index 331ce0b8..7d497d5e 100644 --- a/packages/runtime/src/server/body-streams.ts +++ b/packages/runtime/src/server/body-streams.ts @@ -1,4 +1,4 @@ -import type { IncomingMessage } from 'http' +import type { IncomingMessage, ServerResponse } from 'http' import { Readable } from 'stream' type BodyStream = ReadableStream @@ -94,59 +94,89 @@ function replaceRequestBody( return base } +function isUint8ArrayChunk(value: any): value is Uint8Array { + return value?.constructor?.name == 'Uint8Array' +} + /** * Creates an async iterator from a ReadableStream that ensures that every * emitted chunk is a `Uint8Array`. If there is some invalid chunk it will * throw. */ export async function* consumeUint8ArrayReadableStream(body?: ReadableStream) { - if (!body) { - return - } - - const reader = body.getReader() - - // If the consumer calls `it.return()`, our generator code's `yield` will - // perform an AbruptCompletion and behave as if this was a `return` statement. - // To ensure we perform cleanup, we need to guard the yield statement and - // detect this condition with a try-finally. - let needsCleanup = false - - // If we detect an invalid chunk, we store an error to be thrown as part of - // the cleanup phase. - let invalidChunkError - - try { + const reader = body?.getReader() + if (reader) { while (true) { - // If the read errors, or we are done reading, we do not need to cleanup - // further. const { done, value } = await reader.read() if (done) { return } - // We also need to cleanup if the user returned an invalid type. The loop - // isn't done yet, but we're not going to be reading any more. - needsCleanup = true - - if (value?.constructor?.name !== 'Uint8Array') { - invalidChunkError = new TypeError( - 'This ReadableStream did not return bytes.' - ) - break + if (!isUint8ArrayChunk(value)) { + throw new TypeError('This ReadableStream did not return bytes.') } + yield value + } + } +} + +/** + * Pipes the chunks of a BodyStream into a Response. This optimizes for + * laziness, pauses reading if we experience back-pressure, and handles early + * disconnects by the client on the other end of the server response. + */ +export async function pipeBodyStreamToResponse( + body: BodyStream | null, + res: ServerResponse +) { + if (!body) return + + // If the client has already disconnected, then we don't need to pipe anything. + if (res.destroyed) return body.cancel() + + // When the server pushes more data than the client reads, then we need to + // wait for the client to catch up before writing more data. We register this + // generic handler once so that we don't incur constant register/unregister + // calls. + let drainResolve: () => void + res.on('drain', () => drainResolve?.()) + + // If the user aborts, then we'll receive a close event before the + // body closes. In that case, we want to end the streaming. + let open = true + res.on('close', () => { + open = false + drainResolve?.() + }) - yield value as Uint8Array - needsCleanup = false + const reader = body.getReader() + while (open) { + const { done, value } = await reader.read() + if (done) break + + if (!isUint8ArrayChunk(value)) { + const error = new TypeError('This ReadableStream did not return bytes.') + reader.cancel(error) + throw error } - } finally { - // The reader either returned an invalid chunk, or our consumer early - // exited. In either case, we need to cleanup the stream's resources. - if (needsCleanup) { - reader.cancel(invalidChunkError) + + if (open) { + const bufferSpaceAvailable = res.write(value) + + // If there's no more space in the buffer, then we need to wait on the + // client to read data before pushing again. + if (!bufferSpaceAvailable) { + await new Promise((res) => { + drainResolve = res + }) + } } - if (invalidChunkError) { - throw invalidChunkError + + // If the client disconnected early, then we need to cleanup the stream. + // This cannot be joined with the above if-statement, because the client may + // have disconnected while waiting for a drain signal. + if (!open) { + return reader.cancel() } } } diff --git a/packages/runtime/src/server/create-handler.ts b/packages/runtime/src/server/create-handler.ts index 8cc9bded..8f7babf2 100644 --- a/packages/runtime/src/server/create-handler.ts +++ b/packages/runtime/src/server/create-handler.ts @@ -2,10 +2,7 @@ import type { EdgeRuntime } from '../edge-runtime' import type { IncomingMessage, ServerResponse } from 'http' import type { Logger, NodeHeaders } from '../types' import type { EdgeContext } from '@edge-runtime/vm' -import { - consumeUint8ArrayReadableStream, - getClonableBodyStream, -} from './body-streams' +import { getClonableBodyStream, pipeBodyStreamToResponse } from './body-streams' import prettyMs from 'pretty-ms' import timeSpan from 'time-span' @@ -70,7 +67,7 @@ export function createHandler(options: Options) { } } - await stream(response.body, res) + await pipeBodyStreamToResponse(response.body, res) const subject = `${req.socket.remoteAddress} ${req.method} ${req.url}` const time = `${prettyMs(start()) @@ -91,51 +88,6 @@ export function createHandler(options: Options) { } } -async function stream( - body: ReadableStream | null, - res: ServerResponse -) { - if (!body) return - - // If the client has already disconnected, then we don't need to pipe anything. - if (res.destroyed) return body.cancel() - - // When the server pushes more data than the client reads, then we need to - // wait for the client to catch up before writing more data. We register this - // generic handler once so that we don't incur constant register/unregister - // calls. - let drainResolve: () => void - res.on('drain', () => drainResolve?.()) - - // If the user aborts, then we'll receive a close event before the - // body closes. In that case, we want to end the streaming. - let open = true - res.on('close', () => { - open = false - drainResolve?.() - }) - - const stream = consumeUint8ArrayReadableStream(body) - for await (const chunk of stream) { - const bufferSpaceAvailable = res.write(chunk) - - // If there's no more space in the buffer, then we need to wait on the - // client to read data before pushing again. - if (!bufferSpaceAvailable) { - await new Promise((res) => { - drainResolve = res - }) - } - - // If the client disconnects, then we need to manually cleanup the stream - // iterator so the body can release its resources. - if (!open) { - stream.return() - break - } - } -} - /** * Builds a full URL from the provided incoming message. Note this function * is not safe as one can set has a host anything based on headers. It is From a40bc96a08fcf39261de34694ff74352c302ceef Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Mon, 12 Jun 2023 23:54:04 -0400 Subject: [PATCH 6/8] Fix test --- packages/runtime/tests/server.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/runtime/tests/server.test.ts b/packages/runtime/tests/server.test.ts index 00995335..601e95ab 100644 --- a/packages/runtime/tests/server.test.ts +++ b/packages/runtime/tests/server.test.ts @@ -249,7 +249,7 @@ test('allows long-running streams to be cancelled immediately', async () => { // Because the client and server are in the same node process, if the server // doesn't pause then it will have pulled all 10 iterations immediately. - expect(pulled).toEqual([0, 1]) + expect(pulled).not.toContain(10) }) test('allows long-running streams to be cancelled after partial read', async () => { From daa9511ed8ac0a0387947ad9c2590cb71257457e Mon Sep 17 00:00:00 2001 From: Justin Ridgewell Date: Tue, 13 Jun 2023 00:18:29 -0400 Subject: [PATCH 7/8] Export method --- packages/runtime/src/index.ts | 1 + packages/runtime/src/server/index.ts | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/runtime/src/index.ts b/packages/runtime/src/index.ts index 75adff1e..5c3f5c61 100644 --- a/packages/runtime/src/index.ts +++ b/packages/runtime/src/index.ts @@ -1,5 +1,6 @@ export { consumeUint8ArrayReadableStream, + pipeBodyStreamToResponse, createHandler, runServer, } from './server' diff --git a/packages/runtime/src/server/index.ts b/packages/runtime/src/server/index.ts index 9ae205c4..f1c47778 100644 --- a/packages/runtime/src/server/index.ts +++ b/packages/runtime/src/server/index.ts @@ -1,3 +1,6 @@ -export { consumeUint8ArrayReadableStream } from './body-streams' +export { + consumeUint8ArrayReadableStream, + pipeBodyStreamToResponse, +} from './body-streams' export { createHandler } from './create-handler' export { runServer, EdgeRuntimeServer } from './run-server' From 8ccfe46e8dd3d852a24bcf055acad1ca5878e7a1 Mon Sep 17 00:00:00 2001 From: Kiko Beats Date: Tue, 13 Jun 2023 09:44:45 +0200 Subject: [PATCH 8/8] Create spicy-toys-teach.md --- .changeset/spicy-toys-teach.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/spicy-toys-teach.md diff --git a/.changeset/spicy-toys-teach.md b/.changeset/spicy-toys-teach.md new file mode 100644 index 00000000..6414aecb --- /dev/null +++ b/.changeset/spicy-toys-teach.md @@ -0,0 +1,5 @@ +--- +"edge-runtime": minor +--- + +Lazily stream responses