From fa0e7bf74b406a241888c87f8327af917a6d7d0d Mon Sep 17 00:00:00 2001 From: Sergio Esteves Date: Wed, 21 Feb 2024 11:15:41 +0000 Subject: [PATCH] MQ-498: Add support for delivery delay in Queues This patch updates the queues api so that a delay value can be specified when sending messages to a queue. The delay is sent as an header in the request and also as the message payload in case of a batch. --- src/workerd/api/queue.c++ | 48 +++++++++++++++++++++++++++++++-------- src/workerd/api/queue.h | 33 ++++++++++++++++++++------- 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index b2bf5dfb335..9aa4c9b5e9c 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -15,6 +15,13 @@ namespace workerd::api { namespace { + +// Header for the message format. +static constexpr kj::StringPtr HDR_MSG_FORMAT = "X-Msg-Fmt"_kj; + +// Header for the message delivery delay. +static constexpr kj::StringPtr HDR_MSG_DELAY = "X-Msg-Delay-Secs"_kj; + kj::StringPtr validateContentType(kj::StringPtr contentType) { auto lowerCase = toLower(contentType); if (lowerCase == IncomingQueueMessage::ContentType::TEXT) { @@ -127,9 +134,10 @@ Serialized serialize(jsg::Lock& js, } } -struct SerializedWithContentType { +struct SerializedWithOptions { Serialized body; kj::Maybe contentType; + kj::Maybe delaySeconds; }; jsg::JsValue deserialize(jsg::Lock& js, @@ -179,19 +187,23 @@ kj::Promise WorkerQueue::send(jsg::Lock& js, JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined"); + auto headers = kj::HttpHeaders(context.getHeaderTable()); + headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::OCTET_STREAM.toString()); + kj::Maybe contentType; KJ_IF_SOME(opts, options) { KJ_IF_SOME(type, opts.contentType) { - contentType = validateContentType(type); + auto validatedType = validateContentType(type); + headers.add(HDR_MSG_FORMAT, validatedType); + contentType = validatedType; + } + KJ_IF_SOME(secs, opts.delaySeconds) { + headers.add(HDR_MSG_DELAY, kj::str(secs)); } } - auto headers = kj::HttpHeaders(context.getHeaderTable()); - headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::OCTET_STREAM.toString()); - Serialized serialized; KJ_IF_SOME(type, contentType) { - headers.add("X-Msg-Fmt", type); serialized = serialize(js, body, type, SerializeArrayBufferBehavior::DEEP_COPY); } else { // TODO(cleanup) send message format header (v8) by default @@ -223,7 +235,8 @@ kj::Promise WorkerQueue::send(jsg::Lock& js, .attach(context.registerPendingEvent()); }; -kj::Promise WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence batch) { +kj::Promise WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence batch, + jsg::Optional options) { auto& context = IoContext::current(); JSG_REQUIRE(batch.size() > 0, TypeError, "sendBatch() requires at least one message"); @@ -231,13 +244,17 @@ kj::Promise WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence(messageCount); + auto builder = kj::heapArrayBuilder(messageCount); for (auto& message: batch) { auto body = message.body.getHandle(js); JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined"); - SerializedWithContentType item; + SerializedWithOptions item; + KJ_IF_SOME(secs, message.delaySeconds) { + item.delaySeconds = secs; + } + KJ_IF_SOME(contentType, message.contentType) { item.contentType = validateContentType(contentType); item.body = serialize(js, body, contentType, @@ -255,7 +272,7 @@ kj::Promise WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence bodyBuilder(estimatedSize); bodyBuilder.addAll("{\"messages\":["_kj); for (size_t i = 0; i < messageCount; ++i) { @@ -272,6 +289,11 @@ kj::Promise WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence contentType; - JSG_STRUCT(contentType); - JSG_STRUCT_TS_OVERRIDE(QueueSendOptions { - contentType?: QueueContentType; - }); + // The number of seconds to delay the delivery of the message being sent. + jsg::Optional delaySeconds; + + JSG_STRUCT(contentType, delaySeconds); + JSG_STRUCT_TS_OVERRIDE(QueueSendOptions { contentType?: QueueContentType; }); + // NOTE: Any new fields added here should also be added to MessageSendRequest below. + }; + + struct SendBatchOptions { + // The number of seconds to delay the delivery of the message being sent. + jsg::Optional delaySeconds; + + JSG_STRUCT(delaySeconds); + JSG_STRUCT_TS_OVERRIDE(QueueSendBatchOptions { delaySeconds ?: number; }); // NOTE: Any new fields added here should also be added to MessageSendRequest below. }; @@ -45,17 +55,21 @@ class WorkerQueue: public jsg::Object { // contentType determines the serialization format of the message. jsg::Optional contentType; - JSG_STRUCT(body, contentType); + // The number of seconds to delay the delivery of the message being sent. + jsg::Optional delaySeconds; + + JSG_STRUCT(body, contentType, delaySeconds); JSG_STRUCT_TS_OVERRIDE(MessageSendRequest { body: Body; - contentType?: QueueContentType; + contentType?: QueueContentType; }); // NOTE: Any new fields added to SendOptions must also be added here. }; kj::Promise send(jsg::Lock& js, jsg::JsValue body, jsg::Optional options); - kj::Promise sendBatch(jsg::Lock& js, jsg::Sequence batch); + kj::Promise sendBatch(jsg::Lock& js, jsg::Sequence batch, + jsg::Optional options); JSG_RESOURCE_TYPE(WorkerQueue) { JSG_METHOD(send); @@ -64,7 +78,9 @@ class WorkerQueue: public jsg::Object { JSG_TS_ROOT(); JSG_TS_OVERRIDE(Queue { send(message: Body, options?: QueueSendOptions): Promise; - sendBatch(messages: Iterable>): Promise; + sendBatch(messages + : Iterable>, options ?: QueueSendBatchOptions) + : Promise; }); JSG_TS_DEFINE(type QueueContentType = "text" | "bytes" | "json" | "v8"); } @@ -304,6 +320,7 @@ class QueueCustomEventImpl final: public WorkerInterface::CustomEvent, public kj #define EW_QUEUE_ISOLATE_TYPES \ api::WorkerQueue, \ api::WorkerQueue::SendOptions, \ + api::WorkerQueue::SendBatchOptions, \ api::WorkerQueue::MessageSendRequest, \ api::IncomingQueueMessage, \ api::QueueResponse, \