Skip to content

Commit

Permalink
Move workerd/api/actor-state.c++ and actor.c++ to KJ_IF_SOME
Browse files Browse the repository at this point in the history
Bug: EW-7618
Test: bazel test //...
  • Loading branch information
ohodson committed Sep 23, 2023
1 parent 2e0ee4f commit c56cdae
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 77 deletions.
144 changes: 72 additions & 72 deletions src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ uint32_t billingUnits(size_t bytes, BillAtLeastOne billAtLeastOne = BillAtLeastO

jsg::JsValue deserializeMaybeV8Value(
jsg::Lock& js, kj::ArrayPtr<const char> key, kj::Maybe<kj::ArrayPtr<const kj::byte>> buf) {
KJ_IF_MAYBE(b, buf) {
return deserializeV8Value(js, key, *b);
KJ_IF_SOME(b, buf) {
return deserializeV8Value(js, key, b);
} else {
return js.undefined();
}
Expand Down Expand Up @@ -98,15 +98,15 @@ template <typename Options>
jsg::Promise<void> transformMaybeBackpressure(
jsg::Lock& js, const Options& options,
kj::Maybe<kj::Promise<void>> maybeBackpressure) {
KJ_IF_MAYBE(backpressure, maybeBackpressure) {
KJ_IF_SOME(backpressure, maybeBackpressure) {
// Note: In practice `allowConcurrency` will have no effect on a backpressure promise since
// backpressure blocks everything anyway, but we pass the option through for consistency in
// case of future changes.
auto& context = IoContext::current();
if (options.allowConcurrency.orDefault(false)) {
return context.awaitIo(js, kj::mv(*backpressure));
return context.awaitIo(js, kj::mv(backpressure));
} else {
return context.awaitIoWithInputLock(js, kj::mv(*backpressure), [](jsg::Lock&) {});
return context.awaitIoWithInputLock(js, kj::mv(backpressure), [](jsg::Lock&) {});
}
} else {
return js.resolvedPromise();
Expand Down Expand Up @@ -236,8 +236,8 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> DurableObjectStorageOperations::getOne(
return transformCacheResultWithCacheStatus(js, kj::mv(result), options,
[key = kj::mv(key)](jsg::Lock& js, kj::Maybe<ActorCacheOps::Value> value, bool cached) {
uint32_t units = 1;
KJ_IF_MAYBE(v, value) {
units = billingUnits(v->size());
KJ_IF_SOME(v, value) {
units = billingUnits(v.size());
}
auto& actorMetrics = currentActorMetrics();
if (cached) {
Expand Down Expand Up @@ -280,21 +280,21 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> DurableObjectStorageOperations::list(
return js.resolvedPromise(jsg::JsValue(js.map()).addRef(js));
};

KJ_IF_MAYBE(o, maybeOptions) {
KJ_IF_MAYBE(s, o->start) {
KJ_IF_MAYBE(sa, o->startAfter) {
KJ_IF_SOME(o, maybeOptions) {
KJ_IF_SOME(s, o.start) {
if (o.startAfter != kj::none) {
KJ_FAIL_REQUIRE("jsg.TypeError: list() cannot be called with both start and startAfter values.");
}
start = kj::mv(*s);
start = kj::mv(s);
}
KJ_IF_MAYBE(sks, o->startAfter) {
KJ_IF_SOME(sks, o.startAfter) {
// Convert an exclusive startAfter into an inclusive start key here so that the implementation
// doesn't need to handle both. This can be done simply by adding two NULL bytes. One to the end of
// the startAfter and another to set the start key after startAfter.
auto startAfterKey = kj::heapArray<char>(sks->size() + 2);
auto startAfterKey = kj::heapArray<char>(sks.size() + 2);

// Copy over the original string.
memcpy(startAfterKey.begin(), sks->begin(), sks->size());
memcpy(startAfterKey.begin(), sks.begin(), sks.size());
// Add one additional null byte to set the new start as the key immediately
// after startAfter. This looks a little sketchy to be doing with strings rather
// than arrays, but kj::String explicitly allows for NULL bytes inside of strings.
Expand All @@ -304,32 +304,32 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> DurableObjectStorageOperations::list(
startAfterKey[startAfterKey.size()-1] = '\0';
start = kj::String(kj::mv(startAfterKey));
}
KJ_IF_MAYBE(e, o->end) {
end = kj::mv(*e);
KJ_IF_SOME(e, o.end) {
end = kj::mv(e);
}
KJ_IF_MAYBE(r, o->reverse) {
reverse = *r;
KJ_IF_SOME(r, o.reverse) {
reverse = r;
}
KJ_IF_MAYBE(l, o->limit) {
JSG_REQUIRE(*l > 0, TypeError, "List limit must be positive.");
limit = *l;
KJ_IF_SOME(l, o.limit) {
JSG_REQUIRE(l > 0, TypeError, "List limit must be positive.");
limit = l;
}
KJ_IF_MAYBE(prefix, o->prefix) {
KJ_IF_SOME(prefix, o.prefix) {
// Let's clamp `start` and `end` to include only keys with the given prefix.
if (prefix->size() > 0) {
if (start < *prefix) {
if (prefix.size() > 0) {
if (start < prefix) {
// `start` is before `prefix`, so listing should actually start at `prefix`.
start = kj::str(*prefix);
} else if (start.startsWith(*prefix)) {
start = kj::str(prefix);
} else if (start.startsWith(prefix)) {
// `start` is within the prefix, so need not be modified.
} else {
// `start` comes after the last value with the prefix, so there's no overlap.
return makeEmptyResult();
}

// Calculate the first key that sorts after all keys with the given prefix.
kj::Vector<char> keyAfterPrefix(prefix->size());
keyAfterPrefix.addAll(*prefix);
kj::Vector<char> keyAfterPrefix(prefix.size());
keyAfterPrefix.addAll(prefix);
while (!keyAfterPrefix.empty() && (byte)keyAfterPrefix.back() == 0xff) {
keyAfterPrefix.removeLast();
}
Expand All @@ -342,11 +342,11 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> DurableObjectStorageOperations::list(
keyAfterPrefix.add('\0');
auto keyAfterPrefixStr = kj::String(keyAfterPrefix.releaseAsArray());

KJ_IF_MAYBE(e, end) {
if (*e <= *prefix) {
KJ_IF_SOME(e, end) {
if (e <= prefix) {
// No keys could possibly match both the end and the prefix.
return makeEmptyResult();
} else if (e->startsWith(*prefix)) {
} else if (e.startsWith(prefix)) {
// `end` is within the prefix, so need not be modified.
} else {
// `end` comes after all keys with the prefix, so we should stop at the end of the
Expand All @@ -362,8 +362,8 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> DurableObjectStorageOperations::list(
}
}

KJ_IF_MAYBE(e, end) {
if (*e <= start) {
KJ_IF_SOME(e, end) {
if (e <= start) {
// Key range is empty.
return makeEmptyResult();
}
Expand Down Expand Up @@ -391,16 +391,16 @@ jsg::Promise<void> DurableObjectStorageOperations::put(
auto options = configureOptions(kj::mv(maybeOptions).orDefault(PutOptions{}));
KJ_SWITCH_ONEOF(keyOrEntries) {
KJ_CASE_ONEOF(k, kj::String) {
KJ_IF_MAYBE(v, value) {
return putOne(js, kj::mv(k), *v, options);
KJ_IF_SOME(v, value) {
return putOne(js, kj::mv(k), v, options);
} else {
JSG_FAIL_REQUIRE(TypeError, "put() called with undefined value.");
}
}
KJ_CASE_ONEOF(o, jsg::Dict<jsg::JsValue>) {
KJ_IF_MAYBE(v, value) {
KJ_IF_MAYBE(opt, optionsTypeHandler.tryUnwrap(js, *v)) {
return putMultiple(js, kj::mv(o), configureOptions(kj::mv(*opt)));
KJ_IF_SOME(v, value) {
KJ_IF_SOME(opt, optionsTypeHandler.tryUnwrap(js, v)) {
return putMultiple(js, kj::mv(o), configureOptions(kj::mv(opt)));
} else {
JSG_FAIL_REQUIRE(
TypeError,
Expand Down Expand Up @@ -504,7 +504,7 @@ jsg::Promise<void> DurableObjectStorageOperations::deleteAlarm(
}).orDefault(PutOptions{}));

return transformMaybeBackpressure(js, options,
getCache(OP_DELETE_ALARM).setAlarm(nullptr, options));
getCache(OP_DELETE_ALARM).setAlarm(kj::none, options));
}

jsg::Promise<void> DurableObjectStorage::deleteAll(
Expand Down Expand Up @@ -657,20 +657,20 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> DurableObjectStorage::transaction(jsg::Lo
jsg::JsRef<jsg::JsValue> DurableObjectStorage::transactionSync(
jsg::Lock& js,
jsg::Function<jsg::JsRef<jsg::JsValue>()> callback) {
KJ_IF_MAYBE(sqlite, cache->getSqliteDatabase()) {
KJ_IF_SOME(sqlite, cache->getSqliteDatabase()) {
// SAVEPOINT is a readonly statement, but we need to trigger an outer TRANSACTION
sqlite->notifyWrite();
sqlite.notifyWrite();

uint depth = transactionSyncDepth++;
KJ_DEFER(--transactionSyncDepth);

sqlite->run(SqliteDatabase::TRUSTED, kj::str("SAVEPOINT _cf_sync_savepoint_", depth));
sqlite.run(SqliteDatabase::TRUSTED, kj::str("SAVEPOINT _cf_sync_savepoint_", depth));
return js.tryCatch([&]() {
auto result = callback(js);
sqlite->run(SqliteDatabase::TRUSTED, kj::str("RELEASE _cf_sync_savepoint_", depth));
sqlite.run(SqliteDatabase::TRUSTED, kj::str("RELEASE _cf_sync_savepoint_", depth));
return kj::mv(result);
}, [&](jsg::Value exception) -> jsg::JsRef<jsg::JsValue> {
sqlite->run(SqliteDatabase::TRUSTED, kj::str("ROLLBACK TO _cf_sync_savepoint_", depth));
sqlite.run(SqliteDatabase::TRUSTED, kj::str("ROLLBACK TO _cf_sync_savepoint_", depth));
js.throwException(kj::mv(exception));
});
} else {
Expand All @@ -679,7 +679,7 @@ jsg::JsRef<jsg::JsValue> DurableObjectStorage::transactionSync(
}

jsg::Promise<void> DurableObjectStorage::sync(jsg::Lock& js) {
KJ_IF_MAYBE(p, cache->onNoPendingFlush()) {
KJ_IF_SOME(p, cache->onNoPendingFlush()) {
// Note that we're not actually flushing since that will happen anyway once we go async. We're
// merely checking if we have any pending or in-flight operations, and providing a promise that
// resolves when they succeed. This promise only covers operations that were scheduled before
Expand All @@ -688,15 +688,15 @@ jsg::Promise<void> DurableObjectStorage::sync(jsg::Lock& js) {
// output gate will be broken first and the isolate will not resume synchronous execution.

auto& context = IoContext::current();
return context.awaitIo(js, kj::mv(*p));
return context.awaitIo(js, kj::mv(p));
} else {
return js.resolvedPromise();
}
}

jsg::Ref<SqlStorage> DurableObjectStorage::getSql(jsg::Lock& js) {
KJ_IF_MAYBE(db, cache->getSqliteDatabase()) {
return jsg::alloc<SqlStorage>(*db, JSG_THIS);
KJ_IF_SOME(db, cache->getSqliteDatabase()) {
return jsg::alloc<SqlStorage>(db, JSG_THIS);
} else {
JSG_FAIL_REQUIRE(Error, "Durable Object is not backed by SQL.");
}
Expand Down Expand Up @@ -725,28 +725,28 @@ ActorCacheOps& DurableObjectTransaction::getCache(OpName op) {
void DurableObjectTransaction::rollback() {
if (rolledBack) return; // allow multiple calls to rollback()
getCache(OP_ROLLBACK); // just for the checks
KJ_IF_MAYBE(t, cacheTxn) {
auto prom = (*t)->rollback();
KJ_IF_SOME(t, cacheTxn) {
auto prom = t->rollback();
IoContext::current().addWaitUntil(kj::mv(prom).attach(kj::mv(cacheTxn)));
cacheTxn = nullptr;
cacheTxn = kj::none;
}
rolledBack = true;
}

kj::Promise<void> DurableObjectTransaction::maybeCommit() {
// cacheTxn is null if rollback() was called, in which case we don't want to commit anything.
KJ_IF_MAYBE(t, cacheTxn) {
auto maybePromise = (*t)->commit();
cacheTxn = nullptr;
KJ_IF_MAYBE(promise, maybePromise) {
return kj::mv(*promise);
KJ_IF_SOME(t, cacheTxn) {
auto maybePromise = t->commit();
cacheTxn = kj::none;
KJ_IF_SOME(promise, maybePromise) {
return kj::mv(promise);
}
}
return kj::READY_NOW;
}

void DurableObjectTransaction::maybeRollback() {
cacheTxn = nullptr;
cacheTxn = kj::none;
rolledBack = true;
}

Expand Down Expand Up @@ -804,11 +804,11 @@ void DurableObjectState::abort(jsg::Optional<kj::String> reason) {

kj::Exception error(kj::Exception::Type::FAILED, __FILE__, __LINE__, kj::mv(description));

KJ_IF_MAYBE(s, storage) {
KJ_IF_SOME(s, storage) {
// Make sure we _synchronously_ break storage so that there's no chance our promise fulfilling
// will race against the output gate, possibly allowing writes to complete before being
// canceled.
s->get()->getActorCacheInterface().shutdown(error);
s.get()->getActorCacheInterface().shutdown(error);
}

IoContext::current().abort(kj::cp(error));
Expand All @@ -830,7 +830,7 @@ void DurableObjectState::acceptWebSocket(

// We need to get a HibernationManager to give the websocket to.
auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor());
if (a.getHibernationManager() == nullptr) {
if (a.getHibernationManager() == kj::none) {
a.setHibernationManager(
kj::refcounted<HibernationManagerImpl>(
a.getLoopback(), KJ_REQUIRE_NONNULL(a.getHibernationEventType())));
Expand All @@ -839,10 +839,10 @@ void DurableObjectState::acceptWebSocket(
// Note that not providing a tag is equivalent to providing an empty tag array.
// Any duplicate tags will be ignored.
kj::Array<kj::String> distinctTags = [&]() -> kj::Array<kj::String> {
KJ_IF_MAYBE(t, tags) {
KJ_IF_SOME(t, tags) {
kj::HashSet<kj::String> seen;
size_t distinctTagCount = 0;
for (auto tag = t->begin(); tag < t->end(); tag++) {
for (auto tag = t.begin(); tag < t.end(); tag++) {
JSG_REQUIRE(distinctTagCount < MAX_TAGS_PER_CONNECTION, Error,
"a Hibernatable WebSocket cannot have more than ", MAX_TAGS_PER_CONNECTION, " tags");
JSG_REQUIRE(tag->size() <= MAX_TAG_LENGTH, Error,
Expand All @@ -865,8 +865,8 @@ kj::Array<jsg::Ref<api::WebSocket>> DurableObjectState::getWebSockets(
jsg::Lock& js,
jsg::Optional<kj::String> tag) {
auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor());
KJ_IF_MAYBE(manager, a.getHibernationManager()) {
return manager->getWebSockets(
KJ_IF_SOME(manager, a.getHibernationManager()) {
return manager.getWebSockets(
js, tag.map([](kj::StringPtr t) { return t; })).releaseAsArray();
}
return kj::Array<jsg::Ref<api::WebSocket>>();
Expand All @@ -876,11 +876,11 @@ void DurableObjectState::setWebSocketAutoResponse(
jsg::Optional<jsg::Ref<WebSocketRequestResponsePair>> maybeReqResp) {
auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor());

if (maybeReqResp == nullptr) {
if (maybeReqResp == kj::none) {
// If there's no request/response pair, we unset any current set auto response configuration.
KJ_IF_MAYBE(manager, a.getHibernationManager()) {
KJ_IF_SOME(manager, a.getHibernationManager()) {
// If there's no hibernation manager created yet, there's nothing to do here.
manager->unsetWebSocketAutoResponse();
manager.unsetWebSocketAutoResponse();
}
return;
}
Expand All @@ -896,7 +896,7 @@ void DurableObjectState::setWebSocketAutoResponse(
"Response cannot be larger than ", maxRequestOrResponseSize, " bytes. ",
"A response of size ", reqResp->getResponse().size(), " was provided."));

if (a.getHibernationManager() == nullptr) {
if (a.getHibernationManager() == kj::none) {
a.setHibernationManager(kj::refcounted<HibernationManagerImpl>(
a.getLoopback(), KJ_REQUIRE_NONNULL(a.getHibernationEventType())));
// If there's no hibernation manager created yet, we should create one and
Expand All @@ -907,12 +907,12 @@ void DurableObjectState::setWebSocketAutoResponse(

kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> DurableObjectState::getWebSocketAutoResponse() {
auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor());
KJ_IF_MAYBE(manager, a.getHibernationManager()) {
KJ_IF_SOME(manager, a.getHibernationManager()) {
// If there's no hibernation manager created yet, there's nothing to do here.
auto r = manager->getWebSocketAutoResponse();
auto r = manager.getWebSocketAutoResponse();
return r;
}
return nullptr;
return kj::none;
}

kj::Maybe<kj::Date> DurableObjectState::getWebSocketAutoResponseTimestamp(jsg::Ref<WebSocket> ws) {
Expand Down Expand Up @@ -950,7 +950,7 @@ jsg::JsValue deserializeV8Value(jsg::Lock& js,
options.readHeader = false;
}

jsg::Deserializer deserializer(js, buf, nullptr, nullptr, options);
jsg::Deserializer deserializer(js, buf, kj::none, kj::none, options);

return deserializer.readValue(js);
}, [&](jsg::Value&& exception) mutable -> jsg::JsValue {
Expand Down
10 changes: 5 additions & 5 deletions src/workerd/api/actor.c++
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public:
}

// Lazily initialize actorChannel
if (actorChannel == nullptr) {
if (actorChannel == kj::none) {
actorChannel = context.getColoLocalActorChannel(channelId, actorId, span);
}

Expand Down Expand Up @@ -73,7 +73,7 @@ public:
}

// Lazily initialize actorChannel
if (actorChannel == nullptr) {
if (actorChannel == kj::none) {
actorChannel = context.getGlobalActorChannel(channelId, id->getInner(), kj::mv(locationHint),
mode, span);
}
Expand Down Expand Up @@ -154,9 +154,9 @@ jsg::Ref<DurableObject> DurableObjectNamespace::getImpl(
"get called on jurisdictional subnamespace with an ID from a different jurisdiction");

auto& context = IoContext::current();
kj::Maybe<kj::String> locationHint = nullptr;
KJ_IF_MAYBE(o, options) {
locationHint = kj::mv(o->locationHint);
kj::Maybe<kj::String> locationHint = kj::none;
KJ_IF_SOME(o, options) {
locationHint = kj::mv(o.locationHint);
}

auto outgoingFactory = context.addObject<Fetcher::OutgoingFactory>(
Expand Down

0 comments on commit c56cdae

Please sign in to comment.