Skip to content

Commit

Permalink
fixup! Clear WritableStreamSink queue on abort with compat flag
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Aug 9, 2024
1 parent 9591fe7 commit 58fc961
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 43 deletions.
2 changes: 1 addition & 1 deletion src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class WritableStreamSink {
NONE,
// Instructs the sink to reject pending writes when aborting. This is used when
// the InternalWritableStreamAbortClearsQueue flag is set.
DRAIN,
ABORT_QUEUED_WRITES,
};

virtual void abort(kj::Exception reason, AbortOption option = AbortOption::NONE) = 0;
Expand Down
88 changes: 52 additions & 36 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,12 @@ void ReadableStreamInternalController::releaseReader(
}
}

void WritableStreamInternalController::Writable::abort(
kj::Exception&& ex, WritableStreamSink::AbortOption option) {
canceler.cancel(kj::cp(ex));
sink->abort(kj::mv(ex), option);
}

WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) {
if (writeState.is<WriterLocked>()) {
writeState.init<Unlocked>();
Expand Down Expand Up @@ -912,7 +918,7 @@ jsg::Promise<void> WritableStreamInternalController::write(
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
return js.rejectedPromise<void>(errored.addRef(js));
}
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
if (value == kj::none) {
return js.resolvedPromise();
}
Expand Down Expand Up @@ -1052,7 +1058,7 @@ jsg::Promise<void> WritableStreamInternalController::closeImpl(jsg::Lock& js, bo
auto reason = errored.getHandle(js);
return rejectedMaybeHandledPromise<void>(js, reason, markAsHandled);
}
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
auto prp = js.newPromiseAndResolver<void>();
if (markAsHandled) {
prp.promise.markAsHandled(js);
Expand Down Expand Up @@ -1115,7 +1121,7 @@ jsg::Promise<void> WritableStreamInternalController::flush(
auto reason = errored.getHandle(js);
return rejectedMaybeHandledPromise<void>(js, reason, markAsHandled);
}
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
auto prp = js.newPromiseAndResolver<void>();
if (markAsHandled) {
prp.promise.markAsHandled(js);
Expand Down Expand Up @@ -1160,7 +1166,7 @@ jsg::Promise<void> WritableStreamInternalController::doAbort(
return kj::mv(promise);
}

KJ_IF_SOME(writable, state.tryGet<Writable>()) {
KJ_IF_SOME(writable, state.tryGet<IoOwn<Writable>>()) {
auto exception = js.exceptionToKj(js.v8Ref(reason));

if (FeatureFlags::get(js).getInternalWritableStreamAbortClearsQueue()) {
Expand All @@ -1169,7 +1175,7 @@ jsg::Promise<void> WritableStreamInternalController::doAbort(
// stream will be put into an errored state immediately after draining the
// queue. All pending writes and other operations in the queue will be rejected
// immediately and an immediately resolved or rejected promise will be returned.
writable->abort(kj::cp(exception), WritableStreamSink::AbortOption::DRAIN);
writable->abort(kj::cp(exception), WritableStreamSink::AbortOption::ABORT_QUEUED_WRITES);
drain(js, reason);
return options.reject ?
rejectedMaybeHandledPromise<void>(js, reason, options.handled) :
Expand Down Expand Up @@ -1241,7 +1247,7 @@ kj::Maybe<jsg::Promise<void>> WritableStreamInternalController::tryPipeFrom(
KJ_IF_SOME(errored, sourceLock.tryGetErrored(js)) {
sourceLock.release(js);
if (!preventAbort) {
if (state.tryGet<Writable>() != kj::none) {
if (state.tryGet<IoOwn<Writable>>() != kj::none) {
return doAbort(js, errored, { .reject = true, .handled = pipeThrough });
}
}
Expand Down Expand Up @@ -1341,8 +1347,8 @@ kj::Maybe<kj::Own<WritableStreamSink>> WritableStreamInternalController::removeS
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
kj::throwFatalException(js.exceptionToKj(errored.addRef(js)));
}
KJ_CASE_ONEOF(writable, Writable) {
auto result = kj::mv(writable);
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
auto result = kj::mv(writable->sink);
state.init<StreamStates::Closed>();
return kj::Maybe<kj::Own<WritableStreamSink>>(kj::mv(result));
}
Expand All @@ -1366,7 +1372,7 @@ void WritableStreamInternalController::detach(jsg::Lock& js) {
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
kj::throwFatalException(js.exceptionToKj(errored.addRef(js)));
}
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
state.init<StreamStates::Closed>();
return;
}
Expand All @@ -1379,7 +1385,7 @@ kj::Maybe<int> WritableStreamInternalController::getDesiredSize() {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) { return 0; }
KJ_CASE_ONEOF(errored, StreamStates::Errored) { return kj::none; }
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
KJ_IF_SOME(highWaterMark, maybeHighWaterMark) {
return highWaterMark - currentWriteBufferSize;
}
Expand Down Expand Up @@ -1412,7 +1418,7 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
maybeRejectPromise<void>(js, lock.getClosedFulfiller(), errored.getHandle(js));
maybeRejectPromise<void>(js, lock.getReadyFulfiller(), errored.getHandle(js));
}
KJ_CASE_ONEOF(writable, Writable) {
KJ_CASE_ONEOF(writable, IoOwn<Writable>) {
maybeResolvePromise(js, lock.getReadyFulfiller());
}
}
Expand Down Expand Up @@ -1453,7 +1459,7 @@ bool WritableStreamInternalController::isClosedOrClosing() {
}

bool WritableStreamInternalController::isPiping() {
return state.is<Writable>() && !queue.empty() && queue.back().event.is<Pipe>();
return state.is<IoOwn<Writable>>() && !queue.empty() && queue.back().event.is<Pipe>();
}

bool WritableStreamInternalController::isErrored() {
Expand Down Expand Up @@ -1579,9 +1585,10 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
};

const auto maybeAbort = [this](jsg::Lock& js, auto& request) -> bool {
auto& writable = state.get<Writable>();
auto& writable = state.get<IoOwn<Writable>>();
KJ_IF_SOME(pendingAbort, WritableStreamController::PendingAbort::dequeue(maybePendingAbort)) {
writable->abort(js.exceptionToKj(pendingAbort.reason.addRef(js)));
auto ex = js.exceptionToKj(pendingAbort.reason.addRef(js));
writable->abort(kj::mv(ex));
drain(js, pendingAbort.reason.getHandle(js));
pendingAbort.complete(js);
return true;
Expand All @@ -1605,12 +1612,12 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
}

// writeLoop() is only called with the sink in the Writable state.
auto& writable = state.get<Writable>();
auto& writable = state.get<IoOwn<Writable>>();
auto check = makeChecker(request);

auto amountToWrite = request.bytes.size();

auto promise = writable->write(request.bytes).attach(kj::mv(request.ownBytes));
auto promise = writable->sink->write(request.bytes).attach(kj::mv(request.ownBytes));

// TODO(soon): We use awaitIoLegacy() here because if the stream terminates in JavaScript in
// this same isolate, then the promise may actually be waiting on JavaScript to do something,
Expand All @@ -1620,7 +1627,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// jsg::Promises and not kj::Promises, so that it doesn't look like I/O at all, and there's
// no need to drop the isolate lock and take it again every time some data is read/written.
// That's a larger refactor, though.
return ioContext.awaitIoLegacy(js, kj::mv(promise)).then(js,
return ioContext.awaitIoLegacy(js, writable->canceler.wrap(kj::mv(promise))).then(js,
ioContext.addFunctor(
[this, check, maybeAbort, amountToWrite](jsg::Lock& js) -> jsg::Promise<void> {
auto& request = check();
Expand All @@ -1634,12 +1641,13 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
-> jsg::Promise<void> {
auto handle = reason.getHandle(js);
auto& request = check();
auto& writable = state.get<Writable>();
auto& writable = state.get<IoOwn<Writable>>();
decreaseCurrentWriteBufferSize(js, amountToWrite);
maybeRejectPromise<void>(js, request.promise, handle);
queue.pop_front();
if (!maybeAbort(js, request)) {
writable->abort(js.exceptionToKj(reason.addRef(js)));
auto ex = js.exceptionToKj(reason.addRef(js));
writable->abort(kj::mv(ex));
drain(js, handle);
}
return js.resolvedPromise();
Expand All @@ -1650,7 +1658,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// The destination should still be Writable, because the only way to transition to an
// errored state would have been if a write request in the queue ahead of us encountered an
// error. But in that case, the queue would already have been drained and we wouldn't be here.
auto& writable = state.get<Writable>();
auto& writable = state.get<IoOwn<Writable>>();

if (request.checkSignal(js)) {
// If the signal is triggered, checkSignal will handle erroring the source and destination.
Expand All @@ -1676,7 +1684,8 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// If the source is errored, the spec requires us to error the destination unless the
// preventAbort option is true.
if (!request.preventAbort) {
writable->abort(js.exceptionToKj(js.v8Ref(errored)));
auto ex = js.exceptionToKj(js.v8Ref(errored));
writable->abort(kj::mv(ex));
drain(js, errored);
} else {
writeState.init<Unlocked>();
Expand Down Expand Up @@ -1753,9 +1762,10 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
}));
};

KJ_IF_SOME(promise, request.source.tryPumpTo(*writable, !request.preventClose)) {
KJ_IF_SOME(promise, request.source.tryPumpTo(*writable->sink, !request.preventClose)) {
return handlePromise(js, ioContext.awaitIo(js,
AbortSignal::maybeCancelWrap(request.maybeSignal, kj::mv(promise))));
writable->canceler.wrap(
AbortSignal::maybeCancelWrap(request.maybeSignal, kj::mv(promise)))));
}

// The ReadableStream is JavaScript-backed. We can still pipe the data but it's going to be
Expand All @@ -1767,10 +1777,11 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
}
KJ_CASE_ONEOF(request, Close) {
// writeLoop() is only called with the sink in the Writable state.
auto& writable = state.get<Writable>();
auto& writable = state.get<IoOwn<Writable>>();
auto check = makeChecker(request);

return ioContext.awaitIo(js, writable->end()).then(js,
return ioContext.awaitIo(js, writable->canceler.wrap(
writable->sink->end())).then(js,
ioContext.addFunctor([this, check](jsg::Lock& js) {
auto& request = check();
maybeResolvePromise(js, request.promise);
Expand Down Expand Up @@ -1816,8 +1827,9 @@ bool WritableStreamInternalController::Pipe::checkSignal(jsg::Lock& js) {
auto promise = kj::mv(this->promise);

if (!preventAbort) {
if (parent.state.tryGet<Writable>() != kj::none) {
parent.state.get<Writable>()->abort(js.exceptionToKj(reason));
KJ_IF_SOME(writable, parent.state.tryGet<IoOwn<Writable>>()) {
auto ex = js.exceptionToKj(reason);
writable->abort(kj::mv(ex));
parent.drain(js, reason);
} else {
parent.writeState.init<Unlocked>();
Expand All @@ -1838,7 +1850,7 @@ bool WritableStreamInternalController::Pipe::checkSignal(jsg::Lock& js) {
}

jsg::Promise<void> WritableStreamInternalController::Pipe::write(v8::Local<v8::Value> handle) {
auto& writable = parent.state.get<Writable>();
auto& writable = parent.state.get<IoOwn<Writable>>();
// TODO(soon): Once jsg::BufferSource lands and we're able to use it, this can be simplified.
KJ_ASSERT(handle->IsArrayBuffer() || handle->IsArrayBufferView());
std::shared_ptr<v8::BackingStore> store;
Expand All @@ -1859,7 +1871,7 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::write(v8::Local<v8::V
// v8::Isolate::GetCurrent();
jsg::Lock& js = jsg::Lock::from(v8::Isolate::GetCurrent());
return IoContext::current().awaitIo(js,
writable->write(kj::arrayPtr(data, byteLength))
writable->canceler.wrap(writable->sink->write(kj::arrayPtr(data, byteLength)))
.attach(js.v8Ref(v8::ArrayBuffer::New(js.v8Isolate, store))), [](jsg::Lock&){});
}

Expand Down Expand Up @@ -1889,8 +1901,9 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j
KJ_IF_SOME(errored, source.tryGetErrored(js)) {
source.release(js);
if (!preventAbort) {
if (parent.state.tryGet<Writable>() != kj::none) {
parent.state.get<Writable>()->abort(js.exceptionToKj(js.v8Ref(errored)));
KJ_IF_SOME(writable, parent.state.tryGet<IoOwn<Writable>>()) {
auto ex = js.exceptionToKj(js.v8Ref(errored));
writable->abort(kj::mv(ex));
return js.rejectedPromise<void>(errored);
}
}
Expand Down Expand Up @@ -1919,7 +1932,8 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j
if (!parent.isClosedOrClosing()) {
// We'll only be here if the sink is in the Writable state.
auto& ioContext = IoContext::current();
return ioContext.awaitIo(js, parent.state.get<Writable>()->end(), [](jsg::Lock&){}).then(js,
return ioContext.awaitIo(js,
parent.state.get<IoOwn<Writable>>()->sink->end(), [](jsg::Lock&){}).then(js,
ioContext.addFunctor([this](jsg::Lock& js) { parent.finishClose(js); }),
ioContext.addFunctor([this](jsg::Lock& js, jsg::Value reason) {
parent.finishError(js, reason.getHandle(js));
Expand Down Expand Up @@ -1969,7 +1983,9 @@ jsg::Promise<void> WritableStreamInternalController::Pipe::pipeLoop(jsg::Lock& j
// Undefined and null are perfectly valid values to pass through a ReadableStream,
// but we can't interpret them as bytes so if we get them here, we error the pipe.
auto error = js.v8TypeError("This WritableStream only supports writing byte types."_kj);
parent.state.get<Writable>()->abort(js.exceptionToKj(js.v8Ref(error)));
auto& writable = parent.state.get<IoOwn<Writable>>();
auto ex = js.exceptionToKj(js.v8Ref(error));
writable->abort(kj::mv(ex));
// The error condition will be handled at the start of the next iteration.
return pipeLoop(js);
}), ioContext.addFunctor([this](jsg::Lock& js, jsg::Value reason) -> jsg::Promise<void> {
Expand Down Expand Up @@ -2351,7 +2367,7 @@ void IdentityTransformStreamImpl::abort(kj::Exception reason, AbortOption option
case AbortOption::NONE: {
KJ_FAIL_ASSERT("abort() is supposed to wait for any pending write() to finish");
}
case AbortOption::DRAIN: {
case AbortOption::ABORT_QUEUED_WRITES: {
request.fulfiller->reject(kj::cp(reason));
break;
}
Expand Down Expand Up @@ -2494,7 +2510,7 @@ kj::Own<WritableStreamController> newWritableStreamInternalController(
kj::Maybe<uint64_t> maybeHighWaterMark,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable) {
return kj::heap<WritableStreamInternalController>(
ioContext.addObject(kj::mv(sink)),
kj::mv(sink),
maybeHighWaterMark,
kj::mv(maybeClosureWaitable));
}
Expand All @@ -2512,7 +2528,7 @@ void WritableStreamInternalController::jsgGetMemoryInfo(jsg::MemoryTracker& trac
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
tracker.trackField("error", errored);
}
KJ_CASE_ONEOF(_, Writable) {
KJ_CASE_ONEOF(_, IoOwn<Writable>) {
// Ideally we'd be able to track the size of any pending writes held in the sink's
// queue but since it is behind an IoOwn and we won't be holding the IoContext here,
// we can't.
Expand Down
19 changes: 13 additions & 6 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,24 @@ class ReadableStreamInternalController: public ReadableStreamController {

class WritableStreamInternalController: public WritableStreamController {
public:
using Writable = IoOwn<WritableStreamSink>;
struct Writable {
kj::Own<WritableStreamSink> sink;
kj::Canceler canceler;
Writable(kj::Own<WritableStreamSink> sink) : sink(kj::mv(sink)) {}
void abort(kj::Exception&& ex, WritableStreamSink::AbortOption option =
WritableStreamSink::AbortOption::NONE);
};

explicit WritableStreamInternalController(StreamStates::Closed closed)
: state(closed) {}
explicit WritableStreamInternalController(StreamStates::Errored errored)
: state(kj::mv(errored)) {}
explicit WritableStreamInternalController(Writable writable,
explicit WritableStreamInternalController(kj::Own<WritableStreamSink> writable,
kj::Maybe<uint64_t> maybeHighWaterMark = kj::none,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none) : state(kj::mv(writable)),
maybeHighWaterMark(maybeHighWaterMark),
maybeClosureWaitable(kj::mv(maybeClosureWaitable)) {
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none)
: state(IoContext::current().addObject(kj::heap<Writable>(kj::mv(writable)))),
maybeHighWaterMark(maybeHighWaterMark),
maybeClosureWaitable(kj::mv(maybeClosureWaitable)) {
}

WritableStreamInternalController(WritableStreamInternalController&& other) = default;
Expand Down Expand Up @@ -269,7 +276,7 @@ class WritableStreamInternalController: public WritableStreamController {
};

kj::Maybe<WritableStream&> owner;
kj::OneOf<StreamStates::Closed, StreamStates::Errored, Writable> state;
kj::OneOf<StreamStates::Closed, StreamStates::Errored, IoOwn<Writable>> state;
kj::OneOf<Unlocked, Locked, PipeLocked, WriterLocked> writeState = Unlocked();

kj::Maybe<PendingAbort> maybePendingAbort;
Expand Down

0 comments on commit 58fc961

Please sign in to comment.