Skip to content

Commit

Permalink
Handle synchronous JS exceptions in PumpToReader
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed May 24, 2023
1 parent a42b96c commit 70d4499
Showing 1 changed file with 119 additions and 131 deletions.
250 changes: 119 additions & 131 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2977,145 +2977,133 @@ private:
// be freed. When the JS promise resolves, we make sure we detect that
// case and handle appropriately (generally by canceling the readable
// and exiting the loop).
return js.tryCatch([&] {
// The call to .then can throw a JsExceptionThrown in some cases.
// We need to handle those here by wrapping in the js.tryCatch.)
return read(js).then(js, ioContext.addFunctor(
[](auto& js, ReadResult result) mutable -> Result {

KJ_REQUIRE(!js.v8Isolate->IsExecutionTerminating(),
"Attempting to continue pump after isolate execution terminating.");

if (result.done) {
// Indicate to the outer promise that the readable is done.
// There's nothing further to do.
return StreamStates::Closed ();
}
return read(js).then(js, ioContext.addFunctor(
[](auto& js, ReadResult result) mutable -> Result {

// If we're not done, the result value must be interpretable as
// bytes for the read to make any sense.
auto handle = KJ_ASSERT_NONNULL(result.value).getHandle(js);
if (!handle->IsArrayBufferView() && !handle->IsArrayBuffer()) {
return js.v8Ref(js.v8TypeError("This ReadableStream did not return bytes."));
}
KJ_REQUIRE(!js.v8Isolate->IsExecutionTerminating(),
"Attempting to continue pump after isolate execution terminating.");

jsg::BufferSource bufferSource(js, handle);
if (bufferSource.size() == 0) {
// Weird, but allowed. We'll skip it.
return Pumping {};
}
if (result.done) {
// Indicate to the outer promise that the readable is done.
// There's nothing further to do.
return StreamStates::Closed ();
}

if constexpr (kj::isSameType<T, ByteReadable>()) {
jsg::BackingStore backing = bufferSource.detach(js);
return backing.asArrayPtr().attach(kj::mv(backing));
} else if constexpr (kj::isSameType<T, ValueReadable>()) {
// We do not detach in this case because, as bad as an idea as it is,
// the stream spec does allow a single typedarray/arraybuffer instance
// to be queued multiple times when using value-oriented streams.
return bufferSource.asArrayPtr().attach(kj::mv(bufferSource));
}
// If we're not done, the result value must be interpretable as
// bytes for the read to make any sense.
auto handle = KJ_ASSERT_NONNULL(result.value).getHandle(js);
if (!handle->IsArrayBufferView() && !handle->IsArrayBuffer()) {
return js.v8Ref(js.v8TypeError("This ReadableStream did not return bytes."));
}

jsg::BufferSource bufferSource(js, handle);
if (bufferSource.size() == 0) {
// Weird, but allowed. We'll skip it.
return Pumping {};
}

KJ_UNREACHABLE;
}), [](auto& js, jsg::Value exception) mutable -> Result {
return kj::mv(exception);
}).then(js, ioContext.addFunctor(
[readable=kj::mv(readable),pumpToReader=kj::mv(pumpToReader)]
(jsg::Lock& js, Result result) mutable {
KJ_IF_MAYBE(reader, tryGetAs<PumpToReader>(pumpToReader)) {
// Oh good, if we got here it means we're in the right IoContext and
// the PumpToReader is still alive. Let's process the result.
reader->ioContext.requireCurrentOrThrowJs();
auto& ioContext = IoContext::current();
KJ_SWITCH_ONEOF(result) {
KJ_CASE_ONEOF(bytes, kj::Array<kj::byte>) {
// We received bytes to write. Do so...
// (It's safe to directly access reader->sink here --it's an kj::Own--
// because we accessed the reader through an IoOwn, proving that
// we're in the correct IoContext...)
auto promise = reader->sink->write(bytes.begin(), bytes.size())
.attach(kj::mv(bytes));
// Wrap the write promise in a canceler that will be triggered when the
// PumpToReader is dropped. While the write promise is pending, it is
// possible for the promise that is holding the PumpToReader to be
// dropped causing the hold on the sink to be released. If that is
// released while the write is still pending we can end up with an
// error further up the destruct chain.
return ioContext.awaitIo(js, reader->canceler.wrap(kj::mv(promise)),
[](jsg::Lock& js) -> kj::Maybe<jsg::Value> {
// The write completed successfully.
return kj::Maybe<jsg::Value>(nullptr);
}, [](jsg::Lock& js, jsg::Value exception) mutable -> kj::Maybe<jsg::Value> {
// The write failed.
return kj::mv(exception);
}).then(js, ioContext.addFunctor(
[readable=kj::mv(readable),pumpToReader=kj::mv(pumpToReader)]
(jsg::Lock& js, kj::Maybe<jsg::Value> maybeException) mutable {
KJ_IF_MAYBE(reader, tryGetAs<PumpToReader>(pumpToReader)) {
auto& ioContext = reader->ioContext;
ioContext.requireCurrentOrThrowJs();
// Oh good, if we got here it means we're in the right IoContext and
// the PumpToReader is still alive.
KJ_IF_MAYBE(exception, maybeException) {
reader->doError(js, exception->getHandle(js));
}
return reader->pumpLoop(js, ioContext, kj::mv(readable), kj::mv(pumpToReader));
} else {
// If we got here, we're in the right IoContext but the PumpToReader
// has been destroyed. Let's cancel the readable as the last step.
return readable->cancel(js, maybeException.map([&](jsg::Value& ex) {
return ex.getHandle(js);
}));
if constexpr (kj::isSameType<T, ByteReadable>()) {
jsg::BackingStore backing = bufferSource.detach(js);
return backing.asArrayPtr().attach(kj::mv(backing));
} else if constexpr (kj::isSameType<T, ValueReadable>()) {
// We do not detach in this case because, as bad as an idea as it is,
// the stream spec does allow a single typedarray/arraybuffer instance
// to be queued multiple times when using value-oriented streams.
return bufferSource.asArrayPtr().attach(kj::mv(bufferSource));
}

KJ_UNREACHABLE;
}), [](auto& js, jsg::Value exception) mutable -> Result {
return kj::mv(exception);
}).then(js, ioContext.addFunctor(
[readable=kj::mv(readable),pumpToReader=kj::mv(pumpToReader)]
(jsg::Lock& js, Result result) mutable {
KJ_IF_MAYBE(reader, tryGetAs<PumpToReader>(pumpToReader)) {
// Oh good, if we got here it means we're in the right IoContext and
// the PumpToReader is still alive. Let's process the result.
reader->ioContext.requireCurrentOrThrowJs();
auto& ioContext = IoContext::current();
KJ_SWITCH_ONEOF(result) {
KJ_CASE_ONEOF(bytes, kj::Array<kj::byte>) {
// We received bytes to write. Do so...
// (It's safe to directly access reader->sink here --it's an kj::Own--
// because we accessed the reader through an IoOwn, proving that
// we're in the correct IoContext...)
auto promise = reader->sink->write(bytes.begin(), bytes.size())
.attach(kj::mv(bytes));
// Wrap the write promise in a canceler that will be triggered when the
// PumpToReader is dropped. While the write promise is pending, it is
// possible for the promise that is holding the PumpToReader to be
// dropped causing the hold on the sink to be released. If that is
// released while the write is still pending we can end up with an
// error further up the destruct chain.
return ioContext.awaitIo(js, reader->canceler.wrap(kj::mv(promise)),
[](jsg::Lock& js) -> kj::Maybe<jsg::Value> {
// The write completed successfully.
return kj::Maybe<jsg::Value>(nullptr);
}, [](jsg::Lock& js, jsg::Value exception) mutable -> kj::Maybe<jsg::Value> {
// The write failed.
return kj::mv(exception);
}).then(js, ioContext.addFunctor(
[readable=kj::mv(readable),pumpToReader=kj::mv(pumpToReader)]
(jsg::Lock& js, kj::Maybe<jsg::Value> maybeException) mutable {
KJ_IF_MAYBE(reader, tryGetAs<PumpToReader>(pumpToReader)) {
auto& ioContext = reader->ioContext;
ioContext.requireCurrentOrThrowJs();
// Oh good, if we got here it means we're in the right IoContext and
// the PumpToReader is still alive.
KJ_IF_MAYBE(exception, maybeException) {
reader->doError(js, exception->getHandle(js));
}
}));
}
KJ_CASE_ONEOF(pumping, Pumping) {
// If we got here, a zero-length buffer was provided by the read and we're
// just going to ignore it and keep going.
}
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
// If we got here, the read signaled that we're done. Close the reader and
// pump one more time to shut things down.
reader->doClose();
}
KJ_CASE_ONEOF(exception, jsg::Value) {
// If we got here, the read signaled an exception. Either the read failed or
// provided something other than bytes. Error the reader and pump one more
// time to shut things down.
reader->doError(js, exception.getHandle(js));
}
return reader->pumpLoop(js, ioContext, kj::mv(readable), kj::mv(pumpToReader));
} else {
// If we got here, we're in the right IoContext but the PumpToReader
// has been destroyed. Let's cancel the readable as the last step.
return readable->cancel(js, maybeException.map([&](jsg::Value& ex) {
return ex.getHandle(js);
}));
}
}));
}
KJ_CASE_ONEOF(pumping, Pumping) {
// If we got here, a zero-length buffer was provided by the read and we're
// just going to ignore it and keep going.
}
return reader->pumpLoop(js, ioContext, kj::mv(readable), kj::mv(pumpToReader));
} else {
// If we got here, we're in the right IoContext but the PumpToReader has been
// freed. There's nothing we can do except cleanup.
KJ_SWITCH_ONEOF(result) {
KJ_CASE_ONEOF(bytes, kj::Array<kj::byte>) {
return readable->cancel(js, nullptr);
}
KJ_CASE_ONEOF(pumping, Pumping) {
return readable->cancel(js, nullptr);
}
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
// We do not have to cancel the readable in this case because it has already
// signaled that it is done. There's nothing to cancel.
return js.resolvedPromise();
}
KJ_CASE_ONEOF(exception, jsg::Value) {
return readable->cancel(js, exception.getHandle(js));
}
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
// If we got here, the read signaled that we're done. Close the reader and
// pump one more time to shut things down.
reader->doClose();
}
KJ_CASE_ONEOF(exception, jsg::Value) {
// If we got here, the read signaled an exception. Either the read failed or
// provided something other than bytes. Error the reader and pump one more
// time to shut things down.
reader->doError(js, exception.getHandle(js));
}
}
KJ_UNREACHABLE;
}));
}, [&](jsg::Value exception) {
// Exceptions here should be rare, and fairly odd. We would get here,
// for instance, if the call to .Then inside read().then(...) failed.
// We need to understand more about this case so we're going to temporarily
// add logging to see if we can catch it in the act.
KJ_LOG(ERROR, "Unexpected exception in PumpToReader pump loop: ",
exception.getHandle(js));
return js.rejectedPromise<void>(kj::mv(exception));
});
return reader->pumpLoop(js, ioContext, kj::mv(readable), kj::mv(pumpToReader));
} else {
// If we got here, we're in the right IoContext but the PumpToReader has been
// freed. There's nothing we can do except cleanup.
KJ_SWITCH_ONEOF(result) {
KJ_CASE_ONEOF(bytes, kj::Array<kj::byte>) {
return readable->cancel(js, nullptr);
}
KJ_CASE_ONEOF(pumping, Pumping) {
return readable->cancel(js, nullptr);
}
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
// We do not have to cancel the readable in this case because it has already
// signaled that it is done. There's nothing to cancel.
return js.resolvedPromise();
}
KJ_CASE_ONEOF(exception, jsg::Value) {
return readable->cancel(js, exception.getHandle(js));
}
}
}
KJ_UNREACHABLE;
}));
}
}
KJ_UNREACHABLE;
Expand Down

0 comments on commit 70d4499

Please sign in to comment.