Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have internal streams return empty Uint8Array on end of byob stream #2045

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <kj/vector.h>
#include <workerd/api/util.h>
#include <workerd/util/string-buffer.h>
#include <workerd/io/features.h>

namespace workerd::api {

Expand Down Expand Up @@ -543,6 +544,15 @@ kj::Maybe<jsg::Promise<ReadResult>> ReadableStreamInternalController::read(

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
if (maybeByobOptions != kj::none && FeatureFlags::get(js).getInternalStreamByobReturn()) {
// When using the BYOB reader, we must return a sized-0 Uint8Array that is backed
// by the ArrayBuffer passed in the options.
auto u8 = v8::Uint8Array::New(v8::ArrayBuffer::New(js.v8Isolate, store), 0, 0);
return js.resolvedPromise(ReadResult {
.value = js.v8Ref(u8.As<v8::Value>()),
.done = true,
});
}
return js.resolvedPromise(ReadResult { .done = true });
}
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
Expand Down Expand Up @@ -582,8 +592,10 @@ kj::Maybe<jsg::Promise<ReadResult>> ReadableStreamInternalController::read(
// That's a larger refactor, though.
auto& ioContext = IoContext::current();
return ioContext.awaitIoLegacy(js, kj::mv(promise)).then(js,
ioContext.addFunctor([this,store = kj::mv(store), byteOffset, byteLength]
(jsg::Lock& js, size_t amount) mutable -> jsg::Promise<ReadResult> {
ioContext.addFunctor(
[this,store = kj::mv(store), byteOffset, byteLength,
isByob = maybeByobOptions != kj::none]
(jsg::Lock& js, size_t amount) mutable -> jsg::Promise<ReadResult> {
readPending = false;
KJ_ASSERT(amount <= byteLength);
if (amount == 0) {
Expand All @@ -593,6 +605,15 @@ kj::Maybe<jsg::Promise<ReadResult>> ReadableStreamInternalController::read(
KJ_IF_SOME(o, owner) {
o.signalEof(js);
}
if (isByob && FeatureFlags::get(js).getInternalStreamByobReturn()) {
// When using the BYOB reader, we must return a sized-0 Uint8Array that is backed
// by the ArrayBuffer passed in the options.
auto u8 = v8::Uint8Array::New(v8::ArrayBuffer::New(js.v8Isolate, store), 0, 0);
return js.resolvedPromise(ReadResult {
.value = js.v8Ref(u8.As<v8::Value>()),
.done = true,
});
}
return js.resolvedPromise(ReadResult { .done = true });
}
// Return a slice so the script can see how many bytes were read.
Expand Down
21 changes: 21 additions & 0 deletions src/workerd/api/tests/streams-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,27 @@ export const abortWriterAfterGc = {
}
};

export const finalReadOnInternalStreamReturnsBuffer = {
async test() {
const { readable, writable } = new IdentityTransformStream();
const writer = writable.getWriter();
await writer.close();

const reader = readable.getReader({ mode: 'byob' });
let result = await reader.read(new Uint8Array(10));
strictEqual(result.done, true);
ok(result.value instanceof Uint8Array);
strictEqual(result.value.byteLength, 0);
strictEqual(result.value.buffer.byteLength, 10);

result = await reader.read(new Uint8Array(10));
strictEqual(result.done, true);
ok(result.value instanceof Uint8Array);
strictEqual(result.value.byteLength, 0);
strictEqual(result.value.buffer.byteLength, 10);
}
};

export default {
async fetch(request, env) {
strictEqual(request.headers.get('content-length'), '10');
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/tests/streams-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const unitTests :Workerd.Config = (
(name = "worker", esModule = embed "streams-test.js")
],
compatibilityDate = "2023-01-15",
compatibilityFlags = ["nodejs_compat"],
compatibilityFlags = ["nodejs_compat", "internal_stream_byob_return_view"],
bindings = [
(name = "subrequest", service = "streams-test")
]
Expand Down
11 changes: 11 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -415,4 +415,15 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
# type of Durable Object stubs -- support RPC. If so, this type will have a wildcard method, so
# it will appear that all possible property names are present on any fetcher instance. This could
# break code that tries to infer types based on the presence or absence of methods.

internalStreamByobReturn @47 :Bool
$compatEnableFlag("internal_stream_byob_return_view")
$compatDisableFlag("internal_stream_byob_return_undefined")
$compatEnableDate("2024-05-13");
# Sadly, the original implementation of ReadableStream (now called "internal" streams), did not
# properly implement the result of ReadableStreamBYOBReader's read method. When done = true,
# per the spec, the result `value` must be an empty ArrayBufferView whose underlying ArrayBuffer
# is the same as the one passed to the read method. Our original implementation returned
# undefined instead. This flag changes the behavior to match the spec and to match the behavior
# implemented by the JS-backed ReadableStream implementation.
}
Loading