Skip to content

Commit

Permalink
Merge pull request #1131 from cloudflare/jsnell/coroutine-conversion-…
Browse files Browse the repository at this point in the history
…io-gate
  • Loading branch information
jasnell authored Sep 6, 2023
2 parents c6c5445 + c98f593 commit 5f0d006
Showing 1 changed file with 67 additions and 64 deletions.
131 changes: 67 additions & 64 deletions src/workerd/io/io-gate.c++
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ InputGate::Waiter::~Waiter() noexcept(false) {
}

kj::Promise<InputGate::Lock> InputGate::wait() {
KJ_IF_MAYBE(e, brokenState.tryGet<kj::Exception>()) {
return kj::cp(*e);
KJ_IF_SOME(e, brokenState.tryGet<kj::Exception>()) {
return kj::cp(e);
} else if (lockCount == 0) {
return Lock(*this);
} else {
Expand All @@ -55,8 +55,8 @@ kj::Promise<InputGate::Lock> InputGate::wait() {
}

kj::Promise<void> InputGate::onBroken() {
KJ_IF_MAYBE(e, brokenState.tryGet<kj::Exception>()) {
return kj::cp(*e);
KJ_IF_SOME(e, brokenState.tryGet<kj::Exception>()) {
return kj::cp(e);
} else {
return brokenPromise.addBranch();
}
Expand All @@ -66,12 +66,12 @@ InputGate::Lock::Lock(InputGate& gate)
: gate(&gate),
cs(gate.isCriticalSection
? kj::Maybe(kj::addRef(static_cast<CriticalSection&>(gate)))
: nullptr) {
: kj::none) {
InputGate* gateToLock = &gate;

KJ_IF_MAYBE(c, cs) {
if (c->get()->state == CriticalSection::REPARENTED) {
gateToLock = &c->get()->parentAsInputGate();
KJ_IF_SOME(c, cs) {
if (c.get()->state == CriticalSection::REPARENTED) {
gateToLock = &c.get()->parentAsInputGate();
}
}

Expand Down Expand Up @@ -122,7 +122,7 @@ kj::Maybe<InputGate::CriticalSection&> InputGate::Lock::getCriticalSection() {
if (gate->isCriticalSection) {
return static_cast<CriticalSection&>(*gate);
} else {
return nullptr;
return kj::none;
}
}

Expand Down Expand Up @@ -166,59 +166,62 @@ InputGate::CriticalSection::~CriticalSection() noexcept(false) {
}

kj::Promise<InputGate::Lock> InputGate::CriticalSection::wait() {
switch (state) {
case NOT_STARTED: {
state = INITIAL_WAIT;

auto& target = parentAsInputGate();
KJ_IF_MAYBE(e, target.brokenState.tryGet<kj::Exception>()) {
// Oops, we're broken.
setBroken(*e);
return kj::cp(*e);
}

// Add ourselves to this parent's child waiter list.
if (target.lockCount == 0) {
state = RUNNING;
parentLock = Lock(target);
return wait();
} else {
return kj::newAdaptedPromise<Lock, Waiter>(target, true)
.then([this](Lock lock) {
state = RUNNING;
parentLock = kj::mv(lock);
return wait();
}, [this](kj::Exception&& e) -> kj::Promise<InputGate::Lock> {
state = RUNNING;
for (;;) {
switch (state) {
case NOT_STARTED: {
state = INITIAL_WAIT;

auto& target = parentAsInputGate();
KJ_IF_SOME(e, target.brokenState.tryGet<kj::Exception>()) {
// Oops, we're broken.
setBroken(e);
return kj::mv(e);
});
}
}
case INITIAL_WAIT:
// To avoid the need for a ForkedPromise, we assume wait() is called once initially to
// get things started. This is the case in practice because any further tasks would be
// started only after some code runs under the initial lock.
KJ_FAIL_REQUIRE("CriticalSection::wait() should be called once initially");
case RUNNING:
// CriticalSection is active, so defer to InputGate implementation.
return InputGate::wait();
case REPARENTED:
// Once the CriticalSection has declared itself done, then any straggler tasks it initiated
// are adopted by the parent.
// WARNING: Don't use parentAsInputGate() here as that'll bypass the override of wait() if
// the parent is a CriticalSection itself.
KJ_SWITCH_ONEOF(parent) {
KJ_CASE_ONEOF(p, InputGate*) {
return p->wait();
kj::throwFatalException(kj::cp(e));
}
KJ_CASE_ONEOF(c, kj::Own<CriticalSection>) {
return c->wait();

// Add ourselves to this parent's child waiter list.
if (target.lockCount == 0) {
state = RUNNING;
parentLock = Lock(target);
continue;
} else {
try {
auto lock = co_await kj::newAdaptedPromise<Lock, Waiter>(target, true);
state = RUNNING;
parentLock = kj::mv(lock);
continue;
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
state = RUNNING;
setBroken(exception);
kj::throwFatalException(kj::mv(exception));
}
}
}
KJ_UNREACHABLE;
case INITIAL_WAIT:
// To avoid the need for a ForkedPromise, we assume wait() is called once initially to
// get things started. This is the case in practice because any further tasks would be
// started only after some code runs under the initial lock.
KJ_FAIL_REQUIRE("CriticalSection::wait() should be called once initially");
case RUNNING:
// CriticalSection is active, so defer to InputGate implementation.
co_return co_await InputGate::wait();
case REPARENTED:
// Once the CriticalSection has declared itself done, then any straggler tasks it initiated
// are adopted by the parent.
// WARNING: Don't use parentAsInputGate() here as that'll bypass the override of wait() if
// the parent is a CriticalSection itself.
KJ_SWITCH_ONEOF(parent) {
KJ_CASE_ONEOF(p, InputGate*) {
co_return co_await p->wait();
}
KJ_CASE_ONEOF(c, kj::Own<CriticalSection>) {
co_return co_await c->wait();
}
}
KJ_UNREACHABLE;
}
KJ_UNREACHABLE;
}
KJ_UNREACHABLE;
}

InputGate::Lock InputGate::CriticalSection::succeeded() {
Expand All @@ -242,7 +245,7 @@ InputGate::Lock InputGate::CriticalSection::succeeded() {

state = REPARENTED;
auto result = KJ_ASSERT_NONNULL(kj::mv(parentLock));
parentLock = nullptr;
parentLock = kj::none;
return result;
}

Expand Down Expand Up @@ -272,8 +275,8 @@ void InputGate::setBroken(const kj::Exception& e) {
waiter.fulfiller.reject(kj::cp(e));
waiters.remove(waiter);
}
KJ_IF_MAYBE(f, brokenState.tryGet<kj::Own<kj::PromiseFulfiller<void>>>()) {
f->get()->reject(kj::cp(e));
KJ_IF_SOME(f, brokenState.tryGet<kj::Own<kj::PromiseFulfiller<void>>>()) {
f.get()->reject(kj::cp(e));
}
brokenState = kj::cp(e);
}
Expand Down Expand Up @@ -323,8 +326,8 @@ kj::Promise<void> OutputGate::onBroken() {
KJ_REQUIRE(!brokenState.is<kj::Own<kj::PromiseFulfiller<void>>>(),
"onBroken() can only be called once");

KJ_IF_MAYBE(e, brokenState.tryGet<kj::Exception>()) {
return kj::cp(*e);
KJ_IF_SOME(e, brokenState.tryGet<kj::Exception>()) {
return kj::cp(e);
} else {
auto paf = kj::newPromiseAndFulfiller<void>();
brokenState = kj::mv(paf.fulfiller);
Expand Down Expand Up @@ -352,8 +355,8 @@ kj::Exception OutputGate::makeUnfulfilledException() {
void OutputGate::setBroken(const kj::Exception& e) {
// We assume the exception is already propagated into `pastLocksPromise`, so all we need to do
// is handle onBroken().
KJ_IF_MAYBE(f, brokenState.tryGet<kj::Own<kj::PromiseFulfiller<void>>>()) {
f->get()->reject(kj::cp(e));
KJ_IF_SOME(f, brokenState.tryGet<kj::Own<kj::PromiseFulfiller<void>>>()) {
f.get()->reject(kj::cp(e));
}
brokenState = kj::cp(e);
}
Expand Down

0 comments on commit 5f0d006

Please sign in to comment.