From 346fe5239637518ef1e8ec6996f24968a0d39e21 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 31 Jul 2024 07:11:05 -0700 Subject: [PATCH] Clear WritableStreamSink queue on abort with compat flag --- src/workerd/api/streams/internal.c++ | 105 ++++++++++++------ src/workerd/api/streams/internal.h | 18 ++- .../api/tests/abort-internal-streams-test.js | 23 ++++ .../tests/abort-internal-streams-test.wd-test | 15 +++ src/workerd/io/compatibility-date-test.c++ | 3 +- src/workerd/io/compatibility-date.capnp | 9 ++ 6 files changed, 130 insertions(+), 43 deletions(-) create mode 100644 src/workerd/api/tests/abort-internal-streams-test.js create mode 100644 src/workerd/api/tests/abort-internal-streams-test.wd-test diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index 29663d798f7..2a02129cce1 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -878,6 +878,11 @@ void ReadableStreamInternalController::releaseReader( } } +void WritableStreamInternalController::Writable::abort(kj::Exception&& ex) { + canceler.cancel(kj::cp(ex)); + sink->abort(kj::mv(ex)); +} + WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) { if (writeState.is()) { writeState.init(); @@ -912,7 +917,7 @@ jsg::Promise WritableStreamInternalController::write( KJ_CASE_ONEOF(errored, StreamStates::Errored) { return js.rejectedPromise(errored.addRef(js)); } - KJ_CASE_ONEOF(writable, Writable) { + KJ_CASE_ONEOF(writable, IoOwn) { if (value == kj::none) { return js.resolvedPromise(); } @@ -1052,7 +1057,7 @@ jsg::Promise WritableStreamInternalController::closeImpl(jsg::Lock& js, bo auto reason = errored.getHandle(js); return rejectedMaybeHandledPromise(js, reason, markAsHandled); } - KJ_CASE_ONEOF(writable, Writable) { + KJ_CASE_ONEOF(writable, IoOwn) { auto prp = js.newPromiseAndResolver(); if (markAsHandled) { prp.promise.markAsHandled(js); @@ -1115,7 +1120,7 @@ jsg::Promise WritableStreamInternalController::flush( auto reason = errored.getHandle(js); return rejectedMaybeHandledPromise(js, reason, markAsHandled); } - KJ_CASE_ONEOF(writable, Writable) { + KJ_CASE_ONEOF(writable, IoOwn) { auto prp = js.newPromiseAndResolver(); if (markAsHandled) { prp.promise.markAsHandled(js); @@ -1160,8 +1165,22 @@ jsg::Promise WritableStreamInternalController::doAbort( return kj::mv(promise); } - KJ_IF_SOME(writable, state.tryGet()) { + KJ_IF_SOME(writable, state.tryGet>()) { auto exception = js.exceptionToKj(js.v8Ref(reason)); + + if (FeatureFlags::get(js).getInternalWritableStreamAbortClearsQueue()) { + // If this flag is set, we will clear the queue proactively and immediately + // error the stream rather than handling the abort lazily. In this case, the + // stream will be put into an errored state immediately after draining the + // queue. All pending writes and other operations in the queue will be rejected + // immediately and an immediately resolved or rejected promise will be returned. + writable->abort(kj::cp(exception)); + drain(js, reason); + return options.reject ? + rejectedMaybeHandledPromise(js, reason, options.handled) : + js.resolvedPromise(); + } + if (queue.empty()) { writable->abort(kj::cp(exception)); doError(js, reason); @@ -1227,7 +1246,7 @@ kj::Maybe> WritableStreamInternalController::tryPipeFrom( KJ_IF_SOME(errored, sourceLock.tryGetErrored(js)) { sourceLock.release(js); if (!preventAbort) { - if (state.tryGet() != kj::none) { + if (state.tryGet>() != kj::none) { return doAbort(js, errored, { .reject = true, .handled = pipeThrough }); } } @@ -1327,8 +1346,8 @@ kj::Maybe> WritableStreamInternalController::removeS KJ_CASE_ONEOF(errored, StreamStates::Errored) { kj::throwFatalException(js.exceptionToKj(errored.addRef(js))); } - KJ_CASE_ONEOF(writable, Writable) { - auto result = kj::mv(writable); + KJ_CASE_ONEOF(writable, IoOwn) { + auto result = kj::mv(writable->sink); state.init(); return kj::Maybe>(kj::mv(result)); } @@ -1352,7 +1371,7 @@ void WritableStreamInternalController::detach(jsg::Lock& js) { KJ_CASE_ONEOF(errored, StreamStates::Errored) { kj::throwFatalException(js.exceptionToKj(errored.addRef(js))); } - KJ_CASE_ONEOF(writable, Writable) { + KJ_CASE_ONEOF(writable, IoOwn) { state.init(); return; } @@ -1365,7 +1384,7 @@ kj::Maybe WritableStreamInternalController::getDesiredSize() { KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { return 0; } KJ_CASE_ONEOF(errored, StreamStates::Errored) { return kj::none; } - KJ_CASE_ONEOF(writable, Writable) { + KJ_CASE_ONEOF(writable, IoOwn) { KJ_IF_SOME(highWaterMark, maybeHighWaterMark) { return highWaterMark - currentWriteBufferSize; } @@ -1398,7 +1417,7 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer) maybeRejectPromise(js, lock.getClosedFulfiller(), errored.getHandle(js)); maybeRejectPromise(js, lock.getReadyFulfiller(), errored.getHandle(js)); } - KJ_CASE_ONEOF(writable, Writable) { + KJ_CASE_ONEOF(writable, IoOwn) { maybeResolvePromise(js, lock.getReadyFulfiller()); } } @@ -1439,7 +1458,7 @@ bool WritableStreamInternalController::isClosedOrClosing() { } bool WritableStreamInternalController::isPiping() { - return state.is() && !queue.empty() && queue.back().event.is(); + return state.is>() && !queue.empty() && queue.back().event.is(); } bool WritableStreamInternalController::isErrored() { @@ -1565,9 +1584,10 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo }; const auto maybeAbort = [this](jsg::Lock& js, auto& request) -> bool { - auto& writable = state.get(); + auto& writable = KJ_ASSERT_NONNULL(state.tryGet>()); KJ_IF_SOME(pendingAbort, WritableStreamController::PendingAbort::dequeue(maybePendingAbort)) { - writable->abort(js.exceptionToKj(pendingAbort.reason.addRef(js))); + auto ex = js.exceptionToKj(pendingAbort.reason.addRef(js)); + writable->abort(kj::mv(ex)); drain(js, pendingAbort.reason.getHandle(js)); pendingAbort.complete(js); return true; @@ -1591,12 +1611,12 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo } // writeLoop() is only called with the sink in the Writable state. - auto& writable = state.get(); + auto& writable = state.get>(); auto check = makeChecker(request); auto amountToWrite = request.bytes.size(); - auto promise = writable->write(request.bytes).attach(kj::mv(request.ownBytes)); + auto promise = writable->sink->write(request.bytes).attach(kj::mv(request.ownBytes)); // TODO(soon): We use awaitIoLegacy() here because if the stream terminates in JavaScript in // this same isolate, then the promise may actually be waiting on JavaScript to do something, @@ -1606,7 +1626,7 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo // jsg::Promises and not kj::Promises, so that it doesn't look like I/O at all, and there's // no need to drop the isolate lock and take it again every time some data is read/written. // That's a larger refactor, though. - return ioContext.awaitIoLegacy(js, kj::mv(promise)).then(js, + return ioContext.awaitIoLegacy(js, writable->canceler.wrap(kj::mv(promise))).then(js, ioContext.addFunctor( [this, check, maybeAbort, amountToWrite](jsg::Lock& js) -> jsg::Promise { auto& request = check(); @@ -1620,12 +1640,13 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo -> jsg::Promise { auto handle = reason.getHandle(js); auto& request = check(); - auto& writable = state.get(); + auto& writable = state.get>(); decreaseCurrentWriteBufferSize(js, amountToWrite); maybeRejectPromise(js, request.promise, handle); queue.pop_front(); if (!maybeAbort(js, request)) { - writable->abort(js.exceptionToKj(reason.addRef(js))); + auto ex = js.exceptionToKj(reason.addRef(js)); + writable->abort(kj::mv(ex)); drain(js, handle); } return js.resolvedPromise(); @@ -1636,7 +1657,7 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo // The destination should still be Writable, because the only way to transition to an // errored state would have been if a write request in the queue ahead of us encountered an // error. But in that case, the queue would already have been drained and we wouldn't be here. - auto& writable = state.get(); + auto& writable = state.get>(); if (request.checkSignal(js)) { // If the signal is triggered, checkSignal will handle erroring the source and destination. @@ -1662,7 +1683,8 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo // If the source is errored, the spec requires us to error the destination unless the // preventAbort option is true. if (!request.preventAbort) { - writable->abort(js.exceptionToKj(js.v8Ref(errored))); + auto ex = js.exceptionToKj(js.v8Ref(errored)); + writable->abort(kj::mv(ex)); drain(js, errored); } else { writeState.init(); @@ -1739,9 +1761,10 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo })); }; - KJ_IF_SOME(promise, request.source.tryPumpTo(*writable, !request.preventClose)) { + KJ_IF_SOME(promise, request.source.tryPumpTo(*writable->sink, !request.preventClose)) { return handlePromise(js, ioContext.awaitIo(js, - AbortSignal::maybeCancelWrap(request.maybeSignal, kj::mv(promise)))); + writable->canceler.wrap( + AbortSignal::maybeCancelWrap(request.maybeSignal, kj::mv(promise))))); } // The ReadableStream is JavaScript-backed. We can still pipe the data but it's going to be @@ -1753,10 +1776,11 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo } KJ_CASE_ONEOF(request, Close) { // writeLoop() is only called with the sink in the Writable state. - auto& writable = state.get(); + auto& writable = state.get>(); auto check = makeChecker(request); - return ioContext.awaitIo(js, writable->end()).then(js, + return ioContext.awaitIo(js, writable->canceler.wrap( + writable->sink->end())).then(js, ioContext.addFunctor([this, check](jsg::Lock& js) { auto& request = check(); maybeResolvePromise(js, request.promise); @@ -1802,8 +1826,9 @@ bool WritableStreamInternalController::Pipe::checkSignal(jsg::Lock& js) { auto promise = kj::mv(this->promise); if (!preventAbort) { - if (parent.state.tryGet() != kj::none) { - parent.state.get()->abort(js.exceptionToKj(reason)); + KJ_IF_SOME(writable, parent.state.tryGet>()) { + auto ex = js.exceptionToKj(reason); + writable->abort(kj::mv(ex)); parent.drain(js, reason); } else { parent.writeState.init(); @@ -1824,7 +1849,7 @@ bool WritableStreamInternalController::Pipe::checkSignal(jsg::Lock& js) { } jsg::Promise WritableStreamInternalController::Pipe::write(v8::Local handle) { - auto& writable = parent.state.get(); + auto& writable = parent.state.get>(); // TODO(soon): Once jsg::BufferSource lands and we're able to use it, this can be simplified. KJ_ASSERT(handle->IsArrayBuffer() || handle->IsArrayBufferView()); std::shared_ptr store; @@ -1845,7 +1870,7 @@ jsg::Promise WritableStreamInternalController::Pipe::write(v8::Localwrite(kj::arrayPtr(data, byteLength)) + writable->canceler.wrap(writable->sink->write(kj::arrayPtr(data, byteLength))) .attach(js.v8Ref(v8::ArrayBuffer::New(js.v8Isolate, store))), [](jsg::Lock&){}); } @@ -1875,8 +1900,9 @@ jsg::Promise WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j KJ_IF_SOME(errored, source.tryGetErrored(js)) { source.release(js); if (!preventAbort) { - if (parent.state.tryGet() != kj::none) { - parent.state.get()->abort(js.exceptionToKj(js.v8Ref(errored))); + KJ_IF_SOME(writable, parent.state.tryGet>()) { + auto ex = js.exceptionToKj(js.v8Ref(errored)); + writable->abort(kj::mv(ex)); return js.rejectedPromise(errored); } } @@ -1905,7 +1931,8 @@ jsg::Promise WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j if (!parent.isClosedOrClosing()) { // We'll only be here if the sink is in the Writable state. auto& ioContext = IoContext::current(); - return ioContext.awaitIo(js, parent.state.get()->end(), [](jsg::Lock&){}).then(js, + return ioContext.awaitIo(js, + parent.state.get>()->sink->end(), [](jsg::Lock&){}).then(js, ioContext.addFunctor([this](jsg::Lock& js) { parent.finishClose(js); }), ioContext.addFunctor([this](jsg::Lock& js, jsg::Value reason) { parent.finishError(js, reason.getHandle(js)); @@ -1955,7 +1982,9 @@ jsg::Promise WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j // Undefined and null are perfectly valid values to pass through a ReadableStream, // but we can't interpret them as bytes so if we get them here, we error the pipe. auto error = js.v8TypeError("This WritableStream only supports writing byte types."_kj); - parent.state.get()->abort(js.exceptionToKj(js.v8Ref(error))); + auto& writable = parent.state.get>(); + auto ex = js.exceptionToKj(js.v8Ref(error)); + writable->abort(kj::mv(ex)); // The error condition will be handled at the start of the next iteration. return pipeLoop(js); }), ioContext.addFunctor([this](jsg::Lock& js, jsg::Value reason) -> jsg::Promise { @@ -2333,14 +2362,18 @@ void IdentityTransformStreamImpl::abort(kj::Exception reason) { request.fulfiller->reject(kj::cp(reason)); } KJ_CASE_ONEOF(request, WriteRequest) { - KJ_FAIL_ASSERT("abort() is supposed to wait for any pending write() to finish"); + // IF the fulfiller is not waiting, the write promise was already + // canceled and no one is waiting on it. + KJ_ASSERT(!request.fulfiller->isWaiting(), + "abort() is supposed to wait for any pending write() to finish"); } KJ_CASE_ONEOF(exception, kj::Exception) { // Already errored. return; } KJ_CASE_ONEOF(closed, StreamStates::Closed) { - KJ_FAIL_ASSERT("abort() is supposed to wait for any pending close() to finish"); + // If we're in the pending close state... it should be ok to just switch + // the state to errored below. } } @@ -2470,7 +2503,7 @@ kj::Own newWritableStreamInternalController( kj::Maybe maybeHighWaterMark, kj::Maybe> maybeClosureWaitable) { return kj::heap( - ioContext.addObject(kj::mv(sink)), + kj::mv(sink), maybeHighWaterMark, kj::mv(maybeClosureWaitable)); } @@ -2488,7 +2521,7 @@ void WritableStreamInternalController::jsgGetMemoryInfo(jsg::MemoryTracker& trac KJ_CASE_ONEOF(errored, StreamStates::Errored) { tracker.trackField("error", errored); } - KJ_CASE_ONEOF(_, Writable) { + KJ_CASE_ONEOF(_, IoOwn) { // Ideally we'd be able to track the size of any pending writes held in the sink's // queue but since it is behind an IoOwn and we won't be holding the IoContext here, // we can't. diff --git a/src/workerd/api/streams/internal.h b/src/workerd/api/streams/internal.h index 1e6eeda475b..8cd5eb85940 100644 --- a/src/workerd/api/streams/internal.h +++ b/src/workerd/api/streams/internal.h @@ -168,17 +168,23 @@ class ReadableStreamInternalController: public ReadableStreamController { class WritableStreamInternalController: public WritableStreamController { public: - using Writable = IoOwn; + struct Writable { + kj::Own sink; + kj::Canceler canceler; + Writable(kj::Own sink) : sink(kj::mv(sink)) {} + void abort(kj::Exception&& ex); + }; explicit WritableStreamInternalController(StreamStates::Closed closed) : state(closed) {} explicit WritableStreamInternalController(StreamStates::Errored errored) : state(kj::mv(errored)) {} - explicit WritableStreamInternalController(Writable writable, + explicit WritableStreamInternalController(kj::Own writable, kj::Maybe maybeHighWaterMark = kj::none, - kj::Maybe> maybeClosureWaitable = kj::none) : state(kj::mv(writable)), - maybeHighWaterMark(maybeHighWaterMark), - maybeClosureWaitable(kj::mv(maybeClosureWaitable)) { + kj::Maybe> maybeClosureWaitable = kj::none) + : state(IoContext::current().addObject(kj::heap(kj::mv(writable)))), + maybeHighWaterMark(maybeHighWaterMark), + maybeClosureWaitable(kj::mv(maybeClosureWaitable)) { } WritableStreamInternalController(WritableStreamInternalController&& other) = default; @@ -269,7 +275,7 @@ class WritableStreamInternalController: public WritableStreamController { }; kj::Maybe owner; - kj::OneOf state; + kj::OneOf> state; kj::OneOf writeState = Unlocked(); kj::Maybe maybePendingAbort; diff --git a/src/workerd/api/tests/abort-internal-streams-test.js b/src/workerd/api/tests/abort-internal-streams-test.js new file mode 100644 index 00000000000..84fa756f230 --- /dev/null +++ b/src/workerd/api/tests/abort-internal-streams-test.js @@ -0,0 +1,23 @@ +import { strictEqual } from 'assert'; + +export const abortInternalStreamsTest = { + async test() { + const { writable } = new IdentityTransformStream(); + + const writer = writable.getWriter(); + + const promise = writer.write(new Uint8Array(10)); + + await writer.abort(); + + // The write promise should abort proactively without waiting for a read, + // indicating that the queue was drained proactively when the abort was + // called. + try { + await promise; + throw new Error('The promise should have been rejected'); + } catch (err) { + strictEqual(err, undefined); + } + } +}; diff --git a/src/workerd/api/tests/abort-internal-streams-test.wd-test b/src/workerd/api/tests/abort-internal-streams-test.wd-test new file mode 100644 index 00000000000..a7182e7074b --- /dev/null +++ b/src/workerd/api/tests/abort-internal-streams-test.wd-test @@ -0,0 +1,15 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "abort-internal-streams-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "abort-internal-streams-test.js") + ], + compatibilityDate = "2024-07-01", + compatibilityFlags = ["nodejs_compat_v2", "internal_writable_stream_abort_clears_queue"], + ) + ), + ], +); diff --git a/src/workerd/io/compatibility-date-test.c++ b/src/workerd/io/compatibility-date-test.c++ index 1ee7678b396..a9a90c6cc08 100644 --- a/src/workerd/io/compatibility-date-test.c++ +++ b/src/workerd/io/compatibility-date-test.c++ @@ -235,7 +235,8 @@ KJ_TEST("compatibility flag parsing") { " fetchStandardUrl = true," " nodeJsCompatV2 = true," " globalFetchStrictlyPublic = false," - " newModuleRegistry = false)", {}, + " newModuleRegistry = false," + " internalWritableStreamAbortClearsQueue = true)", {}, CompatibilityDateValidation::FUTURE_FOR_TEST, false, false); expectCompileCompatibilityFlags("2024-09-01", {"nodejs_compat"}, "(formDataParserSupportsFiles = true," diff --git a/src/workerd/io/compatibility-date.capnp b/src/workerd/io/compatibility-date.capnp index 606dab2ac46..caf7039347c 100644 --- a/src/workerd/io/compatibility-date.capnp +++ b/src/workerd/io/compatibility-date.capnp @@ -562,4 +562,13 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef { # For local development purposes only, increase the message size limit to 128MB. # This is not expected ever to be made available in production, as large messages are inefficient. + internalWritableStreamAbortClearsQueue @57 :Bool + $compatEnableFlag("internal_writable_stream_abort_clears_queue") + $compatDisableFlag("internal_writable_stream_abort_does_not_clear_queue") + $compatEnableDate("2024-09-02"); + # When using the original WritableStream implementation ("internal" streams), the + # abort() operation would be handled lazily, meaning that the queue of pending writes + # would not be cleared until the next time the queue was processed. This behavior leads + # to a situtation where the stream can hang if the consumer stops consuming. When set, + # this flag changes the behavior to clear the queue immediately upon abort. }