Skip to content

Commit

Permalink
Clear WritableStreamSink queue on abort with compat flag
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Aug 9, 2024
1 parent f07cd8e commit 346fe52
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 43 deletions.
105 changes: 69 additions & 36 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,11 @@ void ReadableStreamInternalController::releaseReader(
}
}

void WritableStreamInternalController::Writable::abort(kj::Exception&& ex) {
canceler.cancel(kj::cp(ex));
sink->abort(kj::mv(ex));
}

WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) {
if (writeState.is<WriterLocked>()) {
writeState.init<Unlocked>();
Expand Down Expand Up @@ -912,7 +917,7 @@ jsg::Promise<void> WritableStreamInternalController::write(
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
return js.rejectedPromise<void>(errored.addRef(js));
}
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
if (value == kj::none) {
return js.resolvedPromise();
}
Expand Down Expand Up @@ -1052,7 +1057,7 @@ jsg::Promise<void> WritableStreamInternalController::closeImpl(jsg::Lock& js, bo
auto reason = errored.getHandle(js);
return rejectedMaybeHandledPromise<void>(js, reason, markAsHandled);
}
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
auto prp = js.newPromiseAndResolver<void>();
if (markAsHandled) {
prp.promise.markAsHandled(js);
Expand Down Expand Up @@ -1115,7 +1120,7 @@ jsg::Promise<void> WritableStreamInternalController::flush(
auto reason = errored.getHandle(js);
return rejectedMaybeHandledPromise<void>(js, reason, markAsHandled);
}
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
auto prp = js.newPromiseAndResolver<void>();
if (markAsHandled) {
prp.promise.markAsHandled(js);
Expand Down Expand Up @@ -1160,8 +1165,22 @@ jsg::Promise<void> WritableStreamInternalController::doAbort(
return kj::mv(promise);
}

KJ_IF_SOME(writable, state.tryGet<Writable>()) {
KJ_IF_SOME(writable, state.tryGet<IoOwn<Writable>>()) {
auto exception = js.exceptionToKj(js.v8Ref(reason));

if (FeatureFlags::get(js).getInternalWritableStreamAbortClearsQueue()) {
// If this flag is set, we will clear the queue proactively and immediately
// error the stream rather than handling the abort lazily. In this case, the
// stream will be put into an errored state immediately after draining the
// queue. All pending writes and other operations in the queue will be rejected
// immediately and an immediately resolved or rejected promise will be returned.
writable->abort(kj::cp(exception));
drain(js, reason);
return options.reject ?
rejectedMaybeHandledPromise<void>(js, reason, options.handled) :
js.resolvedPromise();
}

if (queue.empty()) {
writable->abort(kj::cp(exception));
doError(js, reason);
Expand Down Expand Up @@ -1227,7 +1246,7 @@ kj::Maybe<jsg::Promise<void>> WritableStreamInternalController::tryPipeFrom(
KJ_IF_SOME(errored, sourceLock.tryGetErrored(js)) {
sourceLock.release(js);
if (!preventAbort) {
if (state.tryGet<Writable>() != kj::none) {
if (state.tryGet<IoOwn<Writable>>() != kj::none) {
return doAbort(js, errored, { .reject = true, .handled = pipeThrough });
}
}
Expand Down Expand Up @@ -1327,8 +1346,8 @@ kj::Maybe<kj::Own<WritableStreamSink>> WritableStreamInternalController::removeS
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
kj::throwFatalException(js.exceptionToKj(errored.addRef(js)));
}
KJ_CASE_ONEOF(writable, Writable) {
auto result = kj::mv(writable);
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
auto result = kj::mv(writable->sink);
state.init<StreamStates::Closed>();
return kj::Maybe<kj::Own<WritableStreamSink>>(kj::mv(result));
}
Expand All @@ -1352,7 +1371,7 @@ void WritableStreamInternalController::detach(jsg::Lock& js) {
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
kj::throwFatalException(js.exceptionToKj(errored.addRef(js)));
}
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
state.init<StreamStates::Closed>();
return;
}
Expand All @@ -1365,7 +1384,7 @@ kj::Maybe<int> WritableStreamInternalController::getDesiredSize() {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) { return 0; }
KJ_CASE_ONEOF(errored, StreamStates::Errored) { return kj::none; }
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
KJ_IF_SOME(highWaterMark, maybeHighWaterMark) {
return highWaterMark - currentWriteBufferSize;
}
Expand Down Expand Up @@ -1398,7 +1417,7 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
maybeRejectPromise<void>(js, lock.getClosedFulfiller(), errored.getHandle(js));
maybeRejectPromise<void>(js, lock.getReadyFulfiller(), errored.getHandle(js));
}
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
maybeResolvePromise(js, lock.getReadyFulfiller());
}
}
Expand Down Expand Up @@ -1439,7 +1458,7 @@ bool WritableStreamInternalController::isClosedOrClosing() {
}

bool WritableStreamInternalController::isPiping() {
return state.is<Writable>() && !queue.empty() && queue.back().event.is<Pipe>();
return state.is<IoOwn<Writable>>() && !queue.empty() && queue.back().event.is<Pipe>();
}

bool WritableStreamInternalController::isErrored() {
Expand Down Expand Up @@ -1565,9 +1584,10 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
};

const auto maybeAbort = [this](jsg::Lock& js, auto& request) -> bool {
auto& writable = state.get<Writable>();
auto& writable = KJ_ASSERT_NONNULL(state.tryGet<IoOwn<Writable>>());
KJ_IF_SOME(pendingAbort, WritableStreamController::PendingAbort::dequeue(maybePendingAbort)) {
writable->abort(js.exceptionToKj(pendingAbort.reason.addRef(js)));
auto ex = js.exceptionToKj(pendingAbort.reason.addRef(js));
writable->abort(kj::mv(ex));
drain(js, pendingAbort.reason.getHandle(js));
pendingAbort.complete(js);
return true;
Expand All @@ -1591,12 +1611,12 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
}

// writeLoop() is only called with the sink in the Writable state.
auto& writable = state.get<Writable>();
auto& writable = state.get<IoOwn<Writable>>();
auto check = makeChecker(request);

auto amountToWrite = request.bytes.size();

auto promise = writable->write(request.bytes).attach(kj::mv(request.ownBytes));
auto promise = writable->sink->write(request.bytes).attach(kj::mv(request.ownBytes));

// TODO(soon): We use awaitIoLegacy() here because if the stream terminates in JavaScript in
// this same isolate, then the promise may actually be waiting on JavaScript to do something,
Expand All @@ -1606,7 +1626,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// jsg::Promises and not kj::Promises, so that it doesn't look like I/O at all, and there's
// no need to drop the isolate lock and take it again every time some data is read/written.
// That's a larger refactor, though.
return ioContext.awaitIoLegacy(js, kj::mv(promise)).then(js,
return ioContext.awaitIoLegacy(js, writable->canceler.wrap(kj::mv(promise))).then(js,
ioContext.addFunctor(
[this, check, maybeAbort, amountToWrite](jsg::Lock& js) -> jsg::Promise<void> {
auto& request = check();
Expand All @@ -1620,12 +1640,13 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
-> jsg::Promise<void> {
auto handle = reason.getHandle(js);
auto& request = check();
auto& writable = state.get<Writable>();
auto& writable = state.get<IoOwn<Writable>>();
decreaseCurrentWriteBufferSize(js, amountToWrite);
maybeRejectPromise<void>(js, request.promise, handle);
queue.pop_front();
if (!maybeAbort(js, request)) {
writable->abort(js.exceptionToKj(reason.addRef(js)));
auto ex = js.exceptionToKj(reason.addRef(js));
writable->abort(kj::mv(ex));
drain(js, handle);
}
return js.resolvedPromise();
Expand All @@ -1636,7 +1657,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// The destination should still be Writable, because the only way to transition to an
// errored state would have been if a write request in the queue ahead of us encountered an
// error. But in that case, the queue would already have been drained and we wouldn't be here.
auto& writable = state.get<Writable>();
auto& writable = state.get<IoOwn<Writable>>();

if (request.checkSignal(js)) {
// If the signal is triggered, checkSignal will handle erroring the source and destination.
Expand All @@ -1662,7 +1683,8 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// If the source is errored, the spec requires us to error the destination unless the
// preventAbort option is true.
if (!request.preventAbort) {
writable->abort(js.exceptionToKj(js.v8Ref(errored)));
auto ex = js.exceptionToKj(js.v8Ref(errored));
writable->abort(kj::mv(ex));
drain(js, errored);
} else {
writeState.init<Unlocked>();
Expand Down Expand Up @@ -1739,9 +1761,10 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
}));
};

KJ_IF_SOME(promise, request.source.tryPumpTo(*writable, !request.preventClose)) {
KJ_IF_SOME(promise, request.source.tryPumpTo(*writable->sink, !request.preventClose)) {
return handlePromise(js, ioContext.awaitIo(js,
AbortSignal::maybeCancelWrap(request.maybeSignal, kj::mv(promise))));
writable->canceler.wrap(
AbortSignal::maybeCancelWrap(request.maybeSignal, kj::mv(promise)))));
}

// The ReadableStream is JavaScript-backed. We can still pipe the data but it's going to be
Expand All @@ -1753,10 +1776,11 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
}
KJ_CASE_ONEOF(request, Close) {
// writeLoop() is only called with the sink in the Writable state.
auto& writable = state.get<Writable>();
auto& writable = state.get<IoOwn<Writable>>();
auto check = makeChecker(request);

return ioContext.awaitIo(js, writable->end()).then(js,
return ioContext.awaitIo(js, writable->canceler.wrap(
writable->sink->end())).then(js,
ioContext.addFunctor([this, check](jsg::Lock& js) {
auto& request = check();
maybeResolvePromise(js, request.promise);
Expand Down Expand Up @@ -1802,8 +1826,9 @@ bool WritableStreamInternalController::Pipe::checkSignal(jsg::Lock& js) {
auto promise = kj::mv(this->promise);

if (!preventAbort) {
if (parent.state.tryGet<Writable>() != kj::none) {
parent.state.get<Writable>()->abort(js.exceptionToKj(reason));
KJ_IF_SOME(writable, parent.state.tryGet<IoOwn<Writable>>()) {
auto ex = js.exceptionToKj(reason);
writable->abort(kj::mv(ex));
parent.drain(js, reason);
} else {
parent.writeState.init<Unlocked>();
Expand All @@ -1824,7 +1849,7 @@ bool WritableStreamInternalController::Pipe::checkSignal(jsg::Lock& js) {
}

jsg::Promise<void> WritableStreamInternalController::Pipe::write(v8::Local<v8::Value> handle) {
auto& writable = parent.state.get<Writable>();
auto& writable = parent.state.get<IoOwn<Writable>>();
// TODO(soon): Once jsg::BufferSource lands and we're able to use it, this can be simplified.
KJ_ASSERT(handle->IsArrayBuffer() || handle->IsArrayBufferView());
std::shared_ptr<v8::BackingStore> store;
Expand All @@ -1845,7 +1870,7 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::write(v8::Local<v8::V
// v8::Isolate::GetCurrent();
jsg::Lock& js = jsg::Lock::from(v8::Isolate::GetCurrent());
return IoContext::current().awaitIo(js,
writable->write(kj::arrayPtr(data, byteLength))
writable->canceler.wrap(writable->sink->write(kj::arrayPtr(data, byteLength)))
.attach(js.v8Ref(v8::ArrayBuffer::New(js.v8Isolate, store))), [](jsg::Lock&){});
}

Expand Down Expand Up @@ -1875,8 +1900,9 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j
KJ_IF_SOME(errored, source.tryGetErrored(js)) {
source.release(js);
if (!preventAbort) {
if (parent.state.tryGet<Writable>() != kj::none) {
parent.state.get<Writable>()->abort(js.exceptionToKj(js.v8Ref(errored)));
KJ_IF_SOME(writable, parent.state.tryGet<IoOwn<Writable>>()) {
auto ex = js.exceptionToKj(js.v8Ref(errored));
writable->abort(kj::mv(ex));
return js.rejectedPromise<void>(errored);
}
}
Expand Down Expand Up @@ -1905,7 +1931,8 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j
if (!parent.isClosedOrClosing()) {
// We'll only be here if the sink is in the Writable state.
auto& ioContext = IoContext::current();
return ioContext.awaitIo(js, parent.state.get<Writable>()->end(), [](jsg::Lock&){}).then(js,
return ioContext.awaitIo(js,
parent.state.get<IoOwn<Writable>>()->sink->end(), [](jsg::Lock&){}).then(js,
ioContext.addFunctor([this](jsg::Lock& js) { parent.finishClose(js); }),
ioContext.addFunctor([this](jsg::Lock& js, jsg::Value reason) {
parent.finishError(js, reason.getHandle(js));
Expand Down Expand Up @@ -1955,7 +1982,9 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j
// Undefined and null are perfectly valid values to pass through a ReadableStream,
// but we can't interpret them as bytes so if we get them here, we error the pipe.
auto error = js.v8TypeError("This WritableStream only supports writing byte types."_kj);
parent.state.get<Writable>()->abort(js.exceptionToKj(js.v8Ref(error)));
auto& writable = parent.state.get<IoOwn<Writable>>();
auto ex = js.exceptionToKj(js.v8Ref(error));
writable->abort(kj::mv(ex));
// The error condition will be handled at the start of the next iteration.
return pipeLoop(js);
}), ioContext.addFunctor([this](jsg::Lock& js, jsg::Value reason) -> jsg::Promise<void> {
Expand Down Expand Up @@ -2333,14 +2362,18 @@ void IdentityTransformStreamImpl::abort(kj::Exception reason) {
request.fulfiller->reject(kj::cp(reason));
}
KJ_CASE_ONEOF(request, WriteRequest) {
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending write() to finish");
// IF the fulfiller is not waiting, the write promise was already
// canceled and no one is waiting on it.
KJ_ASSERT(!request.fulfiller->isWaiting(),
"abort() is supposed to wait for any pending write() to finish");
}
KJ_CASE_ONEOF(exception, kj::Exception) {
// Already errored.
return;
}
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending close() to finish");
// If we're in the pending close state... it should be ok to just switch
// the state to errored below.
}
}

Expand Down Expand Up @@ -2470,7 +2503,7 @@ kj::Own<WritableStreamController> newWritableStreamInternalController(
kj::Maybe<uint64_t> maybeHighWaterMark,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable) {
return kj::heap<WritableStreamInternalController>(
ioContext.addObject(kj::mv(sink)),
kj::mv(sink),
maybeHighWaterMark,
kj::mv(maybeClosureWaitable));
}
Expand All @@ -2488,7 +2521,7 @@ void WritableStreamInternalController::jsgGetMemoryInfo(jsg::MemoryTracker& trac
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
tracker.trackField("error", errored);
}
KJ_CASE_ONEOF(_, Writable) {
KJ_CASE_ONEOF(_, IoOwn<Writable>) {
// Ideally we'd be able to track the size of any pending writes held in the sink's
// queue but since it is behind an IoOwn and we won't be holding the IoContext here,
// we can't.
Expand Down
18 changes: 12 additions & 6 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,23 @@ class ReadableStreamInternalController: public ReadableStreamController {

class WritableStreamInternalController: public WritableStreamController {
public:
using Writable = IoOwn<WritableStreamSink>;
struct Writable {
kj::Own<WritableStreamSink> sink;
kj::Canceler canceler;
Writable(kj::Own<WritableStreamSink> sink) : sink(kj::mv(sink)) {}
void abort(kj::Exception&& ex);
};

explicit WritableStreamInternalController(StreamStates::Closed closed)
: state(closed) {}
explicit WritableStreamInternalController(StreamStates::Errored errored)
: state(kj::mv(errored)) {}
explicit WritableStreamInternalController(Writable writable,
explicit WritableStreamInternalController(kj::Own<WritableStreamSink> writable,
kj::Maybe<uint64_t> maybeHighWaterMark = kj::none,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none) : state(kj::mv(writable)),
maybeHighWaterMark(maybeHighWaterMark),
maybeClosureWaitable(kj::mv(maybeClosureWaitable)) {
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none)
: state(IoContext::current().addObject(kj::heap<Writable>(kj::mv(writable)))),
maybeHighWaterMark(maybeHighWaterMark),
maybeClosureWaitable(kj::mv(maybeClosureWaitable)) {
}

WritableStreamInternalController(WritableStreamInternalController&& other) = default;
Expand Down Expand Up @@ -269,7 +275,7 @@ class WritableStreamInternalController: public WritableStreamController {
};

kj::Maybe<WritableStream&> owner;
kj::OneOf<StreamStates::Closed, StreamStates::Errored, Writable> state;
kj::OneOf<StreamStates::Closed, StreamStates::Errored, IoOwn<Writable>> state;
kj::OneOf<Unlocked, Locked, PipeLocked, WriterLocked> writeState = Unlocked();

kj::Maybe<PendingAbort> maybePendingAbort;
Expand Down
23 changes: 23 additions & 0 deletions src/workerd/api/tests/abort-internal-streams-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { strictEqual } from 'assert';

export const abortInternalStreamsTest = {
async test() {
const { writable } = new IdentityTransformStream();

const writer = writable.getWriter();

const promise = writer.write(new Uint8Array(10));

await writer.abort();

// The write promise should abort proactively without waiting for a read,
// indicating that the queue was drained proactively when the abort was
// called.
try {
await promise;
throw new Error('The promise should have been rejected');
} catch (err) {
strictEqual(err, undefined);
}
}
};
15 changes: 15 additions & 0 deletions src/workerd/api/tests/abort-internal-streams-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "abort-internal-streams-test",
worker = (
modules = [
(name = "worker", esModule = embed "abort-internal-streams-test.js")
],
compatibilityDate = "2024-07-01",
compatibilityFlags = ["nodejs_compat_v2", "internal_writable_stream_abort_clears_queue"],
)
),
],
);
Loading

0 comments on commit 346fe52

Please sign in to comment.