From eed57ddcb3ee1b306578b8fc7bad7b2bdd9d7bbf Mon Sep 17 00:00:00 2001 From: Garrett Gu Date: Sat, 30 Mar 2024 21:15:40 -0500 Subject: [PATCH] Revert "MQ-576: Add attempts to QueueMessage" --- src/workerd/api/http.c++ | 2 -- src/workerd/api/http.h | 4 +--- src/workerd/api/queue-test.js | 19 +++++++------------ src/workerd/api/queue.c++ | 3 --- src/workerd/api/queue.h | 6 +----- src/workerd/io/worker-interface.capnp | 1 - src/workerd/server/server-test.c++ | 8 +++----- 7 files changed, 12 insertions(+), 31 deletions(-) diff --git a/src/workerd/api/http.c++ b/src/workerd/api/http.c++ index 6df4f74b83c..85f5ab48c2c 100644 --- a/src/workerd/api/http.c++ +++ b/src/workerd/api/http.c++ @@ -2287,14 +2287,12 @@ jsg::Promise Fetcher::queue( .id=kj::mv(msg.id), .timestamp=msg.timestamp, .body=serializer.release().data, - .attempts=msg.attempts }); } else KJ_IF_SOME(b, msg.serializedBody) { encodedMessages.add(IncomingQueueMessage{ .id=kj::mv(msg.id), .timestamp=msg.timestamp, .body=kj::mv(b), - .attempts=msg.attempts }); } else { JSG_FAIL_REQUIRE(TypeError, "Expected one of body or serializedBody for each message"); diff --git a/src/workerd/api/http.h b/src/workerd/api/http.h index 51f36d76b6f..f05bc397ded 100644 --- a/src/workerd/api/http.h +++ b/src/workerd/api/http.h @@ -525,13 +525,11 @@ class Fetcher: public JsRpcClientProvider { kj::Date timestamp; jsg::Optional body; jsg::Optional> serializedBody; - uint16_t attempts; - JSG_STRUCT(id, timestamp, body, serializedBody, attempts); + JSG_STRUCT(id, timestamp, body, serializedBody); JSG_STRUCT_TS_OVERRIDE(type ServiceBindingQueueMessage = { id: string; timestamp: Date; - attempts: number; } & ( | { body: Body } | { serializedBody: ArrayBuffer | ArrayBufferView } diff --git a/src/workerd/api/queue-test.js b/src/workerd/api/queue-test.js index 55f81aedca3..74d865f2729 100644 --- a/src/workerd/api/queue-test.js +++ b/src/workerd/api/queue-test.js @@ -65,25 +65,20 @@ export default { assert.strictEqual(batch.messages[0].id, "#0"); assert.strictEqual(batch.messages[0].body, "ghi"); - assert.strictEqual(batch.messages[0].attempts, 1); assert.strictEqual(batch.messages[1].id, "#1"); assert.deepStrictEqual(batch.messages[1].body, new Uint8Array([7, 8, 9])); - assert.strictEqual(batch.messages[1].attempts, 2); assert.strictEqual(batch.messages[2].id, "#2"); assert.deepStrictEqual(batch.messages[2].body, { c: { d: 10 } }); - assert.strictEqual(batch.messages[2].attempts, 3); batch.messages[2].retry(); assert.strictEqual(batch.messages[3].id, "#3"); assert.deepStrictEqual(batch.messages[3].body, batch.messages[3].timestamp); - assert.strictEqual(batch.messages[3].attempts, 4); batch.messages[3].retry({ delaySeconds: 2 }); assert.strictEqual(batch.messages[4].id, "#4"); assert.deepStrictEqual(batch.messages[4].body, new Map([["key", "value"]])); - assert.strictEqual(batch.messages[4].attempts, 5); batch.ackAll(); }, @@ -103,11 +98,11 @@ export default { const timestamp = new Date(); const response = await env.SERVICE.queue("test-queue", [ - { id: "#0", timestamp, body: "ghi", attempts: 1 }, - { id: "#1", timestamp, body: new Uint8Array([7, 8, 9]), attempts: 2 }, - { id: "#2", timestamp, body: { c: { d: 10 } }, attempts: 3 }, - { id: "#3", timestamp, body: timestamp, attempts: 4 }, - { id: "#4", timestamp, serializedBody, attempts: 5 }, + { id: "#0", timestamp, body: "ghi" }, + { id: "#1", timestamp, body: new Uint8Array([7, 8, 9]) }, + { id: "#2", timestamp, body: { c: { d: 10 } } }, + { id: "#3", timestamp, body: timestamp }, + { id: "#4", timestamp, serializedBody }, ]); assert.strictEqual(response.outcome, "ok"); assert(!response.retryBatch.retry); @@ -116,13 +111,13 @@ export default { assert.deepStrictEqual(response.explicitAcks, []); await assert.rejects(env.SERVICE.queue("test-queue", [ - { id: "#0", timestamp, attempts: 1 } + { id: "#0", timestamp } ]), { name: "TypeError", message: "Expected one of body or serializedBody for each message" }); await assert.rejects(env.SERVICE.queue("test-queue", [ - { id: "#0", timestamp, body: "", serializedBody, attempts: 1 } + { id: "#0", timestamp, body: "", serializedBody } ]), { name: "TypeError", message: "Expected one of body or serializedBody for each message" diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index 040af12cfc9..e69a6b5696e 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -359,7 +359,6 @@ QueueMessage::QueueMessage(jsg::Lock& js, : id(kj::str(message.getId())), timestamp(message.getTimestampNs() * kj::NANOSECONDS + kj::UNIX_EPOCH), body(deserialize(js, message).addRef(js)), - attempts(message.getAttempts()), result(result) {} // Note that we must make deep copies of all data here since the incoming Reader may be // deallocated while JS's GC wrappers still exist. @@ -369,7 +368,6 @@ QueueMessage::QueueMessage( : id(kj::mv(message.id)), timestamp(message.timestamp), body(deserialize(js, kj::mv(message.body), message.contentType).addRef(js)), - attempts(message.attempts), result(result) {} jsg::JsValue QueueMessage::getBody(jsg::Lock& js) { @@ -629,7 +627,6 @@ kj::Promise QueueCustomEventImpl::sendRpc( KJ_IF_SOME(contentType, p.messages[i].contentType) { messages[i].setContentType(contentType); } - messages[i].setAttempts(p.messages[i].attempts); } } } diff --git a/src/workerd/api/queue.h b/src/workerd/api/queue.h index 44cf5e7157e..19b76c005f2 100644 --- a/src/workerd/api/queue.h +++ b/src/workerd/api/queue.h @@ -98,8 +98,7 @@ struct IncomingQueueMessage { kj::Date timestamp; kj::Array body; kj::Maybe contentType; - uint16_t attempts; - JSG_STRUCT(id, timestamp, body, contentType, attempts); + JSG_STRUCT(id, timestamp, body, contentType); struct ContentType { static constexpr kj::StringPtr TEXT = "text"_kj; @@ -159,7 +158,6 @@ class QueueMessage final: public jsg::Object { kj::StringPtr getId() { return id; } kj::Date getTimestamp() { return timestamp; } jsg::JsValue getBody(jsg::Lock& js); - uint16_t getAttempts() { return attempts; }; void retry(jsg::Optional options); void ack(); @@ -170,7 +168,6 @@ class QueueMessage final: public jsg::Object { JSG_READONLY_INSTANCE_PROPERTY(id, getId); JSG_READONLY_INSTANCE_PROPERTY(timestamp, getTimestamp); JSG_READONLY_INSTANCE_PROPERTY(body, getBody); - JSG_READONLY_INSTANCE_PROPERTY(attempts, getAttempts); JSG_METHOD(retry); JSG_METHOD(ack); @@ -189,7 +186,6 @@ class QueueMessage final: public jsg::Object { kj::String id; kj::Date timestamp; jsg::JsRef body; - uint16_t attempts; IoPtr result; void visitForGc(jsg::GcVisitor& visitor) { diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 002be27d577..a3ab7ee675c 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -154,7 +154,6 @@ struct QueueMessage @0x944adb18c0352295 { timestampNs @1 :Int64; data @2 :Data; contentType @3 :Text; - attempts @4 :UInt16; } struct QueueRetryBatch { diff --git a/src/workerd/server/server-test.c++ b/src/workerd/server/server-test.c++ index 8539d21004f..5dbf8834d0d 100644 --- a/src/workerd/server/server-test.c++ +++ b/src/workerd/server/server-test.c++ @@ -1422,8 +1422,8 @@ KJ_TEST("Server: call queue handler on service binding") { `export default { ` async fetch(request, env) { ` let result = await env.service2.queue("queueName1", [ - ` {id: "1", timestamp: 12345, body: "my message", attempts: 1}, - ` {id: "msg2", timestamp: 23456, body: 22, attempts: 2}, + ` {id: "1", timestamp: 12345, body: "my message"}, + ` {id: "msg2", timestamp: 23456, body: 22}, ` ]); ` return new Response(`queue outcome: ${result.outcome}, ackAll: ${result.ackAll}`); ` } @@ -1449,11 +1449,9 @@ KJ_TEST("Server: call queue handler on service binding") { ` event.messages[0].id == "1" && ` event.messages[0].timestamp.getTime() == 12345 && ` event.messages[0].body == "my message" && - ` event.messages[0].attempts == 1 && ` event.messages[1].id == "msg2" && ` event.messages[1].timestamp.getTime() == 23456 && - ` event.messages[1].body == 22 && - ` event.messages[1].attempts == 2) { + ` event.messages[1].body == 22) { ` event.ackAll(); ` return; ` }