Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lazy streaming #51330

Open
wants to merge 7 commits into
base: canary
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/next/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@
"debug": "4.1.1",
"devalue": "2.0.1",
"domain-browser": "4.19.0",
"edge-runtime": "2.3.2",
"edge-runtime": "2.4.3",
"events": "3.3.0",
"find-cache-dir": "3.3.1",
"find-up": "4.1.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/next/src/compiled/edge-runtime/index.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/next/src/compiled/sass-loader/cjs.js

Large diffs are not rendered by default.

72 changes: 71 additions & 1 deletion packages/next/src/server/body-streams.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { IncomingMessage } from 'http'
import { PassThrough, Readable } from 'stream'
import { PassThrough, pipeline, type Readable, Writable } from 'stream'

export function requestToBodyStream(
context: { ReadableStream: typeof ReadableStream },
Expand Down Expand Up @@ -89,3 +89,73 @@ export function getCloneableBody<T extends IncomingMessage>(
},
}
}

export function pipeNodeToNode(src: Readable, dest: Writable) {
return new Promise<void>((res, rej) => {
pipeline(src, dest, (err) => {
dest.end()
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
rej(err)
} else {
res()
}
})
})
}

/**
* FlushStream acts as a Writable interface for another Writable. Every time
* data is pushed into the stream, it will be immediately written to the other,
* and the other writable will be flushed. This allows us to send small chunks
* of data to the browser even when GZIP encoding is enabled.
*/
export class FlushStream extends Writable {
declare _dest: Writable & { flush: () => void }

// In order to signal that the dest is full, we need to return the last
// result of calling `dest.write()` to the caller of `flush.write()`.
_writeCb: null | ((e?: Error | null) => void) = null

constructor(dest: Writable & { flush: () => void }) {
super()
this._dest = dest

this.once('close', () => dest.end())
this.once('error', (e) => dest.destroy(e))
dest.once('close', () => this.end())
dest.once('error', (e) => this.destroy(e))
dest.on('drain', () => {
const cb = this._writeCb
this._writeCb = null
cb?.()
this.emit('drain')
})
}

write(chunk: any, encoding?: any, callback?: any): boolean {
super.write(chunk, encoding, callback)
// If there's no pending writeCb, then the caller is free to continue
// writing.
return this._writeCb === null
}

_writev(
chunks: { chunk: any; encoding: BufferEncoding }[],
callback: (error?: Error | null | undefined) => void
): void {
let writable = true
for (const { chunk, encoding } of chunks) {
writable = this._dest.write(chunk, encoding)
}
this._dest.flush()

// If the last call to dest.write returned true, then our caller can
// continue writing. If not, then we need to wait until it the dest emits a
// drain event.
if (writable) {
callback()
} else {
this._writeCb = callback
}
}
}
14 changes: 14 additions & 0 deletions packages/next/src/server/lib/render-server-standalone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ export const createServerHandler = async ({
return
}
const proxyServer = getProxyServer(req.url || '/')

// http-proxy does not properly detect a client disconnect in newer
// versions of Node.js. This is caused because it only listens for the
// `aborted` event on the our request object, but it also fully reads and
// closes the request object. Node **will not** fire `aborted` when the
// request is already closed. Listening for `close` on our response object
// will detect the disconnect, and we can abort the proxy's connection.
proxyServer.on('proxyReq', (proxyReq) => {
res.on('close', () => proxyReq.destroy())
})
proxyServer.on('proxyRes', (proxyRes) => {
res.on('close', () => proxyRes.destroy())
})

proxyServer.web(req, res)
proxyServer.on('error', (err) => {
res.statusCode = 500
Expand Down
15 changes: 15 additions & 0 deletions packages/next/src/server/lib/start-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,21 @@ export async function startServer({
return
}
const proxyServer = getProxyServer(req.url || '/')

// http-proxy does not properly detect a client disconnect in newer
// versions of Node.js. This is caused because it only listens for the
// `aborted` event on the our request object, but it also fully reads
// and closes the request object. Node **will not** fire `aborted` when
// the request is already closed. Listening for `close` on our response
// object will detect the disconnect, and we can abort the proxy's
// connection.
proxyServer.on('proxyReq', (proxyReq) => {
res.on('close', () => proxyReq.destroy())
})
proxyServer.on('proxyRes', (proxyRes) => {
res.on('close', () => proxyRes.destroy())
})

proxyServer.web(req, res)
}
upgradeHandler = async (req, socket, head) => {
Expand Down
64 changes: 34 additions & 30 deletions packages/next/src/server/next-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
} from '../shared/lib/router/utils/route-matcher'
import type { MiddlewareRouteMatch } from '../shared/lib/router/utils/middleware-route-matcher'
import type { RouteMatch } from './future/route-matches/route-match'
import type { Writable } from 'stream'

import fs from 'fs'
import { join, relative, resolve, sep, isAbsolute } from 'path'
Expand Down Expand Up @@ -86,7 +87,7 @@ import { getCustomRoute, stringifyQuery } from './server-route-utils'
import { urlQueryToSearchParams } from '../shared/lib/router/utils/querystring'
import { removeTrailingSlash } from '../shared/lib/router/utils/remove-trailing-slash'
import { getNextPathnameInfo } from '../shared/lib/router/utils/get-next-pathname-info'
import { getCloneableBody } from './body-streams'
import { FlushStream, getCloneableBody, pipeNodeToNode } from './body-streams'
import { checkIsOnDemandRevalidate } from './api-utils'
import ResponseCache from './response-cache'
import { IncrementalCache } from './lib/incremental-cache'
Expand Down Expand Up @@ -986,14 +987,16 @@ export default class NextNodeServer extends BaseServer {
)
}

protected streamResponseChunk(res: NodeNextResponse, chunk: any) {
res.originalResponse.write(chunk)
private flushingStreamForCompression(res: NodeNextResponse): Writable {
const { originalResponse } = res

// When both compression and streaming are enabled, we need to explicitly
// flush the response to avoid it being buffered by gzip.
if (this.compression && 'flush' in res.originalResponse) {
;(res.originalResponse as any).flush()
if (this.compression && 'flush' in originalResponse) {
return new FlushStream(originalResponse as any)
}

return originalResponse
}

protected async imageOptimizer(
Expand Down Expand Up @@ -1525,10 +1528,10 @@ export default class NextNodeServer extends BaseServer {
res.statusCode = invokeRes.statusCode
res.statusMessage = invokeRes.statusMessage

for await (const chunk of invokeRes) {
this.streamResponseChunk(res as NodeNextResponse, chunk)
}
;(res as NodeNextResponse).originalResponse.end()
await pipeNodeToNode(
invokeRes,
this.flushingStreamForCompression(res as NodeNextResponse)
)
return {
finished: true,
}
Expand Down Expand Up @@ -2497,6 +2500,8 @@ export default class NextNodeServer extends BaseServer {
})

if (isMiddlewareInvoke && 'response' in result) {
const { pipeBodyStreamToResponse } =
require('next/dist/compiled/edge-runtime') as typeof import('next/dist/compiled/edge-runtime')
for (const [key, value] of Object.entries(
toNodeOutgoingHttpHeaders(result.response.headers)
)) {
Expand All @@ -2505,10 +2510,10 @@ export default class NextNodeServer extends BaseServer {
}
}
res.statusCode = result.response.status
for await (const chunk of result.response.body ||
([] as any)) {
this.streamResponseChunk(res as NodeNextResponse, chunk)
}
await pipeBodyStreamToResponse(
result.response.body,
this.flushingStreamForCompression(res as NodeNextResponse)
)
res.send()
return {
finished: true,
Expand Down Expand Up @@ -2679,14 +2684,17 @@ export default class NextNodeServer extends BaseServer {
res.statusCode = result.response.status

if ((result.response as any).invokeRes) {
for await (const chunk of (result.response as any).invokeRes) {
this.streamResponseChunk(res as NodeNextResponse, chunk)
}
;(res as NodeNextResponse).originalResponse.end()
await pipeNodeToNode(
(result.response as any).invokeRes,
this.flushingStreamForCompression(res as NodeNextResponse)
)
} else {
for await (const chunk of result.response.body || ([] as any)) {
this.streamResponseChunk(res as NodeNextResponse, chunk)
}
const { pipeBodyStreamToResponse } =
require('next/dist/compiled/edge-runtime') as typeof import('next/dist/compiled/edge-runtime')
await pipeBodyStreamToResponse(
result.response.body,
this.flushingStreamForCompression(res as NodeNextResponse)
)
res.send()
}
return {
Expand Down Expand Up @@ -2864,22 +2872,18 @@ export default class NextNodeServer extends BaseServer {
}
})

// TODO(gal): not sure that we always need to stream
const nodeResStream = (params.res as NodeNextResponse).originalResponse
if (result.response.body) {
// TODO(gal): not sure that we always need to stream
const nodeResStream = (params.res as NodeNextResponse).originalResponse
const { consumeUint8ArrayReadableStream } =
require('next/dist/compiled/edge-runtime') as typeof import('next/dist/compiled/edge-runtime')
try {
for await (const chunk of consumeUint8ArrayReadableStream(
result.response.body
)) {
nodeResStream.write(chunk)
}
const { pipeBodyStreamToResponse } =
require('next/dist/compiled/edge-runtime') as typeof import('next/dist/compiled/edge-runtime')
await pipeBodyStreamToResponse(result.response.body, nodeResStream)
} finally {
nodeResStream.end()
}
} else {
;(params.res as NodeNextResponse).originalResponse.end()
nodeResStream.end()
}

return result
Expand Down
Loading