Skip to content

Commit

Permalink
Migrate some src/workerd/io files to use KJ_IF_SOME
Browse files Browse the repository at this point in the history
This change leaves worker.c++ as the only file remaining to be
converted in src/workerd/io. That has dangling-else issues when
converted.

Bug: EW-7618
Test: bazel test //...
  • Loading branch information
ohodson committed Sep 20, 2023
1 parent 5c975dd commit ca559b1
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 130 deletions.
38 changes: 19 additions & 19 deletions src/workerd/io/actor-cache-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,21 @@ struct ActorCacheConvenienceWrappers {
}

auto list(kj::StringPtr begin, kj::StringPtr end,
kj::Maybe<uint> limit = nullptr, ActorCache::ReadOptions options = {}) {
kj::Maybe<uint> limit = kj::none, ActorCache::ReadOptions options = {}) {
return stringifyValues(target.list(kj::str(begin), kj::str(end), limit, options));
}
auto listReverse(kj::StringPtr begin, kj::StringPtr end,
kj::Maybe<uint> limit = nullptr, ActorCache::ReadOptions options = {}) {
kj::Maybe<uint> limit = kj::none, ActorCache::ReadOptions options = {}) {
return stringifyValues(target.listReverse(kj::str(begin), kj::str(end), limit, options));
}

auto list(kj::StringPtr begin, decltype(nullptr),
kj::Maybe<uint> limit = nullptr, ActorCache::ReadOptions options = {}) {
return stringifyValues(target.list(kj::str(begin), nullptr, limit, options));
kj::Maybe<uint> limit = kj::none, ActorCache::ReadOptions options = {}) {
return stringifyValues(target.list(kj::str(begin), kj::none, limit, options));
}
auto listReverse(kj::StringPtr begin, decltype(nullptr),
kj::Maybe<uint> limit = nullptr, ActorCache::ReadOptions options = {}) {
return stringifyValues(target.listReverse(kj::str(begin), nullptr, limit, options));
kj::Maybe<uint> limit = kj::none, ActorCache::ReadOptions options = {}) {
return stringifyValues(target.listReverse(kj::str(begin), kj::none, limit, options));
}

auto put(kj::StringPtr key, kj::StringPtr value, ActorCache::WriteOptions options = {}) {
Expand Down Expand Up @@ -4210,7 +4210,7 @@ KJ_TEST("ActorCache skip cache") {

// Do an uncached list.
{
auto promise = expectUncached(test.list("bar", "qux", nullptr, {.noCache = true}));
auto promise = expectUncached(test.list("bar", "qux", kj::none, {.noCache = true}));

mockStorage->expectCall("list", ws)
.withParams(CAPNP(start = "bar", end = "qux"), "stream"_kj)
Expand Down Expand Up @@ -4238,7 +4238,7 @@ KJ_TEST("ActorCache skip cache") {

// Again, but reverse list.
{
auto promise = expectUncached(test.listReverse("bar", "qux", nullptr, {.noCache = true}));
auto promise = expectUncached(test.listReverse("bar", "qux", kj::none, {.noCache = true}));

mockStorage->expectCall("list", ws)
.withParams(CAPNP(start = "bar", end = "qux", reverse = true), "stream"_kj)
Expand Down Expand Up @@ -4881,7 +4881,7 @@ KJ_TEST("ActorCache alarm get/put") {

{
// Test clearing alarm
test.setAlarm(nullptr);
test.setAlarm(kj::none);

// When there are no other storage operations to be flushed, alarm modifications can be flushed
// without a wrapping txn.
Expand Down Expand Up @@ -5087,16 +5087,16 @@ KJ_TEST("ActorCache can wait for flush") {
scheduledPromise.wait(ws);
inFlightPromise.wait(ws);

KJ_IF_MAYBE(secondOperation, maybeSecondOperation) {
KJ_IF_SOME(secondOperation, maybeSecondOperation) {
// This promise is for a later flush, so it should not have resolved yet.
KJ_ASSERT(!secondOperation->scheduledPromise.poll(ws));
KJ_ASSERT(!secondOperation.scheduledPromise.poll(ws));

// Finish our secondary put and observe the second flush resolving.
auto params =
kj::str(R"((entries = [(key = ")", secondOperation->key, R"(", value = "bar")]))");
kj::str(R"((entries = [(key = ")", secondOperation.key, R"(", value = "bar")]))");
mockStorage->expectCall("put", ws).withParams(params).thenReturn(CAPNP());

secondOperation->scheduledPromise.wait(ws);
secondOperation.scheduledPromise.wait(ws);
}

// We finished our flush, nothing left to do.
Expand Down Expand Up @@ -5270,10 +5270,10 @@ KJ_TEST("ActorCache can shutdown") {
KJ_EXPECT_THROW_MESSAGE(errorMessage, shutdownPromise.wait(ws));
KJ_EXPECT(test.cache.onNoPendingFlush() == nullptr);
KJ_EXPECT_THROW_MESSAGE(errorMessage, test.gate.wait().wait(ws));
} else KJ_IF_MAYBE(promise, maybeShutdownPromise) {
} else KJ_IF_SOME(promise, maybeShutdownPromise) {
// The in-flight flush should resolve cleanly without any follow on or breaking the output
// gate.
promise->wait(ws);
promise.wait(ws);
KJ_EXPECT(test.cache.onNoPendingFlush() == nullptr);
test.gate.wait().wait(ws);
}
Expand All @@ -5294,14 +5294,14 @@ KJ_TEST("ActorCache can shutdown") {
};

auto verify = [&](auto&& beforeShutdown, auto&& afterShutdown) {
verifyWithOptions(beforeShutdown, afterShutdown, {.maybeError = nullptr});
verifyWithOptions(beforeShutdown, afterShutdown, {.maybeError = kj::none});
verifyWithOptions(beforeShutdown, afterShutdown, {.maybeError = KJ_EXCEPTION(FAILED, "Nope.")});
};

verify([](ActorCacheTest& test){
// Do nothing and expect nothing!
return BeforeShutdownResult{
.maybeReq = nullptr,
.maybeReq = kj::none,
.shouldBreakOutputGate = false,
};
}, [](ActorCacheTest& test, kj::Maybe<InFlightRequest>){
Expand All @@ -5315,7 +5315,7 @@ KJ_TEST("ActorCache can shutdown") {

// Expect the put to be cancelled and break the gate.
return BeforeShutdownResult{
.maybeReq = nullptr,
.maybeReq = kj::none,
.shouldBreakOutputGate = true,
};
}, [](ActorCacheTest& test, kj::Maybe<InFlightRequest>){
Expand All @@ -5329,7 +5329,7 @@ KJ_TEST("ActorCache can shutdown") {

// Expect the put to be cancelled and break the gate.
return BeforeShutdownResult{
.maybeReq = nullptr,
.maybeReq = kj::none,
.shouldBreakOutputGate = true,
};
}, [](ActorCacheTest& test, kj::Maybe<InFlightRequest>){
Expand Down
36 changes: 18 additions & 18 deletions src/workerd/io/actor-sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ ActorSqlite::ImplicitTxn::ImplicitTxn(ActorSqlite& parent)
parent.currentTxn = this;
}
ActorSqlite::ImplicitTxn::~ImplicitTxn() noexcept(false) {
KJ_IF_MAYBE(c, parent.currentTxn.tryGet<ImplicitTxn*>()) {
if (*c == this) {
KJ_IF_SOME(c, parent.currentTxn.tryGet<ImplicitTxn*>()) {
if (c == this) {
parent.currentTxn.init<NoTxn>();
}
}
if (!committed && parent.broken == nullptr) {
if (!committed && parent.broken == kj::none) {
// Failed to commit, so roll back.
//
// This should only happen in cases of catastrophic error. Since this is rarely actually
Expand Down Expand Up @@ -81,9 +81,9 @@ ActorSqlite::ExplicitTxn::~ExplicitTxn() noexcept(false) {
KJ_ASSERT(!hasChild);
auto cur = KJ_ASSERT_NONNULL(actorSqlite.currentTxn.tryGet<ExplicitTxn*>());
KJ_ASSERT(cur == this);
KJ_IF_MAYBE(p, parent) {
p->get()->hasChild = false;
actorSqlite.currentTxn = p->get();
KJ_IF_SOME(p, parent) {
p.get()->hasChild = false;
actorSqlite.currentTxn = p.get();
} else {
actorSqlite.currentTxn.init<NoTxn>();
}
Expand All @@ -103,15 +103,15 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::ExplicitTxn::commit() {
kj::str("RELEASE _cf_savepoint_", depth));
committed = true;

if (parent == nullptr) {
if (parent == kj::none) {
// We committed the root transaction, so it's time to signal any replication layer and lock
// the output gate in the meantime.
actorSqlite.commitTasks.add(
actorSqlite.outputGate.lockWhile(actorSqlite.commitCallback()));
}

// No backpressure for SQLite.
return nullptr;
return kj::none;
}

kj::Promise<void> ActorSqlite::ExplicitTxn::rollback() {
Expand Down Expand Up @@ -166,14 +166,14 @@ void ActorSqlite::taskFailed(kj::Exception&& exception) {
// The output gate should already have been broken since it wraps all commits tasks. So, we
// don't have to report anything here, the exception will already propagate elsewhere. We
// should block further operations, though.
if (broken == nullptr) {
if (broken == kj::none) {
broken = kj::mv(exception);
}
}

void ActorSqlite::requireNotBroken() {
KJ_IF_MAYBE(e, broken) {
kj::throwFatalException(kj::cp(*e));
KJ_IF_SOME(e, broken) {
kj::throwFatalException(kj::cp(e));
}
}

Expand Down Expand Up @@ -245,7 +245,7 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::put(Key key, Value value, WriteOptions
requireNotBroken();

kv.put(key, value);
return nullptr;
return kj::none;
}

kj::Maybe<kj::Promise<void>> ActorSqlite::put(
Expand All @@ -255,7 +255,7 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::put(
for (auto& pair: pairs) {
kv.put(pair.key, pair.value);
}
return nullptr;
return kj::none;
}

kj::OneOf<bool, kj::Promise<bool>> ActorSqlite::delete_(Key key, WriteOptions options) {
Expand Down Expand Up @@ -293,24 +293,24 @@ ActorCacheInterface::DeleteAllResults ActorSqlite::deleteAll(WriteOptions option

uint count = kv.deleteAll();
return {
.backpressure = nullptr,
.backpressure = kj::none,
.count = count,
};
}

kj::Maybe<kj::Promise<void>> ActorSqlite::evictStale(kj::Date now) {
// This implementation never needs to apply backpressure.
return nullptr;
return kj::none;
}

void ActorSqlite::shutdown(kj::Maybe<const kj::Exception&> maybeException) {
// TODO(cleanup): Logic copied from ActorCache::shutdown(). Should they share somehow?

if (broken == nullptr) {
if (broken == kj::none) {
auto exception = [&]() {
KJ_IF_MAYBE(e, maybeException) {
KJ_IF_SOME(e, maybeException) {
// We were given an exception, use it.
return kj::cp(*e);
return kj::cp(e);
}

// Use the direct constructor so that we can reuse the constexpr message variable for testing.
Expand Down
42 changes: 21 additions & 21 deletions src/workerd/io/hibernation-manager.c++
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ kj::Vector<jsg::Ref<api::WebSocket>> HibernationManagerImpl::getWebSockets(
jsg::Lock& js,
kj::Maybe<kj::StringPtr> maybeTag) {
kj::Vector<jsg::Ref<api::WebSocket>> matches;
KJ_IF_MAYBE(tag, maybeTag) {
KJ_IF_MAYBE(item, tagToWs.find(*tag)) {
auto& list = *((*item)->list);
KJ_IF_SOME(tag, maybeTag) {
KJ_IF_SOME(item, tagToWs.find(tag)) {
auto& list = *((item)->list);
for (auto& entry: list) {
auto& hibWS = KJ_REQUIRE_NONNULL(entry.hibWS);
matches.add(hibWS.getActiveOrUnhibernate(js));
Expand All @@ -104,14 +104,14 @@ void HibernationManagerImpl::setWebSocketAutoResponse(
}

void HibernationManagerImpl::unsetWebSocketAutoResponse() {
autoResponsePair = nullptr;
autoResponsePair = kj::none;
}

kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> HibernationManagerImpl::getWebSocketAutoResponse() {
KJ_IF_MAYBE(ar, autoResponsePair) {
return ar->addRef();
KJ_IF_SOME(ar, autoResponsePair) {
return ar.addRef();
} else {
return nullptr;
return kj::none;
}
}

Expand All @@ -124,11 +124,11 @@ void HibernationManagerImpl::hibernateWebSockets(Worker::Lock& lock) {
js.withinHandleScope([&] {
js.enterContextScope(lock.getContext());
for (auto& ws : allWs) {
KJ_IF_MAYBE(active, ws->activeOrPackage.tryGet<jsg::Ref<api::WebSocket>>()) {
KJ_IF_SOME(active, ws->activeOrPackage.tryGet<jsg::Ref<api::WebSocket>>()) {
// Transfers ownership of properties from api::WebSocket to HibernatableWebSocket via the
// HibernationPackage.
ws->activeOrPackage.init<api::WebSocket::HibernationPackage>(
active->get()->buildPackageForHibernation());
active.get()->buildPackageForHibernation());
}
}
});
Expand All @@ -146,12 +146,12 @@ inline void HibernationManagerImpl::removeFromAllWs(HibernatableWebSocket& hib)
kj::Promise<void> HibernationManagerImpl::handleSocketTermination(
HibernatableWebSocket& hib, kj::Maybe<kj::Exception>& maybeError) {
kj::Maybe<kj::Promise<void>> event;
KJ_IF_MAYBE(error, maybeError) {
KJ_IF_SOME(error, maybeError) {

auto websocketId = randomUUID(nullptr);
auto websocketId = randomUUID(kj::none);
webSocketsForEventHandler.insert(kj::str(websocketId), &hib);
kj::Maybe<api::HibernatableSocketParams> params;
if (!hib.hasDispatchedClose && (error->getType() == kj::Exception::Type::DISCONNECTED)) {
if (!hib.hasDispatchedClose && (error.getType() == kj::Exception::Type::DISCONNECTED)) {
// If premature disconnect/cancel, dispatch a close event if we haven't already.
hib.hasDispatchedClose = true;
params = api::HibernatableSocketParams(
Expand All @@ -161,7 +161,7 @@ kj::Promise<void> HibernationManagerImpl::handleSocketTermination(
kj::mv(websocketId));
} else {
// Otherwise, we need to dispatch an error event!
params = api::HibernatableSocketParams(kj::mv(*error), kj::mv(websocketId));
params = api::HibernatableSocketParams(kj::mv(error), kj::mv(websocketId));
}
// Dispatch the event.
auto workerInterface = loopback->getWorker(IoChannelFactory::SubrequestMetadata{});
Expand All @@ -172,8 +172,8 @@ kj::Promise<void> HibernationManagerImpl::handleSocketTermination(

// Returning the event promise will store it in readLoopTasks.
// After the task completes, we want to drop the websocket since we've closed the connection.
KJ_IF_MAYBE(promise, event) {
co_await *promise;
KJ_IF_SOME(promise, event) {
co_await promise;
}

dropHibernatableWebSocket(hib);
Expand All @@ -188,10 +188,10 @@ kj::Promise<void> HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) {

auto skip = false;

KJ_IF_MAYBE (reqResp, autoResponsePair) {
KJ_IF_SOME (reqResp, autoResponsePair) {
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(text, kj::String) {
if (text == (*reqResp)->getRequest()) {
if (text == (reqResp)->getRequest()) {
// If the received message matches the one set for auto-response, we must
// short-circuit readLoop, store the current timestamp and and automatically respond
// with the expected response.
Expand All @@ -202,12 +202,12 @@ kj::Promise<void> HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) {
// We'll store the current timestamp in the HibernatableWebSocket to assure it gets
// stored even if the WebSocket is currently hibernating. In that scenario, the timestamp
// value will be loaded into the WebSocket during unhibernation.
KJ_IF_MAYBE(active, hib.activeOrPackage.tryGet<jsg::Ref<api::WebSocket>>()) {
KJ_IF_SOME(active, hib.activeOrPackage.tryGet<jsg::Ref<api::WebSocket>>()) {
// If the actor is not hibernated/If the WebSocket is active, we need to update
// autoResponseTimestamp on the active websocket.
(*active)->setAutoResponseTimestamp(hib.autoResponseTimestamp);
(active)->setAutoResponseTimestamp(hib.autoResponseTimestamp);
}
ws.send((*reqResp)->getResponse().asArray());
ws.send((reqResp)->getResponse().asArray());
skip = true;
// If we've sent an auto response message, we should not unhibernate or deliver the
// received message to the actor
Expand All @@ -221,7 +221,7 @@ kj::Promise<void> HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) {
continue;
}

auto websocketId = randomUUID(nullptr);
auto websocketId = randomUUID(kj::none);
webSocketsForEventHandler.insert(kj::str(websocketId), &hib);

// Build the event params depending on what type of message we got.
Expand Down
10 changes: 5 additions & 5 deletions src/workerd/io/hibernation-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
// This removal is fast because we have direct access to each kj::List, as well as direct
// access to each TagListItem we want to remove.
for (auto& item: tagItems) {
KJ_IF_MAYBE(list, item.list) {
KJ_IF_SOME(list, item.list) {
// The list reference is non-null, so we still have a valid reference to this
// TagListItem in the list, which we will now remove.
list->remove(item);
if (list->empty()) {
list.remove(item);
if (list.empty()) {
// Remove the bucket in tagToWs if the tag has no more websockets.
manager.tagToWs.erase(kj::mv(item.tag));
}
Expand All @@ -107,9 +107,9 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
// we have to unhibernate it first. The process moves values from the HibernatableWebSocket
// to the api::WebSocket.
jsg::Ref<api::WebSocket> getActiveOrUnhibernate(jsg::Lock& js) {
KJ_IF_MAYBE(package, activeOrPackage.tryGet<api::WebSocket::HibernationPackage>()) {
KJ_IF_SOME(package, activeOrPackage.tryGet<api::WebSocket::HibernationPackage>()) {
activeOrPackage.init<jsg::Ref<api::WebSocket>>(
api::WebSocket::hibernatableFromNative(js, *KJ_REQUIRE_NONNULL(ws), kj::mv(*package))
api::WebSocket::hibernatableFromNative(js, *KJ_REQUIRE_NONNULL(ws), kj::mv(package))
)->setAutoResponseTimestamp(autoResponseTimestamp);
// Now that we unhibernated the WebSocket, we can set the last received autoResponse timestamp
// that was stored in the corresponding HibernatableWebSocket.
Expand Down
Loading

0 comments on commit ca559b1

Please sign in to comment.