Skip to content

Commit

Permalink
JSRPC: Fix returning RPC stream as HTTP response body.
Browse files Browse the repository at this point in the history
If, in an HTTP handler, you fetched a ReadableStream over RPC, and then used that ReadableStream as your HTTP response, the stream would be prematurely canceled. The problem is that the system thinks deferred proxying is possible, since we're pumping from a system stream. So, it shuts down the IoContext, thinking that no more JavaScript needs to run. However, the IoContext itself holds open the RPC context; canceling it breaks the stream.

In the future we should solve this properly s othat deferred proxying is possible. For now, we simply block deferred proxying of RPC streams.
  • Loading branch information
kentonv committed Mar 27, 2024
1 parent d127570 commit 0c9056f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 7 deletions.
55 changes: 54 additions & 1 deletion src/workerd/api/streams/readable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,57 @@ private:
kj::Maybe<uint64_t> expectedLength;
};

// Wrapper around ReadableStreamSource that prevents deferred proxying. We need this for RPC
// streams because although they are "system streams", they become disconnected when the IoContext
// is destroyed, due to the JsRpcCustomEventImpl being canceled.
//
// TODO(someday): Devise a better way for RPC streams to extend the lifetime of the RPC session
// beyond the destruction of the IoContext, if it is being used for deferred proxying.
class NoDeferredProxyReadableStream: public ReadableStreamSource {
public:
NoDeferredProxyReadableStream(kj::Own<ReadableStreamSource> inner, IoContext& ioctx)
: inner(kj::mv(inner)), ioctx(ioctx) {}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
}

kj::Promise<DeferredProxy<void>> pumpTo(WritableStreamSink& output, bool end) override {
// Move the deferred proxy part of the task over to the non-deferred part. To do this,
// we use `ioctx.waitForDeferredProxy()`, which returns a single promise covering both parts
// (and, importantly, registering pending events where needed). Then, we add a noop deferred
// proxy to the end of that.
return addNoopDeferredProxy(ioctx.waitForDeferredProxy(inner->pumpTo(output, end)));
}

StreamEncoding getPreferredEncoding() override {
return inner->getPreferredEncoding();
}

kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) override {
return inner->tryGetLength(encoding);
}

void cancel(kj::Exception reason) override {
return inner->cancel(kj::mv(reason));
}

kj::Maybe<Tee> tryTee(uint64_t limit) override {
return inner->tryTee(limit).map([&](Tee tee) {
return Tee {
.branches = {
kj::heap<NoDeferredProxyReadableStream>(kj::mv(tee.branches[0]), ioctx),
kj::heap<NoDeferredProxyReadableStream>(kj::mv(tee.branches[1]), ioctx),
}
};
});
}

private:
kj::Own<ReadableStreamSource> inner;
IoContext& ioctx;
};

} // namespace

void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
Expand Down Expand Up @@ -693,7 +744,9 @@ jsg::Ref<ReadableStream> ReadableStream::deserialize(

externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out)));

return jsg::alloc<ReadableStream>(ioctx, newSystemStream(kj::mv(in), encoding, ioctx));
return jsg::alloc<ReadableStream>(ioctx,
kj::heap<NoDeferredProxyReadableStream>(
newSystemStream(kj::mv(in), encoding, ioctx), ioctx));
}

kj::StringPtr ReaderImpl::jsgGetMemoryName() const { return "ReaderImpl"_kjc; }
Expand Down
26 changes: 20 additions & 6 deletions src/workerd/api/tests/js-rpc-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,18 @@ export let nonClass = {

async fetch(req, env, ctx) {
// This is used in the stream test to fetch some gziped data.
return new Response("this text was gzipped", {
headers: {
"Content-Encoding": "gzip"
}
});
if (req.url.endsWith("/gzip")) {
return new Response("this text was gzipped", {
headers: {
"Content-Encoding": "gzip"
}
});
} else if (req.url.endsWith("/stream-from-rpc")) {
let stream = await env.MyService.returnReadableStream();
return new Response(stream);
} else {
throw new Error("unknown route");
}
}
}

Expand Down Expand Up @@ -1062,7 +1069,7 @@ export let streams = {

// Send an encoded ReadableStream
{
let gzippedResp = await env.self.fetch("http://foo");
let gzippedResp = await env.self.fetch("http://foo/gzip");

let text = await env.MyService.readFromStream(gzippedResp.body);

Expand All @@ -1087,6 +1094,13 @@ export let streams = {

assert.strictEqual(await readPromise, "foo, bar, baz!");
}

// Perform an HTTP request whose response uses a ReadableStream obtained over RPC.
{
let resp = await env.self.fetch("http://foo/stream-from-rpc");

assert.strictEqual(await resp.text(), "foo, bar, baz!");
}
}
}

Expand Down

0 comments on commit 0c9056f

Please sign in to comment.