Skip to content

Commit

Permalink
Clear WritableStreamSink queue on abort with compat flag
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Aug 9, 2024
1 parent 16c7353 commit 9591fe7
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/workerd/api/eventsource.c++
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public:
return kj::READY_NOW;
}

void abort(kj::Exception reason) override {
void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) override {
// There's really nothing to do here.
clear();
}
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/api/html-rewriter.c++
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public:
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override;
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override;
kj::Promise<void> end() override;
void abort(kj::Exception reason) override;
void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) override;

// Implementation for `Element::onEndTag` to avoid exposing private details of Rewriter.
void onEndTag(lol_html_element_t *element, ElementCallbackFunction&& callback);
Expand Down Expand Up @@ -472,11 +472,11 @@ kj::Promise<void> Rewriter::end() {
});
}

void Rewriter::abort(kj::Exception reason) {
void Rewriter::abort(kj::Exception reason, AbortOption option) {
// End the rewriter and forward the error to the wrapped output stream.
maybeException = kj::cp(reason);

inner->abort(kj::mv(reason));
inner->abort(kj::mv(reason), option);
}

kj::Promise<void> Rewriter::finishWrite() {
Expand Down
9 changes: 8 additions & 1 deletion src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,14 @@ class WritableStreamSink {
virtual kj::Maybe<kj::Promise<DeferredProxy<void>>> tryPumpFrom(
ReadableStreamSource& input, bool end);

virtual void abort(kj::Exception reason) = 0;
enum class AbortOption {
NONE,
// Instructs the sink to reject pending writes when aborting. This is used when
// the InternalWritableStreamAbortClearsQueue flag is set.
DRAIN,
};

virtual void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) = 0;
// TODO(conform): abort() should return a promise after which closed fulfillers should be
// rejected. This may necessitate an "erroring" state.

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/streams/compression.c++
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public:
return writeInternal(Z_FINISH);
}

void abort(kj::Exception reason) override {
void abort(kj::Exception reason, AbortOption ignored) override {
cancelInternal(kj::mv(reason));
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/streams/internal-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ KJ_TEST("WritableStreamInternalController queue size assertion") {
return kj::READY_NOW;
}
kj::Promise<void> end() override { return kj::READY_NOW; }
void abort(kj::Exception reason) override {}
void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) override {}
};

fixture.runInIoContext([&](const TestFixture::Environment& env) {
Expand Down
30 changes: 27 additions & 3 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,20 @@ jsg::Promise<void> WritableStreamInternalController::doAbort(

KJ_IF_SOME(writable, state.tryGet<Writable>()) {
auto exception = js.exceptionToKj(js.v8Ref(reason));

if (FeatureFlags::get(js).getInternalWritableStreamAbortClearsQueue()) {
// If this flag is set, we will clear the queue proactively and immediately
// error the stream rather than handling the abort lazily. In this case, the
// stream will be put into an errored state immediately after draining the
// queue. All pending writes and other operations in the queue will be rejected
// immediately and an immediately resolved or rejected promise will be returned.
writable->abort(kj::cp(exception), WritableStreamSink::AbortOption::DRAIN);
drain(js, reason);
return options.reject ?
rejectedMaybeHandledPromise<void>(js, reason, options.handled) :
js.resolvedPromise();
}

if (queue.empty()) {
writable->abort(kj::cp(exception));
doError(js, reason);
Expand Down Expand Up @@ -2324,7 +2338,7 @@ kj::Promise<void> IdentityTransformStreamImpl::end() {
return writeHelper(kj::ArrayPtr<const kj::byte>());
}

void IdentityTransformStreamImpl::abort(kj::Exception reason) {
void IdentityTransformStreamImpl::abort(kj::Exception reason, AbortOption option) {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(idle, Idle) {
// This is fine.
Expand All @@ -2333,14 +2347,24 @@ void IdentityTransformStreamImpl::abort(kj::Exception reason) {
request.fulfiller->reject(kj::cp(reason));
}
KJ_CASE_ONEOF(request, WriteRequest) {
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending write() to finish");
switch (option) {
case AbortOption::NONE: {
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending write() to finish");
}
case AbortOption::DRAIN: {
request.fulfiller->reject(kj::cp(reason));
break;
}
}
}
KJ_CASE_ONEOF(exception, kj::Exception) {
// Already errored.
return;
}
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending close() to finish");
if (option == AbortOption::NONE) {
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending close() to finish");
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class IdentityTransformStreamImpl: public kj::Refcounted,

kj::Promise<void> end() override;

void abort(kj::Exception reason) override;
void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) override;

private:
kj::Promise<size_t> readHelper(kj::ArrayPtr<kj::byte> bytes);
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/system-streams.c++
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public:

kj::Promise<void> end() override;

void abort(kj::Exception reason) override;
void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) override;

StreamEncoding disownEncodingResponsibility() override;

Expand Down Expand Up @@ -299,7 +299,7 @@ kj::Promise<void> EncodedAsyncOutputStream::end() {
return promise.attach(ioContext.registerPendingEvent());
}

void EncodedAsyncOutputStream::abort(kj::Exception reason) {
void EncodedAsyncOutputStream::abort(kj::Exception reason, AbortOption ignored) {
inner.init<Ended>();
}

Expand Down
23 changes: 23 additions & 0 deletions src/workerd/api/tests/abort-internal-streams-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { strictEqual } from 'assert';

export const abortInternalStreamsTest = {
async test() {
const { writable } = new IdentityTransformStream();

const writer = writable.getWriter();

const promise = writer.write(new Uint8Array(10));

await writer.abort();

// The write promise should abort proactively without waiting for a read,
// indicating that the queue was drained proactively when the abort was
// called.
try {
await promise;
throw new Error('The promise should have been rejected');
} catch (err) {
strictEqual(err, undefined);
}
}
};
15 changes: 15 additions & 0 deletions src/workerd/api/tests/abort-internal-streams-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "abort-internal-streams-test",
worker = (
modules = [
(name = "worker", esModule = embed "abort-internal-streams-test.js")
],
compatibilityDate = "2024-07-01",
compatibilityFlags = ["nodejs_compat_v2", "internal_writable_stream_abort_clears_queue"],
)
),
],
);
3 changes: 2 additions & 1 deletion src/workerd/io/compatibility-date-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ KJ_TEST("compatibility flag parsing") {
" fetchStandardUrl = true,"
" nodeJsCompatV2 = true,"
" globalFetchStrictlyPublic = false,"
" newModuleRegistry = false)", {},
" newModuleRegistry = false,"
" internalWritableStreamAbortClearsQueue = true)", {},
CompatibilityDateValidation::FUTURE_FOR_TEST, false, false);
expectCompileCompatibilityFlags("2024-09-01", {"nodejs_compat"},
"(formDataParserSupportsFiles = true,"
Expand Down
9 changes: 9 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -562,4 +562,13 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
# For local development purposes only, increase the message size limit to 128MB.
# This is not expected ever to be made available in production, as large messages are inefficient.

internalWritableStreamAbortClearsQueue @57 :Bool
$compatEnableFlag("internal_writable_stream_abort_clears_queue")
$compatDisableFlag("internal_writable_stream_abort_does_not_clear_queue")
$compatEnableDate("2024-09-02");
# When using the original WritableStream implementation ("internal" streams), the
# abort() operation would be handled lazily, meaning that the queue of pending writes
# would not be cleared until the next time the queue was processed. This behavior leads
# to a situtation where the stream can hang if the consumer stops consuming. When set,
# this flag changes the behavior to clear the queue immediately upon abort.
}

0 comments on commit 9591fe7

Please sign in to comment.