Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accumulate reads in streams PumpToReader #1264

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 105 additions & 20 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2911,22 +2911,69 @@ private:
kj::Canceler canceler;
bool end;

// The intent with the accumulator is to attempt to hopefully improve performance
// a bit when the source stream is made up of a bunch of small writes. Instead of
// passing a bunch of individual small writes down to the sink, we'll accumulate
// those and pass them in a batch.
struct Accumulator {
// We will accumulate until we either queue at least THRESHOLD bytes or until
// we have at least MAX_COUNT pieces.
static constexpr size_t THRESHOLD = 4096;
static constexpr size_t MAX_COUNT = 16;
kj::Vector<kj::Array<kj::byte>> pieces;
size_t total = 0;
bool pendingClose = false;

// Adds a piece to the collection, adjusting the total buffered size accordingly.
void add(kj::Array<kj::byte> piece) {
total += piece.size();
pieces.add(kj::mv(piece));
}

kj::Array<kj::Array<kj::byte>> release() {
auto ret = pieces.releaseAsArray();
total = 0;
return kj::mv(ret);
}
};

bool isErroredOrClosed() {
return state.template is<kj::Exception>() ||
state.template is<StreamStates::Closed>();
}

kj::Promise<void> write(WritableStreamSink& sink,
kj::Array<kj::Array<kj::byte>> pieces,
bool end = false) {
KJ_STACK_ARRAY(kj::ArrayPtr<const kj::byte>, views,
pieces.size(), Accumulator::MAX_COUNT, Accumulator::MAX_COUNT);
for (int n = 0; n < pieces.size(); n++) {
views[n] = pieces[n].asBytes();
}
co_await sink.write(views);
if (end) co_await sink.end();
}

jsg::Promise<void> pumpLoop(
jsg::Lock& js,
IoContext& ioContext,
Readable readable,
IoOwn<WeakRef<AllReaderBase>> pumpToReader) {
IoOwn<WeakRef<AllReaderBase>> pumpToReader,
kj::Own<Accumulator> accumulator = kj::heap<Accumulator>()) {
ioContext.requireCurrentOrThrowJs();
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(ready, Readable) {
KJ_UNREACHABLE;
}
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
// If the accumulator has bytes here, it means we hit a sync end
// while trying to wrap around. We'll want to write the pending
// bytes before closing out...
if (accumulator->total > 0) {
return ioContext.awaitIoLegacy(
write(*sink, accumulator->release(), end).attach(kj::mv(sink)));
}

return end ?
ioContext.awaitIoLegacy(sink->end().attach(kj::mv(sink))) :
js.resolvedPromise();
Expand All @@ -2952,10 +2999,10 @@ private:
}
};

using Result = kj::OneOf<Pumping, // Continue with next read.
kj::Array<kj::byte>, // Bytes to write were returned.
StreamStates::Closed, // Readable indicated done.
jsg::Value>; // There was an error.
using Result = kj::OneOf<Pumping, // Continue with next read.
kj::Array<kj::Array<kj::byte>>, // Bytes to write were returned.
StreamStates::Closed, // Readable indicated done.
jsg::Value>; // There was an error.

// The flow here is relatively straightforward but the ownership of
// readable/pumpToReader is fairly complicated.
Expand Down Expand Up @@ -2996,19 +3043,28 @@ private:
// case and handle appropriately (generally by canceling the readable
// and exiting the loop).
return read(js).then(js, ioContext.addFunctor(
[](auto& js, ReadResult result) mutable -> Result {
[&accumulator=*accumulator](auto& js, ReadResult result) mutable -> Result {
ohodson marked this conversation as resolved.
Show resolved Hide resolved

KJ_REQUIRE(!js.v8Isolate->IsExecutionTerminating(),
"Attempting to continue pump after isolate execution terminating.");

if (result.done) {
if (accumulator.total > 0) {
// There is data in the accumulator that needs to be written. We have
// to continue the write loop to flush that last bit of data before
// we close.
accumulator.pendingClose = true;
return accumulator.release();
}
// Indicate to the outer promise that the readable is done.
// There's nothing further to do.
return StreamStates::Closed ();
}

// If we're not done, the result value must be interpretable as
// bytes for the read to make any sense.
// TODO(later): If the stream produces a string then we ought to be able
// to interpret that as bytes here also.
auto handle = KJ_ASSERT_NONNULL(result.value).getHandle(js);
if (!handle->IsArrayBufferView() && !handle->IsArrayBuffer()) {
return js.v8Ref(js.v8TypeError("This ReadableStream did not return bytes."));
Expand All @@ -3022,15 +3078,30 @@ private:

if constexpr (kj::isSameType<T, ByteReadable>()) {
jsg::BackingStore backing = bufferSource.detach(js);
return backing.asArrayPtr().attach(kj::mv(backing));
accumulator.add(backing.asArrayPtr().attach(kj::mv(backing)));
} else if constexpr (kj::isSameType<T, ValueReadable>()) {
// We do not detach in this case because, as bad as an idea as it is,
// the stream spec does allow a single typedarray/arraybuffer instance
// to be queued multiple times when using value-oriented streams.
return bufferSource.asArrayPtr().attach(kj::mv(bufferSource));
accumulator.add(bufferSource.asArrayPtr().attach(kj::mv(bufferSource)));
} else {
KJ_FAIL_ASSERT("incorrect readable type. how did that happen?");
}

KJ_UNREACHABLE;
// We accumulate the bytes we've read in the accumulator. Once we've read
// at least Accumulator::THRESHOLD, or have Accumulator::MAX_COUNT buffers
// collected, we'll release the collected bytes to the destination, otherwise
// we'll skip to another read to pull more bytes.
if (accumulator.total >= Accumulator::THRESHOLD ||
jasnell marked this conversation as resolved.
Show resolved Hide resolved
accumulator.pieces.size() == Accumulator::MAX_COUNT) {
// We've read our minimum number of bytes... yay! Release those to the
// write phase of the loop.
return accumulator.release();
}

// Just keep reading...Just keep reading.. Just keep reading, reading, reading...
// What do we do? We read, read.
return Pumping {};
}), [](auto& js, jsg::Value exception) mutable -> Result {
return kj::mv(exception);
}).then(js, ioContext.addFunctor(
Expand All @@ -3039,7 +3110,9 @@ private:
// readable has already been gc visited, it will continue to be reachable
// once it is passed off to the read loop here.
JSG_VISITABLE_LAMBDA(
(readable=kj::mv(readable),pumpToReader=kj::mv(pumpToReader)),
(readable=kj::mv(readable),
pumpToReader=kj::mv(pumpToReader),
accumulator=kj::mv(accumulator)),
(readable),
(jsg::Lock& js, Result result) mutable {
KJ_IF_SOME(reader, tryGetAs<PumpToReader>(pumpToReader)) {
Expand All @@ -3048,13 +3121,12 @@ private:
reader.ioContext.requireCurrentOrThrowJs();
auto& ioContext = IoContext::current();
KJ_SWITCH_ONEOF(result) {
KJ_CASE_ONEOF(bytes, kj::Array<kj::byte>) {
KJ_CASE_ONEOF(pieces, kj::Array<kj::Array<kj::byte>>) {
// We received bytes to write. Do so...
// (It's safe to directly access reader.sink here --it's an kj::Own--
// because we accessed the reader through an IoOwn, proving that
// we're in the correct IoContext...)
auto promise = reader.sink->write(bytes.begin(), bytes.size())
.attach(kj::mv(bytes));
auto promise = reader.write(*reader.sink, kj::mv(pieces));
// Wrap the write promise in a canceler that will be triggered when the
// PumpToReader is dropped. While the write promise is pending, it is
// possible for the promise that is holding the PumpToReader to be
Expand All @@ -3070,20 +3142,29 @@ private:
return kj::mv(exception);
}).then(js, ioContext.addFunctor(
JSG_VISITABLE_LAMBDA(
(readable=kj::mv(readable),pumpToReader=kj::mv(pumpToReader)),
(readable=kj::mv(readable),
pumpToReader=kj::mv(pumpToReader),
accumulator=kj::mv(accumulator)),
(readable),
(jsg::Lock& js, kj::Maybe<jsg::Value> maybeException) mutable {
KJ_IF_SOME(reader, tryGetAs<PumpToReader>(pumpToReader)) {
auto& ioContext = reader.ioContext;
ioContext.requireCurrentOrThrowJs();
// Oh good, if we got here it means we're in the right IoContext and
// the PumpToReader is still alive.
KJ_IF_SOME(exception, maybeException) {
reader.doError(js, exception.getHandle(js));
if (accumulator->pendingClose) {
reader.doClose(js);
} else {
// Else block to avert dangling else compiler warning.
KJ_IF_SOME(exception, maybeException) {
reader.doError(js, exception.getHandle(js));
} else {
// Else block to avert dangling else compiler warning.
jasnell marked this conversation as resolved.
Show resolved Hide resolved
}
}
return reader.pumpLoop(js, ioContext, kj::mv(readable), kj::mv(pumpToReader));
return reader.pumpLoop(js, ioContext,
kj::mv(readable),
kj::mv(pumpToReader),
kj::mv(accumulator));
} else {
// If we got here, we're in the right IoContext but the PumpToReader
// has been destroyed. Let's cancel the readable as the last step.
Expand All @@ -3109,12 +3190,16 @@ private:
reader.doError(js, exception.getHandle(js));
}
}
return reader.pumpLoop(js, ioContext, kj::mv(readable), kj::mv(pumpToReader));

return reader.pumpLoop(js, ioContext,
kj::mv(readable),
kj::mv(pumpToReader),
kj::mv(accumulator));
} else {
// If we got here, we're in the right IoContext but the PumpToReader has been
// freed. There's nothing we can do except cleanup.
KJ_SWITCH_ONEOF(result) {
KJ_CASE_ONEOF(bytes, kj::Array<kj::byte>) {
KJ_CASE_ONEOF(bytes, kj::Array<kj::Array<kj::byte>>) {
return readable->cancel(js, nullptr);
}
KJ_CASE_ONEOF(pumping, Pumping) {
Expand Down
Loading