Skip to content

Commit

Permalink
fixup! fixup! Clear WritableStreamSink queue on abort with compat flag
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Aug 9, 2024
1 parent 58fc961 commit 8165500
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/workerd/api/eventsource.c++
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public:
return kj::READY_NOW;
}

void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) override {
void abort(kj::Exception reason) override {
// There's really nothing to do here.
clear();
}
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/api/html-rewriter.c++
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public:
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override;
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override;
kj::Promise<void> end() override;
void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) override;
void abort(kj::Exception reason) override;

// Implementation for `Element::onEndTag` to avoid exposing private details of Rewriter.
void onEndTag(lol_html_element_t *element, ElementCallbackFunction&& callback);
Expand Down Expand Up @@ -472,11 +472,11 @@ kj::Promise<void> Rewriter::end() {
});
}

void Rewriter::abort(kj::Exception reason, AbortOption option) {
void Rewriter::abort(kj::Exception reason) {
// End the rewriter and forward the error to the wrapped output stream.
maybeException = kj::cp(reason);

inner->abort(kj::mv(reason), option);
inner->abort(kj::mv(reason));
}

kj::Promise<void> Rewriter::finishWrite() {
Expand Down
9 changes: 1 addition & 8 deletions src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,7 @@ class WritableStreamSink {
virtual kj::Maybe<kj::Promise<DeferredProxy<void>>> tryPumpFrom(
ReadableStreamSource& input, bool end);

enum class AbortOption {
NONE,
// Instructs the sink to reject pending writes when aborting. This is used when
// the InternalWritableStreamAbortClearsQueue flag is set.
ABORT_QUEUED_WRITES,
};

virtual void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) = 0;
virtual void abort(kj::Exception reason) = 0;
// TODO(conform): abort() should return a promise after which closed fulfillers should be
// rejected. This may necessitate an "erroring" state.

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/streams/compression.c++
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public:
return writeInternal(Z_FINISH);
}

void abort(kj::Exception reason, AbortOption ignored) override {
void abort(kj::Exception reason) override {
cancelInternal(kj::mv(reason));
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/streams/internal-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ KJ_TEST("WritableStreamInternalController queue size assertion") {
return kj::READY_NOW;
}
kj::Promise<void> end() override { return kj::READY_NOW; }
void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) override {}
void abort(kj::Exception reason) override {}
};

fixture.runInIoContext([&](const TestFixture::Environment& env) {
Expand Down
29 changes: 11 additions & 18 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -878,10 +878,9 @@ void ReadableStreamInternalController::releaseReader(
}
}

void WritableStreamInternalController::Writable::abort(
kj::Exception&& ex, WritableStreamSink::AbortOption option) {
void WritableStreamInternalController::Writable::abort(kj::Exception&& ex) {
canceler.cancel(kj::cp(ex));
sink->abort(kj::mv(ex), option);
sink->abort(kj::mv(ex));
}

WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) {
Expand Down Expand Up @@ -1175,7 +1174,7 @@ jsg::Promise<void> WritableStreamInternalController::doAbort(
// stream will be put into an errored state immediately after draining the
// queue. All pending writes and other operations in the queue will be rejected
// immediately and an immediately resolved or rejected promise will be returned.
writable->abort(kj::cp(exception), WritableStreamSink::AbortOption::ABORT_QUEUED_WRITES);
writable->abort(kj::cp(exception));
drain(js, reason);
return options.reject ?
rejectedMaybeHandledPromise<void>(js, reason, options.handled) :
Expand Down Expand Up @@ -1585,7 +1584,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
};

const auto maybeAbort = [this](jsg::Lock& js, auto& request) -> bool {
auto& writable = state.get<IoOwn<Writable>>();
auto& writable = KJ_ASSERT_NONNULL(state.tryGet<IoOwn<Writable>>());
KJ_IF_SOME(pendingAbort, WritableStreamController::PendingAbort::dequeue(maybePendingAbort)) {
auto ex = js.exceptionToKj(pendingAbort.reason.addRef(js));
writable->abort(kj::mv(ex));
Expand Down Expand Up @@ -2354,7 +2353,7 @@ kj::Promise<void> IdentityTransformStreamImpl::end() {
return writeHelper(kj::ArrayPtr<const kj::byte>());
}

void IdentityTransformStreamImpl::abort(kj::Exception reason, AbortOption option) {
void IdentityTransformStreamImpl::abort(kj::Exception reason) {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(idle, Idle) {
// This is fine.
Expand All @@ -2363,24 +2362,18 @@ void IdentityTransformStreamImpl::abort(kj::Exception reason, AbortOption option
request.fulfiller->reject(kj::cp(reason));
}
KJ_CASE_ONEOF(request, WriteRequest) {
switch (option) {
case AbortOption::NONE: {
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending write() to finish");
}
case AbortOption::ABORT_QUEUED_WRITES: {
request.fulfiller->reject(kj::cp(reason));
break;
}
}
// IF the fulfiller is not waiting, the write promise was already
// canceled and no one is waiting on it.
KJ_ASSERT(!request.fulfiller->isWaiting(),
"abort() is supposed to wait for any pending write() to finish");
}
KJ_CASE_ONEOF(exception, kj::Exception) {
// Already errored.
return;
}
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
if (option == AbortOption::NONE) {
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending close() to finish");
}
// If we're in the pending close state... it should be ok to just switch
// the state to errored below.
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ class WritableStreamInternalController: public WritableStreamController {
kj::Own<WritableStreamSink> sink;
kj::Canceler canceler;
Writable(kj::Own<WritableStreamSink> sink) : sink(kj::mv(sink)) {}
void abort(kj::Exception&& ex, WritableStreamSink::AbortOption option =
WritableStreamSink::AbortOption::NONE);
void abort(kj::Exception&& ex);
};

explicit WritableStreamInternalController(StreamStates::Closed closed)
Expand Down Expand Up @@ -412,7 +411,7 @@ class IdentityTransformStreamImpl: public kj::Refcounted,

kj::Promise<void> end() override;

void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) override;
void abort(kj::Exception reason) override;

private:
kj::Promise<size_t> readHelper(kj::ArrayPtr<kj::byte> bytes);
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/system-streams.c++
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public:

kj::Promise<void> end() override;

void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) override;
void abort(kj::Exception reason) override;

StreamEncoding disownEncodingResponsibility() override;

Expand Down Expand Up @@ -299,7 +299,7 @@ kj::Promise<void> EncodedAsyncOutputStream::end() {
return promise.attach(ioContext.registerPendingEvent());
}

void EncodedAsyncOutputStream::abort(kj::Exception reason, AbortOption ignored) {
void EncodedAsyncOutputStream::abort(kj::Exception reason) {
inner.init<Ended>();
}

Expand Down

0 comments on commit 8165500

Please sign in to comment.