Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Clear WritableStreamSink queue on abort with compat flag" #2520

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 36 additions & 69 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -878,11 +878,6 @@ void ReadableStreamInternalController::releaseReader(
}
}

void WritableStreamInternalController::Writable::abort(kj::Exception&& ex) {
canceler.cancel(kj::cp(ex));
sink->abort(kj::mv(ex));
}

WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) {
if (writeState.is<WriterLocked>()) {
writeState.init<Unlocked>();
Expand Down Expand Up @@ -917,7 +912,7 @@ jsg::Promise<void> WritableStreamInternalController::write(
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
return js.rejectedPromise<void>(errored.addRef(js));
}
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
KJ_CASE_ONEOF(writable, Writable) {
if (value == kj::none) {
return js.resolvedPromise();
}
Expand Down Expand Up @@ -1057,7 +1052,7 @@ jsg::Promise<void> WritableStreamInternalController::closeImpl(jsg::Lock& js, bo
auto reason = errored.getHandle(js);
return rejectedMaybeHandledPromise<void>(js, reason, markAsHandled);
}
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
KJ_CASE_ONEOF(writable, Writable) {
auto prp = js.newPromiseAndResolver<void>();
if (markAsHandled) {
prp.promise.markAsHandled(js);
Expand Down Expand Up @@ -1120,7 +1115,7 @@ jsg::Promise<void> WritableStreamInternalController::flush(
auto reason = errored.getHandle(js);
return rejectedMaybeHandledPromise<void>(js, reason, markAsHandled);
}
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
KJ_CASE_ONEOF(writable, Writable) {
auto prp = js.newPromiseAndResolver<void>();
if (markAsHandled) {
prp.promise.markAsHandled(js);
Expand Down Expand Up @@ -1165,22 +1160,8 @@ jsg::Promise<void> WritableStreamInternalController::doAbort(
return kj::mv(promise);
}

KJ_IF_SOME(writable, state.tryGet<IoOwn<Writable>>()) {
KJ_IF_SOME(writable, state.tryGet<Writable>()) {
auto exception = js.exceptionToKj(js.v8Ref(reason));

if (FeatureFlags::get(js).getInternalWritableStreamAbortClearsQueue()) {
// If this flag is set, we will clear the queue proactively and immediately
// error the stream rather than handling the abort lazily. In this case, the
// stream will be put into an errored state immediately after draining the
// queue. All pending writes and other operations in the queue will be rejected
// immediately and an immediately resolved or rejected promise will be returned.
writable->abort(kj::cp(exception));
drain(js, reason);
return options.reject ?
rejectedMaybeHandledPromise<void>(js, reason, options.handled) :
js.resolvedPromise();
}

if (queue.empty()) {
writable->abort(kj::cp(exception));
doError(js, reason);
Expand Down Expand Up @@ -1246,7 +1227,7 @@ kj::Maybe<jsg::Promise<void>> WritableStreamInternalController::tryPipeFrom(
KJ_IF_SOME(errored, sourceLock.tryGetErrored(js)) {
sourceLock.release(js);
if (!preventAbort) {
if (state.tryGet<IoOwn<Writable>>() != kj::none) {
if (state.tryGet<Writable>() != kj::none) {
return doAbort(js, errored, { .reject = true, .handled = pipeThrough });
}
}
Expand Down Expand Up @@ -1346,8 +1327,8 @@ kj::Maybe<kj::Own<WritableStreamSink>> WritableStreamInternalController::removeS
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
kj::throwFatalException(js.exceptionToKj(errored.addRef(js)));
}
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
auto result = kj::mv(writable->sink);
KJ_CASE_ONEOF(writable, Writable) {
auto result = kj::mv(writable);
state.init<StreamStates::Closed>();
return kj::Maybe<kj::Own<WritableStreamSink>>(kj::mv(result));
}
Expand All @@ -1371,7 +1352,7 @@ void WritableStreamInternalController::detach(jsg::Lock& js) {
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
kj::throwFatalException(js.exceptionToKj(errored.addRef(js)));
}
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
KJ_CASE_ONEOF(writable, Writable) {
state.init<StreamStates::Closed>();
return;
}
Expand All @@ -1384,7 +1365,7 @@ kj::Maybe<int> WritableStreamInternalController::getDesiredSize() {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) { return 0; }
KJ_CASE_ONEOF(errored, StreamStates::Errored) { return kj::none; }
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
KJ_CASE_ONEOF(writable, Writable) {
KJ_IF_SOME(highWaterMark, maybeHighWaterMark) {
return highWaterMark - currentWriteBufferSize;
}
Expand Down Expand Up @@ -1417,7 +1398,7 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
maybeRejectPromise<void>(js, lock.getClosedFulfiller(), errored.getHandle(js));
maybeRejectPromise<void>(js, lock.getReadyFulfiller(), errored.getHandle(js));
}
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
KJ_CASE_ONEOF(writable, Writable) {
maybeResolvePromise(js, lock.getReadyFulfiller());
}
}
Expand Down Expand Up @@ -1458,7 +1439,7 @@ bool WritableStreamInternalController::isClosedOrClosing() {
}

bool WritableStreamInternalController::isPiping() {
return state.is<IoOwn<Writable>>() && !queue.empty() && queue.back().event.is<Pipe>();
return state.is<Writable>() && !queue.empty() && queue.back().event.is<Pipe>();
}

bool WritableStreamInternalController::isErrored() {
Expand Down Expand Up @@ -1584,10 +1565,9 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
};

const auto maybeAbort = [this](jsg::Lock& js, auto& request) -> bool {
auto& writable = KJ_ASSERT_NONNULL(state.tryGet<IoOwn<Writable>>());
auto& writable = state.get<Writable>();
KJ_IF_SOME(pendingAbort, WritableStreamController::PendingAbort::dequeue(maybePendingAbort)) {
auto ex = js.exceptionToKj(pendingAbort.reason.addRef(js));
writable->abort(kj::mv(ex));
writable->abort(js.exceptionToKj(pendingAbort.reason.addRef(js)));
drain(js, pendingAbort.reason.getHandle(js));
pendingAbort.complete(js);
return true;
Expand All @@ -1611,12 +1591,12 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
}

// writeLoop() is only called with the sink in the Writable state.
auto& writable = state.get<IoOwn<Writable>>();
auto& writable = state.get<Writable>();
auto check = makeChecker(request);

auto amountToWrite = request.bytes.size();

auto promise = writable->sink->write(request.bytes).attach(kj::mv(request.ownBytes));
auto promise = writable->write(request.bytes).attach(kj::mv(request.ownBytes));

// TODO(soon): We use awaitIoLegacy() here because if the stream terminates in JavaScript in
// this same isolate, then the promise may actually be waiting on JavaScript to do something,
Expand All @@ -1626,7 +1606,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// jsg::Promises and not kj::Promises, so that it doesn't look like I/O at all, and there's
// no need to drop the isolate lock and take it again every time some data is read/written.
// That's a larger refactor, though.
return ioContext.awaitIoLegacy(js, writable->canceler.wrap(kj::mv(promise))).then(js,
return ioContext.awaitIoLegacy(js, kj::mv(promise)).then(js,
ioContext.addFunctor(
[this, check, maybeAbort, amountToWrite](jsg::Lock& js) -> jsg::Promise<void> {
auto& request = check();
Expand All @@ -1640,13 +1620,12 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
-> jsg::Promise<void> {
auto handle = reason.getHandle(js);
auto& request = check();
auto& writable = state.get<IoOwn<Writable>>();
auto& writable = state.get<Writable>();
decreaseCurrentWriteBufferSize(js, amountToWrite);
maybeRejectPromise<void>(js, request.promise, handle);
queue.pop_front();
if (!maybeAbort(js, request)) {
auto ex = js.exceptionToKj(reason.addRef(js));
writable->abort(kj::mv(ex));
writable->abort(js.exceptionToKj(reason.addRef(js)));
drain(js, handle);
}
return js.resolvedPromise();
Expand All @@ -1657,7 +1636,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// The destination should still be Writable, because the only way to transition to an
// errored state would have been if a write request in the queue ahead of us encountered an
// error. But in that case, the queue would already have been drained and we wouldn't be here.
auto& writable = state.get<IoOwn<Writable>>();
auto& writable = state.get<Writable>();

if (request.checkSignal(js)) {
// If the signal is triggered, checkSignal will handle erroring the source and destination.
Expand All @@ -1683,8 +1662,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// If the source is errored, the spec requires us to error the destination unless the
// preventAbort option is true.
if (!request.preventAbort) {
auto ex = js.exceptionToKj(js.v8Ref(errored));
writable->abort(kj::mv(ex));
writable->abort(js.exceptionToKj(js.v8Ref(errored)));
drain(js, errored);
} else {
writeState.init<Unlocked>();
Expand Down Expand Up @@ -1761,10 +1739,9 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
}));
};

KJ_IF_SOME(promise, request.source.tryPumpTo(*writable->sink, !request.preventClose)) {
KJ_IF_SOME(promise, request.source.tryPumpTo(*writable, !request.preventClose)) {
return handlePromise(js, ioContext.awaitIo(js,
writable->canceler.wrap(
AbortSignal::maybeCancelWrap(request.maybeSignal, kj::mv(promise)))));
AbortSignal::maybeCancelWrap(request.maybeSignal, kj::mv(promise))));
}

// The ReadableStream is JavaScript-backed. We can still pipe the data but it's going to be
Expand All @@ -1776,11 +1753,10 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
}
KJ_CASE_ONEOF(request, Close) {
// writeLoop() is only called with the sink in the Writable state.
auto& writable = state.get<IoOwn<Writable>>();
auto& writable = state.get<Writable>();
auto check = makeChecker(request);

return ioContext.awaitIo(js, writable->canceler.wrap(
writable->sink->end())).then(js,
return ioContext.awaitIo(js, writable->end()).then(js,
ioContext.addFunctor([this, check](jsg::Lock& js) {
auto& request = check();
maybeResolvePromise(js, request.promise);
Expand Down Expand Up @@ -1826,9 +1802,8 @@ bool WritableStreamInternalController::Pipe::checkSignal(jsg::Lock& js) {
auto promise = kj::mv(this->promise);

if (!preventAbort) {
KJ_IF_SOME(writable, parent.state.tryGet<IoOwn<Writable>>()) {
auto ex = js.exceptionToKj(reason);
writable->abort(kj::mv(ex));
if (parent.state.tryGet<Writable>() != kj::none) {
parent.state.get<Writable>()->abort(js.exceptionToKj(reason));
parent.drain(js, reason);
} else {
parent.writeState.init<Unlocked>();
Expand All @@ -1849,7 +1824,7 @@ bool WritableStreamInternalController::Pipe::checkSignal(jsg::Lock& js) {
}

jsg::Promise<void> WritableStreamInternalController::Pipe::write(v8::Local<v8::Value> handle) {
auto& writable = parent.state.get<IoOwn<Writable>>();
auto& writable = parent.state.get<Writable>();
// TODO(soon): Once jsg::BufferSource lands and we're able to use it, this can be simplified.
KJ_ASSERT(handle->IsArrayBuffer() || handle->IsArrayBufferView());
std::shared_ptr<v8::BackingStore> store;
Expand All @@ -1870,7 +1845,7 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::write(v8::Local<v8::V
// v8::Isolate::GetCurrent();
jsg::Lock& js = jsg::Lock::from(v8::Isolate::GetCurrent());
return IoContext::current().awaitIo(js,
writable->canceler.wrap(writable->sink->write(kj::arrayPtr(data, byteLength)))
writable->write(kj::arrayPtr(data, byteLength))
.attach(js.v8Ref(v8::ArrayBuffer::New(js.v8Isolate, store))), [](jsg::Lock&){});
}

Expand Down Expand Up @@ -1900,9 +1875,8 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j
KJ_IF_SOME(errored, source.tryGetErrored(js)) {
source.release(js);
if (!preventAbort) {
KJ_IF_SOME(writable, parent.state.tryGet<IoOwn<Writable>>()) {
auto ex = js.exceptionToKj(js.v8Ref(errored));
writable->abort(kj::mv(ex));
if (parent.state.tryGet<Writable>() != kj::none) {
parent.state.get<Writable>()->abort(js.exceptionToKj(js.v8Ref(errored)));
return js.rejectedPromise<void>(errored);
}
}
Expand Down Expand Up @@ -1931,8 +1905,7 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j
if (!parent.isClosedOrClosing()) {
// We'll only be here if the sink is in the Writable state.
auto& ioContext = IoContext::current();
return ioContext.awaitIo(js,
parent.state.get<IoOwn<Writable>>()->sink->end(), [](jsg::Lock&){}).then(js,
return ioContext.awaitIo(js, parent.state.get<Writable>()->end(), [](jsg::Lock&){}).then(js,
ioContext.addFunctor([this](jsg::Lock& js) { parent.finishClose(js); }),
ioContext.addFunctor([this](jsg::Lock& js, jsg::Value reason) {
parent.finishError(js, reason.getHandle(js));
Expand Down Expand Up @@ -1982,9 +1955,7 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j
// Undefined and null are perfectly valid values to pass through a ReadableStream,
// but we can't interpret them as bytes so if we get them here, we error the pipe.
auto error = js.v8TypeError("This WritableStream only supports writing byte types."_kj);
auto& writable = parent.state.get<IoOwn<Writable>>();
auto ex = js.exceptionToKj(js.v8Ref(error));
writable->abort(kj::mv(ex));
parent.state.get<Writable>()->abort(js.exceptionToKj(js.v8Ref(error)));
// The error condition will be handled at the start of the next iteration.
return pipeLoop(js);
}), ioContext.addFunctor([this](jsg::Lock& js, jsg::Value reason) -> jsg::Promise<void> {
Expand Down Expand Up @@ -2362,18 +2333,14 @@ void IdentityTransformStreamImpl::abort(kj::Exception reason) {
request.fulfiller->reject(kj::cp(reason));
}
KJ_CASE_ONEOF(request, WriteRequest) {
// IF the fulfiller is not waiting, the write promise was already
// canceled and no one is waiting on it.
KJ_ASSERT(!request.fulfiller->isWaiting(),
"abort() is supposed to wait for any pending write() to finish");
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending write() to finish");
}
KJ_CASE_ONEOF(exception, kj::Exception) {
// Already errored.
return;
}
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
// If we're in the pending close state... it should be ok to just switch
// the state to errored below.
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending close() to finish");
}
}

Expand Down Expand Up @@ -2503,7 +2470,7 @@ kj::Own<WritableStreamController> newWritableStreamInternalController(
kj::Maybe<uint64_t> maybeHighWaterMark,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable) {
return kj::heap<WritableStreamInternalController>(
kj::mv(sink),
ioContext.addObject(kj::mv(sink)),
maybeHighWaterMark,
kj::mv(maybeClosureWaitable));
}
Expand All @@ -2521,7 +2488,7 @@ void WritableStreamInternalController::jsgGetMemoryInfo(jsg::MemoryTracker& trac
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
tracker.trackField("error", errored);
}
KJ_CASE_ONEOF(_, IoOwn<Writable>) {
KJ_CASE_ONEOF(_, Writable) {
// Ideally we'd be able to track the size of any pending writes held in the sink's
// queue but since it is behind an IoOwn and we won't be holding the IoContext here,
// we can't.
Expand Down
18 changes: 6 additions & 12 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,23 +168,17 @@ class ReadableStreamInternalController: public ReadableStreamController {

class WritableStreamInternalController: public WritableStreamController {
public:
struct Writable {
kj::Own<WritableStreamSink> sink;
kj::Canceler canceler;
Writable(kj::Own<WritableStreamSink> sink) : sink(kj::mv(sink)) {}
void abort(kj::Exception&& ex);
};
using Writable = IoOwn<WritableStreamSink>;

explicit WritableStreamInternalController(StreamStates::Closed closed)
: state(closed) {}
explicit WritableStreamInternalController(StreamStates::Errored errored)
: state(kj::mv(errored)) {}
explicit WritableStreamInternalController(kj::Own<WritableStreamSink> writable,
explicit WritableStreamInternalController(Writable writable,
kj::Maybe<uint64_t> maybeHighWaterMark = kj::none,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none)
: state(IoContext::current().addObject(kj::heap<Writable>(kj::mv(writable)))),
maybeHighWaterMark(maybeHighWaterMark),
maybeClosureWaitable(kj::mv(maybeClosureWaitable)) {
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none) : state(kj::mv(writable)),
maybeHighWaterMark(maybeHighWaterMark),
maybeClosureWaitable(kj::mv(maybeClosureWaitable)) {
}

WritableStreamInternalController(WritableStreamInternalController&& other) = default;
Expand Down Expand Up @@ -275,7 +269,7 @@ class WritableStreamInternalController: public WritableStreamController {
};

kj::Maybe<WritableStream&> owner;
kj::OneOf<StreamStates::Closed, StreamStates::Errored, IoOwn<Writable>> state;
kj::OneOf<StreamStates::Closed, StreamStates::Errored, Writable> state;
kj::OneOf<Unlocked, Locked, PipeLocked, WriterLocked> writeState = Unlocked();

kj::Maybe<PendingAbort> maybePendingAbort;
Expand Down
23 changes: 0 additions & 23 deletions src/workerd/api/tests/abort-internal-streams-test.js

This file was deleted.

15 changes: 0 additions & 15 deletions src/workerd/api/tests/abort-internal-streams-test.wd-test

This file was deleted.

Loading
Loading