From eab0a39606a093c93b4dc46b0fff27220e047856 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 21 Nov 2018 11:10:17 -0800 Subject: [PATCH 1/8] Publish with retry settings --- samples/system-test/topics.test.js | 20 ++++++++++ samples/topics.js | 64 ++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/samples/system-test/topics.test.js b/samples/system-test/topics.test.js index 3691bbb2a..994c14092 100644 --- a/samples/system-test/topics.test.js +++ b/samples/system-test/topics.test.js @@ -30,6 +30,7 @@ const topicNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`; const subscriptionNameOne = `nodejs-docs-samples-test-${uuid.v4()}`; const subscriptionNameTwo = `nodejs-docs-samples-test-${uuid.v4()}`; const subscriptionNameThree = `nodejs-docs-samples-test-${uuid.v4()}`; +const subscriptionNameFour = `nodejs-docs-samples-test-${uuid.v4()}`; const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`; const expectedMessage = {data: `Hello, world!`}; const cmd = `node topics.js`; @@ -196,6 +197,25 @@ it(`should publish with specific batch settings`, async () => { assert.strictEqual(publishTime - startTime > expectedWait, true); }); +it(`should publish with retry settings`, async () => { + const expectedWait = 1000; + const [subscription] = await pubsub + .topic(topicNameOne) + .subscription(subscriptionNameFour) + .get({autoCreate: true}); + const startTime = Date.now(); + await tools.runAsync( + `${cmd} publish-retry ${projectId} ${topicName} "${ + expectedMessage.data + }"`, + cwd + ); + const receivedMessage = await _pullOneMessage(subscription); + const publishTime = Data.parse(receivedMessage.publishTime); + assert.strictEqual(receivedMessage.data.toString(), expectedMessage.data); + assert.strictEqual(publishTime - startTime > expectedWait, true); +}) + it(`should set the IAM policy for a topic`, async () => { await tools.runAsync(`${cmd} set-policy ${topicNameOne}`, cwd); const results = await pubsub.topic(topicNameOne).iam.getPolicy(); diff --git a/samples/topics.js b/samples/topics.js index 5932bc94d..97ea4de91 100755 --- a/samples/topics.js +++ b/samples/topics.js @@ -175,6 +175,61 @@ async function publishBatchedMessages( // [END pubsub_publisher_batch_settings] } +async function publishWithRetrySettings(projectId, topicName, data) { + // [START pubsub_publisher_retry_settings] + // Imports the Google Cloud client library + const PubSub = require('@google-cloud/pubsub'); + + // Creates a publisher client + const client = new PubSub.v1.PublisherClient({ + // optional auth parameters + }); + + /** + * TODO(developer): Uncomment the following lines to run the sample. + */ + // const projectId = 'my-project-id' + // const topicName = 'my-topic'; + // const data = JSON.stringify({ foo: 'bar' }); + + const formattedTopic = client.topicPath(projectId, topicName); + // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + const messagesElement = { + data: dataBuffer, + }; + const messages = [messagesElement]; + // Build the request + const request = { + topic: formattedTopic, + messages: messages, + }; + + // Retry settings control how the publisher handles retryable failures + // Default values are shown + const retrySettings = { + retry_codes: { + "idempotent": ["UNAVAILABLE", "DEADLINE_EXCEEDED"], + "non_idempotent": [] + }, + backoffSettings: { + "initial_retry_delay_millis": 100, + "retry_delay_multiplier": 1.2, + "max_retry_delay_millis": 1000, + "initial_rpc_timeout_millis": 2000, + "rpc_timeout_multiplier": 1.5, + "max_rpc_timeout_millis": 30000, + "total_timeout_millis": 45000 + }, + } + + const [response] = await client + .publish(request, {retry: retrySettings}); + console.log(`Message ${response.messageIds} published.`); + + // [END pubsub_publisher_retry_settings] +} + let publishCounterValue = 1; function getPublishCounterValue() { @@ -361,6 +416,14 @@ const cli = require(`yargs`) ); } ) + .command( + `publish-retry `, + `Publishes messages to a topic with retry settings.`, + {}, + opts => { + publishWithRetrySettings(opts.projectId, opts.topicName, opts.message); + } + ) .command( `publish-ordered `, `Publishes an ordered message to a topic.`, @@ -400,6 +463,7 @@ const cli = require(`yargs`) .example(`node $0 publish-attributes my-topic "Hello, world!"`) .example(`node $0 publish-ordered my-topic "Hello, world!"`) .example(`node $0 publish-batch my-topic "Hello, world!" -w 1000`) + .example(`node $0 publish-retry my-project my-topic "Hello, world!"`) .example(`node $0 get-policy greetings`) .example(`node $0 set-policy greetings`) .example(`node $0 test-permissions greetings`) From 47f3fb61d30681f1caa419cdd6fe25d4052374be Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 21 Nov 2018 11:23:56 -0800 Subject: [PATCH 2/8] Updated README --- samples/README.md | 21 +++++++++++---------- samples/topics.js | 2 +- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/samples/README.md b/samples/README.md index 8ec1a1846..f5c695dce 100644 --- a/samples/README.md +++ b/samples/README.md @@ -89,16 +89,17 @@ __Usage:__ `node topics.js --help` topics.js Commands: - topics.js list Lists all topics in the current project. - topics.js create Creates a new topic. - topics.js delete Deletes a topic. - topics.js publish Publishes a message to a topic. - topics.js publish-attributes Publishes a message with custom attributes to a Topic - topics.js publish-batch Publishes messages to a topic using custom batching settings. - topics.js publish-ordered Publishes an ordered message to a topic. - topics.js get-policy Gets the IAM policy for a topic. - topics.js set-policy Sets the IAM policy for a topic. - topics.js test-permissions Tests the permissions for a topic. + topics.js list Lists all topics in the current project. + topics.js create Creates a new topic. + topics.js delete Deletes a topic. + topics.js publish Publishes a message to a topic. + topics.js publish-attributes Publishes a message with custom attributes to a Topic + topics.js publish-batch Publishes messages to a topic using custom batching settings. + topics.js publish-retry Publishes a message to a topic with retry settings. + topics.js publish-ordered Publishes an ordered message to a topic. + topics.js get-policy Gets the IAM policy for a topic. + topics.js set-policy Sets the IAM policy for a topic. + topics.js test-permissions Tests the permissions for a topic. Options: --version Show version number [boolean] diff --git a/samples/topics.js b/samples/topics.js index 97ea4de91..72678ef73 100755 --- a/samples/topics.js +++ b/samples/topics.js @@ -418,7 +418,7 @@ const cli = require(`yargs`) ) .command( `publish-retry `, - `Publishes messages to a topic with retry settings.`, + `Publishes a message to a topic with retry settings.`, {}, opts => { publishWithRetrySettings(opts.projectId, opts.topicName, opts.message); From f047608d11e11e37b17185adaac328cc6ddcf7ee Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 21 Nov 2018 11:36:35 -0800 Subject: [PATCH 3/8] nit: reformatting --- samples/system-test/topics.test.js | 10 ++++++---- samples/topics.js | 23 +++++++++++------------ 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/samples/system-test/topics.test.js b/samples/system-test/topics.test.js index 994c14092..eecea1dc3 100644 --- a/samples/system-test/topics.test.js +++ b/samples/system-test/topics.test.js @@ -55,6 +55,9 @@ after(async () => { try { await pubsub.subscription(subscriptionNameThree).delete(); } catch (err) {} // ignore error + try { + await pubsub.subscription(subscriptionNameFour).delete(); + } catch (err) {} // ignore error try { await pubsub.topic(topicNameTwo).delete(); } catch (err) {} // ignore error @@ -205,16 +208,15 @@ it(`should publish with retry settings`, async () => { .get({autoCreate: true}); const startTime = Date.now(); await tools.runAsync( - `${cmd} publish-retry ${projectId} ${topicName} "${ - expectedMessage.data + `${cmd} publish-retry ${projectId} ${topicNameOne} "${expectedMessage.data }"`, cwd ); const receivedMessage = await _pullOneMessage(subscription); - const publishTime = Data.parse(receivedMessage.publishTime); + const publishTime = Date.parse(receivedMessage.publishTime); assert.strictEqual(receivedMessage.data.toString(), expectedMessage.data); assert.strictEqual(publishTime - startTime > expectedWait, true); -}) +}); it(`should set the IAM policy for a topic`, async () => { await tools.runAsync(`${cmd} set-policy ${topicNameOne}`, cwd); diff --git a/samples/topics.js b/samples/topics.js index 72678ef73..209f1446a 100755 --- a/samples/topics.js +++ b/samples/topics.js @@ -209,22 +209,21 @@ async function publishWithRetrySettings(projectId, topicName, data) { // Default values are shown const retrySettings = { retry_codes: { - "idempotent": ["UNAVAILABLE", "DEADLINE_EXCEEDED"], - "non_idempotent": [] + idempotent: ['UNAVAILABLE', 'DEADLINE_EXCEEDED'], + non_idempotent: [] }, backoffSettings: { - "initial_retry_delay_millis": 100, - "retry_delay_multiplier": 1.2, - "max_retry_delay_millis": 1000, - "initial_rpc_timeout_millis": 2000, - "rpc_timeout_multiplier": 1.5, - "max_rpc_timeout_millis": 30000, - "total_timeout_millis": 45000 + initial_retry_delay_millis: 100, + retry_delay_multiplier: 1.2, + max_retry_delay_millis: 1000, + initial_rpc_timeout_millis: 2000, + rpc_timeout_multiplier: 1.5, + max_rpc_timeout_millis: 30000, + total_timeout_millis: 45000 }, - } + }; - const [response] = await client - .publish(request, {retry: retrySettings}); + const [response] = await client.publish(request, {retry: retrySettings}); console.log(`Message ${response.messageIds} published.`); // [END pubsub_publisher_retry_settings] From 5bc6dcecd855496663837f4835ac92cde256b1b2 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 21 Nov 2018 11:44:28 -0800 Subject: [PATCH 4/8] nit: reformatting --- samples/system-test/topics.test.js | 3 ++- samples/topics.js | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/samples/system-test/topics.test.js b/samples/system-test/topics.test.js index eecea1dc3..eb34f419f 100644 --- a/samples/system-test/topics.test.js +++ b/samples/system-test/topics.test.js @@ -208,7 +208,8 @@ it(`should publish with retry settings`, async () => { .get({autoCreate: true}); const startTime = Date.now(); await tools.runAsync( - `${cmd} publish-retry ${projectId} ${topicNameOne} "${expectedMessage.data + `${cmd} publish-retry ${projectId} ${topicNameOne} "${ + expectedMessage.data }"`, cwd ); diff --git a/samples/topics.js b/samples/topics.js index 209f1446a..9dd5e6147 100755 --- a/samples/topics.js +++ b/samples/topics.js @@ -210,7 +210,7 @@ async function publishWithRetrySettings(projectId, topicName, data) { const retrySettings = { retry_codes: { idempotent: ['UNAVAILABLE', 'DEADLINE_EXCEEDED'], - non_idempotent: [] + non_idempotent: [], }, backoffSettings: { initial_retry_delay_millis: 100, @@ -219,7 +219,7 @@ async function publishWithRetrySettings(projectId, topicName, data) { initial_rpc_timeout_millis: 2000, rpc_timeout_multiplier: 1.5, max_rpc_timeout_millis: 30000, - total_timeout_millis: 45000 + total_timeout_millis: 45000, }, }; From 8926152bb79eafeb622cd0817bfb8cceda407acb Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 21 Nov 2018 11:57:08 -0800 Subject: [PATCH 5/8] nit: test no need to assert time --- samples/system-test/topics.test.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/samples/system-test/topics.test.js b/samples/system-test/topics.test.js index eb34f419f..1b4a6d38f 100644 --- a/samples/system-test/topics.test.js +++ b/samples/system-test/topics.test.js @@ -206,7 +206,6 @@ it(`should publish with retry settings`, async () => { .topic(topicNameOne) .subscription(subscriptionNameFour) .get({autoCreate: true}); - const startTime = Date.now(); await tools.runAsync( `${cmd} publish-retry ${projectId} ${topicNameOne} "${ expectedMessage.data @@ -214,9 +213,7 @@ it(`should publish with retry settings`, async () => { cwd ); const receivedMessage = await _pullOneMessage(subscription); - const publishTime = Date.parse(receivedMessage.publishTime); assert.strictEqual(receivedMessage.data.toString(), expectedMessage.data); - assert.strictEqual(publishTime - startTime > expectedWait, true); }); it(`should set the IAM policy for a topic`, async () => { From d8460326d52903bb99ef34f3bc4611aa37f9fee0 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 21 Nov 2018 12:02:34 -0800 Subject: [PATCH 6/8] Removed unused var --- samples/system-test/topics.test.js | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/system-test/topics.test.js b/samples/system-test/topics.test.js index 1b4a6d38f..8649659f9 100644 --- a/samples/system-test/topics.test.js +++ b/samples/system-test/topics.test.js @@ -201,7 +201,6 @@ it(`should publish with specific batch settings`, async () => { }); it(`should publish with retry settings`, async () => { - const expectedWait = 1000; const [subscription] = await pubsub .topic(topicNameOne) .subscription(subscriptionNameFour) From 4d30ed14b8f14b2add4f293a3ec7bbe2d7872350 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 21 Nov 2018 14:48:31 -0800 Subject: [PATCH 7/8] nit: camelCase --- samples/topics.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/topics.js b/samples/topics.js index 9dd5e6147..4635d192f 100755 --- a/samples/topics.js +++ b/samples/topics.js @@ -208,7 +208,7 @@ async function publishWithRetrySettings(projectId, topicName, data) { // Retry settings control how the publisher handles retryable failures // Default values are shown const retrySettings = { - retry_codes: { + retryCodes: { idempotent: ['UNAVAILABLE', 'DEADLINE_EXCEEDED'], non_idempotent: [], }, From 0a8b9c4988eeaee276620aed8557c1bce715c6d2 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 21 Nov 2018 15:25:06 -0800 Subject: [PATCH 8/8] Fix import --- samples/topics.js | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/samples/topics.js b/samples/topics.js index 4635d192f..b32643d79 100755 --- a/samples/topics.js +++ b/samples/topics.js @@ -178,10 +178,10 @@ async function publishBatchedMessages( async function publishWithRetrySettings(projectId, topicName, data) { // [START pubsub_publisher_retry_settings] // Imports the Google Cloud client library - const PubSub = require('@google-cloud/pubsub'); + const {v1} = require('@google-cloud/pubsub'); // Creates a publisher client - const client = new PubSub.v1.PublisherClient({ + const client = new v1.PublisherClient({ // optional auth parameters }); @@ -213,13 +213,13 @@ async function publishWithRetrySettings(projectId, topicName, data) { non_idempotent: [], }, backoffSettings: { - initial_retry_delay_millis: 100, - retry_delay_multiplier: 1.2, - max_retry_delay_millis: 1000, - initial_rpc_timeout_millis: 2000, - rpc_timeout_multiplier: 1.5, - max_rpc_timeout_millis: 30000, - total_timeout_millis: 45000, + initialRetryDelayMillis: 100, + retryDelayMultiplier: 1.2, + maxRetryDelayMillis: 1000, + initialRpcTimeoutMillis: 2000, + rpcTimeoutMultiplier: 1.5, + maxRpcTimeoutMillis: 30000, + totalTimeoutMillis: 45000, }, };