From ce719290de766452eb83a88de377e6feca20aa9d Mon Sep 17 00:00:00 2001 From: Daeyeon Jeong Date: Fri, 23 Sep 2022 21:01:52 +0900 Subject: [PATCH] stream: handle a pending pull request from a released reader In order to meet the specification, this includes mainly the followings: - Adding the 'release steps' to ReadableStreamController - Responding to a pull request from a released reader in ReadableByteStreamController Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com PR-URL: https://github.com/nodejs/node/pull/44702 Refs: https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps Refs: https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state Reviewed-By: Matteo Collina Reviewed-By: Minwoo Jung --- doc/api/webstreams.md | 4 ++ lib/internal/webstreams/readablestream.js | 82 +++++++++++++++++++++++ test/wpt/status/streams.json | 8 --- 3 files changed, 86 insertions(+), 8 deletions(-) diff --git a/doc/api/webstreams.md b/doc/api/webstreams.md index f7294693abe64c..8fc7ceab3188d4 100644 --- a/doc/api/webstreams.md +++ b/doc/api/webstreams.md @@ -651,6 +651,10 @@ Signals an error that causes the {ReadableStream} to error and close. Every {ReadableStream} has a controller that is responsible for diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 0d2c8e55eda2c9..51e6ca149b1a88 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -139,6 +139,7 @@ const kClose = Symbol('kClose'); const kChunk = Symbol('kChunk'); const kError = Symbol('kError'); const kPull = Symbol('kPull'); +const kRelease = Symbol('kRelease'); /** * @typedef {import('../abort_controller').AbortSignal} AbortSignal @@ -1019,6 +1020,8 @@ class ReadableStreamDefaultController { readableStreamDefaultControllerPullSteps(this, readRequest); } + [kRelease]() {} + [kInspect](depth, options) { return customInspect(depth, options, this[kType], { }); } @@ -1143,6 +1146,17 @@ class ReadableByteStreamController { readableByteStreamControllerPullSteps(this, readRequest); } + [kRelease]() { + const { + pendingPullIntos, + } = this[kState]; + if (pendingPullIntos.length > 0) { + const firstPendingPullInto = pendingPullIntos[0]; + firstPendingPullInto.type = 'none'; + this[kState].pendingPullIntos = [firstPendingPullInto]; + } + } + [kInspect](depth, options) { return customInspect(depth, options, this[kType], { }); } @@ -2060,6 +2074,9 @@ function readableStreamReaderGenericRelease(reader) { }; } setPromiseHandled(reader[kState].close.promise); + + stream[kState].controller[kRelease](); + stream[kState].reader = undefined; reader[kState].stream = undefined; } @@ -2365,6 +2382,8 @@ function readableByteStreamControllerClose(controller) { function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) { assert(stream[kState].state !== 'errored'); + assert(desc.type !== 'none'); + let done = false; if (stream[kState].state === 'closed') { desc.bytesFilled = 0; @@ -2574,6 +2593,9 @@ function readableByteStreamControllerRespond(controller, bytesWritten) { function readableByteStreamControllerRespondInClosedState(controller, desc) { assert(!desc.bytesFilled); + if (desc.type === 'none') { + readableByteStreamControllerShiftPendingPullInto(controller); + } const { stream, } = controller[kState]; @@ -2663,6 +2685,31 @@ function readableByteStreamControllerEnqueue(controller, chunk) { readableByteStreamControllerCallPullIfNeeded(controller); } +function readableByteStreamControllerEnqueueClonedChunkToQueue( + controller, + buffer, + byteOffset, + byteLength +) { + let cloneResult; + try { + cloneResult = ArrayBufferPrototypeSlice( + buffer, + byteOffset, + byteOffset + byteLength + ); + } catch (error) { + readableByteStreamControllerError(controller, error); + throw error; + } + readableByteStreamControllerEnqueueChunkToQueue( + controller, + cloneResult, + 0, + byteLength + ); +} + function readableByteStreamControllerEnqueueChunkToQueue( controller, buffer, @@ -2678,6 +2725,29 @@ function readableByteStreamControllerEnqueueChunkToQueue( controller[kState].queueTotalSize += byteLength; } +function readableByteStreamControllerEnqueueDetachedPullIntoToQueue( + controller, + desc +) { + const { + buffer, + byteOffset, + bytesFilled, + type, + } = desc; + assert(type === 'none'); + + if (bytesFilled > 0) { + readableByteStreamControllerEnqueueClonedChunkToQueue( + controller, + buffer, + byteOffset, + bytesFilled + ); + } + readableByteStreamControllerShiftPendingPullInto(controller); +} + function readableByteStreamControllerFillPullIntoDescriptorFromQueue( controller, desc) { @@ -2773,6 +2843,7 @@ function readableByteStreamControllerRespondInReadableState( buffer, bytesFilled, byteLength, + type, } = desc; if (bytesFilled + bytesWritten > byteLength) @@ -2783,6 +2854,17 @@ function readableByteStreamControllerRespondInReadableState( bytesWritten, desc); + if (type === 'none') { + readableByteStreamControllerEnqueueDetachedPullIntoToQueue( + controller, + desc + ); + readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + controller + ); + return; + } + if (desc.bytesFilled < desc.elementSize) return; diff --git a/test/wpt/status/streams.json b/test/wpt/status/streams.json index 08a7d4514ebfb5..2e8e931e697247 100644 --- a/test/wpt/status/streams.json +++ b/test/wpt/status/streams.json @@ -16,17 +16,9 @@ "fail": { "expected": [ "ReadableStream with byte source: enqueue() discards auto-allocated BYOB request", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respond()", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 1 element Uint16Array, respond(1)", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 2 element Uint8Array, respond(3)", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respondWithNewView()", "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, close(), respond(0)", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, respond()", "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, respond()", "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()", - "ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read(view) on second reader with 1 element Uint16Array, respond(1)", "ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()" ] }