Skip to content

Commit

Permalink
JSRPC: waitUntil() tasks should run even if the client cancels the …
Browse files Browse the repository at this point in the history
…RPC session.
  • Loading branch information
kentonv committed Mar 27, 2024
1 parent 0c9056f commit 9e9dea7
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
33 changes: 28 additions & 5 deletions src/workerd/api/tests/js-rpc-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ export let nonClass = {
// to fail).
let globalRpcPromise;

// Promise initialized by testWaitUntil() and then resolved shortly later, in a waitUntil task.
let globalWaitUntilPromise;

export class MyService extends WorkerEntrypoint {
constructor(ctx, env) {
super(ctx, env);
Expand Down Expand Up @@ -354,6 +357,19 @@ export class MyService extends WorkerEntrypoint {
cf: {foo: 123, bar: "def"},
});
}

testWaitUntil() {
// Initialize globalWaitUntilPromise to a promise that will be resolved in a waitUntil task
// later on. We'll perform a cross-context wait to verify that the waitUntil task actually
// completes and resolves the promise.
let resolve;
globalWaitUntilPromise = new Promise(r => { resolve = r; });

this.ctx.waitUntil((async () => {
await scheduler.wait(100);
resolve();
})());
}
}

export class MyActor extends DurableObject {
Expand Down Expand Up @@ -796,11 +812,8 @@ export let disposal = {

// If we abort the server's I/O context, though, then the counter is disposed.
await assert.rejects(obj.abort(), {
// TODO(someday): This ought to propagate the abort exception, but that requires a bunch
// more work...
name: "Error",
message: "The destination execution context for this RPC was canceled while the " +
"call was still running."
name: "RangeError",
message: "foo bar abort reason"
});

await counter.onDisposed();
Expand Down Expand Up @@ -859,6 +872,16 @@ export let crossContextSharingDoesntWork = {
},
}

export let waitUntilWorks = {
async test(controller, env, ctx) {
globalWaitUntilPromise = null;
await env.MyService.testWaitUntil();

assert.strictEqual(globalWaitUntilPromise instanceof Promise, true);
await globalWaitUntilPromise;
}
}

function stripDispose(obj) {
assert.deepEqual(!!obj[Symbol.dispose], true);
delete obj[Symbol.dispose];
Expand Down
11 changes: 8 additions & 3 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1686,11 +1686,16 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEventImpl::r
mapAddRef(incomingRequest->getWorkerTracer())),
kj::refcounted<ServerTopLevelMembrane>(kj::mv(doneFulfiller))));

KJ_DEFER({
// waitUntil() should allow extending execution on the server side even when the client
// disconnects.
waitUntilTasks.add(incomingRequest->drain().attach(kj::mv(incomingRequest)));
});

// `donePromise` resolves once there are no longer any capabilities pointing between the client
// and server as part of this session.
co_await donePromise
.then([&ir = *incomingRequest]() { return ir.drain(); })
.exclusiveJoin(ioctx.onAbort());
co_await donePromise.exclusiveJoin(ioctx.onAbort());

co_return WorkerInterface::CustomEvent::Result {
.outcome = EventOutcome::OK
};
Expand Down

0 comments on commit 9e9dea7

Please sign in to comment.