Skip to content

Commit

Permalink
Merge pull request #2648 from cloudflare/jlee/srs-alarms
Browse files Browse the repository at this point in the history
Store alarm value in sqlite database, for sqlite-backed DOs
  • Loading branch information
jclee authored Sep 23, 2024
2 parents b3d98e0 + 4f9e6aa commit e1eb9b6
Show file tree
Hide file tree
Showing 16 changed files with 1,897 additions and 195 deletions.
40 changes: 22 additions & 18 deletions src/workerd/api/actor-alarms-delete-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ export class DurableObjectExample {

async waitForAlarm(scheduledTime) {
let self = this;
let prom = new Promise((resolve) => {
self.resolve = resolve;
});
const { promise, resolve, reject } = Promise.withResolvers();
self.resolve = resolve;
self.reject = reject;

try {
await prom;
await promise;
if (Date.now() < scheduledTime.valueOf()) {
throw new Error(
`Date.now() is before scheduledTime! ${Date.now()} vs ${scheduledTime.valueOf()}`
Expand All @@ -38,22 +38,26 @@ export class DurableObjectExample {
}

async alarm() {
this.state.alarmsTriggered++;
let time = await this.state.storage.getAlarm();
if (time) {
throw new Error(`time not null inside alarm handler ${time}`);
}
// Deleting an alarm inside `alarm()` will not have any effect, unless there's another queued alarm
// already.
await this.state.storage.deleteAlarm();
try {
this.state.alarmsTriggered++;
let time = await this.state.storage.getAlarm();
if (time !== null) {
throw new Error(`time not null inside alarm handler ${time}`);
}
// Deleting an alarm inside `alarm()` will not have any effect, unless there's another queued alarm
// already.
await this.state.storage.deleteAlarm();

// On the other hand, if we have an alarm queued, it will be deleted. If this is working properly,
// we'll only have one alarm triggered.
await this.state.storage.setAlarm(Date.now() + 50);
await this.state.storage.deleteAlarm();
// On the other hand, if we have an alarm queued, it will be deleted. If this is working properly,
// we'll only have one alarm triggered.
await this.state.storage.setAlarm(Date.now() + 50);
await this.state.storage.deleteAlarm();

// All done inside `alarm()`.
this.resolve();
// All done inside `alarm()`.
this.resolve();
} catch (e) {
this.reject(e);
}
}

async fetch() {
Expand Down
20 changes: 12 additions & 8 deletions src/workerd/api/actor-alarms-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ export class DurableObjectExample {

async waitForAlarm(scheduledTime) {
let self = this;
let prom = new Promise((resolve) => {
self.resolve = resolve;
});
const { promise, resolve, reject } = Promise.withResolvers();
self.resolve = resolve;
self.reject = reject;

try {
await prom;
await promise;
if (Date.now() < scheduledTime.valueOf()) {
throw new Error(
`Date.now() is before scheduledTime! ${Date.now()} vs ${scheduledTime.valueOf()}`
Expand Down Expand Up @@ -45,11 +45,15 @@ export class DurableObjectExample {
}

async alarm() {
let time = await this.state.storage.getAlarm();
if (time) {
throw new Error(`time not null inside alarm handler ${time}`);
try {
let time = await this.state.storage.getAlarm();
if (time !== null) {
throw new Error(`time not null inside alarm handler ${time}`);
}
this.resolve();
} catch (e) {
this.reject(e);
}
this.resolve();
}
}

Expand Down
225 changes: 115 additions & 110 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -385,112 +385,73 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
auto& context = IoContext::current();
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
auto maybeDeferredDelete = persistent.armAlarmHandler(scheduledTime);

KJ_IF_SOME(deferredDelete, maybeDeferredDelete) {
auto& handler = KJ_REQUIRE_NONNULL(exportedHandler);
if (handler.alarm == kj::none) {
KJ_SWITCH_ONEOF(persistent.armAlarmHandler(scheduledTime)) {
KJ_CASE_ONEOF(armResult, ActorCacheInterface::RunAlarmHandler) {
auto& handler = KJ_REQUIRE_NONNULL(exportedHandler);
if (handler.alarm == kj::none) {

lock.logWarningOnce("Attempted to run a scheduled alarm without a handler, "
"did you remember to export an alarm() function?");
return WorkerInterface::AlarmResult{
.retry = false, .outcome = EventOutcome::SCRIPT_NOT_FOUND};
}
lock.logWarningOnce("Attempted to run a scheduled alarm without a handler, "
"did you remember to export an alarm() function?");
return WorkerInterface::AlarmResult{
.retry = false, .outcome = EventOutcome::SCRIPT_NOT_FOUND};
}

auto& alarm = KJ_ASSERT_NONNULL(handler.alarm);

return context
.run([exportedHandler, &context, timeout, retryCount, &alarm,
maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)](
Worker::Lock& lock) mutable -> kj::Promise<WorkerInterface::AlarmResult> {
jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext);
// We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise
// completes we want to cancel the alarm handler. If the alarm handler promise completes first
// timeout will be canceled.
auto timeoutPromise = context.afterLimitTimeout(timeout).then(
[&context]() -> kj::Promise<WorkerInterface::AlarmResult> {
// We don't want to delete the alarm since we have not successfully completed the alarm
// execution.
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();

LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time");
// Report alarm handler failure and log it.
auto e = KJ_EXCEPTION(OVERLOADED,
"broken.dropped; worker_do_not_log; jsg.Error: Alarm exceeded its allowed execution time");
context.getMetrics().reportFailure(e);

// We don't want the handler to keep running after timeout.
context.abort(kj::mv(e));
// We want timed out alarms to be treated as user errors. As such, we'll mark them as
// retriable, and we'll count the retries against the alarm retries limit. This will ensure
// that the handler will attempt to run for a number of times before giving up and deleting
// the alarm.
return WorkerInterface::AlarmResult{
.retry = true, .retryCountsAgainstLimit = true, .outcome = EventOutcome::EXCEEDED_CPU};
});

auto& alarm = KJ_ASSERT_NONNULL(handler.alarm);

return context
.run([exportedHandler, &context, timeout, retryCount, &alarm,
maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)](
Worker::Lock& lock) mutable -> kj::Promise<WorkerInterface::AlarmResult> {
jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext);
// We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise
// completes we want to cancel the alarm handler. If the alarm handler promise completes first
// timeout will be canceled.
auto timeoutPromise = context.afterLimitTimeout(timeout).then(
[&context]() -> kj::Promise<WorkerInterface::AlarmResult> {
// We don't want to delete the alarm since we have not successfully completed the alarm
// execution.
return alarm(lock, jsg::alloc<AlarmInvocationInfo>(retryCount))
.then([]() -> kj::Promise<WorkerInterface::AlarmResult> {
return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::OK};
}).exclusiveJoin(kj::mv(timeoutPromise));
})
.catch_([&context, deferredDelete = kj::mv(armResult.deferredDelete)](
kj::Exception&& e) mutable {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();

LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time");
// Report alarm handler failure and log it.
auto e = KJ_EXCEPTION(OVERLOADED,
"broken.dropped; worker_do_not_log; jsg.Error: Alarm exceeded its allowed execution time");
context.getMetrics().reportFailure(e);

// We don't want the handler to keep running after timeout.
context.abort(kj::mv(e));
// We want timed out alarms to be treated as user errors. As such, we'll mark them as
// retriable, and we'll count the retries against the alarm retries limit. This will ensure
// that the handler will attempt to run for a number of times before giving up and deleting
// the alarm.
return WorkerInterface::AlarmResult{
.retry = true, .retryCountsAgainstLimit = true, .outcome = EventOutcome::EXCEEDED_CPU};
});

return alarm(lock, jsg::alloc<AlarmInvocationInfo>(retryCount))
.then([]() -> kj::Promise<WorkerInterface::AlarmResult> {
return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::OK};
}).exclusiveJoin(kj::mv(timeoutPromise));
})
.catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();
// This will include the error in inspector/tracers and log to syslog if internal.
context.logUncaughtExceptionAsync(UncaughtExceptionSource::ALARM_HANDLER, kj::mv(e));

context.getMetrics().reportFailure(e);

// This will include the error in inspector/tracers and log to syslog if internal.
context.logUncaughtExceptionAsync(UncaughtExceptionSource::ALARM_HANDLER, kj::mv(e));

EventOutcome outcome = EventOutcome::EXCEPTION;
KJ_IF_SOME(status, context.getLimitEnforcer().getLimitsExceeded()) {
outcome = status;
}

kj::String actorId;
KJ_SWITCH_ONEOF(actor.getId()) {
KJ_CASE_ONEOF(f, kj::Own<ActorIdFactory::ActorId>) {
actorId = f->toString();
EventOutcome outcome = EventOutcome::EXCEPTION;
KJ_IF_SOME(status, context.getLimitEnforcer().getLimitsExceeded()) {
outcome = status;
}
KJ_CASE_ONEOF(s, kj::String) {
actorId = kj::str(s);
}
}

// We only want to retry against limits if it's a user error. By default let's check if the
// output gate is broken.
auto shouldRetryCountsAgainstLimits = !context.isOutputGateBroken();

// We want to alert if we aren't going to count this alarm retry against limits
if (auto desc = e.getDescription(); !jsg::isTunneledException(desc) &&
!jsg::isDoNotLogException(desc) && context.isOutputGateBroken()) {
LOG_NOSENTRY(ERROR, "output lock broke during alarm execution", actorId, e);
} else if (context.isOutputGateBroken()) {
// We don't usually log these messages, but it's useful to know the real reason we failed
// to correctly investigate stuck alarms.
LOG_NOSENTRY(ERROR,
"output lock broke during alarm execution without an interesting error description",
actorId, e);
if (e.getDetail(jsg::EXCEPTION_IS_USER_ERROR) != kj::none) {
// The handler failed because the user overloaded the object. It's their fault, we'll not
// retry forever.
shouldRetryCountsAgainstLimits = true;
}
}
return WorkerInterface::AlarmResult{.retry = true,
.retryCountsAgainstLimit = shouldRetryCountsAgainstLimits,
.outcome = outcome};
})
.then(
[&context](
WorkerInterface::AlarmResult result) -> kj::Promise<WorkerInterface::AlarmResult> {
return context.waitForOutputLocks().then(
[result]() { return kj::mv(result); }, [&context](kj::Exception&& e) {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
kj::String actorId;
KJ_SWITCH_ONEOF(actor.getId()) {
KJ_CASE_ONEOF(f, kj::Own<ActorIdFactory::ActorId>) {
Expand All @@ -500,21 +461,20 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
actorId = kj::str(s);
}
}
// We only want to retry against limits if it's a user error. By default let's assume it's our
// fault.
auto shouldRetryCountsAgainstLimits = false;
if (auto desc = e.getDescription();
!jsg::isTunneledException(desc) && !jsg::isDoNotLogException(desc)) {
if (isInterestingException(e)) {
LOG_EXCEPTION("alarmOutputLock"_kj, e);
} else {
LOG_NOSENTRY(ERROR, "output lock broke after executing alarm", actorId, e);
}
} else {

// We only want to retry against limits if it's a user error. By default let's check if the
// output gate is broken.
auto shouldRetryCountsAgainstLimits = !context.isOutputGateBroken();

// We want to alert if we aren't going to count this alarm retry against limits
if (auto desc = e.getDescription(); !jsg::isTunneledException(desc) &&
!jsg::isDoNotLogException(desc) && context.isOutputGateBroken()) {
LOG_NOSENTRY(ERROR, "output lock broke during alarm execution", actorId, e);
} else if (context.isOutputGateBroken()) {
// We don't usually log these messages, but it's useful to know the real reason we failed
// to correctly investigate stuck alarms.
LOG_NOSENTRY(ERROR,
"output lock broke after executing alarm without an interesting error description",
"output lock broke during alarm execution without an interesting error description",
actorId, e);
if (e.getDetail(jsg::EXCEPTION_IS_USER_ERROR) != kj::none) {
// The handler failed because the user overloaded the object. It's their fault, we'll not
Expand All @@ -524,12 +484,57 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
}
return WorkerInterface::AlarmResult{.retry = true,
.retryCountsAgainstLimit = shouldRetryCountsAgainstLimits,
.outcome = EventOutcome::EXCEPTION};
.outcome = outcome};
})
.then([&context](WorkerInterface::AlarmResult result)
-> kj::Promise<WorkerInterface::AlarmResult> {
return context.waitForOutputLocks().then(
[result]() { return kj::mv(result); }, [&context](kj::Exception&& e) {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
kj::String actorId;
KJ_SWITCH_ONEOF(actor.getId()) {
KJ_CASE_ONEOF(f, kj::Own<ActorIdFactory::ActorId>) {
actorId = f->toString();
}
KJ_CASE_ONEOF(s, kj::String) {
actorId = kj::str(s);
}
}
// We only want to retry against limits if it's a user error. By default let's assume it's our
// fault.
auto shouldRetryCountsAgainstLimits = false;
if (auto desc = e.getDescription();
!jsg::isTunneledException(desc) && !jsg::isDoNotLogException(desc)) {
if (isInterestingException(e)) {
LOG_EXCEPTION("alarmOutputLock"_kj, e);
} else {
LOG_NOSENTRY(ERROR, "output lock broke after executing alarm", actorId, e);
}
} else {
// We don't usually log these messages, but it's useful to know the real reason we failed
// to correctly investigate stuck alarms.
LOG_NOSENTRY(ERROR,
"output lock broke after executing alarm without an interesting error description",
actorId, e);
if (e.getDetail(jsg::EXCEPTION_IS_USER_ERROR) != kj::none) {
// The handler failed because the user overloaded the object. It's their fault, we'll not
// retry forever.
shouldRetryCountsAgainstLimits = true;
}
}
return WorkerInterface::AlarmResult{.retry = true,
.retryCountsAgainstLimit = shouldRetryCountsAgainstLimits,
.outcome = EventOutcome::EXCEPTION};
});
});
});
} else {
return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::CANCELED};
}
KJ_CASE_ONEOF(armResult, ActorCacheInterface::CancelAlarmHandler) {
return armResult.waitBeforeCancel.then([]() {
return WorkerInterface::AlarmResult{.retry = false, .outcome = EventOutcome::CANCELED};
});
}
}
KJ_UNREACHABLE;
}

jsg::Promise<void> ServiceWorkerGlobalScope::test(
Expand Down
10 changes: 10 additions & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,16 @@ kj_test(
],
)

kj_test(
src = "actor-sqlite-test.c++",
deps = [
":actor",
":io-gate",
"//src/workerd/util:test",
"//src/workerd/util:test-util",
],
)

kj_test(
src = "promise-wrapper-test.c++",
deps = [":io"],
Expand Down
Loading

0 comments on commit e1eb9b6

Please sign in to comment.