Skip to content

Commit

Permalink
Resolve BYOB reads immediately on cancel
Browse files Browse the repository at this point in the history
Previously, if you cancel the stream while a BYOB read is pending, the stream has to wait for the underlying byte source to call respond(0) before it can return the BYOB request's buffer to the caller. This makes underlying byte sources difficult to write in a robust way. After this change, the contract changes so that canceling a stream while a BYOB read is pending will always lead to the stream not giving you back your buffer. This means that cancel() can immediately resolve all pending BYOB reads with the classic { done: true, value: undefined }, without having to wait for the underlying byte source. This solves the problem, and would make it easier to implement an underlying byte source.

To make this work, an additional change was required: when the stream is canceled, any pending BYOB request is now immediately invalidated, so the underlying byte source doesn't erroneously think that it still needs to provide a response. (This aligns with the behavior of controller.enqueue(), which throws if the stream is already closed.)

See #1114 (comment) and #1123 (comment) for some discussion and background.
  • Loading branch information
MattiasBuelens authored Jun 2, 2021
1 parent 033c6d9 commit 8a7d92b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 13 deletions.
24 changes: 16 additions & 8 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -1322,17 +1322,19 @@ the following [=struct/items=]:
: <dfn export for="read-into request">chunk steps</dfn>
:: An algorithm taking a [=chunk=], called when a chunk is available for reading
: <dfn export for="read-into request">close steps</dfn>
:: An algorithm taking a [=chunk=], called when no chunks are available because the stream is
closed
:: An algorithm taking a [=chunk=] or undefined, called when no chunks are available because
the stream is closed
: <dfn export for="read-into request">error steps</dfn>
:: An algorithm taking a JavaScript value, called when no [=chunks=] are available because the
stream is errored

<p class="note">The [=read-into request/close steps=] take a [=chunk=] so that it can return the
backing memory to the caller if possible. For example,
{{ReadableStreamBYOBReader/read()|byobReader.read(chunk)}} will fulfill with <code highlight="js">{
value: newViewOnSameMemory, done: true }</code> for closed streams, instead of the more traditional
<code highlight="js">{ value: undefined, done: true }</code>.
value: newViewOnSameMemory, done: true }</code> for closed streams. If the stream is
[=cancel a readable stream|canceled=], the backing memory is discarded and
{{ReadableStreamBYOBReader/read()|byobReader.read(chunk)}} fulfills with the more traditional
<code highlight="js">{ value: undefined, done: true }</code> instead.

<h4 id="byob-reader-prototype">Constructor, methods, and properties</h4>

Expand Down Expand Up @@ -1369,6 +1371,10 @@ value: newViewOnSameMemory, done: true }</code> for closed streams, instead of t
the same type) onto the same backing memory region, with no modifications, to ensure the memory
is returned to the caller.

<li>If the reader is [=cancel a readable stream|canceled=], the promise will be fulfilled with
an object of the form <code highlight="js">{ value: undefined, done: true }</code>. In this case,
the backing memory region of |view| is discarded and not returned to the caller.

<li>If the stream becomes errored, the promise will be rejected with the relevant error.
</ul>

Expand Down Expand Up @@ -1849,10 +1855,7 @@ counterparts for default controllers, as discussed in [[#rs-abstract-ops-used-by
id="rbs-controller-private-cancel">\[[CancelSteps]](|reason|)</dfn> implements the
[$ReadableStreamController/[[CancelSteps]]$] contract. It performs the following steps:

1. If [=this=].[=ReadableByteStreamController/[[pendingPullIntos]]=] is not [=list/is
empty|empty=],
1. Let |firstDescriptor| be [=this=].[=ReadableByteStreamController/[[pendingPullIntos]]=][0].
1. Set |firstDescriptor|'s [=pull-into descriptor/bytes filled=] to 0.
1. Perform ! [$ReadableByteStreamControllerClearPendingPullIntos$]([=this=]).
1. Perform ! [$ResetQueue$]([=this=]).
1. Let |result| be the result of performing
[=this=].[=ReadableByteStreamController/[[cancelAlgorithm]]=], passing in |reason|.
Expand Down Expand Up @@ -2361,6 +2364,11 @@ the {{ReadableStream}}'s public API.
1. If |stream|.[=ReadableStream/[[state]]=] is "`errored`", return [=a promise rejected with=]
|stream|.[=ReadableStream/[[storedError]]=].
1. Perform ! [$ReadableStreamClose$](|stream|).
1. Let |reader| be |stream|.[=ReadableStream/[[reader]]=].
1. If |reader| is not undefined and |reader| [=implements=] {{ReadableStreamBYOBReader}},
1. [=list/For each=] |readIntoRequest| of |reader|.[=ReadableStreamBYOBReader/[[readIntoRequests]]=],
1. Perform |readIntoRequest|'s [=read-into request/close steps=], given undefined.
1. Set |reader|.[=ReadableStreamBYOBReader/[[readIntoRequests]]=] to an empty [=list=].
1. Let |sourceCancelPromise| be !
|stream|.[=ReadableStream/[[controller]]=].[$ReadableStreamController/[[CancelSteps]]$](|reason|).
1. Return the result of [=reacting=] to |sourceCancelPromise| with a fulfillment step that returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ exports.implementation = class ReadableByteStreamControllerImpl {
}

[CancelSteps](reason) {
if (this._pendingPullIntos.length > 0) {
const firstDescriptor = this._pendingPullIntos[0];
firstDescriptor.bytesFilled = 0;
}
aos.ReadableByteStreamControllerClearPendingPullIntos(this);

ResetQueue(this);

Expand Down
9 changes: 9 additions & 0 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Object.assign(exports, {
IsReadableStreamLocked,
ReadableByteStreamControllerCallPullIfNeeded,
ReadableByteStreamControllerClearAlgorithms,
ReadableByteStreamControllerClearPendingPullIntos,
ReadableByteStreamControllerClose,
ReadableByteStreamControllerEnqueue,
ReadableByteStreamControllerError,
Expand Down Expand Up @@ -453,6 +454,14 @@ function ReadableStreamCancel(stream, reason) {

ReadableStreamClose(stream);

const reader = stream._reader;
if (reader !== undefined && ReadableStreamBYOBReader.isImpl(reader)) {
for (const readIntoRequest of reader._readIntoRequests) {
readIntoRequest.closeSteps(undefined);
}
reader._readIntoRequests = [];
}

const sourceCancelPromise = stream._controller[CancelSteps](reason);
return transformPromiseWith(sourceCancelPromise, () => undefined);
}
Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/web-platform-tests

0 comments on commit 8a7d92b

Please sign in to comment.