Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "MQ-576: Add attempts to QueueMessage" #1926

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/workerd/api/http.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2287,14 +2287,12 @@ jsg::Promise<Fetcher::QueueResult> Fetcher::queue(
.id=kj::mv(msg.id),
.timestamp=msg.timestamp,
.body=serializer.release().data,
.attempts=msg.attempts
});
} else KJ_IF_SOME(b, msg.serializedBody) {
encodedMessages.add(IncomingQueueMessage{
.id=kj::mv(msg.id),
.timestamp=msg.timestamp,
.body=kj::mv(b),
.attempts=msg.attempts
});
} else {
JSG_FAIL_REQUIRE(TypeError, "Expected one of body or serializedBody for each message");
Expand Down
4 changes: 1 addition & 3 deletions src/workerd/api/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,11 @@ class Fetcher: public JsRpcClientProvider {
kj::Date timestamp;
jsg::Optional<jsg::Value> body;
jsg::Optional<kj::Array<kj::byte>> serializedBody;
uint16_t attempts;

JSG_STRUCT(id, timestamp, body, serializedBody, attempts);
JSG_STRUCT(id, timestamp, body, serializedBody);
JSG_STRUCT_TS_OVERRIDE(type ServiceBindingQueueMessage<Body = unknown> = {
id: string;
timestamp: Date;
attempts: number;
} & (
| { body: Body }
| { serializedBody: ArrayBuffer | ArrayBufferView }
Expand Down
19 changes: 7 additions & 12 deletions src/workerd/api/queue-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,20 @@ export default {

assert.strictEqual(batch.messages[0].id, "#0");
assert.strictEqual(batch.messages[0].body, "ghi");
assert.strictEqual(batch.messages[0].attempts, 1);

assert.strictEqual(batch.messages[1].id, "#1");
assert.deepStrictEqual(batch.messages[1].body, new Uint8Array([7, 8, 9]));
assert.strictEqual(batch.messages[1].attempts, 2);

assert.strictEqual(batch.messages[2].id, "#2");
assert.deepStrictEqual(batch.messages[2].body, { c: { d: 10 } });
assert.strictEqual(batch.messages[2].attempts, 3);
batch.messages[2].retry();

assert.strictEqual(batch.messages[3].id, "#3");
assert.deepStrictEqual(batch.messages[3].body, batch.messages[3].timestamp);
assert.strictEqual(batch.messages[3].attempts, 4);
batch.messages[3].retry({ delaySeconds: 2 });

assert.strictEqual(batch.messages[4].id, "#4");
assert.deepStrictEqual(batch.messages[4].body, new Map([["key", "value"]]));
assert.strictEqual(batch.messages[4].attempts, 5);

batch.ackAll();
},
Expand All @@ -103,11 +98,11 @@ export default {

const timestamp = new Date();
const response = await env.SERVICE.queue("test-queue", [
{ id: "#0", timestamp, body: "ghi", attempts: 1 },
{ id: "#1", timestamp, body: new Uint8Array([7, 8, 9]), attempts: 2 },
{ id: "#2", timestamp, body: { c: { d: 10 } }, attempts: 3 },
{ id: "#3", timestamp, body: timestamp, attempts: 4 },
{ id: "#4", timestamp, serializedBody, attempts: 5 },
{ id: "#0", timestamp, body: "ghi" },
{ id: "#1", timestamp, body: new Uint8Array([7, 8, 9]) },
{ id: "#2", timestamp, body: { c: { d: 10 } } },
{ id: "#3", timestamp, body: timestamp },
{ id: "#4", timestamp, serializedBody },
]);
assert.strictEqual(response.outcome, "ok");
assert(!response.retryBatch.retry);
Expand All @@ -116,13 +111,13 @@ export default {
assert.deepStrictEqual(response.explicitAcks, []);

await assert.rejects(env.SERVICE.queue("test-queue", [
{ id: "#0", timestamp, attempts: 1 }
{ id: "#0", timestamp }
]), {
name: "TypeError",
message: "Expected one of body or serializedBody for each message"
});
await assert.rejects(env.SERVICE.queue("test-queue", [
{ id: "#0", timestamp, body: "", serializedBody, attempts: 1 }
{ id: "#0", timestamp, body: "", serializedBody }
]), {
name: "TypeError",
message: "Expected one of body or serializedBody for each message"
Expand Down
3 changes: 0 additions & 3 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ QueueMessage::QueueMessage(jsg::Lock& js,
: id(kj::str(message.getId())),
timestamp(message.getTimestampNs() * kj::NANOSECONDS + kj::UNIX_EPOCH),
body(deserialize(js, message).addRef(js)),
attempts(message.getAttempts()),
result(result) {}
// Note that we must make deep copies of all data here since the incoming Reader may be
// deallocated while JS's GC wrappers still exist.
Expand All @@ -369,7 +368,6 @@ QueueMessage::QueueMessage(
: id(kj::mv(message.id)),
timestamp(message.timestamp),
body(deserialize(js, kj::mv(message.body), message.contentType).addRef(js)),
attempts(message.attempts),
result(result) {}

jsg::JsValue QueueMessage::getBody(jsg::Lock& js) {
Expand Down Expand Up @@ -629,7 +627,6 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::sendRpc(
KJ_IF_SOME(contentType, p.messages[i].contentType) {
messages[i].setContentType(contentType);
}
messages[i].setAttempts(p.messages[i].attempts);
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ struct IncomingQueueMessage {
kj::Date timestamp;
kj::Array<kj::byte> body;
kj::Maybe<kj::String> contentType;
uint16_t attempts;
JSG_STRUCT(id, timestamp, body, contentType, attempts);
JSG_STRUCT(id, timestamp, body, contentType);

struct ContentType {
static constexpr kj::StringPtr TEXT = "text"_kj;
Expand Down Expand Up @@ -159,7 +158,6 @@ class QueueMessage final: public jsg::Object {
kj::StringPtr getId() { return id; }
kj::Date getTimestamp() { return timestamp; }
jsg::JsValue getBody(jsg::Lock& js);
uint16_t getAttempts() { return attempts; };

void retry(jsg::Optional<QueueRetryOptions> options);
void ack();
Expand All @@ -170,7 +168,6 @@ class QueueMessage final: public jsg::Object {
JSG_READONLY_INSTANCE_PROPERTY(id, getId);
JSG_READONLY_INSTANCE_PROPERTY(timestamp, getTimestamp);
JSG_READONLY_INSTANCE_PROPERTY(body, getBody);
JSG_READONLY_INSTANCE_PROPERTY(attempts, getAttempts);
JSG_METHOD(retry);
JSG_METHOD(ack);

Expand All @@ -189,7 +186,6 @@ class QueueMessage final: public jsg::Object {
kj::String id;
kj::Date timestamp;
jsg::JsRef<jsg::JsValue> body;
uint16_t attempts;
IoPtr<QueueEventResult> result;

void visitForGc(jsg::GcVisitor& visitor) {
Expand Down
1 change: 0 additions & 1 deletion src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ struct QueueMessage @0x944adb18c0352295 {
timestampNs @1 :Int64;
data @2 :Data;
contentType @3 :Text;
attempts @4 :UInt16;
}

struct QueueRetryBatch {
Expand Down
8 changes: 3 additions & 5 deletions src/workerd/server/server-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1422,8 +1422,8 @@ KJ_TEST("Server: call queue handler on service binding") {
`export default {
` async fetch(request, env) {
` let result = await env.service2.queue("queueName1", [
` {id: "1", timestamp: 12345, body: "my message", attempts: 1},
` {id: "msg2", timestamp: 23456, body: 22, attempts: 2},
` {id: "1", timestamp: 12345, body: "my message"},
` {id: "msg2", timestamp: 23456, body: 22},
` ]);
` return new Response(`queue outcome: ${result.outcome}, ackAll: ${result.ackAll}`);
` }
Expand All @@ -1449,11 +1449,9 @@ KJ_TEST("Server: call queue handler on service binding") {
` event.messages[0].id == "1" &&
` event.messages[0].timestamp.getTime() == 12345 &&
` event.messages[0].body == "my message" &&
` event.messages[0].attempts == 1 &&
` event.messages[1].id == "msg2" &&
` event.messages[1].timestamp.getTime() == 23456 &&
` event.messages[1].body == 22 &&
` event.messages[1].attempts == 2) {
` event.messages[1].body == 22) {
` event.ackAll();
` return;
` }
Expand Down
Loading