diff --git a/src/workerd/api/actor-alarms-delete-test.js b/src/workerd/api/actor-alarms-delete-test.js index 2fe2cdf32d0..d4379d834c2 100644 --- a/src/workerd/api/actor-alarms-delete-test.js +++ b/src/workerd/api/actor-alarms-delete-test.js @@ -14,12 +14,12 @@ export class DurableObjectExample { async waitForAlarm(scheduledTime) { let self = this; - let prom = new Promise((resolve) => { - self.resolve = resolve; - }); + const { promise, resolve, reject } = Promise.withResolvers(); + self.resolve = resolve; + self.reject = reject; try { - await prom; + await promise; if (Date.now() < scheduledTime.valueOf()) { throw new Error( `Date.now() is before scheduledTime! ${Date.now()} vs ${scheduledTime.valueOf()}` @@ -38,22 +38,26 @@ export class DurableObjectExample { } async alarm() { - this.state.alarmsTriggered++; - let time = await this.state.storage.getAlarm(); - if (time) { - throw new Error(`time not null inside alarm handler ${time}`); - } - // Deleting an alarm inside `alarm()` will not have any effect, unless there's another queued alarm - // already. - await this.state.storage.deleteAlarm(); + try { + this.state.alarmsTriggered++; + let time = await this.state.storage.getAlarm(); + if (time !== null) { + throw new Error(`time not null inside alarm handler ${time}`); + } + // Deleting an alarm inside `alarm()` will not have any effect, unless there's another queued alarm + // already. + await this.state.storage.deleteAlarm(); - // On the other hand, if we have an alarm queued, it will be deleted. If this is working properly, - // we'll only have one alarm triggered. - await this.state.storage.setAlarm(Date.now() + 50); - await this.state.storage.deleteAlarm(); + // On the other hand, if we have an alarm queued, it will be deleted. If this is working properly, + // we'll only have one alarm triggered. + await this.state.storage.setAlarm(Date.now() + 50); + await this.state.storage.deleteAlarm(); - // All done inside `alarm()`. - this.resolve(); + // All done inside `alarm()`. + this.resolve(); + } catch (e) { + this.reject(e); + } } async fetch() { diff --git a/src/workerd/api/actor-alarms-test.js b/src/workerd/api/actor-alarms-test.js index 7b7887ebbcb..40fe659a0b7 100644 --- a/src/workerd/api/actor-alarms-test.js +++ b/src/workerd/api/actor-alarms-test.js @@ -11,12 +11,12 @@ export class DurableObjectExample { async waitForAlarm(scheduledTime) { let self = this; - let prom = new Promise((resolve) => { - self.resolve = resolve; - }); + const { promise, resolve, reject } = Promise.withResolvers(); + self.resolve = resolve; + self.reject = reject; try { - await prom; + await promise; if (Date.now() < scheduledTime.valueOf()) { throw new Error( `Date.now() is before scheduledTime! ${Date.now()} vs ${scheduledTime.valueOf()}` @@ -45,11 +45,15 @@ export class DurableObjectExample { } async alarm() { - let time = await this.state.storage.getAlarm(); - if (time) { - throw new Error(`time not null inside alarm handler ${time}`); + try { + let time = await this.state.storage.getAlarm(); + if (time !== null) { + throw new Error(`time not null inside alarm handler ${time}`); + } + this.resolve(); + } catch (e) { + this.reject(e); } - this.resolve(); } } diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index f8e22ca7561..0afa3f81ef9 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -385,112 +385,73 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm(kj: auto& context = IoContext::current(); auto& actor = KJ_ASSERT_NONNULL(context.getActor()); auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent()); - auto maybeDeferredDelete = persistent.armAlarmHandler(scheduledTime); - KJ_IF_SOME(deferredDelete, maybeDeferredDelete) { - auto& handler = KJ_REQUIRE_NONNULL(exportedHandler); - if (handler.alarm == kj::none) { + KJ_SWITCH_ONEOF(persistent.armAlarmHandler(scheduledTime)) { + KJ_CASE_ONEOF(armResult, ActorCacheInterface::RunAlarmHandler) { + auto& handler = KJ_REQUIRE_NONNULL(exportedHandler); + if (handler.alarm == kj::none) { - lock.logWarningOnce("Attempted to run a scheduled alarm without a handler, " - "did you remember to export an alarm() function?"); - return WorkerInterface::AlarmResult{ - .retry = false, .outcome = EventOutcome::SCRIPT_NOT_FOUND}; - } + lock.logWarningOnce("Attempted to run a scheduled alarm without a handler, " + "did you remember to export an alarm() function?"); + return WorkerInterface::AlarmResult{ + .retry = false, .outcome = EventOutcome::SCRIPT_NOT_FOUND}; + } + + auto& alarm = KJ_ASSERT_NONNULL(handler.alarm); + + return context + .run([exportedHandler, &context, timeout, retryCount, &alarm, + maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)]( + Worker::Lock& lock) mutable -> kj::Promise { + jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext); + // We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise + // completes we want to cancel the alarm handler. If the alarm handler promise completes first + // timeout will be canceled. + auto timeoutPromise = context.afterLimitTimeout(timeout).then( + [&context]() -> kj::Promise { + // We don't want to delete the alarm since we have not successfully completed the alarm + // execution. + auto& actor = KJ_ASSERT_NONNULL(context.getActor()); + auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent()); + persistent.cancelDeferredAlarmDeletion(); + + LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time"); + // Report alarm handler failure and log it. + auto e = KJ_EXCEPTION(OVERLOADED, + "broken.dropped; worker_do_not_log; jsg.Error: Alarm exceeded its allowed execution time"); + context.getMetrics().reportFailure(e); + + // We don't want the handler to keep running after timeout. + context.abort(kj::mv(e)); + // We want timed out alarms to be treated as user errors. As such, we'll mark them as + // retriable, and we'll count the retries against the alarm retries limit. This will ensure + // that the handler will attempt to run for a number of times before giving up and deleting + // the alarm. + return WorkerInterface::AlarmResult{ + .retry = true, .retryCountsAgainstLimit = true, .outcome = EventOutcome::EXCEEDED_CPU}; + }); - auto& alarm = KJ_ASSERT_NONNULL(handler.alarm); - - return context - .run([exportedHandler, &context, timeout, retryCount, &alarm, - maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)]( - Worker::Lock& lock) mutable -> kj::Promise { - jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext); - // We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise - // completes we want to cancel the alarm handler. If the alarm handler promise completes first - // timeout will be canceled. - auto timeoutPromise = context.afterLimitTimeout(timeout).then( - [&context]() -> kj::Promise { - // We don't want to delete the alarm since we have not successfully completed the alarm - // execution. + return alarm(lock, jsg::alloc(retryCount)) + .then([]() -> kj::Promise { + return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::OK}; + }).exclusiveJoin(kj::mv(timeoutPromise)); + }) + .catch_([&context, deferredDelete = kj::mv(armResult.deferredDelete)]( + kj::Exception&& e) mutable { auto& actor = KJ_ASSERT_NONNULL(context.getActor()); auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent()); persistent.cancelDeferredAlarmDeletion(); - LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time"); - // Report alarm handler failure and log it. - auto e = KJ_EXCEPTION(OVERLOADED, - "broken.dropped; worker_do_not_log; jsg.Error: Alarm exceeded its allowed execution time"); context.getMetrics().reportFailure(e); - // We don't want the handler to keep running after timeout. - context.abort(kj::mv(e)); - // We want timed out alarms to be treated as user errors. As such, we'll mark them as - // retriable, and we'll count the retries against the alarm retries limit. This will ensure - // that the handler will attempt to run for a number of times before giving up and deleting - // the alarm. - return WorkerInterface::AlarmResult{ - .retry = true, .retryCountsAgainstLimit = true, .outcome = EventOutcome::EXCEEDED_CPU}; - }); - - return alarm(lock, jsg::alloc(retryCount)) - .then([]() -> kj::Promise { - return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::OK}; - }).exclusiveJoin(kj::mv(timeoutPromise)); - }) - .catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable { - auto& actor = KJ_ASSERT_NONNULL(context.getActor()); - auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent()); - persistent.cancelDeferredAlarmDeletion(); + // This will include the error in inspector/tracers and log to syslog if internal. + context.logUncaughtExceptionAsync(UncaughtExceptionSource::ALARM_HANDLER, kj::mv(e)); - context.getMetrics().reportFailure(e); - - // This will include the error in inspector/tracers and log to syslog if internal. - context.logUncaughtExceptionAsync(UncaughtExceptionSource::ALARM_HANDLER, kj::mv(e)); - - EventOutcome outcome = EventOutcome::EXCEPTION; - KJ_IF_SOME(status, context.getLimitEnforcer().getLimitsExceeded()) { - outcome = status; - } - - kj::String actorId; - KJ_SWITCH_ONEOF(actor.getId()) { - KJ_CASE_ONEOF(f, kj::Own) { - actorId = f->toString(); + EventOutcome outcome = EventOutcome::EXCEPTION; + KJ_IF_SOME(status, context.getLimitEnforcer().getLimitsExceeded()) { + outcome = status; } - KJ_CASE_ONEOF(s, kj::String) { - actorId = kj::str(s); - } - } - // We only want to retry against limits if it's a user error. By default let's check if the - // output gate is broken. - auto shouldRetryCountsAgainstLimits = !context.isOutputGateBroken(); - - // We want to alert if we aren't going to count this alarm retry against limits - if (auto desc = e.getDescription(); !jsg::isTunneledException(desc) && - !jsg::isDoNotLogException(desc) && context.isOutputGateBroken()) { - LOG_NOSENTRY(ERROR, "output lock broke during alarm execution", actorId, e); - } else if (context.isOutputGateBroken()) { - // We don't usually log these messages, but it's useful to know the real reason we failed - // to correctly investigate stuck alarms. - LOG_NOSENTRY(ERROR, - "output lock broke during alarm execution without an interesting error description", - actorId, e); - if (e.getDetail(jsg::EXCEPTION_IS_USER_ERROR) != kj::none) { - // The handler failed because the user overloaded the object. It's their fault, we'll not - // retry forever. - shouldRetryCountsAgainstLimits = true; - } - } - return WorkerInterface::AlarmResult{.retry = true, - .retryCountsAgainstLimit = shouldRetryCountsAgainstLimits, - .outcome = outcome}; - }) - .then( - [&context]( - WorkerInterface::AlarmResult result) -> kj::Promise { - return context.waitForOutputLocks().then( - [result]() { return kj::mv(result); }, [&context](kj::Exception&& e) { - auto& actor = KJ_ASSERT_NONNULL(context.getActor()); kj::String actorId; KJ_SWITCH_ONEOF(actor.getId()) { KJ_CASE_ONEOF(f, kj::Own) { @@ -500,21 +461,20 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm(kj: actorId = kj::str(s); } } - // We only want to retry against limits if it's a user error. By default let's assume it's our - // fault. - auto shouldRetryCountsAgainstLimits = false; - if (auto desc = e.getDescription(); - !jsg::isTunneledException(desc) && !jsg::isDoNotLogException(desc)) { - if (isInterestingException(e)) { - LOG_EXCEPTION("alarmOutputLock"_kj, e); - } else { - LOG_NOSENTRY(ERROR, "output lock broke after executing alarm", actorId, e); - } - } else { + + // We only want to retry against limits if it's a user error. By default let's check if the + // output gate is broken. + auto shouldRetryCountsAgainstLimits = !context.isOutputGateBroken(); + + // We want to alert if we aren't going to count this alarm retry against limits + if (auto desc = e.getDescription(); !jsg::isTunneledException(desc) && + !jsg::isDoNotLogException(desc) && context.isOutputGateBroken()) { + LOG_NOSENTRY(ERROR, "output lock broke during alarm execution", actorId, e); + } else if (context.isOutputGateBroken()) { // We don't usually log these messages, but it's useful to know the real reason we failed // to correctly investigate stuck alarms. LOG_NOSENTRY(ERROR, - "output lock broke after executing alarm without an interesting error description", + "output lock broke during alarm execution without an interesting error description", actorId, e); if (e.getDetail(jsg::EXCEPTION_IS_USER_ERROR) != kj::none) { // The handler failed because the user overloaded the object. It's their fault, we'll not @@ -524,12 +484,57 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm(kj: } return WorkerInterface::AlarmResult{.retry = true, .retryCountsAgainstLimit = shouldRetryCountsAgainstLimits, - .outcome = EventOutcome::EXCEPTION}; + .outcome = outcome}; + }) + .then([&context](WorkerInterface::AlarmResult result) + -> kj::Promise { + return context.waitForOutputLocks().then( + [result]() { return kj::mv(result); }, [&context](kj::Exception&& e) { + auto& actor = KJ_ASSERT_NONNULL(context.getActor()); + kj::String actorId; + KJ_SWITCH_ONEOF(actor.getId()) { + KJ_CASE_ONEOF(f, kj::Own) { + actorId = f->toString(); + } + KJ_CASE_ONEOF(s, kj::String) { + actorId = kj::str(s); + } + } + // We only want to retry against limits if it's a user error. By default let's assume it's our + // fault. + auto shouldRetryCountsAgainstLimits = false; + if (auto desc = e.getDescription(); + !jsg::isTunneledException(desc) && !jsg::isDoNotLogException(desc)) { + if (isInterestingException(e)) { + LOG_EXCEPTION("alarmOutputLock"_kj, e); + } else { + LOG_NOSENTRY(ERROR, "output lock broke after executing alarm", actorId, e); + } + } else { + // We don't usually log these messages, but it's useful to know the real reason we failed + // to correctly investigate stuck alarms. + LOG_NOSENTRY(ERROR, + "output lock broke after executing alarm without an interesting error description", + actorId, e); + if (e.getDetail(jsg::EXCEPTION_IS_USER_ERROR) != kj::none) { + // The handler failed because the user overloaded the object. It's their fault, we'll not + // retry forever. + shouldRetryCountsAgainstLimits = true; + } + } + return WorkerInterface::AlarmResult{.retry = true, + .retryCountsAgainstLimit = shouldRetryCountsAgainstLimits, + .outcome = EventOutcome::EXCEPTION}; + }); }); - }); - } else { - return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::CANCELED}; + } + KJ_CASE_ONEOF(armResult, ActorCacheInterface::CancelAlarmHandler) { + return armResult.waitBeforeCancel.then([]() { + return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::CANCELED}; + }); + } } + KJ_UNREACHABLE; } jsg::Promise ServiceWorkerGlobalScope::test( diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index 8603582ef52..07f9b69721a 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -320,6 +320,16 @@ kj_test( ], ) +kj_test( + src = "actor-sqlite-test.c++", + deps = [ + ":actor", + ":io-gate", + "//src/workerd/util:test", + "//src/workerd/util:test-util", + ], +) + kj_test( src = "promise-wrapper-test.c++", deps = [":io"], diff --git a/src/workerd/io/actor-cache-test.c++ b/src/workerd/io/actor-cache-test.c++ index eaff1a42cdf..74fb0151a74 100644 --- a/src/workerd/io/actor-cache-test.c++ +++ b/src/workerd/io/actor-cache-test.c++ @@ -4977,8 +4977,14 @@ KJ_TEST("ActorCache alarm get/put") { KJ_ASSERT(time == kj::none); } - // we have a cached time == nullptr, so we should not attempt to run an alarm - KJ_ASSERT(test.cache.armAlarmHandler(10 * kj::SECONDS + kj::UNIX_EPOCH, false) == kj::none); + { + // we have a cached time == nullptr, so we should not attempt to run an alarm + auto armResult = test.cache.armAlarmHandler(10 * kj::SECONDS + kj::UNIX_EPOCH, false); + KJ_ASSERT(armResult.is()); + auto cancelResult = kj::mv(armResult.get()); + KJ_ASSERT(cancelResult.waitBeforeCancel.poll(ws)); + cancelResult.waitBeforeCancel.wait(ws); + } { test.setAlarm(oneMs); @@ -4990,7 +4996,10 @@ KJ_TEST("ActorCache alarm get/put") { { // Test that alarm handler handle clears alarm when dropped with no writes - { auto maybeWrite = KJ_ASSERT_NONNULL(test.cache.armAlarmHandler(oneMs, false)); } + { + auto armResult = test.cache.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + } mockStorage->expectCall("deleteAlarm", ws) .withParams(CAPNP(timeToDeleteMs = 1)) .thenReturn(CAPNP(deleted = true)); @@ -5001,7 +5010,8 @@ KJ_TEST("ActorCache alarm get/put") { // Test that alarm handler handle does not clear alarm when dropped with writes { - auto maybeWrite = KJ_ASSERT_NONNULL(test.cache.armAlarmHandler(oneMs, false)); + auto armResult = test.cache.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); test.setAlarm(twoMs); } mockStorage->expectCall("setAlarm", ws) @@ -5013,7 +5023,10 @@ KJ_TEST("ActorCache alarm get/put") { test.setAlarm(oneMs); // Test that alarm handler handle does not cache delete when it fails - { auto maybeWrite = KJ_ASSERT_NONNULL(test.cache.armAlarmHandler(oneMs, false)); } + { + auto armResult = test.cache.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + } mockStorage->expectCall("deleteAlarm", ws) .withParams(CAPNP(timeToDeleteMs = 1)) .thenReturn(CAPNP(deleted = false)); @@ -5022,7 +5035,10 @@ KJ_TEST("ActorCache alarm get/put") { { // Test that alarm handler handle does not cache alarm delete when noCache == true - { auto maybeWrite = KJ_ASSERT_NONNULL(test.cache.armAlarmHandler(twoMs, true)); } + { + auto armResult = test.cache.armAlarmHandler(twoMs, true); + KJ_ASSERT(armResult.is()); + } mockStorage->expectCall("deleteAlarm", ws) .withParams(CAPNP(timeToDeleteMs = 2)) .thenReturn(CAPNP(deleted = true)); diff --git a/src/workerd/io/actor-cache.c++ b/src/workerd/io/actor-cache.c++ index 28e8c1558b1..5da0c0c6c28 100644 --- a/src/workerd/io/actor-cache.c++ +++ b/src/workerd/io/actor-cache.c++ @@ -163,7 +163,8 @@ kj::Maybe> ActorCache::evictStale(kj::Date now) { return getBackpressure(); } -kj::Maybe> ActorCache::armAlarmHandler(kj::Date scheduledTime, bool noCache) { +kj::OneOf ActorCache::armAlarmHandler( + kj::Date scheduledTime, bool noCache) { noCache = noCache || lru.options.noCache; KJ_ASSERT(!currentAlarmTime.is()); @@ -173,7 +174,7 @@ kj::Maybe> ActorCache::armAlarmHandler(kj::Date scheduledTime, boo if (t.status == KnownAlarmTime::Status::CLEAN) { // If there's a clean scheduledTime that is different from ours, this run should be // canceled. - return kj::none; + return CancelAlarmHandler{.waitBeforeCancel = kj::READY_NOW}; } else { // There's a alarm write that hasn't been set yet pending for a time different than ours -- // We won't cancel the alarm because it hasn't been confirmed, but we shouldn't delete @@ -191,7 +192,7 @@ kj::Maybe> ActorCache::armAlarmHandler(kj::Date scheduledTime, boo }; } static const DeferredAlarmDeleter disposer; - return kj::Own(this, disposer); + return RunAlarmHandler{.deferredDelete = kj::Own(this, disposer)}; } void ActorCache::cancelDeferredAlarmDeletion() { diff --git a/src/workerd/io/actor-cache.h b/src/workerd/io/actor-cache.h index 4c29252157e..1ccf53a206a 100644 --- a/src/workerd/io/actor-cache.h +++ b/src/workerd/io/actor-cache.h @@ -209,12 +209,24 @@ class ActorCacheInterface: public ActorCacheOps { virtual void shutdown(kj::Maybe maybeException) = 0; - // Call when entering the alarm handler and attach the returned object to the promise representing - // the alarm handler's execution. + // Possible armAlarmHandler() return values: // - // The returned object will schedule a write to clear the alarm time if no alarm writes have been - // made while it exists. If kj::none is returned, the alarm run should be canceled. - virtual kj::Maybe> armAlarmHandler( + // Alarm should be canceled without retry (because alarm state has changed such that the + // requested alarm time is no longer valid). + struct CancelAlarmHandler { + // Caller should wait for this promise to complete before canceling. + kj::Promise waitBeforeCancel; + }; + // Alarm should be run. + struct RunAlarmHandler { + // RAII object to delete the alarm, if object is destroyed before setAlarm() or + // cancelDeferredAlarmDeletion() are called. Caller should attach it to a promise + // representing the alarm handler's execution. + kj::Own deferredDelete; + }; + + // Call when entering the alarm handler. + virtual kj::OneOf armAlarmHandler( kj::Date scheduledTime, bool noCache = false) = 0; virtual void cancelDeferredAlarmDeletion() = 0; @@ -298,7 +310,9 @@ class ActorCache final: public ActorCacheInterface { DeleteAllResults deleteAll(WriteOptions options) override; kj::Maybe> evictStale(kj::Date now) override; void shutdown(kj::Maybe maybeException) override; - kj::Maybe> armAlarmHandler(kj::Date scheduledTime, bool noCache = false) override; + + kj::OneOf armAlarmHandler( + kj::Date scheduledTime, bool noCache = false) override; void cancelDeferredAlarmDeletion() override; kj::Maybe> onNoPendingFlush() override; // See ActorCacheInterface diff --git a/src/workerd/io/actor-sqlite-test.c++ b/src/workerd/io/actor-sqlite-test.c++ new file mode 100644 index 00000000000..b01af7e17f9 --- /dev/null +++ b/src/workerd/io/actor-sqlite-test.c++ @@ -0,0 +1,1140 @@ +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "actor-sqlite.h" +#include "io-gate.h" + +#include +#include + +#include +#include + +namespace workerd { +namespace { + +static constexpr kj::Date oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH; +static constexpr kj::Date twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH; +static constexpr kj::Date threeMs = 3 * kj::MILLISECONDS + kj::UNIX_EPOCH; +static constexpr kj::Date fourMs = 4 * kj::MILLISECONDS + kj::UNIX_EPOCH; +static constexpr kj::Date fiveMs = 5 * kj::MILLISECONDS + kj::UNIX_EPOCH; + +template +kj::Promise eagerlyReportExceptions(kj::Promise promise, kj::SourceLocation location = {}) { + return promise.eagerlyEvaluate([location](kj::Exception&& e) -> T { + KJ_LOG_AT(ERROR, location, e); + kj::throwFatalException(kj::mv(e)); + }); +} + +// Expect that a synchronous result is returned. +template +T expectSync(kj::OneOf> result, kj::SourceLocation location = {}) { + KJ_SWITCH_ONEOF(result) { + KJ_CASE_ONEOF(promise, kj::Promise) { + KJ_FAIL_ASSERT_AT(location, "result was unexpectedly asynchronous"); + } + KJ_CASE_ONEOF(value, T) { + return kj::mv(value); + } + } + KJ_UNREACHABLE; +} + +struct ActorSqliteTestOptions final { + bool monitorOutputGate = true; +}; + +struct ActorSqliteTest final { + kj::EventLoop loop; + kj::WaitScope ws; + + OutputGate gate; + kj::Own vfsDir; + SqliteDatabase::Vfs vfs; + SqliteDatabase db; + + struct Call final { + kj::String desc; + kj::Own> fulfiller; + }; + kj::Vector calls; + + struct ActorSqliteTestHooks final: public ActorSqlite::Hooks { + public: + explicit ActorSqliteTestHooks(ActorSqliteTest& parent): parent(parent) {} + + kj::Promise scheduleRun(kj::Maybe newAlarmTime) override { + KJ_IF_SOME(h, parent.scheduleRunHandler) { + return h(newAlarmTime); + } + auto desc = newAlarmTime.map([](auto& t) { + return kj::str("scheduleRun(", t, ")"); + }).orDefault(kj::str("scheduleRun(none)")); + auto [promise, fulfiller] = kj::newPromiseAndFulfiller(); + parent.calls.add(Call{kj::mv(desc), kj::mv(fulfiller)}); + return kj::mv(promise); + } + + ActorSqliteTest& parent; + }; + kj::Maybe(kj::Maybe)>> scheduleRunHandler; + ActorSqliteTestHooks hooks = ActorSqliteTestHooks(*this); + + ActorSqlite actor; + + kj::Promise gateBrokenPromise; + kj::UnwindDetector unwindDetector; + + explicit ActorSqliteTest(ActorSqliteTestOptions options = {}) + : ws(loop), + vfsDir(kj::newInMemoryDirectory(kj::nullClock())), + vfs(*vfsDir), + db(vfs, kj::Path({"foo"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY), + actor(kj::attachRef(db), gate, KJ_BIND_METHOD(*this, commitCallback), hooks), + gateBrokenPromise(options.monitorOutputGate ? eagerlyReportExceptions(gate.onBroken()) + : kj::Promise(kj::READY_NOW)) {} + + ~ActorSqliteTest() noexcept(false) { + if (!unwindDetector.isUnwinding()) { + // Make sure if the output gate has been broken, the exception was reported. This is + // important to report errors thrown inside flush(), since those won't otherwise propagate + // into the test body. + gateBrokenPromise.poll(ws); + + // Make sure there's no outstanding async work we haven't considered: + pollAndExpectCalls({}, "unexpected calls at end of test"); + } + } + + kj::Promise commitCallback() { + auto [promise, fulfiller] = kj::newPromiseAndFulfiller(); + calls.add(Call{kj::str("commit"), kj::mv(fulfiller)}); + return kj::mv(promise); + } + + // Polls the event loop, then asserts that the description of calls up to this point match the + // expectation and returns their fulfillers. Also clears the call log. + // + // TODO(cleanup): Is there a better way to do mocks? capnp-mock looks nice, but seems a bit + // heavyweight for this test. + kj::Vector>> pollAndExpectCalls( + std::initializer_list expCallDescs, + kj::StringPtr message = ""_kj, + kj::SourceLocation location = {}) { + ws.poll(); + auto callDescs = KJ_MAP(c, calls) { return kj::str(c.desc); }; + KJ_ASSERT_AT(callDescs == heapArray(expCallDescs), location, kj::str(message)); + auto fulfillers = KJ_MAP(c, calls) { return kj::mv(c.fulfiller); }; + calls.clear(); + return kj::mv(fulfillers); + } + + // A few driver methods for convenience. + auto get(kj::StringPtr key, ActorCache::ReadOptions options = {}) { + return actor.get(kj::str(key), options); + } + auto getAlarm(ActorCache::ReadOptions options = {}) { + return actor.getAlarm(options); + } + auto put(kj::StringPtr key, kj::StringPtr value, ActorCache::WriteOptions options = {}) { + return actor.put(kj::str(key), kj::heapArray(value.asBytes()), options); + } + auto setAlarm(kj::Maybe newTime, ActorCache::WriteOptions options = {}) { + return actor.setAlarm(newTime, options); + } +}; + +KJ_TEST("initial alarm value is unset") { + ActorSqliteTest test; + + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); +} + +KJ_TEST("can set and get alarm") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("alarm write happens transactionally with storage ops") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.put("foo", "bar"); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes()); +} + +KJ_TEST("storage op without alarm change does not wait on scheduler") { + ActorSqliteTest test; + + test.put("foo", "bar"); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + + KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes()); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); +} + +KJ_TEST("alarm scheduling starts synchronously before implicit local db commit") { + ActorSqliteTest test; + + // In workerd (unlike edgeworker), there is no remote storage, so there is no work done in + // commitCallback(); the local db is considered durably stored after the synchronous sqlite + // commit() call returns. If a commit includes an alarm state change that requires scheduling + // before the commit call, it needs to happen synchronously. Since workerd synchronously + // schedules alarms, we just need to ensure that the database is in a pre-commit state when + // scheduleRun() is called. + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + + bool startedScheduleRun = false; + test.scheduleRunHandler = [&](kj::Maybe) -> kj::Promise { + startedScheduleRun = true; + + KJ_EXPECT_THROW_MESSAGE( + "cannot start a transaction within a transaction", test.db.run("BEGIN TRANSACTION")); + + return kj::READY_NOW; + }; + + test.setAlarm(oneMs); + KJ_ASSERT(!startedScheduleRun); + test.ws.poll(); + KJ_ASSERT(startedScheduleRun); + + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("alarm scheduling starts synchronously before explicit local db commit") { + ActorSqliteTest test; + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + + bool startedScheduleRun = false; + test.scheduleRunHandler = [&](kj::Maybe) -> kj::Promise { + startedScheduleRun = true; + + // Not sure if there is a good way to detect savepoint presence without mutating the db state, + // but this is sufficient to verify the test properties: + + // Verify that we are not within a nested savepoint. + KJ_EXPECT_THROW_MESSAGE( + "no such savepoint: _cf_savepoint_1", test.db.run("RELEASE _cf_savepoint_1")); + + // Verify that we are within the root savepoint. + test.db.run("RELEASE _cf_savepoint_0"); + KJ_EXPECT_THROW_MESSAGE( + "no such savepoint: _cf_savepoint_0", test.db.run("RELEASE _cf_savepoint_0")); + + // We don't actually care what happens in the test after this point, but it's slightly simpler + // to readd the savepoint to allow the test to complete cleanly: + test.db.run("SAVEPOINT _cf_savepoint_0"); + + return kj::READY_NOW; + }; + + { + auto txn = test.actor.startTransaction(); + txn->setAlarm(oneMs, {}); + + KJ_ASSERT(!startedScheduleRun); + txn->commit(); + KJ_ASSERT(startedScheduleRun); + + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + } + + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("alarm scheduling does not start synchronously before nested explicit local db commit") { + ActorSqliteTest test; + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + + bool startedScheduleRun = false; + test.scheduleRunHandler = [&](kj::Maybe) -> kj::Promise { + startedScheduleRun = true; + return kj::READY_NOW; + }; + + { + auto txn1 = test.actor.startTransaction(); + + { + auto txn2 = test.actor.startTransaction(); + txn2->setAlarm(oneMs, {}); + + txn2->commit(); + KJ_ASSERT(!startedScheduleRun); + } + + txn1->commit(); + KJ_ASSERT(startedScheduleRun); + + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + } + + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("synchronous alarm scheduling failure causes local db commit to throw synchronously") { + ActorSqliteTest test({.monitorOutputGate = false}); + auto promise = test.gate.onBroken(); + + auto getLocalAlarm = [&]() -> kj::Maybe { + auto query = test.db.run("SELECT value FROM _cf_METADATA WHERE key = 1"); + if (query.isDone() || query.isNull(0)) { + return kj::none; + } else { + return kj::UNIX_EPOCH + query.getInt64(0) * kj::NANOSECONDS; + } + }; + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + + // Override scheduleRun handler with one that throws synchronously. + bool startedScheduleRun = false; + test.scheduleRunHandler = [&](kj::Maybe) -> kj::Promise { + startedScheduleRun = true; + // Must throw synchronously; returning an exception is insufficient. + kj::throwFatalException(KJ_EXCEPTION(FAILED, "a_sync_fail")); + }; + + KJ_ASSERT(!promise.poll(test.ws)); + test.setAlarm(oneMs); + + // Expect that polling will attempt to commit the implicit transaction, which should + // synchronously fail when attempting to call scheduleRun() before the db commit, and roll back the + // local db state to the 2ms alarm. + KJ_ASSERT(!startedScheduleRun); + KJ_ASSERT(KJ_REQUIRE_NONNULL(getLocalAlarm()) == oneMs); + test.ws.poll(); + KJ_ASSERT(startedScheduleRun); + KJ_ASSERT(KJ_REQUIRE_NONNULL(getLocalAlarm()) == twoMs); + + KJ_ASSERT(promise.poll(test.ws)); + KJ_EXPECT_THROW_MESSAGE("a_sync_fail", promise.wait(test.ws)); +} + +KJ_TEST("can clear alarm") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + test.setAlarm(kj::none); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill(); + + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); +} + +KJ_TEST("can set alarm twice") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); +} + +KJ_TEST("setting duplicate alarm is no-op") { + ActorSqliteTest test; + + test.setAlarm(kj::none); + test.pollAndExpectCalls({}); + + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + + test.setAlarm(oneMs); + test.pollAndExpectCalls({}); +} + +KJ_TEST("tells alarm handler to cancel when committed alarm is empty") { + ActorSqliteTest test; + + { + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + auto waitPromise = kj::mv(armResult.get().waitBeforeCancel); + KJ_ASSERT(waitPromise.poll(test.ws)); + waitPromise.wait(test.ws); + } +} + +KJ_TEST("tells alarm handler to cancel when handler alarm is later than committed alarm") { + ActorSqliteTest test; + + // Initialize alarm state to 2ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + // Request handler run at 2ms. Expect cancellation without rescheduling. + auto armResult = test.actor.armAlarmHandler(twoMs, false); + KJ_ASSERT(armResult.is()); + auto cancelResult = kj::mv(armResult.get()); + KJ_ASSERT(cancelResult.waitBeforeCancel.poll(test.ws)); + cancelResult.waitBeforeCancel.wait(test.ws); +} + +KJ_TEST("tells alarm handler to reschedule when handler alarm is earlier than committed alarm") { + ActorSqliteTest test; + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); + + // Expect that armAlarmHandler() tells caller to cancel after rescheduling completes. + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + auto cancelResult = kj::mv(armResult.get()); + + // Expect rescheduling was requested and that returned promise resolves after fulfillment. + auto waitBeforeCancel = kj::mv(cancelResult.waitBeforeCancel); + auto rescheduleFulfiller = kj::mv(test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]); + KJ_ASSERT(!waitBeforeCancel.poll(test.ws)); + rescheduleFulfiller->fulfill(); + KJ_ASSERT(waitBeforeCancel.poll(test.ws)); + waitBeforeCancel.wait(test.ws); +} + +KJ_TEST("does not cancel handler when local db alarm state is later than scheduled alarm") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + test.setAlarm(twoMs); + { + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + } + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); +} + +KJ_TEST("does not cancel handler when local db alarm state is earlier than scheduled alarm") { + ActorSqliteTest test; + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); + + test.setAlarm(oneMs); + { + auto armResult = test.actor.armAlarmHandler(twoMs, false); + KJ_ASSERT(armResult.is()); + } + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); +} + +KJ_TEST("getAlarm() returns null during handler") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + { + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + test.pollAndExpectCalls({}); + + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + } + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill(); +} + +KJ_TEST("alarm handler handle clears alarm when dropped with no writes") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + { + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + } + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill(); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); +} + +KJ_TEST("alarm deleter does not clear alarm when dropped with writes") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + { + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + test.setAlarm(twoMs); + } + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); +} + +KJ_TEST("can cancel deferred alarm deletion during handler") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + { + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + test.actor.cancelDeferredAlarmDeletion(); + } + + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("canceling deferred alarm deletion outside handler has no effect") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + { + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + } + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill(); + + test.actor.cancelDeferredAlarmDeletion(); + + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); +} + +KJ_TEST("canceling deferred alarm deletion outside handler edge case") { + // Presumably harmless to cancel deletion if the client requests it after the handler ends but + // before the event loop runs the commit code? Trying to cancel deletion outside the handler is + // a bit of a contract violation anyway -- maybe we should just assert against it? + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + { + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + } + test.actor.cancelDeferredAlarmDeletion(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill(); + + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); +} + +KJ_TEST("canceling deferred alarm deletion is idempotent") { + // Not sure if important, but matches ActorCache behavior. + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + { + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + test.actor.cancelDeferredAlarmDeletion(); + test.actor.cancelDeferredAlarmDeletion(); + } + + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("handler alarm is not deleted when commit fails") { + ActorSqliteTest test({.monitorOutputGate = false}); + + auto promise = test.gate.onBroken(); + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + { + auto armResult = test.actor.armAlarmHandler(oneMs, false); + KJ_ASSERT(armResult.is()); + + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + } + test.pollAndExpectCalls({"commit"})[0]->reject(KJ_EXCEPTION(FAILED, "a_rejected_commit")); + + KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", promise.wait(test.ws)); +} + +KJ_TEST("setting earlier alarm persists alarm scheduling before db") { + ActorSqliteTest test; + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); + + // Update alarm to be earlier. We expect the alarm scheduling to be persisted before the db. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("setting later alarm persists db before alarm scheduling") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + // Update alarm to be later. We expect the db to be persisted before the alarm scheduling. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); +} + +KJ_TEST("multiple set-earlier in-flight alarms wait for earliest before committing db") { + ActorSqliteTest test; + + // Initialize alarm state to 5ms. + test.setAlarm(fiveMs); + test.pollAndExpectCalls({"scheduleRun(5ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == fiveMs); + + // Gate is not blocked. + auto gateWaitBefore = test.gate.wait(); + KJ_ASSERT(gateWaitBefore.poll(test.ws)); + + // Update alarm to be earlier (4ms). We expect the alarm scheduling to start. + test.setAlarm(fourMs); + auto fulfiller4Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(4ms)"})[0]); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == fourMs); + + // Gate as-of 4ms update is blocked. + auto gateWait4ms = test.gate.wait(); + KJ_ASSERT(!gateWait4ms.poll(test.ws)); + + // While 4ms scheduling request is in-flight, update alarm to be even earlier (3ms). We expect + // the 4ms request to block the 3ms scheduling request. + test.setAlarm(threeMs); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == threeMs); + + // Gate as-of 3ms update is blocked. + auto gateWait3ms = test.gate.wait(); + KJ_ASSERT(!gateWait3ms.poll(test.ws)); + + // Update alarm to be even earlier (2ms). We expect scheduling requests to still be blocked. + test.setAlarm(twoMs); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); + + // Gate as-of 2ms update is blocked. + auto gateWait2ms = test.gate.wait(); + KJ_ASSERT(!gateWait2ms.poll(test.ws)); + + // Fulfill the 4ms request. We expect the 2ms scheduling to start, because that is the current + // alarm value. + fulfiller4Ms->fulfill(); + auto fulfiller2Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]); + test.pollAndExpectCalls({}); + + // While waiting for 2ms request, update alarm time to be 1ms. Expect scheduling to be blocked. + test.setAlarm(oneMs); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + // Gate as-of 1ms update is blocked. + auto gateWait1ms = test.gate.wait(); + KJ_ASSERT(!gateWait1ms.poll(test.ws)); + + // Fulfill the 2ms request. We expect the 1ms scheduling to start. + fulfiller2Ms->fulfill(); + auto fulfiller1Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]); + test.pollAndExpectCalls({}); + + // Fulfill the 1ms request. We expect a single db commit to start (coalescing all previous db + // commits together). + fulfiller1Ms->fulfill(); + auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]); + test.pollAndExpectCalls({}); + + // We expect all earlier gates to be blocked until commit completes. + KJ_ASSERT(!gateWait4ms.poll(test.ws)); + KJ_ASSERT(!gateWait3ms.poll(test.ws)); + KJ_ASSERT(!gateWait2ms.poll(test.ws)); + KJ_ASSERT(!gateWait1ms.poll(test.ws)); + commitFulfiller->fulfill(); + KJ_ASSERT(gateWait4ms.poll(test.ws)); + KJ_ASSERT(gateWait3ms.poll(test.ws)); + KJ_ASSERT(gateWait2ms.poll(test.ws)); + KJ_ASSERT(gateWait1ms.poll(test.ws)); + + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("setting later alarm times does scheduling after db commit") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + // Gate is not blocked. + auto gateWaitBefore = test.gate.wait(); + KJ_ASSERT(gateWaitBefore.poll(test.ws)); + + // Set alarm to 2ms. Expect 2ms db commit to start. + test.setAlarm(twoMs); + auto commit2MsFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]); + test.pollAndExpectCalls({}); + + // Gate as-of 2ms update is blocked. + auto gateWait2Ms = test.gate.wait(); + KJ_ASSERT(!gateWait2Ms.poll(test.ws)); + + // Set alarm to 3ms. Expect 3ms db commit to start. + test.setAlarm(threeMs); + auto commit3MsFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]); + test.pollAndExpectCalls({}); + + // Gate as-of 3ms update is blocked. + auto gateWait3Ms = test.gate.wait(); + KJ_ASSERT(!gateWait3Ms.poll(test.ws)); + + // Fulfill 2ms db commit. Expect 2ms alarm to be scheduled and 2ms gate to be unblocked. + KJ_ASSERT(!gateWait2Ms.poll(test.ws)); + commit2MsFulfiller->fulfill(); + KJ_ASSERT(gateWait2Ms.poll(test.ws)); + auto fulfiller2Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]); + test.pollAndExpectCalls({}); + + // Fulfill 3ms db commit. Expect 3ms alarm to be scheduled and 3ms gate to be unblocked. + KJ_ASSERT(!gateWait3Ms.poll(test.ws)); + commit3MsFulfiller->fulfill(); + KJ_ASSERT(gateWait3Ms.poll(test.ws)); + auto fulfiller3Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(3ms)"})[0]); + test.pollAndExpectCalls({}); + + // Outstanding alarm scheduling can complete asynchronously. + fulfiller2Ms->fulfill(); + fulfiller3Ms->fulfill(); +} + +KJ_TEST("in-flight later alarm times don't affect subsequent commits") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + // Set alarm to 5ms. Expect 5ms db commit and scheduling to start. + test.setAlarm(fiveMs); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + auto fulfiller5Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(5ms)"})[0]); + + // While 5ms scheduling is still in-flight, set alarm to 2ms. Even though the last-confirmed + // alarm value was 1ms, we expect that setting the alarm to 2ms will be interpreted as setting + // the alarm earlier, so it will issue the schedule request before the commit request. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + auto commit2MsFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]); + + fulfiller5Ms->fulfill(); + commit2MsFulfiller->fulfill(); +} + +KJ_TEST("rejected alarm scheduling request breaks gate") { + ActorSqliteTest test({.monitorOutputGate = false}); + + auto promise = test.gate.onBroken(); + + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->reject( + KJ_EXCEPTION(FAILED, "a_rejected_scheduleRun")); + + KJ_EXPECT_THROW_MESSAGE("a_rejected_scheduleRun", promise.wait(test.ws)); +} + +KJ_TEST("an exception thrown during merged commits does not hang") { + ActorSqliteTest test({.monitorOutputGate = false}); + + auto promise = test.gate.onBroken(); + + // Initialize alarm state to 5ms. + test.setAlarm(fiveMs); + test.pollAndExpectCalls({"scheduleRun(5ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == fiveMs); + + // Update alarm to be earlier (4ms). We expect the alarm scheduling to start. + test.setAlarm(fourMs); + auto fulfiller4Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(4ms)"})[0]); + auto gateWait4ms = test.gate.wait(); + + // While 4ms scheduling request is in-flight, update alarm to be earlier (3ms). We expect + // the two commit requests to merge and be blocked on the alarm scheduling request. + test.setAlarm(threeMs); + test.pollAndExpectCalls({}); + auto gateWait3ms = test.gate.wait(); + + // Reject the 4ms request. We expect both gate waiting promises to unblock with exceptions. + KJ_ASSERT(!gateWait4ms.poll(test.ws)); + KJ_ASSERT(!gateWait3ms.poll(test.ws)); + fulfiller4Ms->reject(KJ_EXCEPTION(FAILED, "a_rejected_scheduleRun")); + KJ_ASSERT(gateWait4ms.poll(test.ws)); + KJ_ASSERT(gateWait3ms.poll(test.ws)); + + KJ_EXPECT_THROW_MESSAGE("a_rejected_scheduleRun", gateWait4ms.wait(test.ws)); + KJ_EXPECT_THROW_MESSAGE("a_rejected_scheduleRun", gateWait3ms.wait(test.ws)); + KJ_EXPECT_THROW_MESSAGE("a_rejected_scheduleRun", promise.wait(test.ws)); +} + +KJ_TEST("getAlarm/setAlarm check for brokenness") { + ActorSqliteTest test({.monitorOutputGate = false}); + + auto promise = test.gate.onBroken(); + + // Break gate + test.put("foo", "bar"); + test.pollAndExpectCalls({"commit"})[0]->reject(KJ_EXCEPTION(FAILED, "a_rejected_commit")); + + KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", promise.wait(test.ws)); + + // Apparently we don't actually set brokenness until the taskFailed handler runs, but presumably + // this is OK? + test.getAlarm(); + + // Ensure taskFailed handler runs and notices brokenness: + test.ws.poll(); + + KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", test.getAlarm()); + KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", test.setAlarm(kj::none)); + test.pollAndExpectCalls({}); +} + +KJ_TEST("calling deleteAll() preserves alarm state if alarm is set") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + ActorCache::DeleteAllResults results = test.actor.deleteAll({}); + KJ_ASSERT(results.backpressure == kj::none); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]); + KJ_ASSERT(results.count.wait(test.ws) == 0); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + commitFulfiller->fulfill(); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("calling deleteAll() preserves alarm state if alarm is not set") { + ActorSqliteTest test; + + // Initialize alarm state to empty value in metadata table. + test.setAlarm(oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + test.setAlarm(kj::none); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + + ActorCache::DeleteAllResults results = test.actor.deleteAll({}); + KJ_ASSERT(results.backpressure == kj::none); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + + auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]); + KJ_ASSERT(results.count.wait(test.ws) == 0); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + + commitFulfiller->fulfill(); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + + // We can also assert that we leave the database empty, in case that turns out to be useful later: + auto q = test.db.run("SELECT name FROM sqlite_master WHERE type='table' AND name='_cf_METADATA'"); + KJ_ASSERT(q.isDone()); +} + +KJ_TEST("calling deleteAll() during an implicit transaction preserves alarm state") { + ActorSqliteTest test; + + // Initialize alarm state to 1ms. + test.setAlarm(oneMs); + + ActorCache::DeleteAllResults results = test.actor.deleteAll({}); + KJ_ASSERT(results.backpressure == kj::none); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + + auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]); + KJ_ASSERT(results.count.wait(test.ws) == 0); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + commitFulfiller->fulfill(); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("rolling back transaction leaves alarm in expected state") { + ActorSqliteTest test; + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); + + { + auto txn = test.actor.startTransaction(); + KJ_ASSERT(expectSync(txn->getAlarm({})) == twoMs); + txn->setAlarm(oneMs, {}); + KJ_ASSERT(expectSync(txn->getAlarm({})) == oneMs); + // Dropping transaction without committing; should roll back. + } + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); +} + +KJ_TEST("rolling back transaction leaves deferred alarm deletion in expected state") { + ActorSqliteTest test; + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); + + { + auto armResult = test.actor.armAlarmHandler(twoMs, false); + KJ_ASSERT(armResult.is()); + + auto txn = test.actor.startTransaction(); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + test.setAlarm(oneMs); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + txn->rollback().wait(test.ws); + + // After rollback, getAlarm() still returns the deferred deletion result. + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + + // After rollback, no changes committed, no change in scheduled alarm. + test.pollAndExpectCalls({}); + } + + // After handler, 2ms alarm is deleted. + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill(); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); +} + +KJ_TEST("committing transaction leaves deferred alarm deletion in expected state") { + ActorSqliteTest test; + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); + + { + auto armResult = test.actor.armAlarmHandler(twoMs, false); + KJ_ASSERT(armResult.is()); + + auto txn = test.actor.startTransaction(); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + test.setAlarm(oneMs); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + txn->commit(); + + // After commit, getAlarm() returns the committed value. + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + } + + // Alarm not deleted + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); +} + +KJ_TEST("rolling back nested transaction leaves deferred alarm deletion in expected state") { + ActorSqliteTest test; + + // Initialize alarm state to 2ms. + test.setAlarm(twoMs); + test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill(); + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({}); + KJ_ASSERT(expectSync(test.getAlarm()) == twoMs); + + { + auto armResult = test.actor.armAlarmHandler(twoMs, false); + KJ_ASSERT(armResult.is()); + + auto txn1 = test.actor.startTransaction(); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + { + // Rolling back nested transaction change leaves deferred deletion in place. + auto txn2 = test.actor.startTransaction(); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + test.setAlarm(oneMs); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + txn2->rollback().wait(test.ws); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + } + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + { + // Committing nested transaction changes parent transaction state to dirty. + auto txn3 = test.actor.startTransaction(); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + test.setAlarm(oneMs); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + txn3->commit(); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + } + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + { + // Nested transaction of dirty transaction is dirty, rollback has no effect. + auto txn4 = test.actor.startTransaction(); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + txn4->rollback().wait(test.ws); + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + } + KJ_ASSERT(expectSync(test.getAlarm()) == oneMs); + txn1->rollback().wait(test.ws); + + // After root transaction rollback, getAlarm() still returns the deferred deletion result. + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); + + // After rollback, no changes committed, no change in scheduled alarm. + test.pollAndExpectCalls({}); + } + + // After handler, 2ms alarm is deleted. + test.pollAndExpectCalls({"commit"})[0]->fulfill(); + test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill(); + KJ_ASSERT(expectSync(test.getAlarm()) == kj::none); +} + +} // namespace +} // namespace workerd diff --git a/src/workerd/io/actor-sqlite.c++ b/src/workerd/io/actor-sqlite.c++ index f5de8fb57c0..f3fd3d39d1b 100644 --- a/src/workerd/io/actor-sqlite.c++ +++ b/src/workerd/io/actor-sqlite.c++ @@ -7,11 +7,22 @@ #include "io-gate.h" #include +#include #include namespace workerd { +namespace { + +// Returns true if a given (set or unset) alarm will fire earlier than another. +static bool willFireEarlier(kj::Maybe alarm1, kj::Maybe alarm2) { + // Intuitively, an unset alarm is effectively indistinguishable from an alarm set at infinity. + return alarm1.orDefault(kj::maxValue) < alarm2.orDefault(kj::maxValue); +} + +} // namespace + ActorSqlite::ActorSqlite(kj::Own dbParam, OutputGate& outputGate, kj::Function()> commitCallback, @@ -21,8 +32,16 @@ ActorSqlite::ActorSqlite(kj::Own dbParam, commitCallback(kj::mv(commitCallback)), hooks(hooks), kv(*db), + metadata(*db), commitTasks(*this) { db->onWrite(KJ_BIND_METHOD(*this, onWrite)); + lastConfirmedAlarmDbState = metadata.getAlarm(); + + // Because we preserve an invariant that scheduled alarms are always at or earlier than + // persisted db alarm state, it should be OK to populate our idea of the latest scheduled alarm + // using the current db alarm state. At worst, it may perform one unnecessary scheduling + // request in cases where a previous alarm-state-altering transaction failed. + alarmScheduledNoLaterThan = metadata.getAlarm(); } ActorSqlite::ImplicitTxn::ImplicitTxn(ActorSqlite& parent): parent(parent) { @@ -42,6 +61,7 @@ ActorSqlite::ImplicitTxn::~ImplicitTxn() noexcept(false) { // This should only happen in cases of catastrophic error. Since this is rarely actually // executed, we don't prepare a statement for it. parent.db->run("ROLLBACK TRANSACTION"); + parent.metadata.invalidate(); } } @@ -62,6 +82,7 @@ void ActorSqlite::ImplicitTxn::rollback() { // bother. parent.db->run("ROLLBACK TRANSACTION"); committed = true; + parent.metadata.invalidate(); } } @@ -81,6 +102,7 @@ ActorSqlite::ExplicitTxn::ExplicitTxn(ActorSqlite& actorSqlite): actorSqlite(act parent = kj::addRef(*exp); exp->hasChild = true; depth = exp->depth + 1; + alarmDirty = exp->alarmDirty; } } actorSqlite.currentTxn = this; @@ -112,18 +134,41 @@ ActorSqlite::ExplicitTxn::~ExplicitTxn() noexcept(false) { } } +bool ActorSqlite::ExplicitTxn::getAlarmDirty() { + return alarmDirty; +} + +void ActorSqlite::ExplicitTxn::setAlarmDirty() { + alarmDirty = true; +} + kj::Maybe> ActorSqlite::ExplicitTxn::commit() { KJ_REQUIRE(!hasChild, "critical sections should have prevented committing transaction while " "nested txn is outstanding"); + // Start the schedule request before root transaction commit(), for correctness in workerd. + kj::Maybe precommitAlarmState; + if (parent == kj::none) { + precommitAlarmState = actorSqlite.startPrecommitAlarmScheduling(); + } + actorSqlite.db->run(SqliteDatabase::TRUSTED, kj::str("RELEASE _cf_savepoint_", depth)); committed = true; - if (parent == kj::none) { + KJ_IF_SOME(p, parent) { + if (alarmDirty) { + p->alarmDirty = true; + } + } else { + if (alarmDirty) { + actorSqlite.haveDeferredDelete = false; + } + // We committed the root transaction, so it's time to signal any replication layer and lock // the output gate in the meantime. - actorSqlite.commitTasks.add(actorSqlite.outputGate.lockWhile(actorSqlite.commitCallback())); + actorSqlite.commitTasks.add(actorSqlite.outputGate.lockWhile( + actorSqlite.commitImpl(kj::mv(KJ_ASSERT_NONNULL(precommitAlarmState))))); } // No backpressure for SQLite. @@ -143,6 +188,12 @@ kj::Promise ActorSqlite::ExplicitTxn::rollback() { void ActorSqlite::ExplicitTxn::rollbackImpl() noexcept(false) { actorSqlite.db->run(SqliteDatabase::TRUSTED, kj::str("ROLLBACK TO _cf_savepoint_", depth)); actorSqlite.db->run(SqliteDatabase::TRUSTED, kj::str("RELEASE _cf_savepoint_", depth)); + actorSqlite.metadata.invalidate(); + KJ_IF_SOME(p, parent) { + alarmDirty = p->alarmDirty; + } else { + alarmDirty = false; + } } void ActorSqlite::onWrite() { @@ -154,6 +205,9 @@ void ActorSqlite::onWrite() { // Don't commit if shutdown() has been called. requireNotBroken(); + // Start the schedule request before commit(), for correctness in workerd. + auto precommitAlarmState = startPrecommitAlarmScheduling(); + try { txn->commit(); } catch (...) { @@ -171,11 +225,104 @@ void ActorSqlite::onWrite() { // rather than after the callback. { auto drop = kj::mv(txn); } - return commitCallback(); + return commitImpl(kj::mv(precommitAlarmState)); }))); } } +kj::Promise ActorSqlite::requestScheduledAlarm(kj::Maybe requestedTime) { + // Not using coroutines here, because it's important for correctness in workerd that a + // synchronously thrown exception in scheduleRun() can escape synchronously to the caller. + + bool movingAlarmLater = willFireEarlier(alarmScheduledNoLaterThan, requestedTime); + if (movingAlarmLater) { + // Since we are setting the alarm to be later, we can update alarmScheduledNoLaterThan + // immediately and still preserve the invariant that the scheduled alarm time is equal to or + // earlier than the persisted db alarm value. Doing the immediate update ensures that + // subsequent invocations of commitImpl() will compare against the correct value in their + // precommit alarm checks, even if other later-setting requests are still in-flight, without + // needing to wait for them to complete. + alarmScheduledNoLaterThan = requestedTime; + } + + return hooks.scheduleRun(requestedTime).then([this, movingAlarmLater, requestedTime]() { + if (!movingAlarmLater) { + alarmScheduledNoLaterThan = requestedTime; + } + }); +} + +ActorSqlite::PrecommitAlarmState ActorSqlite::startPrecommitAlarmScheduling() { + PrecommitAlarmState state; + if (pendingCommit == kj::none && + willFireEarlier(metadata.getAlarm(), alarmScheduledNoLaterThan)) { + // Basically, this is the first scheduling request that commitImpl() would make prior to + // commitCallback(). We start the request separately, ahead of calling sqlite functions that + // commit to local disk, for correctness in workerd, where alarm scheduling and db commits are + // both synchronous. + state.schedulingPromise = requestScheduledAlarm(metadata.getAlarm()); + } + return kj::mv(state); +} + +kj::Promise ActorSqlite::commitImpl(ActorSqlite::PrecommitAlarmState precommitAlarmState) { + // We assume that exceptions thrown during commit will propagate to the caller, such that they + // will ensure cancelDeferredAlarmDeletion() is called, if necessary. + + KJ_IF_SOME(pending, pendingCommit) { + // If an earlier commitImpl() invocation is already in the process of updating precommit + // alarms but has not yet made the commitCallback() call, it should be OK to wait on it to + // perform the precommit alarm update and db commit for this invocation, too. + co_await pending.addBranch(); + co_return; + } + + // There are no pending commits in-flight, so we set up a forked promise that other callers can + // wait on, to perform the alarm scheduling and database persistence work for all of them. Note + // that the fulfiller is owned by this coroutine context, so if an exception is thrown below, + // the fulfiller's destructor will detect that the stack is unwinding and will automatically + // propagate the thrown exception to the other waiters. + auto [promise, fulfiller] = kj::newPromiseAndFulfiller(); + pendingCommit = promise.fork(); + + // Wait for the first precommit alarm scheduling request to complete, if any. This was set up + // in startPrecommitAlarmScheduling() and is essentially the first iteration of the below + // while() loop, but needed to be initiated synchronously before the local database commit to + // ensure correctness in workerd. + KJ_IF_SOME(p, precommitAlarmState.schedulingPromise) { + co_await p; + } + + // While the local db state requires an earlier alarm than is known might be scheduled, issue an + // alarm update request for the earlier time and wait for it to complete. This helps ensure + // that the successfully scheduled alarm time is always earlier or equal to the alarm state in + // the successfully persisted db. + while (willFireEarlier(metadata.getAlarm(), alarmScheduledNoLaterThan)) { + co_await requestScheduledAlarm(metadata.getAlarm()); + } + + // Issue the commitCallback() request to persist the db state, then synchronously clear the + // pending commit so that the next commitImpl() invocation starts its own set of precommit alarm + // updates and db commit. + auto alarmStateForCommit = metadata.getAlarm(); + auto commitCallbackPromise = commitCallback(); + pendingCommit = kj::none; + + // Wait for the db to persist. + co_await commitCallbackPromise; + lastConfirmedAlarmDbState = alarmStateForCommit; + + // Notify any merged commitImpl() requests that the db persistence completed. + fulfiller->fulfill(); + + // If the db state is now later than the in-flight scheduled alarms, issue a request to update + // it to match the db state. We don't need to hold open the output gate, so we add the + // scheduling request to commitTasks. + if (willFireEarlier(alarmScheduledNoLaterThan, alarmStateForCommit)) { + commitTasks.add(requestScheduledAlarm(alarmStateForCommit)); + } +} + void ActorSqlite::taskFailed(kj::Exception&& exception) { // The output gate should already have been broken since it wraps all commits tasks. So, we // don't have to report anything here, the exception will already propagate elsewhere. We @@ -191,6 +338,19 @@ void ActorSqlite::requireNotBroken() { } } +void ActorSqlite::maybeDeleteDeferredAlarm() { + if (!inAlarmHandler) { + // Pretty sure this can't happen. + LOG_WARNING_ONCE("expected to be in alarm handler when trying to delete alarm"); + } + inAlarmHandler = false; + + if (haveDeferredDelete) { + metadata.setAlarm(kj::none); + haveDeferredDelete = false; + } +} + // ======================================================================================= // ActorCacheInterface implementation @@ -220,7 +380,19 @@ kj::OneOf, kj::Promise>> ActorSqlite::ge ReadOptions options) { requireNotBroken(); - return hooks.getAlarm(); + bool transactionAlarmDirty = false; + KJ_IF_SOME(exp, currentTxn.tryGet()) { + transactionAlarmDirty = exp->getAlarmDirty(); + } + + if (haveDeferredDelete && !transactionAlarmDirty) { + // If an alarm handler is currently running, and a new alarm time has not been set yet, We + // need to return that there is no alarm. + return kj::Maybe(kj::none); + } else { + return metadata.getAlarm(); + } + KJ_UNREACHABLE; } kj::OneOf> ActorSqlite:: @@ -285,7 +457,18 @@ kj::Maybe> ActorSqlite::setAlarm( kj::Maybe newAlarmTime, WriteOptions options) { requireNotBroken(); - return hooks.setAlarm(newAlarmTime); + // TODO(someday): When deleting alarm data in an otherwise empty database, clear the database to + // free up resources? + + metadata.setAlarm(newAlarmTime); + + KJ_IF_SOME(exp, currentTxn.tryGet()) { + exp->setAlarmDirty(); + } else { + haveDeferredDelete = false; + } + + return kj::none; } kj::Own ActorSqlite::startTransaction() { @@ -297,6 +480,11 @@ kj::Own ActorSqlite::startTransaction() { ActorCacheInterface::DeleteAllResults ActorSqlite::deleteAll(WriteOptions options) { requireNotBroken(); + // kv.deleteAll() clears the database, so we need to save and possibly restore alarm state in + // the metadata table, to try to match the behavior of ActorCache, which preserves the set alarm + // when running deleteAll(). + auto localAlarmState = metadata.getAlarm(); + // deleteAll() cannot be part of a transaction because it deletes the database altogether. So, // we have to close our transactions or fail. KJ_SWITCH_ONEOF(currentTxn) { @@ -325,8 +513,9 @@ ActorCacheInterface::DeleteAllResults ActorSqlite::deleteAll(WriteOptions option } } - if (!deleteAllCommitScheduled) { - // We'll want to make sure the commit callback is called for the deleteAll(). + if (localAlarmState == kj::none && !deleteAllCommitScheduled) { + // If we're not going to perform a write to restore alarm state, we'll want to make sure the + // commit callback is called for the deleteAll(). commitTasks.add(outputGate.lockWhile(kj::evalLater([this]() mutable -> kj::Promise { // Don't commit if shutdown() has been called. requireNotBroken(); @@ -338,6 +527,18 @@ ActorCacheInterface::DeleteAllResults ActorSqlite::deleteAll(WriteOptions option } uint count = kv.deleteAll(); + + // TODO(correctness): Since workerd doesn't have a separate durability step, in the unlikely + // event of a failure here, between deleteAll() and setAlarm(), we could theoretically lose the + // current alarm state when running under workerd. Not sure if there's a practical way to avoid + // this. + + // Reset alarm state, if necessary. If no alarm is set, OK to just leave metadata table + // uninitialized. + if (localAlarmState != kj::none) { + metadata.setAlarm(localAlarmState); + } + return { .backpressure = kj::none, .count = count, @@ -384,12 +585,59 @@ void ActorSqlite::shutdown(kj::Maybe maybeException) { } } -kj::Maybe> ActorSqlite::armAlarmHandler(kj::Date scheduledTime, bool noCache) { - return hooks.armAlarmHandler(scheduledTime, noCache); +kj::OneOf ActorSqlite:: + armAlarmHandler(kj::Date scheduledTime, bool noCache) { + KJ_ASSERT(!inAlarmHandler); + + if (haveDeferredDelete) { + // Unlikely to happen, unless caller is starting new alarm handler before previous alarm + // handler cleanup has completed. + LOG_WARNING_ONCE("expected previous alarm handler to be cleaned up"); + } + + auto localAlarmState = metadata.getAlarm(); + if (localAlarmState != scheduledTime) { + if (localAlarmState == lastConfirmedAlarmDbState) { + // If there's a clean db time that differs from the requested handler's scheduled time, this + // run should be canceled. + if (willFireEarlier(scheduledTime, localAlarmState)) { + // If the handler's scheduled time is earlier than the clean scheduled time, we may be + // recovering from a failed db commit or scheduling request, so we need to request that + // the alarm be rescheduled for the current db time, and tell the caller to wait for + // successful rescheduling before cancelling the current handler invocation. + // + // TODO(perf): If we already have such a rescheduling request in-flight, might want to + // coalesce with the existing request? + if (localAlarmState == kj::none) { + // If clean scheduled time is unset, don't need to reschedule; just cancel the alarm. + return CancelAlarmHandler{.waitBeforeCancel = kj::READY_NOW}; + } else { + return CancelAlarmHandler{.waitBeforeCancel = requestScheduledAlarm(localAlarmState)}; + } + } else { + return CancelAlarmHandler{.waitBeforeCancel = kj::READY_NOW}; + } + } else { + // There's a alarm write that hasn't been set yet pending for a time different than ours -- + // We won't cancel the alarm because it hasn't been confirmed, but we shouldn't delete + // the pending write. + haveDeferredDelete = false; + } + } else { + haveDeferredDelete = true; + } + inAlarmHandler = true; + + static const DeferredAlarmDeleter disposer; + return RunAlarmHandler{.deferredDelete = kj::Own(this, disposer)}; } void ActorSqlite::cancelDeferredAlarmDeletion() { - hooks.cancelDeferredAlarmDeletion(); + if (!inAlarmHandler) { + // Pretty sure this can't happen. + LOG_WARNING_ONCE("expected to be in alarm handler when trying to cancel deleted alarm"); + } + haveDeferredDelete = false; } kj::Maybe> ActorSqlite::onNoPendingFlush() { @@ -404,22 +652,10 @@ kj::Maybe> ActorSqlite::onNoPendingFlush() { const ActorSqlite::Hooks ActorSqlite::Hooks::DEFAULT = ActorSqlite::Hooks{}; -kj::Maybe> ActorSqlite::Hooks::armAlarmHandler(kj::Date scheduledTime, bool noCache) { - JSG_FAIL_REQUIRE(Error, "alarms are not yet implemented for SQLite-backed Durable Objects"); -} - -void ActorSqlite::Hooks::cancelDeferredAlarmDeletion() { +kj::Promise ActorSqlite::Hooks::scheduleRun(kj::Maybe newAlarmTime) { JSG_FAIL_REQUIRE(Error, "alarms are not yet implemented for SQLite-backed Durable Objects"); } -kj::Promise> ActorSqlite::Hooks::getAlarm() { - JSG_FAIL_REQUIRE(Error, "getAlarm() is not yet implemented for SQLite-backed Durable Objects"); -} - -kj::Promise ActorSqlite::Hooks::setAlarm(kj::Maybe) { - JSG_FAIL_REQUIRE(Error, "setAlarm() is not yet implemented for SQLite-backed Durable Objects"); -} - kj::OneOf, kj::Promise>> ActorSqlite::ExplicitTxn::get(Key key, ReadOptions options) { return actorSqlite.get(kj::mv(key), options); diff --git a/src/workerd/io/actor-sqlite.h b/src/workerd/io/actor-sqlite.h index d9fe0bf621a..0e838f6d9c4 100644 --- a/src/workerd/io/actor-sqlite.h +++ b/src/workerd/io/actor-sqlite.h @@ -7,6 +7,7 @@ #include "actor-cache.h" #include +#include namespace workerd { @@ -23,10 +24,9 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH // for alarm operations. class Hooks { public: - virtual kj::Promise> getAlarm(); - virtual kj::Promise setAlarm(kj::Maybe newAlarmTime); - virtual kj::Maybe> armAlarmHandler(kj::Date scheduledTime, bool noCache); - virtual void cancelDeferredAlarmDeletion(); + // Makes a request to the alarm manager to run the alarm handler at the given time, returning + // a promise that resolves when the scheduling has succeeded. + virtual kj::Promise scheduleRun(kj::Maybe newAlarmTime); static const Hooks DEFAULT; }; @@ -76,7 +76,8 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH DeleteAllResults deleteAll(WriteOptions options) override; kj::Maybe> evictStale(kj::Date now) override; void shutdown(kj::Maybe maybeException) override; - kj::Maybe> armAlarmHandler(kj::Date scheduledTime, bool noCache = false) override; + kj::OneOf armAlarmHandler( + kj::Date scheduledTime, bool noCache = false) override; void cancelDeferredAlarmDeletion() override; kj::Maybe> onNoPendingFlush() override; // See ActorCacheInterface @@ -87,6 +88,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH kj::Function()> commitCallback; Hooks& hooks; SqliteKv kv; + SqliteMetadata metadata; SqliteDatabase::Statement beginTxn = db->prepare("BEGIN TRANSACTION"); SqliteDatabase::Statement commitTxn = db->prepare("COMMIT TRANSACTION"); @@ -116,6 +118,9 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH ~ExplicitTxn() noexcept(false); KJ_DISALLOW_COPY_AND_MOVE(ExplicitTxn); + bool getAlarmDirty(); + void setAlarmDirty(); + kj::Maybe> commit() override; kj::Promise rollback() override; // Implements ActorCacheInterface::Transaction. @@ -144,6 +149,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH uint depth = 0; bool hasChild = false; bool committed = false; + bool alarmDirty = false; void rollbackImpl(); }; @@ -161,13 +167,67 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH // If true, then a commit is scheduled as a result of deleteAll() having been called. bool deleteAllCommitScheduled = false; + // Backs the `kj::Own` returned by `armAlarmHandler()`. + class DeferredAlarmDeleter: public kj::Disposer { + public: + // The `Own` returned by `armAlarmHandler()` is actually set up to point to the + // `ActorSqlite` itself, but with an alternate disposer that deletes the alarm rather than + // the whole object. + void disposeImpl(void* pointer) const { + reinterpret_cast(pointer)->maybeDeleteDeferredAlarm(); + } + }; + + // We need to track some additional alarm state to guarantee at-least-once alarm delivery: + // Within an alarm handler, we want the observable alarm state to look like the running alarm + // was deleted at the start of the handler (when armAlarmHandler() is called), but we don't + // actually want to persist that deletion until after the handler has successfully completed. + bool haveDeferredDelete = false; + + // Some state only used for tracking calling invariants. + bool inAlarmHandler = false; + + // The alarm state for which we last received confirmation that the db was durably stored. + kj::Maybe lastConfirmedAlarmDbState; + + // The latest time we'd expect a scheduled alarm to fire, given the current set of in-flight + // scheduling requests, without yet knowing if any of them succeeded or failed. We use this + // value to maintain the invariant that the scheduled alarm is always equal to or earlier than + // the alarm value in the persisted database state. + kj::Maybe alarmScheduledNoLaterThan; + + // A promise for an in-progress alarm notification update and database commit. + kj::Maybe> pendingCommit; + kj::TaskSet commitTasks; void onWrite(); + // Issues a request to the alarm scheduler for the given time, returning a promise that resolves + // when the request is confirmed. + kj::Promise requestScheduledAlarm(kj::Maybe requestedTime); + + struct PrecommitAlarmState { + // Promise for the completion of precommit alarm scheduling + kj::Maybe> schedulingPromise; + }; + + // To be called just before committing the local sqlite db, to synchronously start any necessary + // alarm scheduling: + PrecommitAlarmState startPrecommitAlarmScheduling(); + + // Performs the rest of the asynchronous commit, to be waited on after committing the local + // sqlite db. Should be called in the same turn of the event loop as + // startPrecommitAlarmScheduling() and passed the state that it returned. + kj::Promise commitImpl(PrecommitAlarmState precommitAlarmState); + void taskFailed(kj::Exception&& exception) override; void requireNotBroken(); + + // Called when DeferredAlarmDeleter is destroyed, to delete alarm if not reset or cancelled + // during handler. + void maybeDeleteDeferredAlarm(); }; } // namespace workerd diff --git a/src/workerd/server/alarm-scheduler.c++ b/src/workerd/server/alarm-scheduler.c++ index 3ef9f52e143..3c3c0b09b43 100644 --- a/src/workerd/server/alarm-scheduler.c++ +++ b/src/workerd/server/alarm-scheduler.c++ @@ -81,6 +81,8 @@ void AlarmScheduler::registerNamespace(kj::StringPtr uniqueKey, GetActorFn getAc } kj::Maybe AlarmScheduler::getAlarm(ActorKey actor) { + // TODO(someday): Might be able to simplify AlarmScheduler somewhat, now that ActorSqlite no + // longer relies on it for getAlarm()? KJ_IF_SOME(alarm, alarms.find(actor)) { if (alarm.status == AlarmStatus::STARTED) { // getAlarm() when the alarm handler is running should return null, diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index f89ac196db5..780693c0e79 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1867,11 +1867,7 @@ public: : alarmScheduler(alarmScheduler), actor(actor) {} - kj::Promise> getAlarm() override { - return alarmScheduler.getAlarm(actor); - } - - kj::Promise setAlarm(kj::Maybe newAlarmTime) override { + kj::Promise scheduleRun(kj::Maybe newAlarmTime) override { KJ_IF_SOME(scheduledTime, newAlarmTime) { alarmScheduler.setAlarm(actor, scheduledTime); } else { @@ -1880,17 +1876,6 @@ public: return kj::READY_NOW; } - // No-op -- armAlarmHandler() is normally used to schedule a delete after the alarm runs. - // But since alarm read/write operations happen on the same thread as the scheduler in - // workerd, we can just handle the delete in the scheduler instead. - kj::Maybe> armAlarmHandler(kj::Date, bool) override { - // We return this weird kj::Own to `this` since just doing kj::Own() creates an - // empty maybe. - return kj::Own(this, kj::NullDisposer::instance); - } - - void cancelDeferredAlarmDeletion() override {} - private: AlarmScheduler& alarmScheduler; ActorKey actor; diff --git a/src/workerd/util/BUILD.bazel b/src/workerd/util/BUILD.bazel index f74b9bea299..384936d42ef 100644 --- a/src/workerd/util/BUILD.bazel +++ b/src/workerd/util/BUILD.bazel @@ -87,10 +87,12 @@ wd_cc_library( srcs = [ "sqlite.c++", "sqlite-kv.c++", + "sqlite-metadata.c++", ], hdrs = [ "sqlite.h", "sqlite-kv.h", + "sqlite-metadata.h", ], implementation_deps = [ "@sqlite3", @@ -203,6 +205,13 @@ kj_test( ], ) +kj_test( + src = "sqlite-metadata-test.c++", + deps = [ + ":sqlite", + ], +) + kj_test( src = "test-test.c++", deps = [ diff --git a/src/workerd/util/sqlite-metadata-test.c++ b/src/workerd/util/sqlite-metadata-test.c++ new file mode 100644 index 00000000000..dddd9164b5f --- /dev/null +++ b/src/workerd/util/sqlite-metadata-test.c++ @@ -0,0 +1,63 @@ +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "sqlite-metadata.h" + +#include + +namespace workerd { +namespace { + +KJ_TEST("SQLite-METADATA") { + auto dir = kj::newInMemoryDirectory(kj::nullClock()); + SqliteDatabase::Vfs vfs(*dir); + SqliteDatabase db(vfs, kj::Path({"foo"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + SqliteMetadata metadata(db); + + // Initial state has empty alarm + KJ_EXPECT(metadata.getAlarm() == kj::none); + + // Can set alarm to an explicit time + constexpr kj::Date anAlarmTime1 = + kj::UNIX_EPOCH + 1734099316 * kj::SECONDS + 987654321 * kj::NANOSECONDS; + metadata.setAlarm(anAlarmTime1); + + // Can get the set alarm time + KJ_EXPECT(metadata.getAlarm() == anAlarmTime1); + + // Can overwrite the alarm time + constexpr kj::Date anAlarmTime2 = anAlarmTime1 + 1 * kj::NANOSECONDS; + metadata.setAlarm(anAlarmTime2); + KJ_EXPECT(metadata.getAlarm() != anAlarmTime1); + KJ_EXPECT(metadata.getAlarm() == anAlarmTime2); + + // Can clear alarm + metadata.setAlarm(kj::none); + KJ_EXPECT(metadata.getAlarm() == kj::none); + + // Zero alarm is distinct from unset (probably not important, but just checking) + metadata.setAlarm(kj::UNIX_EPOCH); + KJ_EXPECT(metadata.getAlarm() == kj::UNIX_EPOCH); + + // Can recreate table after resetting database + metadata.setAlarm(anAlarmTime1); + KJ_EXPECT(metadata.getAlarm() == anAlarmTime1); + db.reset(); + KJ_EXPECT(metadata.getAlarm() == kj::none); + metadata.setAlarm(anAlarmTime2); + KJ_EXPECT(KJ_ASSERT_NONNULL(metadata.getAlarm()) == anAlarmTime2); + + // Can invalidate cache after rolling back. + metadata.setAlarm(anAlarmTime2); + db.run("BEGIN TRANSACTION"); + metadata.setAlarm(anAlarmTime1); + KJ_EXPECT(metadata.getAlarm() == anAlarmTime1); + db.run("ROLLBACK TRANSACTION"); + KJ_EXPECT(metadata.getAlarm() == anAlarmTime1); + metadata.invalidate(); + KJ_EXPECT(metadata.getAlarm() == anAlarmTime2); +} + +} // namespace +} // namespace workerd diff --git a/src/workerd/util/sqlite-metadata.c++ b/src/workerd/util/sqlite-metadata.c++ new file mode 100644 index 00000000000..95264b52198 --- /dev/null +++ b/src/workerd/util/sqlite-metadata.c++ @@ -0,0 +1,88 @@ +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "sqlite-metadata.h" + +#include + +namespace workerd { + +SqliteMetadata::SqliteMetadata(SqliteDatabase& db): ResetListener(db) { + auto q = db.run("SELECT name FROM sqlite_master WHERE type='table' AND name='_cf_METADATA'"); + tableCreated = !q.isDone(); +} + +kj::Maybe SqliteMetadata::getAlarm() { + if (cacheState == kj::none) { + cacheState = Cache{.alarmTime = getAlarmUncached()}; + } + return KJ_ASSERT_NONNULL(cacheState).alarmTime; +} + +void SqliteMetadata::setAlarm(kj::Maybe currentTime) { + KJ_IF_SOME(c, cacheState) { + if (c.alarmTime == currentTime) { + return; + } + } + setAlarmUncached(currentTime); + cacheState = Cache{.alarmTime = currentTime}; +} + +void SqliteMetadata::invalidate() { + cacheState = kj::none; +} + +kj::Maybe SqliteMetadata::getAlarmUncached() { + if (!tableCreated) { + return kj::none; + } + + auto query = ensureInitialized().stmtGetAlarm.run(); + if (query.isDone() || query.isNull(0)) { + return kj::none; + } else { + return kj::UNIX_EPOCH + query.getInt64(0) * kj::NANOSECONDS; + } +} + +void SqliteMetadata::setAlarmUncached(kj::Maybe currentTime) { + KJ_IF_SOME(t, currentTime) { + ensureInitialized().stmtSetAlarm.run((t - kj::UNIX_EPOCH) / kj::NANOSECONDS); + } else { + // Our getter code also allows representing an empty alarm value as a + // missing row or table, but a null-value row seems efficient and simple. + ensureInitialized().stmtSetAlarm.run(nullptr); + } +} + +SqliteMetadata::Initialized& SqliteMetadata::ensureInitialized() { + if (!tableCreated) { + db.run(R"( + CREATE TABLE IF NOT EXISTS _cf_METADATA ( + key INTEGER PRIMARY KEY, + value BLOB + ); + )"); + tableCreated = true; + } + + KJ_SWITCH_ONEOF(dbState) { + KJ_CASE_ONEOF(uninitialized, Uninitialized) { + return dbState.init(db); + } + KJ_CASE_ONEOF(initialized, Initialized) { + return initialized; + } + } + KJ_UNREACHABLE; +} + +void SqliteMetadata::beforeSqliteReset() { + // We'll need to recreate the table on the next operation. + tableCreated = false; + cacheState = kj::none; +} + +} // namespace workerd diff --git a/src/workerd/util/sqlite-metadata.h b/src/workerd/util/sqlite-metadata.h new file mode 100644 index 00000000000..e9fd73385eb --- /dev/null +++ b/src/workerd/util/sqlite-metadata.h @@ -0,0 +1,65 @@ +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include "sqlite.h" + +namespace workerd { + +// Class which implements a simple metadata kv storage and cache on top of SQLite. Currently only +// used to store Durable Object alarm times (hardcoded as key = 1), but could later be used for +// other properties. +// +// The table is named `_cf_METADATA`. The naming is designed so that if the application is allowed to +// perform direct SQL queries, we can block it from accessing any table prefixed with `_cf_`. +class SqliteMetadata final: private SqliteDatabase::ResetListener { +public: + explicit SqliteMetadata(SqliteDatabase& db); + + // Return currently set alarm time, or none. + kj::Maybe getAlarm(); + + // Sets current alarm time, or none. + void setAlarm(kj::Maybe currentTime); + + // Invalidates cached values. Needs to be called when rolling back db state. + void invalidate(); + +private: + struct Uninitialized {}; + struct Initialized { + SqliteDatabase& db; + + SqliteDatabase::Statement stmtGetAlarm = db.prepare(R"( + SELECT value FROM _cf_METADATA WHERE key = 1 + )"); + SqliteDatabase::Statement stmtSetAlarm = db.prepare(R"( + INSERT INTO _cf_METADATA VALUES(1, ?) + ON CONFLICT DO UPDATE SET value = excluded.value; + )"); + + Initialized(SqliteDatabase& db): db(db) {} + }; + + bool tableCreated; + kj::OneOf dbState = Uninitialized{}; + + struct Cache { + kj::Maybe alarmTime; + }; + kj::Maybe cacheState; + + kj::Maybe getAlarmUncached(); + void setAlarmUncached(kj::Maybe currentTime); + + Initialized& ensureInitialized(); + // Make sure the metadata table is created and prepared statements are ready. Not called until the + // first write. + + // ResetListener interface: + void beforeSqliteReset() override; +}; + +} // namespace workerd