Skip to content

Commit

Permalink
Lazily stream responses (vercel#396)
Browse files Browse the repository at this point in the history
* Use a lazy stream implementation

* Add proper test for unhandled rejection in pull

* Use consumeUint8ArrayReadableStream

* Remove module method mocking

* Extract stream helper so it can be reused by Next

* Fix test

* Export method

* Create spicy-toys-teach.md

---------

Co-authored-by: Kiko Beats <[email protected]>
  • Loading branch information
jridgewell and Kikobeats committed Jun 23, 2023
1 parent 071cef9 commit 835453c
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 76 deletions.
5 changes: 5 additions & 0 deletions .changeset/spicy-toys-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"edge-runtime": minor
---

Lazily stream responses
1 change: 1 addition & 0 deletions packages/runtime/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export {
consumeUint8ArrayReadableStream,
pipeBodyStreamToResponse,
createHandler,
runServer,
} from './server'
Expand Down
70 changes: 67 additions & 3 deletions packages/runtime/src/server/body-streams.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { IncomingMessage } from 'http'
import type { IncomingMessage, ServerResponse } from 'http'
import { Readable } from 'stream'

type BodyStream = ReadableStream<Uint8Array>
Expand Down Expand Up @@ -94,6 +94,10 @@ function replaceRequestBody<T extends IncomingMessage>(
return base
}

function isUint8ArrayChunk(value: any): value is Uint8Array {
return value?.constructor?.name == 'Uint8Array'
}

/**
* Creates an async iterator from a ReadableStream that ensures that every
* emitted chunk is a `Uint8Array`. If there is some invalid chunk it will
Expand All @@ -108,11 +112,71 @@ export async function* consumeUint8ArrayReadableStream(body?: ReadableStream) {
return
}

if (value?.constructor?.name !== 'Uint8Array') {
if (!isUint8ArrayChunk(value)) {
throw new TypeError('This ReadableStream did not return bytes.')
}
yield value
}
}
}

/**
* Pipes the chunks of a BodyStream into a Response. This optimizes for
* laziness, pauses reading if we experience back-pressure, and handles early
* disconnects by the client on the other end of the server response.
*/
export async function pipeBodyStreamToResponse(
body: BodyStream | null,
res: ServerResponse
) {
if (!body) return

// If the client has already disconnected, then we don't need to pipe anything.
if (res.destroyed) return body.cancel()

// When the server pushes more data than the client reads, then we need to
// wait for the client to catch up before writing more data. We register this
// generic handler once so that we don't incur constant register/unregister
// calls.
let drainResolve: () => void
res.on('drain', () => drainResolve?.())

// If the user aborts, then we'll receive a close event before the
// body closes. In that case, we want to end the streaming.
let open = true
res.on('close', () => {
open = false
drainResolve?.()
})

const reader = body.getReader()
while (open) {
const { done, value } = await reader.read()
if (done) break

if (!isUint8ArrayChunk(value)) {
const error = new TypeError('This ReadableStream did not return bytes.')
reader.cancel(error)
throw error
}

if (open) {
const bufferSpaceAvailable = res.write(value)

// If there's no more space in the buffer, then we need to wait on the
// client to read data before pushing again.
if (!bufferSpaceAvailable) {
await new Promise<void>((res) => {
drainResolve = res
})
}
}

yield value as Uint8Array
// If the client disconnected early, then we need to cleanup the stream.
// This cannot be joined with the above if-statement, because the client may
// have disconnected while waiting for a drain signal.
if (!open) {
return reader.cancel()
}
}
}
11 changes: 2 additions & 9 deletions packages/runtime/src/server/create-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ import type { EdgeRuntime } from '../edge-runtime'
import type { IncomingMessage, ServerResponse } from 'http'
import type { Logger, NodeHeaders } from '../types'
import type { EdgeContext } from '@edge-runtime/vm'
import { consumeUint8ArrayReadableStream } from './body-streams'
import { getClonableBodyStream } from './body-streams'
import { getClonableBodyStream, pipeBodyStreamToResponse } from './body-streams'
import prettyMs from 'pretty-ms'
import timeSpan from 'time-span'

Expand Down Expand Up @@ -68,13 +67,7 @@ export function createHandler<T extends EdgeContext>(options: Options<T>) {
}
}

if (response.body) {
for await (const chunk of consumeUint8ArrayReadableStream(
response.body
)) {
res.write(chunk)
}
}
await pipeBodyStreamToResponse(response.body, res)

const subject = `${req.socket.remoteAddress} ${req.method} ${req.url}`
const time = `${prettyMs(start())
Expand Down
5 changes: 4 additions & 1 deletion packages/runtime/src/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export { consumeUint8ArrayReadableStream } from './body-streams'
export {
consumeUint8ArrayReadableStream,
pipeBodyStreamToResponse,
} from './body-streams'
export { createHandler } from './create-handler'
export { runServer, EdgeRuntimeServer } from './run-server'
52 changes: 52 additions & 0 deletions packages/runtime/tests/fixtures/pull-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { EdgeRuntime, runServer } from '../../src'
import assert from 'assert'
import fetch from 'node-fetch'

async function main() {
const runtime = new EdgeRuntime()
const deferred = new Promise<PromiseRejectionEvent>((resolve) => {
runtime.context.handleRejection = (event: PromiseRejectionEvent) => {
resolve(event)
}
})

runtime.evaluate(`
addEventListener('fetch', event => {
const stream = new ReadableStream({
pull(controller) {
throw new Error('expected pull error');
}
});
return event.respondWith(
new Response(stream, {
status: 200,
})
)
})
addEventListener('unhandledrejection', (event) => {
globalThis.handleRejection(event)
})
`)

const server = await runServer({ runtime })

try {
const url = new URL(server.url)
const response = await fetch(String(url))
assert.strictEqual(response.status, 200)
assert.strictEqual(await response.text(), '')
const event = await deferred
assert.strictEqual(event.reason.message, 'expected pull error')
return 'TEST PASSED!'
} finally {
await server.close()
}
}

main()
.then(console.log)
.catch((error) => {
console.log('TEST FAILED!')
console.log(error)
})
39 changes: 24 additions & 15 deletions packages/runtime/tests/fixtures/unhandled-rejection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ import fetch from 'node-fetch'

async function main() {
const runtime = new EdgeRuntime()
const deferred = new Promise<PromiseRejectionEvent>((resolve) => {
runtime.context.handleRejection = (event: PromiseRejectionEvent) => {
resolve(event)
}
})
function waitForReject() {
return new Promise<PromiseRejectionEvent>((resolve) => {
runtime.context.handleRejection = (event: PromiseRejectionEvent) => {
resolve(event)
}
})
}

runtime.evaluate(`
addEventListener('fetch', event => {
const url = new URL(event.request.url)
const chunk = url.searchParams.get('chunk')
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('hi there'));
controller.enqueue('wrong chunk');
controller.enqueue(JSON.parse(chunk));
controller.close();
}
});
Expand All @@ -33,16 +37,21 @@ async function main() {

const server = await runServer({ runtime })

const chunks = [1, 'String', true, { b: 1 }, [1], Buffer.from('Buffer')]

try {
const url = new URL(server.url)
const response = await fetch(String(url))
assert.strictEqual(response.status, 200)
assert.strictEqual(await response.text(), 'hi there')
const event = await deferred
assert.strictEqual(
event.reason.message,
'This ReadableStream did not return bytes.'
)
for (const chunk of chunks) {
const deferred = waitForReject()
const url = new URL(`${server.url}?chunk=${JSON.stringify(chunk)}`)
const response = await fetch(String(url))
assert.strictEqual(response.status, 200)
assert.strictEqual(await response.text(), 'hi there')
const event = await deferred
assert.strictEqual(
event.reason.message,
'This ReadableStream did not return bytes.'
)
}
return 'TEST PASSED!'
} finally {
await server.close()
Expand Down
14 changes: 14 additions & 0 deletions packages/runtime/tests/rejections-and-errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,17 @@ it('handles correctly unhandled rejections', async () => {
stderr: '',
})
})

it('reports unhandled rejection for pull errors', async () => {
const result = await execAsync(
`ts-node --transpile-only ${resolve(
__dirname,
'./fixtures/pull-error.ts'
)}`,
{ encoding: 'utf8' }
)
expect(result).toMatchObject({
stdout: expect.stringContaining('TEST PASSED!'),
stderr: '',
})
})
Loading

0 comments on commit 835453c

Please sign in to comment.