diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index 38067cd3d3f..b2bf5dfb335 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -453,7 +453,12 @@ jsg::Ref startQueueEvent( jsg::alloc(event.addRef()), jsg::JsValue(h.env.getHandle(js)).addRef(js), h.getCtx()); - event->waitUntil(kj::mv(promise)); + event->waitUntil(promise.then( + [event=event.addRef()]() mutable { event->setCompletionStatus(QueueEvent::CompletedSuccessfully{}); }, + [event=event.addRef()](kj::Exception&& e) mutable { + event->setCompletionStatus(QueueEvent::CompletedWithError{ kj::cp(e) }); + return kj::mv(e); + })); } else { lock.logWarningOnce( "Received a QueueEvent but we lack a handler for QueueEvents. " @@ -468,6 +473,7 @@ jsg::Ref startQueueEvent( JSG_FAIL_REQUIRE(Error, "No event listener registered for queue messages."); } globalEventTarget.dispatchEventImpl(lock, event.addRef()); + event->setCompletionStatus(QueueEvent::CompletedSuccessfully{}); } return event.addRef(); @@ -523,6 +529,33 @@ kj::Promise QueueCustomEventImpl::run( // reuse the scheduled worker logic and timeout for now. auto completed = co_await incomingRequest->finishScheduled(); + // Log some debug info if the request did not complete fully: + // In particular, detect whether or not the users queue() handler function completed + // and include info about other waitUntil tasks that may have caused the request to timeout. + if (!completed) { + kj::String status; + if (queueEventHolder->event.get() == nullptr) { + status = kj::str("Empty"); + } else { + KJ_SWITCH_ONEOF(queueEventHolder->event->getCompletionStatus()) { + KJ_CASE_ONEOF(i, QueueEvent::Incomplete) { + status = kj::str("Incomplete"); + break; + } + KJ_CASE_ONEOF(s, QueueEvent::CompletedSuccessfully) { + status = kj::str("Completed Succesfully"); + break; + } + KJ_CASE_ONEOF(e, QueueEvent::CompletedWithError) { + status = kj::str("Completed with error:", e.error); + break; + } + } + } + auto tasks = incomingRequest->getContext().getWaitUntilTasks().trace(); + KJ_LOG(WARNING, "NOSENTRY queue event timed out", status, tasks); + } + co_return WorkerInterface::CustomEvent::Result { .outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU, }; diff --git a/src/workerd/api/queue.h b/src/workerd/api/queue.h index c62fc581db7..8016b17eb40 100644 --- a/src/workerd/api/queue.h +++ b/src/workerd/api/queue.h @@ -196,12 +196,28 @@ class QueueEvent final: public ExtendableEvent { tracker.trackFieldWithSize("IoPtr", sizeof(IoPtr)); } + struct Incomplete {}; + struct CompletedSuccessfully {}; + struct CompletedWithError { + kj::Exception error; + }; + typedef kj::OneOf CompletionStatus; + + void setCompletionStatus(CompletionStatus status) { + completionStatus = status; + } + + CompletionStatus getCompletionStatus() const { + return completionStatus; + } + private: // TODO(perf): Should we store these in a v8 array directly rather than this intermediate kj // array to avoid one intermediate copy? kj::Array> messages; kj::String queueName; IoPtr result; + CompletionStatus completionStatus = Incomplete{}; void visitForGc(jsg::GcVisitor& visitor) { visitor.visitAll(messages);