Skip to content

Commit

Permalink
Implement cross-request promise waits using promise context tagging
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Jun 26, 2023
1 parent cb8554d commit c7d3b47
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 2 deletions.
13 changes: 12 additions & 1 deletion src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,8 @@ public:
threadScope(context),
workerLock(*context.worker, lockType),
handleScope(workerLock.getIsolate()),
jsContextScope(workerLock.getContext()) {
jsContextScope(workerLock.getContext()),
promiseContextScope(workerLock.getIsolate(), context.getPromiseContextTag(workerLock)) {
KJ_REQUIRE(context.currentInputLock == nullptr);
KJ_REQUIRE(context.currentLock == nullptr);
context.currentInputLock = kj::mv(inputLock);
Expand Down Expand Up @@ -1125,6 +1126,7 @@ private:
Worker::Lock workerLock;
v8::HandleScope handleScope;
v8::Context::Scope jsContextScope;
v8::Isolate::PromiseContextScope promiseContextScope;
};

void IoContext::runImpl(Runnable& runnable, bool takePendingEvent,
Expand Down Expand Up @@ -1285,6 +1287,8 @@ void IoContext::runFinalizers(Worker::AsyncLock& asyncLock) {
RunnableImpl runnable(*this, kj::mv(warnings));
runImpl(runnable, false, asyncLock, nullptr, true);
}

promiseContextTag = nullptr;
}

#ifdef KJ_DEBUG
Expand Down Expand Up @@ -1409,4 +1413,11 @@ void IoContext::requireCurrentOrThrowJs() {
"of Cloudflare Workers which allows us to improve overall performance.");
}

v8::Local<v8::Object> IoContext::getPromiseContextTag(jsg::Lock& js) {
if (promiseContextTag == nullptr) {
promiseContextTag = js.v8Ref(v8::Object::New(js.v8Isolate));
}
return KJ_REQUIRE_NONNULL(promiseContextTag).getHandle(js);
}

} // namespace workerd
4 changes: 4 additions & 0 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler

void writeLogfwdr(uint channel, kj::FunctionParam<void(capnp::AnyPointer::Builder)> buildMessage);

v8::Local<v8::Object> getPromiseContextTag(jsg::Lock& js);

private:
ThreadContext& thread;

Expand Down Expand Up @@ -1004,6 +1006,8 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
void requireCurrent();
void checkFarGet(const DeleteQueue* expectedQueue);

kj::Maybe<jsg::V8Ref<v8::Object>> promiseContextTag;

class Runnable {
public:
virtual void run(Worker::Lock& lock) = 0;
Expand Down
70 changes: 70 additions & 0 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,45 @@ kj::Maybe<kj::String> makeCompatJson(kj::ArrayPtr<kj::StringPtr> enableFlags) {
return kj::String(json.releaseAsArray());
}

jsg::Promise<void> addCrossThreadPromiseWaiter(jsg::Lock& js,
v8::Local<v8::Promise>& promise) {
// When a promise is created in a different IoContext, we need to use a
// kj::CrossThreadFulfiller in order to wait on it. The Waiter instance will
// be held on the Promise itself, and will be fulfilled/rejected when the
// promise is resolved or rejected. This will signal all of the waiters
// from other IoContexts.

auto waiter = kj::newPromiseAndCrossThreadFulfiller<void>();

struct Waiter: public kj::Refcounted {
kj::Maybe<kj::Own<kj::CrossThreadPromiseFulfiller<void>>> fulfiller;
void done() {
KJ_IF_MAYBE(f, fulfiller) {
// Done this way so that the fulfiller is released as soon as possible
// when done as the JS promise may not clean up reactions right away.
(*f)->fulfill();
fulfiller = nullptr;
}
}
Waiter(kj::Own<kj::CrossThreadPromiseFulfiller<void>> fulfiller)
: fulfiller(kj::mv(fulfiller)) {}
};

auto fulfiller = kj::refcounted<Waiter>(kj::mv(waiter.fulfiller));

auto onSuccess = [waiter=kj::addRef(*fulfiller)](jsg::Lock& js, jsg::Value value) mutable {
waiter->done();
};

auto onFailure = [waiter=kj::mv(fulfiller)](jsg::Lock& js, jsg::Value exception) mutable {
waiter->done();
};

js.toPromise(promise).then(js, kj::mv(onSuccess), kj::mv(onFailure));

return IoContext::current().awaitIo(js, kj::mv(waiter.promise));
}

} // namespace

Worker::Isolate::Isolate(kj::Own<ApiIsolate> apiIsolateParam,
Expand Down Expand Up @@ -1060,6 +1099,37 @@ Worker::Isolate::Isolate(kj::Own<ApiIsolate> apiIsolateParam,
}
}
});

// The PromiseCrossContextCallback is used to allow cross-IoContext promise following.
// When the IoContext::Scope is entered, we set the "promise context tag" associated
// with the IoContext on the Isolate that is locked. Any Promise that is created within
// that scope will be tagged with the same promise context tag. When an attempt to
// follow a promise occurs (e.g. either using Promise.prototype.then() or await, etc)
// our patched v8 logic will check to see if the followed promise's tag matches the
// current Isolate tag. If they do not, then v8 will invoke this callback. The promise
// here is the promise that belongs to a different IoContext.
lock->v8Isolate->SetPromiseCrossContextCallback([](v8::Local<v8::Context> context,
v8::Local<v8::Promise> promise,
v8::Local<v8::Object> tag) ->
v8::MaybeLocal<v8::Promise> {
auto& js = jsg::Lock::from(context->GetIsolate());

// Generally this condition is only going to happen when using dynamic imports.
// It should not be common.
JSG_REQUIRE(IoContext::hasCurrent(), Error,
"Unable to wait on a promise created within a request when not running within a "
"request.");

return js.wrapSimplePromise(addCrossThreadPromiseWaiter(js, promise).then(js,
[promise=js.v8Ref(promise.As<v8::Value>())](auto& js) mutable {
// Once the waiter has been resolved, return the now settled promise.
// Since the promise has been settled, it is now safe to access from
// other requests. Note that the resolved value of the promise still
// might not be safe to access! (e.g. if it contains any IoOwns attached
// to the other request IoContext).
return kj::mv(promise);
}));
});
}

Worker::Script::Script(kj::Own<const Isolate> isolateParam, kj::StringPtr id,
Expand Down
4 changes: 3 additions & 1 deletion src/workerd/jsg/jsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -1847,7 +1847,6 @@ class Lock {
Promise<T> rejectedPromise(jsg::Value exception);
template <typename T>
Promise<T> rejectedPromise(kj::Exception&& exception);
// Construct an immediately-rejected promise throwing the given exception.

template <class Func>
PromiseForResult<Func, void, false> evalNow(Func&& func);
Expand Down Expand Up @@ -1919,11 +1918,14 @@ class Lock {
// TODO(later): See if we can easily combine wrapSimpleFunction and wrapReturningFunction
// into one.

virtual v8::Local<v8::Promise> wrapSimplePromise(Promise<Value> promise) = 0;

bool toBool(v8::Local<v8::Value> value);
virtual kj::String toString(v8::Local<v8::Value> value) = 0;
virtual jsg::Dict<v8::Local<v8::Value>> toDict(v8::Local<v8::Value> value) = 0;
// Convenience methods to unwrap various types of V8 values. All of these could be done manually
// via the V8 API, but these methods are much easier.
virtual Promise<Value> toPromise(v8::Local<v8::Promise> promise) = 0;

// ---------------------------------------------------------------------------
// Setup stuff
Expand Down
7 changes: 7 additions & 0 deletions src/workerd/jsg/setup.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,13 @@ class Isolate: public IsolateBase {
return jsgIsolate.wrapper->template unwrap<jsg::Dict<v8::Local<v8::Value>>>(
v8Isolate->GetCurrentContext(), value, jsg::TypeErrorContext::other());
}
v8::Local<v8::Promise> wrapSimplePromise(jsg::Promise<jsg::Value> promise) override {
return jsgIsolate.wrapper->wrap(v8Context(), nullptr, kj::mv(promise));
}
jsg::Promise<jsg::Value> toPromise(v8::Local<v8::Promise> promise) override {
return jsgIsolate.wrapper->template unwrap<jsg::Promise<jsg::Value>>(
v8Isolate->GetCurrentContext(), promise, jsg::TypeErrorContext::other());
}

template <typename T, typename... Args>
JsContext<T> newContext(Args&&... args) {
Expand Down

0 comments on commit c7d3b47

Please sign in to comment.