Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazily stream responses #396

Merged
merged 8 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/spicy-toys-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"edge-runtime": minor
---

Lazily stream responses
1 change: 1 addition & 0 deletions packages/runtime/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export {
consumeUint8ArrayReadableStream,
pipeBodyStreamToResponse,
createHandler,
runServer,
} from './server'
Expand Down
70 changes: 67 additions & 3 deletions packages/runtime/src/server/body-streams.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { IncomingMessage } from 'http'
import type { IncomingMessage, ServerResponse } from 'http'
import { Readable } from 'stream'

type BodyStream = ReadableStream<Uint8Array>
Expand Down Expand Up @@ -94,6 +94,10 @@ function replaceRequestBody<T extends IncomingMessage>(
return base
}

function isUint8ArrayChunk(value: any): value is Uint8Array {
return value?.constructor?.name == 'Uint8Array'
}

/**
* Creates an async iterator from a ReadableStream that ensures that every
* emitted chunk is a `Uint8Array`. If there is some invalid chunk it will
Expand All @@ -108,11 +112,71 @@ export async function* consumeUint8ArrayReadableStream(body?: ReadableStream) {
return
}

if (value?.constructor?.name !== 'Uint8Array') {
if (!isUint8ArrayChunk(value)) {
throw new TypeError('This ReadableStream did not return bytes.')
}
yield value
}
}
}

/**
* Pipes the chunks of a BodyStream into a Response. This optimizes for
* laziness, pauses reading if we experience back-pressure, and handles early
* disconnects by the client on the other end of the server response.
*/
export async function pipeBodyStreamToResponse(
body: BodyStream | null,
res: ServerResponse
) {
if (!body) return

// If the client has already disconnected, then we don't need to pipe anything.
if (res.destroyed) return body.cancel()

// When the server pushes more data than the client reads, then we need to
// wait for the client to catch up before writing more data. We register this
// generic handler once so that we don't incur constant register/unregister
// calls.
let drainResolve: () => void
res.on('drain', () => drainResolve?.())

// If the user aborts, then we'll receive a close event before the
// body closes. In that case, we want to end the streaming.
let open = true
res.on('close', () => {
open = false
drainResolve?.()
})

const reader = body.getReader()
while (open) {
const { done, value } = await reader.read()
if (done) break

if (!isUint8ArrayChunk(value)) {
const error = new TypeError('This ReadableStream did not return bytes.')
reader.cancel(error)
throw error
}

if (open) {
const bufferSpaceAvailable = res.write(value)

// If there's no more space in the buffer, then we need to wait on the
// client to read data before pushing again.
if (!bufferSpaceAvailable) {
await new Promise<void>((res) => {
drainResolve = res
})
}
}

yield value as Uint8Array
// If the client disconnected early, then we need to cleanup the stream.
// This cannot be joined with the above if-statement, because the client may
// have disconnected while waiting for a drain signal.
if (!open) {
return reader.cancel()
}
}
}
11 changes: 2 additions & 9 deletions packages/runtime/src/server/create-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ import type { EdgeRuntime } from '../edge-runtime'
import type { IncomingMessage, ServerResponse } from 'http'
import type { Logger, NodeHeaders } from '../types'
import type { EdgeContext } from '@edge-runtime/vm'
import { consumeUint8ArrayReadableStream } from './body-streams'
import { getClonableBodyStream } from './body-streams'
import { getClonableBodyStream, pipeBodyStreamToResponse } from './body-streams'
import prettyMs from 'pretty-ms'
import timeSpan from 'time-span'

Expand Down Expand Up @@ -68,13 +67,7 @@ export function createHandler<T extends EdgeContext>(options: Options<T>) {
}
}

if (response.body) {
for await (const chunk of consumeUint8ArrayReadableStream(
response.body
)) {
res.write(chunk)
}
}
await pipeBodyStreamToResponse(response.body, res)

const subject = `${req.socket.remoteAddress} ${req.method} ${req.url}`
const time = `${prettyMs(start())
Expand Down
5 changes: 4 additions & 1 deletion packages/runtime/src/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export { consumeUint8ArrayReadableStream } from './body-streams'
export {
consumeUint8ArrayReadableStream,
pipeBodyStreamToResponse,
} from './body-streams'
export { createHandler } from './create-handler'
export { runServer, EdgeRuntimeServer } from './run-server'
52 changes: 52 additions & 0 deletions packages/runtime/tests/fixtures/pull-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { EdgeRuntime, runServer } from '../../src'
import assert from 'assert'
import fetch from 'node-fetch'

async function main() {
const runtime = new EdgeRuntime()
const deferred = new Promise<PromiseRejectionEvent>((resolve) => {
runtime.context.handleRejection = (event: PromiseRejectionEvent) => {
resolve(event)
}
})

runtime.evaluate(`
addEventListener('fetch', event => {
const stream = new ReadableStream({
pull(controller) {
throw new Error('expected pull error');
}
});
return event.respondWith(
new Response(stream, {
status: 200,
})
)
})

addEventListener('unhandledrejection', (event) => {
globalThis.handleRejection(event)
})
`)

const server = await runServer({ runtime })

try {
const url = new URL(server.url)
const response = await fetch(String(url))
assert.strictEqual(response.status, 200)
assert.strictEqual(await response.text(), '')
const event = await deferred
assert.strictEqual(event.reason.message, 'expected pull error')
return 'TEST PASSED!'
} finally {
await server.close()
}
}

main()
.then(console.log)
.catch((error) => {
console.log('TEST FAILED!')
console.log(error)
})
39 changes: 24 additions & 15 deletions packages/runtime/tests/fixtures/unhandled-rejection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ import fetch from 'node-fetch'

async function main() {
const runtime = new EdgeRuntime()
const deferred = new Promise<PromiseRejectionEvent>((resolve) => {
runtime.context.handleRejection = (event: PromiseRejectionEvent) => {
resolve(event)
}
})
function waitForReject() {
return new Promise<PromiseRejectionEvent>((resolve) => {
runtime.context.handleRejection = (event: PromiseRejectionEvent) => {
resolve(event)
}
})
}

runtime.evaluate(`
addEventListener('fetch', event => {
const url = new URL(event.request.url)
const chunk = url.searchParams.get('chunk')
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('hi there'));
controller.enqueue('wrong chunk');
controller.enqueue(JSON.parse(chunk));
controller.close();
}
});
Expand All @@ -33,16 +37,21 @@ async function main() {

const server = await runServer({ runtime })

const chunks = [1, 'String', true, { b: 1 }, [1], Buffer.from('Buffer')]

try {
const url = new URL(server.url)
const response = await fetch(String(url))
assert.strictEqual(response.status, 200)
assert.strictEqual(await response.text(), 'hi there')
const event = await deferred
assert.strictEqual(
event.reason.message,
'This ReadableStream did not return bytes.'
)
for (const chunk of chunks) {
const deferred = waitForReject()
const url = new URL(`${server.url}?chunk=${JSON.stringify(chunk)}`)
const response = await fetch(String(url))
assert.strictEqual(response.status, 200)
assert.strictEqual(await response.text(), 'hi there')
const event = await deferred
assert.strictEqual(
event.reason.message,
'This ReadableStream did not return bytes.'
)
}
return 'TEST PASSED!'
} finally {
await server.close()
Expand Down
14 changes: 14 additions & 0 deletions packages/runtime/tests/rejections-and-errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,17 @@ it('handles correctly unhandled rejections', async () => {
stderr: '',
})
})

it('reports unhandled rejection for pull errors', async () => {
const result = await execAsync(
`ts-node --transpile-only ${resolve(
__dirname,
'./fixtures/pull-error.ts'
)}`,
{ encoding: 'utf8' }
)
expect(result).toMatchObject({
stdout: expect.stringContaining('TEST PASSED!'),
stderr: '',
})
})
Loading