Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQ-498: Add support for delivery delay in Queues #1704

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
namespace workerd::api {

namespace {

// Header for the message format.
static constexpr kj::StringPtr HDR_MSG_FORMAT = "X-Msg-Fmt"_kj;

// Header for the message delivery delay.
static constexpr kj::StringPtr HDR_MSG_DELAY = "X-Msg-Delay-Secs"_kj;

kj::StringPtr validateContentType(kj::StringPtr contentType) {
auto lowerCase = toLower(contentType);
if (lowerCase == IncomingQueueMessage::ContentType::TEXT) {
Expand Down Expand Up @@ -127,9 +134,10 @@ Serialized serialize(jsg::Lock& js,
}
}

struct SerializedWithContentType {
struct SerializedWithOptions {
Serialized body;
kj::Maybe<kj::StringPtr> contentType;
kj::Maybe<int> delaySeconds;
};

jsg::JsValue deserialize(jsg::Lock& js,
Expand Down Expand Up @@ -179,19 +187,23 @@ kj::Promise<void> WorkerQueue::send(jsg::Lock& js,

JSG_REQUIRE(!body.isUndefined(), TypeError, "Message body cannot be undefined");

auto headers = kj::HttpHeaders(context.getHeaderTable());
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::OCTET_STREAM.toString());

kj::Maybe<kj::StringPtr> contentType;
KJ_IF_SOME(opts, options) {
KJ_IF_SOME(type, opts.contentType) {
contentType = validateContentType(type);
auto validatedType = validateContentType(type);
headers.add(HDR_MSG_FORMAT, validatedType);
contentType = validatedType;
}
KJ_IF_SOME(secs, opts.delaySeconds) {
headers.add(HDR_MSG_DELAY, kj::str(secs));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since delaySeconds is defined as an int, what happens in a negative value is provided?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed on doing that validation on the server side. I had it as a uint before and the error that was thrown was not super descriptive and user-friendly, that's why I changed it to an int.

}
}

auto headers = kj::HttpHeaders(context.getHeaderTable());
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::OCTET_STREAM.toString());

Serialized serialized;
KJ_IF_SOME(type, contentType) {
headers.add("X-Msg-Fmt", type);
serialized = serialize(js, body, type, SerializeArrayBufferBehavior::DEEP_COPY);
} else {
// TODO(cleanup) send message format header (v8) by default
Expand Down Expand Up @@ -223,21 +235,26 @@ kj::Promise<void> WorkerQueue::send(jsg::Lock& js,
.attach(context.registerPendingEvent());
};

kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSendRequest> batch) {
kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSendRequest> batch,
jsg::Optional<SendBatchOptions> options) {
auto& context = IoContext::current();

JSG_REQUIRE(batch.size() > 0, TypeError, "sendBatch() requires at least one message");

size_t totalSize = 0;
size_t largestMessage = 0;
auto messageCount = batch.size();
auto builder = kj::heapArrayBuilder<SerializedWithContentType>(messageCount);
auto builder = kj::heapArrayBuilder<SerializedWithOptions>(messageCount);
for (auto& message: batch) {
auto body = message.body.getHandle(js);
JSG_REQUIRE(!body.isUndefined(), TypeError,
"Message body cannot be undefined");

SerializedWithContentType item;
SerializedWithOptions item;
KJ_IF_SOME(secs, message.delaySeconds) {
item.delaySeconds = secs;
}

KJ_IF_SOME(contentType, message.contentType) {
item.contentType = validateContentType(contentType);
item.body = serialize(js, body, contentType,
Expand All @@ -255,7 +272,7 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSen
// Construct the request body by concatenating the messages together into a JSON message.
// Done manually to minimize copies, although it'd be nice to make this safer.
// (totalSize + 2) / 3 * 4 is equivalent to ceil(totalSize / 3) * 4 for base64 encoding overhead.
auto estimatedSize = (totalSize + 2) / 3 * 4 + messageCount * 32 + 32;
auto estimatedSize = (totalSize + 2) / 3 * 4 + messageCount * 64 + 32;
kj::Vector<char> bodyBuilder(estimatedSize);
bodyBuilder.addAll("{\"messages\":["_kj);
for (size_t i = 0; i < messageCount; ++i) {
Expand All @@ -272,6 +289,11 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSen
bodyBuilder.add('"');
}

KJ_IF_SOME(delaySecs, serializedBodies[i].delaySeconds) {
bodyBuilder.addAll(",\"delaySecs\": "_kj);
sesteves marked this conversation as resolved.
Show resolved Hide resolved
bodyBuilder.addAll(kj::str(delaySecs));
}

bodyBuilder.addAll("}"_kj);
if (i < messageCount - 1) {
bodyBuilder.add(',');
Expand All @@ -294,6 +316,12 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSen
headers.add("CF-Queue-Largest-Msg"_kj, kj::str(largestMessage));
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::JSON.toString());

KJ_IF_SOME(opts, options) {
KJ_IF_SOME(secs, opts.delaySeconds) {
headers.add(HDR_MSG_DELAY, kj::str(secs));
}
}

// The stage that we're sending a subrequest to provides a base URL that includes a scheme, the
// queue broker's domain, and the start of the URL path including the account ID and queue ID. All
// we have to do is provide the end of the path (which is "/batch") to send a message batch.
Expand Down
33 changes: 25 additions & 8 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,20 @@ class WorkerQueue: public jsg::Object {
// contentType determines the serialization format of the message.
jsg::Optional<kj::String> contentType;

JSG_STRUCT(contentType);
JSG_STRUCT_TS_OVERRIDE(QueueSendOptions {
contentType?: QueueContentType;
});
// The number of seconds to delay the delivery of the message being sent.
jsg::Optional<int> delaySeconds;

JSG_STRUCT(contentType, delaySeconds);
JSG_STRUCT_TS_OVERRIDE(QueueSendOptions { contentType?: QueueContentType; });
// NOTE: Any new fields added here should also be added to MessageSendRequest below.
};

struct SendBatchOptions {
// The number of seconds to delay the delivery of the message being sent.
jsg::Optional<int> delaySeconds;

JSG_STRUCT(delaySeconds);
JSG_STRUCT_TS_OVERRIDE(QueueSendBatchOptions { delaySeconds ?: number; });
// NOTE: Any new fields added here should also be added to MessageSendRequest below.
};

Expand All @@ -45,17 +55,21 @@ class WorkerQueue: public jsg::Object {
// contentType determines the serialization format of the message.
jsg::Optional<kj::String> contentType;

JSG_STRUCT(body, contentType);
// The number of seconds to delay the delivery of the message being sent.
jsg::Optional<int> delaySeconds;

JSG_STRUCT(body, contentType, delaySeconds);
JSG_STRUCT_TS_OVERRIDE(MessageSendRequest<Body = unknown> {
body: Body;
contentType?: QueueContentType;
contentType?: QueueContentType;
});
// NOTE: Any new fields added to SendOptions must also be added here.
};

kj::Promise<void> send(jsg::Lock& js, jsg::JsValue body, jsg::Optional<SendOptions> options);

kj::Promise<void> sendBatch(jsg::Lock& js, jsg::Sequence<MessageSendRequest> batch);
kj::Promise<void> sendBatch(jsg::Lock& js, jsg::Sequence<MessageSendRequest> batch,
jsg::Optional<SendBatchOptions> options);

JSG_RESOURCE_TYPE(WorkerQueue) {
JSG_METHOD(send);
Expand All @@ -64,7 +78,9 @@ class WorkerQueue: public jsg::Object {
JSG_TS_ROOT();
JSG_TS_OVERRIDE(Queue<Body = unknown> {
send(message: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages: Iterable<MessageSendRequest<Body>>): Promise<void>;
sendBatch(messages
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on with the whitespace in here? Have you managed to get make format working?

Copy link
Contributor Author

@sesteves sesteves Feb 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no make format in this project. In the edgeworker repo, make format uses clang-format with a custom code style; when I apply it to this file, I get the same result (same line breaking and space before ?).

: Promise<void>;
});
JSG_TS_DEFINE(type QueueContentType = "text" | "bytes" | "json" | "v8");
}
Expand Down Expand Up @@ -288,6 +304,7 @@ class QueueCustomEventImpl final: public WorkerInterface::CustomEvent, public kj
#define EW_QUEUE_ISOLATE_TYPES \
api::WorkerQueue, \
api::WorkerQueue::SendOptions, \
api::WorkerQueue::SendBatchOptions, \
api::WorkerQueue::MessageSendRequest, \
api::IncomingQueueMessage, \
api::QueueResponse, \
Expand Down
Loading