Skip to content

Commit

Permalink
MQ-576: Add attempts to QueueMessage
Browse files Browse the repository at this point in the history
This patch updates the queue message capnp schema so that the number of
attempts is included within each message, allowing queue consumers to
implement backoff logic based on that. Note this is a breaking change
and we are making sure that all clients are updated accordingly before
this change gets released.
  • Loading branch information
sesteves committed Mar 21, 2024
1 parent 46edd48 commit 5a4beaa
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 12 deletions.
2 changes: 2 additions & 0 deletions src/workerd/api/http.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2287,12 +2287,14 @@ jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
.id=kj::mv(msg.id),
.timestamp=msg.timestamp,
.body=serializer.release().data,
.attempts=msg.attempts
});
} else KJ_IF_SOME(b, msg.serializedBody) {
encodedMessages.add(IncomingQueueMessage{
.id=kj::mv(msg.id),
.timestamp=msg.timestamp,
.body=kj::mv(b),
.attempts=msg.attempts
});
} else {
JSG_FAIL_REQUIRE(TypeError, "Expected one of body or serializedBody for each message");
Expand Down
4 changes: 3 additions & 1 deletion src/workerd/api/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,13 @@ class Fetcher: public JsRpcClientProvider {
kj::Date timestamp;
jsg::Optional<jsg::Value> body;
jsg::Optional<kj::Array<kj::byte>> serializedBody;
uint16_t attempts;

JSG_STRUCT(id, timestamp, body, serializedBody);
JSG_STRUCT(id, timestamp, body, serializedBody, attempts);
JSG_STRUCT_TS_OVERRIDE(type ServiceBindingQueueMessage<Body = unknown> = {
id: string;
timestamp: Date;
attempts: number;
} & (
| { body: Body }
| { serializedBody: ArrayBuffer | ArrayBufferView }
Expand Down
19 changes: 12 additions & 7 deletions src/workerd/api/queue-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,25 @@ export default {

assert.strictEqual(batch.messages[0].id, "#0");
assert.strictEqual(batch.messages[0].body, "ghi");
assert.strictEqual(batch.messages[0].attempts, 1);

assert.strictEqual(batch.messages[1].id, "#1");
assert.deepStrictEqual(batch.messages[1].body, new Uint8Array([7, 8, 9]));
assert.strictEqual(batch.messages[1].attempts, 2);

assert.strictEqual(batch.messages[2].id, "#2");
assert.deepStrictEqual(batch.messages[2].body, { c: { d: 10 } });
assert.strictEqual(batch.messages[2].attempts, 3);
batch.messages[2].retry();

assert.strictEqual(batch.messages[3].id, "#3");
assert.deepStrictEqual(batch.messages[3].body, batch.messages[3].timestamp);
assert.strictEqual(batch.messages[3].attempts, 4);
batch.messages[3].retry({ delaySeconds: 2 });

assert.strictEqual(batch.messages[4].id, "#4");
assert.deepStrictEqual(batch.messages[4].body, new Map([["key", "value"]]));
assert.strictEqual(batch.messages[4].attempts, 5);

batch.ackAll();
},
Expand All @@ -98,11 +103,11 @@ export default {

const timestamp = new Date();
const response = await env.SERVICE.queue("test-queue", [
{ id: "#0", timestamp, body: "ghi" },
{ id: "#1", timestamp, body: new Uint8Array([7, 8, 9]) },
{ id: "#2", timestamp, body: { c: { d: 10 } } },
{ id: "#3", timestamp, body: timestamp },
{ id: "#4", timestamp, serializedBody },
{ id: "#0", timestamp, body: "ghi", attempts: 1 },
{ id: "#1", timestamp, body: new Uint8Array([7, 8, 9]), attempts: 2 },
{ id: "#2", timestamp, body: { c: { d: 10 } }, attempts: 3 },
{ id: "#3", timestamp, body: timestamp, attempts: 4 },
{ id: "#4", timestamp, serializedBody, attempts: 5 },
]);
assert.strictEqual(response.outcome, "ok");
assert(!response.retryBatch.retry);
Expand All @@ -111,13 +116,13 @@ export default {
assert.deepStrictEqual(response.explicitAcks, []);

await assert.rejects(env.SERVICE.queue("test-queue", [
{ id: "#0", timestamp }
{ id: "#0", timestamp, attempts: 1 }
]), {
name: "TypeError",
message: "Expected one of body or serializedBody for each message"
});
await assert.rejects(env.SERVICE.queue("test-queue", [
{ id: "#0", timestamp, body: "", serializedBody }
{ id: "#0", timestamp, body: "", serializedBody, attempts: 1 }
]), {
name: "TypeError",
message: "Expected one of body or serializedBody for each message"
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ QueueMessage::QueueMessage(jsg::Lock& js,
: id(kj::str(message.getId())),
timestamp(message.getTimestampNs() * kj::NANOSECONDS + kj::UNIX_EPOCH),
body(deserialize(js, message).addRef(js)),
attempts(message.getAttempts()),
result(result) {}
// Note that we must make deep copies of all data here since the incoming Reader may be
// deallocated while JS's GC wrappers still exist.
Expand All @@ -368,6 +369,7 @@ QueueMessage::QueueMessage(
: id(kj::mv(message.id)),
timestamp(message.timestamp),
body(deserialize(js, kj::mv(message.body), message.contentType).addRef(js)),
attempts(message.attempts),
result(result) {}

jsg::JsValue QueueMessage::getBody(jsg::Lock& js) {
Expand Down Expand Up @@ -626,6 +628,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::sendRpc(
KJ_IF_SOME(contentType, p.messages[i].contentType) {
messages[i].setContentType(contentType);
}
messages[i].setAttempts(p.messages[i].attempts);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ struct IncomingQueueMessage {
kj::Date timestamp;
kj::Array<kj::byte> body;
kj::Maybe<kj::String> contentType;
JSG_STRUCT(id, timestamp, body, contentType);
uint16_t attempts;
JSG_STRUCT(id, timestamp, body, contentType, attempts);

struct ContentType {
static constexpr kj::StringPtr TEXT = "text"_kj;
Expand Down Expand Up @@ -158,6 +159,7 @@ class QueueMessage final: public jsg::Object {
kj::StringPtr getId() { return id; }
kj::Date getTimestamp() { return timestamp; }
jsg::JsValue getBody(jsg::Lock& js);
uint16_t getAttempts() { return attempts; };

void retry(jsg::Optional<QueueRetryOptions> options);
void ack();
Expand All @@ -168,6 +170,7 @@ class QueueMessage final: public jsg::Object {
JSG_READONLY_INSTANCE_PROPERTY(id, getId);
JSG_READONLY_INSTANCE_PROPERTY(timestamp, getTimestamp);
JSG_READONLY_INSTANCE_PROPERTY(body, getBody);
JSG_READONLY_INSTANCE_PROPERTY(attempts, getAttempts);
JSG_METHOD(retry);
JSG_METHOD(ack);

Expand All @@ -186,6 +189,7 @@ class QueueMessage final: public jsg::Object {
kj::String id;
kj::Date timestamp;
jsg::JsRef<jsg::JsValue> body;
uint16_t attempts;
IoPtr<QueueEventResult> result;

void visitForGc(jsg::GcVisitor& visitor) {
Expand Down
1 change: 1 addition & 0 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ struct QueueMessage @0x944adb18c0352295 {
timestampNs @1 :Int64;
data @2 :Data;
contentType @3 :Text;
attempts @4 :UInt16;
}

struct QueueRetryBatch {
Expand Down
8 changes: 5 additions & 3 deletions src/workerd/server/server-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1422,8 +1422,8 @@ KJ_TEST("Server: call queue handler on service binding") {
`export default {
` async fetch(request, env) {
` let result = await env.service2.queue("queueName1", [
` {id: "1", timestamp: 12345, body: "my message"},
` {id: "msg2", timestamp: 23456, body: 22},
` {id: "1", timestamp: 12345, body: "my message", attempts: 1},
` {id: "msg2", timestamp: 23456, body: 22, attempts: 2},
` ]);
` return new Response(`queue outcome: ${result.outcome}, ackAll: ${result.ackAll}`);
` }
Expand All @@ -1449,9 +1449,11 @@ KJ_TEST("Server: call queue handler on service binding") {
` event.messages[0].id == "1" &&
` event.messages[0].timestamp.getTime() == 12345 &&
` event.messages[0].body == "my message" &&
` event.messages[0].attempts == 1 &&
` event.messages[1].id == "msg2" &&
` event.messages[1].timestamp.getTime() == 23456 &&
` event.messages[1].body == 22) {
` event.messages[1].body == 22 &&
` event.messages[1].attempts == 2) {
` event.ackAll();
` return;
` }
Expand Down

0 comments on commit 5a4beaa

Please sign in to comment.