Skip to content

Commit

Permalink
Log debug information when queue events time out due to lingering wai…
Browse files Browse the repository at this point in the history
…tUntil tasks
  • Loading branch information
jbwcloudflare authored and a-robinson committed Feb 26, 2024
1 parent eed8ed9 commit 5f07bb7
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
35 changes: 34 additions & 1 deletion src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,12 @@ jsg::Ref<QueueEvent> startQueueEvent(
jsg::alloc<QueueController>(event.addRef()),
jsg::JsValue(h.env.getHandle(js)).addRef(js),
h.getCtx());
event->waitUntil(kj::mv(promise));
event->waitUntil(promise.then(
[event=event.addRef()]() mutable { event->setCompletionStatus(QueueEvent::CompletedSuccessfully{}); },
[event=event.addRef()](kj::Exception&& e) mutable {
event->setCompletionStatus(QueueEvent::CompletedWithError{ kj::cp(e) });
return kj::mv(e);
}));
} else {
lock.logWarningOnce(
"Received a QueueEvent but we lack a handler for QueueEvents. "
Expand All @@ -468,6 +473,7 @@ jsg::Ref<QueueEvent> startQueueEvent(
JSG_FAIL_REQUIRE(Error, "No event listener registered for queue messages.");
}
globalEventTarget.dispatchEventImpl(lock, event.addRef());
event->setCompletionStatus(QueueEvent::CompletedSuccessfully{});
}

return event.addRef();
Expand Down Expand Up @@ -523,6 +529,33 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
// reuse the scheduled worker logic and timeout for now.
auto completed = co_await incomingRequest->finishScheduled();

// Log some debug info if the request did not complete fully:
// In particular, detect whether or not the users queue() handler function completed
// and include info about other waitUntil tasks that may have caused the request to timeout.
if (!completed) {
kj::String status;
if (queueEventHolder->event.get() == nullptr) {
status = kj::str("Empty");
} else {
KJ_SWITCH_ONEOF(queueEventHolder->event->getCompletionStatus()) {
KJ_CASE_ONEOF(i, QueueEvent::Incomplete) {
status = kj::str("Incomplete");
break;
}
KJ_CASE_ONEOF(s, QueueEvent::CompletedSuccessfully) {
status = kj::str("Completed Succesfully");
break;
}
KJ_CASE_ONEOF(e, QueueEvent::CompletedWithError) {
status = kj::str("Completed with error:", e.error);
break;
}
}
}
auto tasks = incomingRequest->getContext().getWaitUntilTasks().trace();
KJ_LOG(WARNING, "NOSENTRY queue event timed out", status, tasks);
}

co_return WorkerInterface::CustomEvent::Result {
.outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU,
};
Expand Down
16 changes: 16 additions & 0 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,28 @@ class QueueEvent final: public ExtendableEvent {
tracker.trackFieldWithSize("IoPtr<QueueEventResult>", sizeof(IoPtr<QueueEventResult>));
}

struct Incomplete {};
struct CompletedSuccessfully {};
struct CompletedWithError {
kj::Exception error;
};
typedef kj::OneOf<Incomplete, CompletedSuccessfully, CompletedWithError> CompletionStatus;

void setCompletionStatus(CompletionStatus status) {
completionStatus = status;
}

CompletionStatus getCompletionStatus() const {
return completionStatus;
}

private:
// TODO(perf): Should we store these in a v8 array directly rather than this intermediate kj
// array to avoid one intermediate copy?
kj::Array<jsg::Ref<QueueMessage>> messages;
kj::String queueName;
IoPtr<QueueEventResult> result;
CompletionStatus completionStatus = Incomplete{};

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visitAll(messages);
Expand Down

0 comments on commit 5f07bb7

Please sign in to comment.