-
Notifications
You must be signed in to change notification settings - Fork 290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQ-498: Add support for delivery delay in Queues #1704
MQ-498: Add support for delivery delay in Queues #1704
Conversation
CLA Assistant Lite bot All contributors have signed the CLA ✍️ ✅ |
bdc678b
to
887498e
Compare
recheck |
I have read the CLA Document and I hereby sign the CLA |
recheck |
@a-robinson and I will take a look |
src/workerd/api/queue.h
Outdated
@@ -32,30 +32,48 @@ class WorkerQueue: public jsg::Object { | |||
// contentType determines the serialization format of the message. | |||
jsg::Optional<kj::String> contentType; | |||
|
|||
JSG_STRUCT(contentType); | |||
// The number of seconds to delay the delivery of the message being sent. | |||
jsg::Optional<uint> delaySecs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jasnell for thoughts on field naming and units -- is there something better to call this field other than delaySecs
, assuming the implementation is only actually precise to the nearest second?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine. I would probably just cut it down to delay
and document it as being seconds. Or, avoid the abbreviation and make it delaySeconds
src/workerd/api/queue.h
Outdated
JSG_STRUCT_TS_OVERRIDE(QueueSendOptions { | ||
contentType?: QueueContentType; | ||
delaySecs ?: number; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't imagine the override is needed here given that we aren't change the type at all. contentType
only needed an override due to wanting to advertise that it only accepts the QueueContentType
type
But you should be able to confirm by building the types locally using the instructions in https://github.com/cloudflare/workerd/tree/main/types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you manage to get this building to see how the resulting types look? My machine unfortunately complains about a dependency issue when I try, otherwise I'd do it for you.
I'm fine skipping it for now and just validating them after merge if needed, so long as you do come back and make sure the published types look ok afterwards.
@@ -64,7 +82,9 @@ class WorkerQueue: public jsg::Object { | |||
JSG_TS_ROOT(); | |||
JSG_TS_OVERRIDE(Queue<Body = unknown> { | |||
send(message: Body, options?: QueueSendOptions): Promise<void>; | |||
sendBatch(messages: Iterable<MessageSendRequest<Body>>): Promise<void>; | |||
sendBatch(messages | |||
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's going on with the whitespace in here? Have you managed to get make format
working?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no make format
in this project. In the edgeworker
repo, make format
uses clang-format
with a custom code style; when I apply it to this file, I get the same result (same line breaking and space before ?
).
src/workerd/api/queue.c++
Outdated
serialized = serialize(js, body, type, SerializeArrayBufferBehavior::DEEP_COPY); | ||
} else { | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always keep else
on the same line as the prior closing brace, so I'd suggest getting your auto-formatter (make format
) working properly :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-format
is moving the else to the next line (I guess this is b/c the if is in a macro), I think my auto-formatter is working properly (I even tried formatting this with the edgeworker
style file), I might be missing something here. Anyway, I updated this line manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, that's weird. I don't know much about our clang-format setup since it arose after I did most of my runtime work, but here's what I know:
make format
in edgeworker doesn't appear to touch files in workerd- When I change a line in edgeworker to move the
else
to the following line like this, runningmake format
moves it back to the same line - The style guide we follow is pretty clear about the intended format here and all the code I can recall reading in the runtime code follows that style
I don't know much about our clang-format config for workerd, though. Maybe @jasnell has a pointer here, but no worries if you don't.
src/workerd/api/queue.h
Outdated
struct MessageSendRequest { | ||
jsg::JsRef<jsg::JsValue> body; | ||
|
||
// contentType determines the serialization format of the message. | ||
jsg::Optional<kj::String> contentType; | ||
|
||
JSG_STRUCT(body, contentType); | ||
// The number of seconds to delay the delivery of the message being sent. | ||
jsg::Optional<uint> delaySecs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even want to support setting the delay on each message in a batch individually vs just on an entire batch? I thought the discussion on the PRD was that we were only going to support it on the entire batch. I'll tag you in the thread.
src/workerd/api/queue.c++
Outdated
@@ -294,6 +329,12 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js, jsg::Sequence<MessageSen | |||
headers.add("CF-Queue-Largest-Msg"_kj, kj::str(largestMessage)); | |||
headers.set(kj::HttpHeaderId::CONTENT_TYPE, MimeType::JSON.toString()); | |||
|
|||
KJ_IF_SOME(opts, options) { | |||
KJ_IF_SOME(secs, opts.delaySecs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Are we ever validating that this is beneath the max delay?
- Pending the result of the question around whether we want to support this option both on individual messages within a batch and within a batch, what happens if the user specifies a delay for the entire batch and for messages within the batch? Is that meant to be allowed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
I wasn't sure about these validations on the client-side, since (1) we already have them on the broker side and (2) it will be more complicated to change the delay boundaries in the future if necessary. I added the validation here anyway.
-
The current implementation on the broker side allows for that - message delay will override batch delay and, in this case, messages won't be stored "contiguously" but under different delivery timestamp prefixes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I'm fine with just doing the validation in the queue service rather than here. Feel free to rip it out of here. It just didn't look right to do some validation here but to miss some cases.
887498e
to
ea0db56
Compare
nice job, LGTM |
075d543
to
2d985cb
Compare
This patch updates the queues api so that a delay value can be specified when sending messages to a queue. The delay is sent as an header in the request and also as the message payload in case of a batch.
2d985cb
to
eb25886
Compare
contentType = validatedType; | ||
} | ||
KJ_IF_SOME(secs, opts.delaySeconds) { | ||
headers.add(HDR_MSG_DELAY, kj::str(secs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since delaySeconds
is defined as an int
, what happens in a negative value is provided?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We agreed on doing that validation on the server side. I had it as a uint
before and the error that was thrown was not super descriptive and user-friendly, that's why I changed it to an int
.
This patch updates the queues api so that a delay value can be specified when sending messages to a queue. The delay is sent as an header in the request and also as the message payload in case of a batch.