Skip to content

Commit

Permalink
Accumulate reads in streams PumpToReader (#1264)
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell authored Oct 3, 2023
1 parent 39e4450 commit 7c05696
Showing 1 changed file with 105 additions and 20 deletions.
125 changes: 105 additions & 20 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2911,22 +2911,69 @@ private:
kj::Canceler canceler;
bool end;

// The intent with the accumulator is to attempt to hopefully improve performance
// a bit when the source stream is made up of a bunch of small writes. Instead of
// passing a bunch of individual small writes down to the sink, we'll accumulate
// those and pass them in a batch.
struct Accumulator {
// We will accumulate until we either queue at least THRESHOLD bytes or until
// we have at least MAX_COUNT pieces.
static constexpr size_t THRESHOLD = 4096;
static constexpr size_t MAX_COUNT = 16;
kj::Vector<kj::Array<kj::byte>> pieces;
size_t total = 0;
bool pendingClose = false;

// Adds a piece to the collection, adjusting the total buffered size accordingly.
void add(kj::Array<kj::byte> piece) {
total += piece.size();
pieces.add(kj::mv(piece));
}

kj::Array<kj::Array<kj::byte>> release() {
auto ret = pieces.releaseAsArray();
total = 0;
return kj::mv(ret);
}
};

bool isErroredOrClosed() {
return state.template is<kj::Exception>() ||
state.template is<StreamStates::Closed>();
}

kj::Promise<void> write(WritableStreamSink& sink,
kj::Array<kj::Array<kj::byte>> pieces,
bool end = false) {
KJ_STACK_ARRAY(kj::ArrayPtr<const kj::byte>, views,
pieces.size(), Accumulator::MAX_COUNT, Accumulator::MAX_COUNT);
for (int n = 0; n < pieces.size(); n++) {
views[n] = pieces[n].asBytes();
}
co_await sink.write(views);
if (end) co_await sink.end();
}

jsg::Promise<void> pumpLoop(
jsg::Lock& js,
IoContext& ioContext,
Readable readable,
IoOwn<WeakRef<AllReaderBase>> pumpToReader) {
IoOwn<WeakRef<AllReaderBase>> pumpToReader,
kj::Own<Accumulator> accumulator = kj::heap<Accumulator>()) {
ioContext.requireCurrentOrThrowJs();
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(ready, Readable) {
KJ_UNREACHABLE;
}
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
// If the accumulator has bytes here, it means we hit a sync end
// while trying to wrap around. We'll want to write the pending
// bytes before closing out...
if (accumulator->total > 0) {
return ioContext.awaitIoLegacy(
write(*sink, accumulator->release(), end).attach(kj::mv(sink)));
}

return end ?
ioContext.awaitIoLegacy(sink->end().attach(kj::mv(sink))) :
js.resolvedPromise();
Expand All @@ -2952,10 +2999,10 @@ private:
}
};

using Result = kj::OneOf<Pumping, // Continue with next read.
kj::Array<kj::byte>, // Bytes to write were returned.
StreamStates::Closed, // Readable indicated done.
jsg::Value>; // There was an error.
using Result = kj::OneOf<Pumping, // Continue with next read.
kj::Array<kj::Array<kj::byte>>, // Bytes to write were returned.
StreamStates::Closed, // Readable indicated done.
jsg::Value>; // There was an error.

// The flow here is relatively straightforward but the ownership of
// readable/pumpToReader is fairly complicated.
Expand Down Expand Up @@ -2996,19 +3043,28 @@ private:
// case and handle appropriately (generally by canceling the readable
// and exiting the loop).
return read(js).then(js, ioContext.addFunctor(
[](auto& js, ReadResult result) mutable -> Result {
[&accumulator=*accumulator](auto& js, ReadResult result) mutable -> Result {

KJ_REQUIRE(!js.v8Isolate->IsExecutionTerminating(),
"Attempting to continue pump after isolate execution terminating.");

if (result.done) {
if (accumulator.total > 0) {
// There is data in the accumulator that needs to be written. We have
// to continue the write loop to flush that last bit of data before
// we close.
accumulator.pendingClose = true;
return accumulator.release();
}
// Indicate to the outer promise that the readable is done.
// There's nothing further to do.
return StreamStates::Closed ();
}

// If we're not done, the result value must be interpretable as
// bytes for the read to make any sense.
// TODO(later): If the stream produces a string then we ought to be able
// to interpret that as bytes here also.
auto handle = KJ_ASSERT_NONNULL(result.value).getHandle(js);
if (!handle->IsArrayBufferView() && !handle->IsArrayBuffer()) {
return js.v8Ref(js.v8TypeError("This ReadableStream did not return bytes."));
Expand All @@ -3022,15 +3078,30 @@ private:

if constexpr (kj::isSameType<T, ByteReadable>()) {
jsg::BackingStore backing = bufferSource.detach(js);
return backing.asArrayPtr().attach(kj::mv(backing));
accumulator.add(backing.asArrayPtr().attach(kj::mv(backing)));
} else if constexpr (kj::isSameType<T, ValueReadable>()) {
// We do not detach in this case because, as bad as an idea as it is,
// the stream spec does allow a single typedarray/arraybuffer instance
// to be queued multiple times when using value-oriented streams.
return bufferSource.asArrayPtr().attach(kj::mv(bufferSource));
accumulator.add(bufferSource.asArrayPtr().attach(kj::mv(bufferSource)));
} else {
KJ_FAIL_ASSERT("incorrect readable type. how did that happen?");
}

KJ_UNREACHABLE;
// We accumulate the bytes we've read in the accumulator. Once we've read
// at least Accumulator::THRESHOLD, or have Accumulator::MAX_COUNT buffers
// collected, we'll release the collected bytes to the destination, otherwise
// we'll skip to another read to pull more bytes.
if (accumulator.total >= Accumulator::THRESHOLD ||
accumulator.pieces.size() == Accumulator::MAX_COUNT) {
// We've read our minimum number of bytes... yay! Release those to the
// write phase of the loop.
return accumulator.release();
}

// Just keep reading...Just keep reading.. Just keep reading, reading, reading...
// What do we do? We read, read.
return Pumping {};
}), [](auto& js, jsg::Value exception) mutable -> Result {
return kj::mv(exception);
}).then(js, ioContext.addFunctor(
Expand All @@ -3039,7 +3110,9 @@ private:
// readable has already been gc visited, it will continue to be reachable
// once it is passed off to the read loop here.
JSG_VISITABLE_LAMBDA(
(readable=kj::mv(readable),pumpToReader=kj::mv(pumpToReader)),
(readable=kj::mv(readable),
pumpToReader=kj::mv(pumpToReader),
accumulator=kj::mv(accumulator)),
(readable),
(jsg::Lock& js, Result result) mutable {
KJ_IF_SOME(reader, tryGetAs<PumpToReader>(pumpToReader)) {
Expand All @@ -3048,13 +3121,12 @@ private:
reader.ioContext.requireCurrentOrThrowJs();
auto& ioContext = IoContext::current();
KJ_SWITCH_ONEOF(result) {
KJ_CASE_ONEOF(bytes, kj::Array<kj::byte>) {
KJ_CASE_ONEOF(pieces, kj::Array<kj::Array<kj::byte>>) {
// We received bytes to write. Do so...
// (It's safe to directly access reader.sink here --it's an kj::Own--
// because we accessed the reader through an IoOwn, proving that
// we're in the correct IoContext...)
auto promise = reader.sink->write(bytes.begin(), bytes.size())
.attach(kj::mv(bytes));
auto promise = reader.write(*reader.sink, kj::mv(pieces));
// Wrap the write promise in a canceler that will be triggered when the
// PumpToReader is dropped. While the write promise is pending, it is
// possible for the promise that is holding the PumpToReader to be
Expand All @@ -3070,20 +3142,29 @@ private:
return kj::mv(exception);
}).then(js, ioContext.addFunctor(
JSG_VISITABLE_LAMBDA(
(readable=kj::mv(readable),pumpToReader=kj::mv(pumpToReader)),
(readable=kj::mv(readable),
pumpToReader=kj::mv(pumpToReader),
accumulator=kj::mv(accumulator)),
(readable),
(jsg::Lock& js, kj::Maybe<jsg::Value> maybeException) mutable {
KJ_IF_SOME(reader, tryGetAs<PumpToReader>(pumpToReader)) {
auto& ioContext = reader.ioContext;
ioContext.requireCurrentOrThrowJs();
// Oh good, if we got here it means we're in the right IoContext and
// the PumpToReader is still alive.
KJ_IF_SOME(exception, maybeException) {
reader.doError(js, exception.getHandle(js));
if (accumulator->pendingClose) {
reader.doClose(js);
} else {
// Else block to avert dangling else compiler warning.
KJ_IF_SOME(exception, maybeException) {
reader.doError(js, exception.getHandle(js));
} else {
// Else block to avert dangling else compiler warning.
}
}
return reader.pumpLoop(js, ioContext, kj::mv(readable), kj::mv(pumpToReader));
return reader.pumpLoop(js, ioContext,
kj::mv(readable),
kj::mv(pumpToReader),
kj::mv(accumulator));
} else {
// If we got here, we're in the right IoContext but the PumpToReader
// has been destroyed. Let's cancel the readable as the last step.
Expand All @@ -3109,12 +3190,16 @@ private:
reader.doError(js, exception.getHandle(js));
}
}
return reader.pumpLoop(js, ioContext, kj::mv(readable), kj::mv(pumpToReader));

return reader.pumpLoop(js, ioContext,
kj::mv(readable),
kj::mv(pumpToReader),
kj::mv(accumulator));
} else {
// If we got here, we're in the right IoContext but the PumpToReader has been
// freed. There's nothing we can do except cleanup.
KJ_SWITCH_ONEOF(result) {
KJ_CASE_ONEOF(bytes, kj::Array<kj::byte>) {
KJ_CASE_ONEOF(bytes, kj::Array<kj::Array<kj::byte>>) {
return readable->cancel(js, nullptr);
}
KJ_CASE_ONEOF(pumping, Pumping) {
Expand Down

0 comments on commit 7c05696

Please sign in to comment.