Skip to content

Commit

Permalink
Coroutines Conversion: update io/io-gate.c++ wait()
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Sep 6, 2023
1 parent e0c00a7 commit c98f593
Showing 1 changed file with 51 additions and 48 deletions.
99 changes: 51 additions & 48 deletions src/workerd/io/io-gate.c++
Original file line number Diff line number Diff line change
Expand Up @@ -166,59 +166,62 @@ InputGate::CriticalSection::~CriticalSection() noexcept(false) {
}

kj::Promise<InputGate::Lock> InputGate::CriticalSection::wait() {
switch (state) {
case NOT_STARTED: {
state = INITIAL_WAIT;

auto& target = parentAsInputGate();
KJ_IF_SOME(e, target.brokenState.tryGet<kj::Exception>()) {
// Oops, we're broken.
setBroken(e);
return kj::cp(e);
}

// Add ourselves to this parent's child waiter list.
if (target.lockCount == 0) {
state = RUNNING;
parentLock = Lock(target);
return wait();
} else {
return kj::newAdaptedPromise<Lock, Waiter>(target, true)
.then([this](Lock lock) {
state = RUNNING;
parentLock = kj::mv(lock);
return wait();
}, [this](kj::Exception&& e) -> kj::Promise<InputGate::Lock> {
state = RUNNING;
for (;;) {
switch (state) {
case NOT_STARTED: {
state = INITIAL_WAIT;

auto& target = parentAsInputGate();
KJ_IF_SOME(e, target.brokenState.tryGet<kj::Exception>()) {
// Oops, we're broken.
setBroken(e);
return kj::mv(e);
});
}
}
case INITIAL_WAIT:
// To avoid the need for a ForkedPromise, we assume wait() is called once initially to
// get things started. This is the case in practice because any further tasks would be
// started only after some code runs under the initial lock.
KJ_FAIL_REQUIRE("CriticalSection::wait() should be called once initially");
case RUNNING:
// CriticalSection is active, so defer to InputGate implementation.
return InputGate::wait();
case REPARENTED:
// Once the CriticalSection has declared itself done, then any straggler tasks it initiated
// are adopted by the parent.
// WARNING: Don't use parentAsInputGate() here as that'll bypass the override of wait() if
// the parent is a CriticalSection itself.
KJ_SWITCH_ONEOF(parent) {
KJ_CASE_ONEOF(p, InputGate*) {
return p->wait();
kj::throwFatalException(kj::cp(e));
}
KJ_CASE_ONEOF(c, kj::Own<CriticalSection>) {
return c->wait();

// Add ourselves to this parent's child waiter list.
if (target.lockCount == 0) {
state = RUNNING;
parentLock = Lock(target);
continue;
} else {
try {
auto lock = co_await kj::newAdaptedPromise<Lock, Waiter>(target, true);
state = RUNNING;
parentLock = kj::mv(lock);
continue;
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
state = RUNNING;
setBroken(exception);
kj::throwFatalException(kj::mv(exception));
}
}
}
KJ_UNREACHABLE;
case INITIAL_WAIT:
// To avoid the need for a ForkedPromise, we assume wait() is called once initially to
// get things started. This is the case in practice because any further tasks would be
// started only after some code runs under the initial lock.
KJ_FAIL_REQUIRE("CriticalSection::wait() should be called once initially");
case RUNNING:
// CriticalSection is active, so defer to InputGate implementation.
co_return co_await InputGate::wait();
case REPARENTED:
// Once the CriticalSection has declared itself done, then any straggler tasks it initiated
// are adopted by the parent.
// WARNING: Don't use parentAsInputGate() here as that'll bypass the override of wait() if
// the parent is a CriticalSection itself.
KJ_SWITCH_ONEOF(parent) {
KJ_CASE_ONEOF(p, InputGate*) {
co_return co_await p->wait();
}
KJ_CASE_ONEOF(c, kj::Own<CriticalSection>) {
co_return co_await c->wait();
}
}
KJ_UNREACHABLE;
}
KJ_UNREACHABLE;
}
KJ_UNREACHABLE;
}

InputGate::Lock InputGate::CriticalSection::succeeded() {
Expand Down

0 comments on commit c98f593

Please sign in to comment.