From 2b7a1b35a8e695c2d29921eb9fb201ccda32a0a5 Mon Sep 17 00:00:00 2001 From: Daeyeon Jeong Date: Sun, 4 Sep 2022 00:00:28 +0900 Subject: [PATCH 1/2] stream: add `ReadableByteStream.tee()` This supports teeing readable byte streams to meet the latest web streams standards. Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com --- lib/internal/webstreams/readablestream.js | 304 ++++++++++++++++++- lib/internal/webstreams/util.js | 11 + test/parallel/test-whatwg-readablestream.js | 2 +- test/parallel/test-whatwg-readablestream.mjs | 34 +++ test/wpt/status/streams.json | 41 --- 5 files changed, 339 insertions(+), 53 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 4577d791d65229..0d2c8e55eda2c9 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -95,6 +95,7 @@ const { ArrayBufferViewGetByteOffset, ArrayBufferGetByteLength, AsyncIterator, + cloneAsUint8Array, copyArrayBuffer, customInspect, dequeueValue, @@ -215,6 +216,7 @@ class ReadableStream { throw new ERR_INVALID_ARG_VALUE('source', 'Object', source); this[kState] = { disturbed: false, + reader: undefined, state: 'readable', storedError: undefined, stream: undefined, @@ -1111,7 +1113,6 @@ class ReadableByteStreamController { chunk); } const chunkByteLength = ArrayBufferViewGetByteLength(chunk); - const chunkByteOffset = ArrayBufferViewGetByteOffset(chunk); const chunkBuffer = ArrayBufferViewGetBuffer(chunk); const chunkBufferByteLength = ArrayBufferGetByteLength(chunkBuffer); if (chunkByteLength === 0 || chunkBufferByteLength === 0) { @@ -1122,11 +1123,7 @@ class ReadableByteStreamController { throw new ERR_INVALID_STATE.TypeError('Controller is already closed'); if (this[kState].stream[kState].state !== 'readable') throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed'); - readableByteStreamControllerEnqueue( - this, - chunkBuffer, - chunkByteLength, - chunkByteOffset); + readableByteStreamControllerEnqueue(this, chunk); } /** @@ -1430,6 +1427,13 @@ function readableStreamPipeTo( } function readableStreamTee(stream, cloneForBranch2) { + if (isReadableByteStreamController(stream[kState].controller)) { + return readableByteStreamTee(stream); + } + return readableStreamDefaultTee(stream, cloneForBranch2); +} + +function readableStreamDefaultTee(stream, cloneForBranch2) { const reader = new ReadableStreamDefaultReader(stream); let reading = false; let canceled1 = false; @@ -1524,6 +1528,284 @@ function readableStreamTee(stream, cloneForBranch2) { return [branch1, branch2]; } +function readableByteStreamTee(stream) { + assert(isReadableStream(stream)); + assert(isReadableByteStreamController(stream[kState].controller)); + + let reader = new ReadableStreamDefaultReader(stream); + let reading = false; + let readAgainForBranch1 = false; + let readAgainForBranch2 = false; + let canceled1 = false; + let canceled2 = false; + let reason1; + let reason2; + let branch1; + let branch2; + const cancelDeferred = createDeferredPromise(); + + function forwardReaderError(thisReader) { + PromisePrototypeThen( + thisReader[kState].close.promise, + undefined, + (error) => { + if (thisReader !== reader) { + return; + } + readableStreamDefaultControllerError(branch1[kState].controller, error); + readableStreamDefaultControllerError(branch2[kState].controller, error); + if (!canceled1 || !canceled2) { + cancelDeferred.resolve(); + } + } + ); + } + + function pullWithDefaultReader() { + if (isReadableStreamBYOBReader(reader)) { + readableStreamBYOBReaderRelease(reader); + reader = new ReadableStreamDefaultReader(stream); + forwardReaderError(reader); + } + + const readRequest = { + [kChunk](chunk) { + queueMicrotask(() => { + readAgainForBranch1 = false; + readAgainForBranch2 = false; + const chunk1 = chunk; + let chunk2 = chunk; + + if (!canceled1 && !canceled2) { + try { + chunk2 = cloneAsUint8Array(chunk); + } catch (error) { + readableByteStreamControllerError( + branch1[kState].controller, + error + ); + readableByteStreamControllerError( + branch2[kState].controller, + error + ); + cancelDeferred.resolve(readableStreamCancel(stream, error)); + return; + } + } + if (!canceled1) { + readableByteStreamControllerEnqueue( + branch1[kState].controller, + chunk1 + ); + } + if (!canceled2) { + readableByteStreamControllerEnqueue( + branch2[kState].controller, + chunk2 + ); + } + reading = false; + + if (readAgainForBranch1) { + pull1Algorithm(); + } else if (readAgainForBranch2) { + pull2Algorithm(); + } + }); + }, + [kClose]() { + reading = false; + + if (!canceled1) { + readableByteStreamControllerClose(branch1[kState].controller); + } + if (!canceled2) { + readableByteStreamControllerClose(branch2[kState].controller); + } + if (branch1[kState].controller[kState].pendingPullIntos.length > 0) { + readableByteStreamControllerRespond(branch1[kState].controller, 0); + } + if (branch2[kState].controller[kState].pendingPullIntos.length > 0) { + readableByteStreamControllerRespond(branch2[kState].controller, 0); + } + if (!canceled1 || !canceled2) { + cancelDeferred.resolve(); + } + }, + [kError]() { + reading = false; + }, + }; + + readableStreamDefaultReaderRead(reader, readRequest); + } + + function pullWithBYOBReader(view, forBranch2) { + if (isReadableStreamDefaultReader(reader)) { + readableStreamDefaultReaderRelease(reader); + reader = new ReadableStreamBYOBReader(stream); + forwardReaderError(reader); + } + + const byobBranch = forBranch2 === true ? branch2 : branch1; + const otherBranch = forBranch2 === false ? branch2 : branch1; + const readIntoRequest = { + [kChunk](chunk) { + queueMicrotask(() => { + readAgainForBranch1 = false; + readAgainForBranch2 = false; + const byobCanceled = forBranch2 === true ? canceled2 : canceled1; + const otherCanceled = forBranch2 === false ? canceled2 : canceled1; + + if (!otherCanceled) { + let clonedChunk; + + try { + clonedChunk = cloneAsUint8Array(chunk); + } catch (error) { + readableByteStreamControllerError( + byobBranch[kState].controller, + error + ); + readableByteStreamControllerError( + otherBranch[kState].controller, + error + ); + cancelDeferred.resolve(readableStreamCancel(stream, error)); + return; + } + if (!byobCanceled) { + readableByteStreamControllerRespondWithNewView( + byobBranch[kState].controller, + chunk + ); + } + + readableByteStreamControllerEnqueue( + otherBranch[kState].controller, + clonedChunk + ); + } else if (!byobCanceled) { + readableByteStreamControllerRespondWithNewView( + byobBranch[kState].controller, + chunk + ); + } + reading = false; + + if (readAgainForBranch1) { + pull1Algorithm(); + } else if (readAgainForBranch2) { + pull2Algorithm(); + } + }); + }, + [kClose](chunk) { + reading = false; + + const byobCanceled = forBranch2 === true ? canceled2 : canceled1; + const otherCanceled = forBranch2 === false ? canceled2 : canceled1; + + if (!byobCanceled) { + readableByteStreamControllerClose(byobBranch[kState].controller); + } + if (!otherCanceled) { + readableByteStreamControllerClose(otherBranch[kState].controller); + } + if (chunk !== undefined) { + if (!byobCanceled) { + readableByteStreamControllerRespondWithNewView( + byobBranch[kState].controller, + chunk + ); + } + if ( + !otherCanceled && + otherBranch[kState].controller[kState].pendingPullIntos.length > 0 + ) { + readableByteStreamControllerRespond( + otherBranch[kState].controller, + 0 + ); + } + } + if (!byobCanceled || !otherCanceled) { + cancelDeferred.resolve(); + } + }, + [kError]() { + reading = false; + }, + }; + readableStreamBYOBReaderRead(reader, view, readIntoRequest); + } + + function pull1Algorithm() { + if (reading) { + readAgainForBranch1 = true; + return PromiseResolve(); + } + reading = true; + + const byobRequest = branch1[kState].controller.byobRequest; + if (byobRequest === null) { + pullWithDefaultReader(); + } else { + pullWithBYOBReader(byobRequest[kState].view, false); + } + return PromiseResolve(); + } + + function pull2Algorithm() { + if (reading) { + readAgainForBranch2 = true; + return PromiseResolve(); + } + reading = true; + + const byobRequest = branch2[kState].controller.byobRequest; + if (byobRequest === null) { + pullWithDefaultReader(); + } else { + pullWithBYOBReader(byobRequest[kState].view, true); + } + return PromiseResolve(); + } + + function cancel1Algorithm(reason) { + canceled1 = true; + reason1 = reason; + if (canceled2) { + cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2])); + } + return cancelDeferred.promise; + } + + function cancel2Algorithm(reason) { + canceled2 = true; + reason2 = reason; + if (canceled1) { + cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2])); + } + return cancelDeferred.promise; + } + + branch1 = new ReadableStream({ + type: 'bytes', + pull: pull1Algorithm, + cancel: cancel1Algorithm, + }); + branch2 = new ReadableStream({ + type: 'bytes', + pull: pull2Algorithm, + cancel: cancel2Algorithm, + }); + + forwardReaderError(reader); + + return [branch1, branch2]; +} + function readableByteStreamControllerConvertPullIntoDescriptor(desc) { const { buffer, @@ -2317,11 +2599,7 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor( desc.bytesFilled += size; } -function readableByteStreamControllerEnqueue( - controller, - buffer, - byteLength, - byteOffset) { +function readableByteStreamControllerEnqueue(controller, chunk) { const { closeRequested, pendingPullIntos, @@ -2329,6 +2607,10 @@ function readableByteStreamControllerEnqueue( stream, } = controller[kState]; + const buffer = ArrayBufferViewGetBuffer(chunk); + const byteOffset = ArrayBufferViewGetByteOffset(chunk); + const byteLength = ArrayBufferViewGetByteLength(chunk); + if (closeRequested || stream[kState].state !== 'readable') return; diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index d8e63b5faaa280..0e260d074c73c2 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -2,6 +2,7 @@ const { ArrayBufferPrototype, + ArrayBufferPrototypeSlice, ArrayPrototypePush, ArrayPrototypeShift, AsyncIteratorPrototype, @@ -112,6 +113,15 @@ function ArrayBufferGetByteLength(view) { return ReflectGet(ArrayBufferPrototype, 'byteLength', view); } +function cloneAsUint8Array(view) { + const buffer = ArrayBufferViewGetBuffer(view); + const byteOffset = ArrayBufferViewGetByteOffset(view); + const byteLength = ArrayBufferViewGetByteLength(view); + return new Uint8Array( + ArrayBufferPrototypeSlice(buffer, byteOffset, byteOffset + byteLength) + ); +} + function isBrandCheck(brand) { return (value) => { return value != null && @@ -236,6 +246,7 @@ module.exports = { ArrayBufferViewGetByteOffset, ArrayBufferGetByteLength, AsyncIterator, + cloneAsUint8Array, copyArrayBuffer, customInspect, dequeueValue, diff --git a/test/parallel/test-whatwg-readablestream.js b/test/parallel/test-whatwg-readablestream.js index 7f5880939354f7..85096286d3e613 100644 --- a/test/parallel/test-whatwg-readablestream.js +++ b/test/parallel/test-whatwg-readablestream.js @@ -1561,7 +1561,7 @@ class Source { assert(!readableStreamDefaultControllerCanCloseOrEnqueue(controller)); readableStreamDefaultControllerEnqueue(controller); readableByteStreamControllerClose(controller); - readableByteStreamControllerEnqueue(controller); + readableByteStreamControllerEnqueue(controller, new Uint8Array(1)); } { diff --git a/test/parallel/test-whatwg-readablestream.mjs b/test/parallel/test-whatwg-readablestream.mjs index a3693f62439ce7..57ebed604542a3 100644 --- a/test/parallel/test-whatwg-readablestream.mjs +++ b/test/parallel/test-whatwg-readablestream.mjs @@ -34,3 +34,37 @@ import assert from 'assert'; assert.strictEqual(dataReader2, 'foobar'); })().then(mustCall()); } + +{ + // Test ReadableByteStream.tee() with close in the nextTick after enqueue + async function read(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + return Buffer.concat(chunks).toString(); + } + + const [r1, r2] = new ReadableStream({ + type: 'bytes', + start(controller) { + process.nextTick(() => { + controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114])); + + process.nextTick(() => { + controller.close(); + }); + }); + } + }).tee(); + + (async () => { + const [dataReader1, dataReader2] = await Promise.all([ + read(r1), + read(r2), + ]); + + assert.strictEqual(dataReader1, dataReader2); + assert.strictEqual(dataReader1, 'foobar'); + assert.strictEqual(dataReader2, 'foobar'); + })().then(mustCall()); +} diff --git a/test/wpt/status/streams.json b/test/wpt/status/streams.json index 6f9a806a33abf1..6aa0d17cce935a 100644 --- a/test/wpt/status/streams.json +++ b/test/wpt/status/streams.json @@ -1,11 +1,4 @@ { - "piping/abort.any.js": { - "fail": { - "expected": [ - "pipeTo on a teed readable byte stream should only be aborted when both branches are aborted" - ] - } - }, "queuing-strategies-size-function-per-global.window.js": { "skip": "Browser-specific test" }, @@ -38,40 +31,6 @@ ] } }, - "readable-byte-streams/tee.any.js": { - "fail": { - "expected": [ - "ReadableStream teeing with byte source: should be able to read one branch to the end without affecting the other", - "ReadableStream teeing with byte source: chunks should be cloned for each branch", - "ReadableStream teeing with byte source: chunks for BYOB requests from branch 1 should be cloned to branch 2", - "ReadableStream teeing with byte source: errors in the source should propagate to both branches", - "ReadableStream teeing with byte source: closing the original should close the branches", - "ReadableStream teeing with byte source: erroring the original should immediately error the branches", - "ReadableStream teeing with byte source: erroring the original should error pending reads from BYOB reader", - "ReadableStream teeing with byte source: canceling branch1 should finish when branch2 reads until end of stream", - "ReadableStream teeing with byte source: canceling branch1 should finish when original stream errors", - "ReadableStream teeing with byte source: should not pull any chunks if no branches are reading", - "ReadableStream teeing with byte source: should only pull enough to fill the emptiest queue", - "ReadableStream teeing with byte source: should not pull when original is already errored", - "ReadableStream teeing with byte source: stops pulling when original stream errors while branch 1 is reading", - "ReadableStream teeing with byte source: stops pulling when original stream errors while branch 2 is reading", - "ReadableStream teeing with byte source: stops pulling when original stream errors while both branches are reading", - "ReadableStream teeing with byte source: canceling both branches in sequence with delay", - "ReadableStream teeing with byte source: failing to cancel when canceling both branches in sequence with delay", - "ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch1, cancel branch2", - "ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch2, cancel branch1", - "ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch2, enqueue to branch1", - "ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch1, respond to branch2", - "ReadableStream teeing with byte source: pull with BYOB reader, then pull with default reader", - "ReadableStream teeing with byte source: pull with default reader, then pull with BYOB reader", - "ReadableStream teeing with byte source: read from branch2, then read from branch1", - "ReadableStream teeing with byte source: read from branch1 with default reader, then close while branch2 has pending BYOB read", - "ReadableStream teeing with byte source: read from branch2 with default reader, then close while branch1 has pending BYOB read", - "ReadableStream teeing with byte source: close when both branches have pending BYOB reads", - "ReadableStream teeing with byte source: respond() and close() while both branches are pulling" - ] - } - }, "readable-streams/cross-realm-crash.window.js": { "skip": "Browser-specific test" }, From af41bf0ed51a94575c70584d4e6f653f11fbcdb0 Mon Sep 17 00:00:00 2001 From: Daeyeon Jeong Date: Wed, 7 Sep 2022 21:00:07 +0900 Subject: [PATCH 2/2] fixup: update the doc Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com --- doc/api/webstreams.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/doc/api/webstreams.md b/doc/api/webstreams.md index f1e5b742799958..b415113a80975b 100644 --- a/doc/api/webstreams.md +++ b/doc/api/webstreams.md @@ -299,6 +299,10 @@ is active. * Returns: {ReadableStream\[]}