Skip to content

Commit

Permalink
Extract stream helper so it can be reused by Next
Browse files Browse the repository at this point in the history
  • Loading branch information
jridgewell committed Jun 13, 2023
1 parent f37e7eb commit 575850d
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 88 deletions.
106 changes: 68 additions & 38 deletions packages/runtime/src/server/body-streams.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { IncomingMessage } from 'http'
import type { IncomingMessage, ServerResponse } from 'http'
import { Readable } from 'stream'

type BodyStream = ReadableStream<Uint8Array>
Expand Down Expand Up @@ -94,59 +94,89 @@ function replaceRequestBody<T extends IncomingMessage>(
return base
}

function isUint8ArrayChunk(value: any): value is Uint8Array {
return value?.constructor?.name == 'Uint8Array'
}

/**
* Creates an async iterator from a ReadableStream that ensures that every
* emitted chunk is a `Uint8Array`. If there is some invalid chunk it will
* throw.
*/
export async function* consumeUint8ArrayReadableStream(body?: ReadableStream) {
if (!body) {
return
}

const reader = body.getReader()

// If the consumer calls `it.return()`, our generator code's `yield` will
// perform an AbruptCompletion and behave as if this was a `return` statement.
// To ensure we perform cleanup, we need to guard the yield statement and
// detect this condition with a try-finally.
let needsCleanup = false

// If we detect an invalid chunk, we store an error to be thrown as part of
// the cleanup phase.
let invalidChunkError

try {
const reader = body?.getReader()
if (reader) {
while (true) {
// If the read errors, or we are done reading, we do not need to cleanup
// further.
const { done, value } = await reader.read()
if (done) {
return
}

// We also need to cleanup if the user returned an invalid type. The loop
// isn't done yet, but we're not going to be reading any more.
needsCleanup = true

if (value?.constructor?.name !== 'Uint8Array') {
invalidChunkError = new TypeError(
'This ReadableStream did not return bytes.'
)
break
if (!isUint8ArrayChunk(value)) {
throw new TypeError('This ReadableStream did not return bytes.')
}
yield value
}
}
}

/**
* Pipes the chunks of a BodyStream into a Response. This optimizes for
* laziness, pauses reading if we experience back-pressure, and handles early
* disconnects by the client on the other end of the server response.
*/
export async function pipeBodyStreamToResponse(
body: BodyStream | null,
res: ServerResponse
) {
if (!body) return

// If the client has already disconnected, then we don't need to pipe anything.
if (res.destroyed) return body.cancel()

// When the server pushes more data than the client reads, then we need to
// wait for the client to catch up before writing more data. We register this
// generic handler once so that we don't incur constant register/unregister
// calls.
let drainResolve: () => void
res.on('drain', () => drainResolve?.())

// If the user aborts, then we'll receive a close event before the
// body closes. In that case, we want to end the streaming.
let open = true
res.on('close', () => {
open = false
drainResolve?.()
})

yield value as Uint8Array
needsCleanup = false
const reader = body.getReader()
while (open) {
const { done, value } = await reader.read()
if (done) break

if (!isUint8ArrayChunk(value)) {
const error = new TypeError('This ReadableStream did not return bytes.')
reader.cancel(error)
throw error
}
} finally {
// The reader either returned an invalid chunk, or our consumer early
// exited. In either case, we need to cleanup the stream's resources.
if (needsCleanup) {
reader.cancel(invalidChunkError)

if (open) {
const bufferSpaceAvailable = res.write(value)

// If there's no more space in the buffer, then we need to wait on the
// client to read data before pushing again.
if (!bufferSpaceAvailable) {
await new Promise<void>((res) => {
drainResolve = res
})
}
}
if (invalidChunkError) {
throw invalidChunkError

// If the client disconnected early, then we need to cleanup the stream.
// This cannot be joined with the above if-statement, because the client may
// have disconnected while waiting for a drain signal.
if (!open) {
return reader.cancel()
}
}
}
52 changes: 2 additions & 50 deletions packages/runtime/src/server/create-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ import type { EdgeRuntime } from '../edge-runtime'
import type { IncomingMessage, ServerResponse } from 'http'
import type { Logger, NodeHeaders } from '../types'
import type { EdgeContext } from '@edge-runtime/vm'
import {
consumeUint8ArrayReadableStream,
getClonableBodyStream,
} from './body-streams'
import { getClonableBodyStream, pipeBodyStreamToResponse } from './body-streams'
import prettyMs from 'pretty-ms'
import timeSpan from 'time-span'

Expand Down Expand Up @@ -70,7 +67,7 @@ export function createHandler<T extends EdgeContext>(options: Options<T>) {
}
}

await stream(response.body, res)
await pipeBodyStreamToResponse(response.body, res)

const subject = `${req.socket.remoteAddress} ${req.method} ${req.url}`
const time = `${prettyMs(start())
Expand All @@ -91,51 +88,6 @@ export function createHandler<T extends EdgeContext>(options: Options<T>) {
}
}

async function stream(
body: ReadableStream<Uint8Array> | null,
res: ServerResponse
) {
if (!body) return

// If the client has already disconnected, then we don't need to pipe anything.
if (res.destroyed) return body.cancel()

// When the server pushes more data than the client reads, then we need to
// wait for the client to catch up before writing more data. We register this
// generic handler once so that we don't incur constant register/unregister
// calls.
let drainResolve: () => void
res.on('drain', () => drainResolve?.())

// If the user aborts, then we'll receive a close event before the
// body closes. In that case, we want to end the streaming.
let open = true
res.on('close', () => {
open = false
drainResolve?.()
})

const stream = consumeUint8ArrayReadableStream(body)
for await (const chunk of stream) {
const bufferSpaceAvailable = res.write(chunk)

// If there's no more space in the buffer, then we need to wait on the
// client to read data before pushing again.
if (!bufferSpaceAvailable) {
await new Promise<void>((res) => {
drainResolve = res
})
}

// If the client disconnects, then we need to manually cleanup the stream
// iterator so the body can release its resources.
if (!open) {
stream.return()
break
}
}
}

/**
* Builds a full URL from the provided incoming message. Note this function
* is not safe as one can set has a host anything based on headers. It is
Expand Down

0 comments on commit 575850d

Please sign in to comment.