Skip to content

Commit

Permalink
MQ-498: Add support for delivery delay in Queues
Browse files Browse the repository at this point in the history
This patch updates the queues api so that a delay value can be specified
when sending messages to a queue. The delay is sent as an header in the
request and also as the message payload in case of a batch.
  • Loading branch information
sesteves committed Feb 26, 2024
1 parent 46d21a8 commit 075d543
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 18 deletions.
48 changes: 38 additions & 10 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
namespace workerd::api {

namespace {

// Header for the message format.
static constexpr kj::StringPtr HDR_MSG_FORMAT = "X-Msg-Fmt"_kj;

// Header for the message delivery delay.
static constexpr kj::StringPtr HDR_MSG_DELAY = "X-Msg-Delay-Secs"_kj;

kj::StringPtr validateContentType(kj::StringPtr contentType) {
auto lowerCase = toLower(contentType);
if (lowerCase == IncomingQueueMessage::ContentType::TEXT) {
Expand Down Expand Up @@ -127,9 +134,10 @@ Serialized serialize(jsg::Lock& js,
}
}

struct SerializedWithContentType {
struct SerializedWithOptions {
Serialized body;
kj::Maybe<kj::StringPtr> contentType;
kj::Maybe<int> delaySeconds;
};

jsg::JsValue deserialize(jsg::Lock& js,
Expand Down Expand Up @@ -179,19 +187,23 @@ kj::Promise<void> WorkerQueue::send(jsg::Lock& js,

JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined");

auto headers = kj::HttpHeaders(context.getHeaderTable());
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::OCTET_STREAM.toString());

kj::Maybe<kj::StringPtr> contentType;
KJ_IF_SOME(opts, options) {
KJ_IF_SOME(type, opts.contentType) {
contentType = validateContentType(type);
auto validatedType = validateContentType(type);
headers.add(HDR_MSG_FORMAT, validatedType);
contentType = validatedType;
}
KJ_IF_SOME(secs, opts.delaySeconds) {
headers.add(HDR_MSG_DELAY, kj::str(secs));
}
}

auto headers = kj::HttpHeaders(context.getHeaderTable());
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::OCTET_STREAM.toString());

Serialized serialized;
KJ_IF_SOME(type, contentType) {
headers.add("X-Msg-Fmt", type);
serialized = serialize(js, body, type, SerializeArrayBufferBehavior::DEEP_COPY);
} else {
// TODO(cleanup) send message format header (v8) by default
Expand Down Expand Up @@ -223,21 +235,26 @@ kj::Promise<void> WorkerQueue::send(jsg::Lock& js,
.attach(context.registerPendingEvent());
};

kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSendRequest> batch) {
kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSendRequest> batch,
jsg::Optional<SendBatchOptions> options) {
auto& context = IoContext::current();

JSG_REQUIRE(batch.size() > 0, TypeError, "sendBatch() requires at least one message");

size_t totalSize = 0;
size_t largestMessage = 0;
auto messageCount = batch.size();
auto builder = kj::heapArrayBuilder<SerializedWithContentType>(messageCount);
auto builder = kj::heapArrayBuilder<SerializedWithOptions>(messageCount);
for (auto& message: batch) {
auto body = message.body.getHandle(js);
JSG_REQUIRE(!body.isUndefined(), TypeError,
"Message body cannot be undefined");

SerializedWithContentType item;
SerializedWithOptions item;
KJ_IF_SOME(secs, message.delaySeconds) {
item.delaySeconds = secs;
}

KJ_IF_SOME(contentType, message.contentType) {
item.contentType = validateContentType(contentType);
item.body = serialize(js, body, contentType,
Expand All @@ -255,7 +272,7 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSen
// Construct the request body by concatenating the messages together into a JSON message.
// Done manually to minimize copies, although it'd be nice to make this safer.
// (totalSize + 2) / 3 * 4 is equivalent to ceil(totalSize / 3) * 4 for base64 encoding overhead.
auto estimatedSize = (totalSize + 2) / 3 * 4 + messageCount * 32 + 32;
auto estimatedSize = (totalSize + 2) / 3 * 4 + messageCount * 64 + 32;
kj::Vector<char> bodyBuilder(estimatedSize);
bodyBuilder.addAll("{\"messages\":["_kj);
for (size_t i = 0; i < messageCount; ++i) {
Expand All @@ -272,6 +289,11 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSen
bodyBuilder.add('"');
}

KJ_IF_SOME(delaySecs, serializedBodies[i].delaySeconds) {
bodyBuilder.addAll(",\"delaySecs\": "_kj);
bodyBuilder.addAll(kj::str(delaySecs));
}

bodyBuilder.addAll("}"_kj);
if (i < messageCount - 1) {
bodyBuilder.add(',');
Expand All @@ -294,6 +316,12 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSen
headers.add("CF-Queue-Largest-Msg"_kj, kj::str(largestMessage));
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::JSON.toString());

KJ_IF_SOME(opts, options) {
KJ_IF_SOME(secs, opts.delaySeconds) {
headers.add(HDR_MSG_DELAY, kj::str(secs));
}
}

// The stage that we're sending a subrequest to provides a base URL that includes a scheme, the
// queue broker's domain, and the start of the URL path including the account ID and queue ID. All
// we have to do is provide the end of the path (which is "/batch") to send a message batch.
Expand Down
32 changes: 24 additions & 8 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,19 @@ class WorkerQueue: public jsg::Object {
// contentType determines the serialization format of the message.
jsg::Optional<kj::String> contentType;

JSG_STRUCT(contentType);
JSG_STRUCT_TS_OVERRIDE(QueueSendOptions {
contentType?: QueueContentType;
});
// The number of seconds to delay the delivery of the message being sent.
jsg::Optional<int> delaySeconds;

JSG_STRUCT(contentType, delaySeconds);
JSG_STRUCT_TS_OVERRIDE(QueueSendOptions { contentType?: QueueContentType; });
// NOTE: Any new fields added here should also be added to MessageSendRequest below.
};

struct SendBatchOptions {
// The number of seconds to delay the delivery of the message being sent.
jsg::Optional<int> delaySeconds;

JSG_STRUCT(delaySeconds);
// NOTE: Any new fields added here should also be added to MessageSendRequest below.
};

Expand All @@ -45,17 +54,21 @@ class WorkerQueue: public jsg::Object {
// contentType determines the serialization format of the message.
jsg::Optional<kj::String> contentType;

JSG_STRUCT(body, contentType);
// The number of seconds to delay the delivery of the message being sent.
jsg::Optional<int> delaySeconds;

JSG_STRUCT(body, contentType, delaySeconds);
JSG_STRUCT_TS_OVERRIDE(MessageSendRequest<Body = unknown> {
body: Body;
contentType?: QueueContentType;
contentType?: QueueContentType;
});
// NOTE: Any new fields added to SendOptions must also be added here.
};

kj::Promise<void> send(jsg::Lock& js, jsg::JsValue body, jsg::Optional<SendOptions> options);

kj::Promise<void> sendBatch(jsg::Lock& js, jsg::Sequence<MessageSendRequest> batch);
kj::Promise<void> sendBatch(jsg::Lock& js, jsg::Sequence<MessageSendRequest> batch,
jsg::Optional<SendBatchOptions> options);

JSG_RESOURCE_TYPE(WorkerQueue) {
JSG_METHOD(send);
Expand All @@ -64,7 +77,9 @@ class WorkerQueue: public jsg::Object {
JSG_TS_ROOT();
JSG_TS_OVERRIDE(Queue<Body = unknown> {
send(message: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages: Iterable<MessageSendRequest<Body>>): Promise<void>;
sendBatch(messages
: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions)
: Promise<void>;
});
JSG_TS_DEFINE(type QueueContentType = "text" | "bytes" | "json" | "v8");
}
Expand Down Expand Up @@ -288,6 +303,7 @@ class QueueCustomEventImpl final: public WorkerInterface::CustomEvent, public kj
#define EW_QUEUE_ISOLATE_TYPES \
api::WorkerQueue, \
api::WorkerQueue::SendOptions, \
api::WorkerQueue::SendBatchOptions, \
api::WorkerQueue::MessageSendRequest, \
api::IncomingQueueMessage, \
api::QueueResponse, \
Expand Down

0 comments on commit 075d543

Please sign in to comment.