Skip to content

Commit

Permalink
Fix suffix ordering while streaming (#34011)
Browse files Browse the repository at this point in the history
* Fix suffix ordering

* Don't start reading until after resolving

* More yak shaving
  • Loading branch information
devknoll authored Feb 5, 2022
1 parent 6814ca7 commit 1aeb230
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 74 deletions.
179 changes: 105 additions & 74 deletions packages/next/server/render.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,8 @@ export async function renderToHTML(
ReactDOMServer,
content,
suffix,
serverComponentsInlinedTransformStream?.readable ?? null,
serverComponentsInlinedTransformStream?.readable ??
streamFromArray([]),
generateStaticHTML
)
}
Expand Down Expand Up @@ -1382,7 +1383,7 @@ export async function renderToHTML(
ReactDOMServer,
document,
null,
null,
streamFromArray([]),
true
)
documentHTML = await streamToString(documentStream)
Expand Down Expand Up @@ -1523,8 +1524,8 @@ function createTransformStream({
flush,
transform,
}: {
flush: (controller: TransformStreamDefaultController) => Promise<void> | void
transform: (
flush?: (controller: TransformStreamDefaultController) => Promise<void> | void
transform?: (
chunk: Uint8Array,
controller: TransformStreamDefaultController
) => void
Expand Down Expand Up @@ -1560,15 +1561,19 @@ function createTransformStream({
const { done, value } = await reader.read()

if (done) {
const maybePromise = flush(controller)
const maybePromise = flush?.(controller)
if (maybePromise) {
await maybePromise
}
writer.close()
return
}

transform(value, controller)
if (transform) {
transform(value, controller)
} else {
controller.enqueue(value)
}
}
} catch (err) {
writer.abort(err)
Expand Down Expand Up @@ -1620,7 +1625,7 @@ function renderToStream(
ReactDOMServer: typeof import('react-dom/server'),
element: React.ReactElement,
suffix: string | null,
dataStream: ReadableStream | null,
dataStream: ReadableStream,
generateStaticHTML: boolean
): Promise<ReadableStream> {
return new Promise((resolve, reject) => {
Expand All @@ -1635,83 +1640,109 @@ function renderToStream(
}
: null

const { readable, writable } = createTransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)
},

flush(controller) {
if (suffixState) {
controller.enqueue(suffixState.closeTag)
}
},
})

const doResolve = () => {
if (!resolved) {
resolved = true
let dataStreamFinished: Promise<void> | null = null
let shellFlushed = false

resolve(
readable.pipeThrough(createBufferedTransformStream()).pipeThrough(
createTransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)

if (!shellFlushed) {
shellFlushed = true
if (suffixState) {
controller.enqueue(suffixState.suffixUnclosed)
}
}

if (!dataStreamFinished && dataStream) {
const dataStreamReader = dataStream.getReader()
dataStreamFinished = (async () => {
try {
while (true) {
const { done, value } = await dataStreamReader.read()
if (done) {
return
}
controller.enqueue(value)
}
} catch (err) {
controller.error(err)
}
})()
}
},
flush() {
if (dataStreamFinished) {
return dataStreamFinished
}
},
})
// React will call our callbacks synchronously, so we need to
// defer to a microtask to ensure `stream` is set.
Promise.resolve().then(() =>
resolve(
stream
.pipeThrough(createBufferedTransformStream())
.pipeThrough(
createInlineDataStream(
dataStream.pipeThrough(
createPrefixStream(suffixState?.suffixUnclosed ?? null)
)
)
)
.pipeThrough(createSuffixStream(suffixState?.closeTag ?? null))
)
)
}
}

;(ReactDOMServer as any)
.renderToReadableStream(element, {
onError(err: Error) {
if (!resolved) {
resolved = true
reject(err)
}
},
onCompleteShell() {
if (!generateStaticHTML) {
doResolve()
}
},
onCompleteAll() {
const stream: ReadableStream = (
ReactDOMServer as any
).renderToReadableStream(element, {
onError(err: Error) {
if (!resolved) {
resolved = true
reject(err)
}
},
onCompleteShell() {
if (!generateStaticHTML) {
doResolve()
},
})
.pipeTo(writable)
}
},
onCompleteAll() {
doResolve()
},
})
})
}

function createSuffixStream(suffix: Uint8Array | null) {
return createTransformStream({
flush(controller) {
if (suffix) {
controller.enqueue(suffix)
}
},
})
}

function createPrefixStream(prefix: Uint8Array | null) {
let prefixFlushed = false
return createTransformStream({
transform(chunk, controller) {
if (!prefixFlushed && prefix) {
prefixFlushed = true
controller.enqueue(prefix)
}
controller.enqueue(chunk)
},
flush(controller) {
if (!prefixFlushed && prefix) {
prefixFlushed = true
controller.enqueue(prefix)
}
},
})
}

function createInlineDataStream(
dataStream: ReadableStream | null
): TransformStream {
let dataStreamFinished: Promise<void> | null = null
return createTransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)

if (!dataStreamFinished && dataStream) {
const dataStreamReader = dataStream.getReader()
dataStreamFinished = (async () => {
try {
while (true) {
const { done, value } = await dataStreamReader.read()
if (done) {
return
}
controller.enqueue(value)
}
} catch (err) {
controller.error(err)
}
})()
}
},
flush() {
if (dataStreamFinished) {
return dataStreamFinished
}
},
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,11 @@ export default function (context) {
'count: 1'
)
})

it('should flush the suffix at the very end', async () => {
await fetchViaHTTP(context.appPort, '/').then(async (response) => {
const result = await resolveStreamResponse(response)
expect(result).toMatch(/<\/body><\/html>/)
})
})
}

0 comments on commit 1aeb230

Please sign in to comment.