Skip to content

Commit

Permalink
Deprecate WritableStream::removeSink (#2064)
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell authored May 2, 2024
1 parent a2a20a6 commit 9b385aa
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/workerd/api/sockets.c++
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ jsg::Ref<Socket> Socket::startTls(jsg::Lock& js, jsg::Optional<TlsOptions> tlsOp
auto secureStreamPromise = context.awaitJs(js, writable->flush(js).then(js,
[this, domain = kj::heapString(domain), tlsOptions = kj::mv(tlsOptions),
tlsStarter = kj::mv(tlsStarter)](jsg::Lock& js) mutable {
writable->removeSink(js);
writable->detach(js);
readable = readable->detach(js, true);
closedResolver.resolve(js);

Expand Down
11 changes: 6 additions & 5 deletions src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,6 @@ kj::Own<ReadableStreamController> newReadableStreamInternalController(
//
// As such, it exists within the V8 heap (it's allocated directly as a member of the
// WritableStream) and will always execute within the V8 isolate lock.
// Both the WritableStreamDefaultController and WritableStreamInternalController will support
// the removeSink() method that can be used to acquire a kj heap object that can be used to
// write data from outside of the isolate lock, however, when using the
// WritableStreamDefaultController, each write operation will require acquiring the isolate lock.
//
// The methods here return jsg::Promise rather than kj::Promise because the controller
// operations here do not always require passing through the kj mechanisms or kj event loop.
Expand Down Expand Up @@ -689,7 +685,12 @@ class WritableStreamController {
// does not support removing the sink. After the WritableStreamSink has been released, all other
// methods on the controller should fail with an exception as the WritableStreamSink should be
// the only way to interact with the underlying sink.
virtual kj::Maybe<kj::Own<WritableStreamSink>> removeSink(jsg::Lock& js) = 0;
virtual kj::Maybe<kj::Own<WritableStreamSink>> removeSink(
jsg::Lock& js) = 0 ;

// Detaches the WritableStreamController from it's underlying implementation, leaving the
// writable stream locked and in a state where no further writes can be made.
virtual void detach(jsg::Lock& js) = 0;

virtual kj::Maybe<int> getDesiredSize() = 0;

Expand Down
23 changes: 23 additions & 0 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,29 @@ kj::Maybe<kj::Own<WritableStreamSink>> WritableStreamInternalController::removeS
KJ_UNREACHABLE;
}

void WritableStreamInternalController::detach(jsg::Lock& js) {
JSG_REQUIRE(!isLockedToWriter(), TypeError,
"This WritableStream is currently locked to a writer.");
JSG_REQUIRE(!isClosedOrClosing(), TypeError, "This WritableStream is closed.");

writeState.init<Locked>();

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
// Handled by the isClosedOrClosing() check above;
KJ_UNREACHABLE;
}
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
kj::throwFatalException(js.exceptionToKj(errored.addRef(js)));
}
KJ_CASE_ONEOF(writable, Writable) {
state.init<StreamStates::Closed>();
}
}

KJ_UNREACHABLE;
}

kj::Maybe<int> WritableStreamInternalController::getDesiredSize() {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) { return 0; }
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class WritableStreamInternalController: public WritableStreamController {
PipeToOptions options) override;

kj::Maybe<kj::Own<WritableStreamSink>> removeSink(jsg::Lock& js) override;
void detach(jsg::Lock& js) override;

kj::Maybe<int> getDesiredSize() override;

Expand Down
8 changes: 4 additions & 4 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -791,10 +791,6 @@ private:
// The WritableStreamJsController provides the implementation of custom
// WritableStream's backed by a user-code provided Underlying Sink. The implementation
// is fairly complicated and defined entirely by the streams specification.
//
// Importantly, the controller is designed to operate entirely within the JavaScript
// isolate lock. It is possible to call removeSink() to acquire a WritableStreamSink
// implementation that delegates to the WritableStreamDefaultController.
class WritableStreamJsController: public WritableStreamController {
public:
using WritableLockImpl = WritableLockImpl<WritableStreamJsController>;
Expand Down Expand Up @@ -847,6 +843,7 @@ public:
void releaseWriter(Writer& writer, kj::Maybe<jsg::Lock&> maybeJs) override;

kj::Maybe<kj::Own<WritableStreamSink>> removeSink(jsg::Lock& js) override;
void detach(jsg::Lock& js) override;

void setOwnerRef(WritableStream& stream) override;

Expand Down Expand Up @@ -3462,6 +3459,9 @@ void WritableStreamJsController::releaseWriter(
kj::Maybe<kj::Own<WritableStreamSink>> WritableStreamJsController::removeSink(jsg::Lock& js) {
KJ_UNIMPLEMENTED("WritableStreamJsController::removeSink is not implemented");
}
void WritableStreamJsController::detach(jsg::Lock& js) {
KJ_UNIMPLEMENTED("WritableStreamJsController::detach is not implemented");
}

void WritableStreamJsController::setOwnerRef(WritableStream& stream) {
owner = stream;
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/api/streams/writable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ kj::Own<WritableStreamSink> WritableStream::removeSink(jsg::Lock& js) {
"This WritableStream does not have a WritableStreamSink");
}

void WritableStream::detach(jsg::Lock& js) {
getController().detach(js);
}

jsg::Promise<void> WritableStream::abort(
jsg::Lock& js,
jsg::Optional<v8::Local<v8::Value>> reason) {
Expand Down
6 changes: 5 additions & 1 deletion src/workerd/api/streams/writable.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ class WritableStream: public jsg::Object {
// Remove and return the underlying implementation of this WritableStream. Throw a TypeError if
// this WritableStream is locked or closed, otherwise this WritableStream becomes immediately
// locked and closed. If this writable stream is errored, throw the stored error.
virtual kj::Own<WritableStreamSink> removeSink(jsg::Lock& js);
// TODO(cleanup): There are a couple of places where we need to convert to using detach()
// or the inner removeSink (on WritableStreamController) before we can remove this method.
virtual KJ_DEPRECATED("Use detach() instead") kj::Own<WritableStreamSink> removeSink(
jsg::Lock& js);
virtual void detach(jsg::Lock& js);

// ---------------------------------------------------------------------------
// JS interface
Expand Down

0 comments on commit 9b385aa

Please sign in to comment.