Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Pub/Sub samples #464

Merged
merged 4 commits into from
Aug 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Commands:
create-push <topicName> <subscriptionName> Creates a new push subscription.
delete <subscriptionName> Deletes a subscription.
get <subscriptionName> Gets the metadata for a subscription.
pull <subscriptionName> Pulls messages for a subscription.
listen <subscriptionName> Listens to messages for a subscription.
get-policy <subscriptionName> Gets the IAM policy for a subscription.
set-policy <subscriptionName> Sets the IAM policy for a subscription.
test-permissions <subscriptionName> Tests the permissions for a subscription.
Expand Down
6 changes: 3 additions & 3 deletions pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
"test": "samples test run --cmd ava -- -T 30s --verbose system-test/*.test.js"
},
"dependencies": {
"@google-cloud/pubsub": "0.13.2",
"@google-cloud/pubsub": "0.14.0",
"yargs": "8.0.2"
},
"devDependencies": {
"@google-cloud/nodejs-repo-tools": "1.4.17",
"ava": "0.21.0",
"ava": "0.22.0",
"proxyquire": "1.8.0",
"sinon": "3.2.0"
"sinon": "3.2.1"
},
"cloud-repo-tools": {
"requiresKeyFile": true,
Expand Down
139 changes: 77 additions & 62 deletions pubsub/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ function createSubscription (topicName, subscriptionName) {
const topic = pubsub.topic(topicName);

// Creates a new subscription, e.g. "my-new-subscription"
return topic.subscribe(subscriptionName)
return topic.createSubscription(subscriptionName)
.then((results) => {
const subscription = results[0];

Expand Down Expand Up @@ -101,7 +101,7 @@ function createPushSubscription (topicName, subscriptionName) {
};

// Creates a new push subscription, e.g. "my-new-subscription"
return topic.subscribe(subscriptionName, options)
return topic.createSubscription(subscriptionName, options)
.then((results) => {
const subscription = results[0];

Expand Down Expand Up @@ -151,32 +151,34 @@ function getSubscription (subscriptionName) {
}
// [END pubsub_get_subscription]

// [START pubsub_pull_messages]
function pullMessages (subscriptionName) {
// [START pubsub_listen_messages]
function listenForMessages (subscriptionName, timeout) {
// Instantiates a client
const pubsub = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsub.subscription(subscriptionName);

// Pulls messages. Set returnImmediately to false to block until messages are
// received.
return subscription.pull()
.then((results) => {
const messages = results[0];

console.log(`Received ${messages.length} messages.`);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = (message) => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;

messages.forEach((message) => {
console.log(`* %d %j %j`, message.id, message.data, message.attributes);
});
// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Acknowledges received messages. If you do not acknowledge, Pub/Sub will
// redeliver the message.
return subscription.ack(messages.map((message) => message.ackId));
});
// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// [END pubsub_pull_messages]
// [END pubsub_listen_messages]

let subscribeCounterValue = 1;

Expand All @@ -188,54 +190,61 @@ function setSubscribeCounterValue (value) {
subscribeCounterValue = value;
}

// [START pubsub_pull_ordered_messages]
// [START pubsub_listen_ordered_messages]
const outstandingMessages = {};

function pullOrderedMessages (subscriptionName) {
function listenForOrderedMessages (subscriptionName, timeout) {
// Instantiates a client
const pubsub = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsub.subscription(subscriptionName);

// Pulls messages. Set returnImmediately to false to block until messages are
// received.
return subscription.pull()
.then((results) => {
const messages = results[0];

// Pub/Sub messages are unordered, so here we manually order messages by
// their "counterId" attribute which was set when they were published.
messages.forEach((message) => {
outstandingMessages[message.attributes.counterId] = message;
});

const outstandingIds = Object.keys(outstandingMessages).map((counterId) => parseInt(counterId, 10));
outstandingIds.sort();

outstandingIds.forEach((counterId) => {
const counter = getSubscribeCounterValue();
const message = outstandingMessages[counterId];

if (counterId < counter) {
// The message has already been processed
subscription.ack(message.ackId);
delete outstandingMessages[counterId];
} else if (counterId === counter) {
// Process the message
console.log(`* %d %j %j`, message.id, message.data, message.attributes);

setSubscribeCounterValue(counterId + 1);
subscription.ack(message.ackId);
delete outstandingMessages[counterId];
} else {
// Have not yet processed the message on which this message is dependent
return false;
}
});
// Create an event handler to handle messages
const messageHandler = function (message) {
// Buffer the message in an object (for later ordering)
outstandingMessages[message.attributes.counterId] = message;

// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Listen for new messages until timeout is hit
return new Promise((resolve) => {
subscription.on(`message`, messageHandler);
setTimeout(() => {
subscription.removeListener(`message`, messageHandler);
resolve();
}, timeout * 1000);
})
.then(() => {
// Pub/Sub messages are unordered, so here we manually order messages by
// their "counterId" attribute which was set when they were published.
const outstandingIds = Object.keys(outstandingMessages).map((counterId) => parseInt(counterId, 10));
outstandingIds.sort();

outstandingIds.forEach((counterId) => {
const counter = getSubscribeCounterValue();
const message = outstandingMessages[counterId];

if (counterId < counter) {
// The message has already been processed
message.ack();
delete outstandingMessages[counterId];
} else if (counterId === counter) {
// Process the message
console.log(`* %d %j %j`, message.id, message.data.toString(), message.attributes);
setSubscribeCounterValue(counterId + 1);
message.ack();
delete outstandingMessages[counterId];
} else {
// Have not yet processed the message on which this message is dependent
return false;
}
});
});
}
// [END pubsub_pull_ordered_messages]
// [END pubsub_listen_ordered_messages]

// [START pubsub_get_subscription_policy]
function getSubscriptionPolicy (subscriptionName) {
Expand Down Expand Up @@ -318,7 +327,7 @@ function testSubscriptionPermissions (subscriptionName) {
}
// [END pubsub_test_subscription_permissions]

module.exports = { pullOrderedMessages };
module.exports = { listenForOrderedMessages };

const cli = require(`yargs`)
.demand(1)
Expand Down Expand Up @@ -359,10 +368,16 @@ const cli = require(`yargs`)
(opts) => getSubscription(opts.subscriptionName)
)
.command(
`pull <subscriptionName>`,
`Pulls messages for a subscription.`,
{},
(opts) => pullMessages(opts.subscriptionName)
`listen <subscriptionName>`,
`Listens to messages for a subscription.`,
{
timeout: {
alias: 't',
type: 'number',
default: 10
}
},
(opts) => listenForMessages(opts.subscriptionName, opts.timeout)
)
.command(
`get-policy <subscriptionName>`,
Expand Down
89 changes: 53 additions & 36 deletions pubsub/system-test/subscriptions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,22 @@ test.beforeEach(tools.stubConsole);
test.afterEach.always(tools.restoreConsole);

test.serial(`should create a subscription`, async (t) => {
t.plan(1);
const output = await tools.runAsync(`${cmd} create ${topicNameOne} ${subscriptionNameOne}`, cwd);
t.is(output, `Subscription ${fullSubscriptionNameOne} created.`);
const results = await pubsub.subscription(subscriptionNameOne).exists();
t.true(results[0]);
await tools.tryTest(async (assert) => {
const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions();
assert.equal(subscriptions[0].name, fullSubscriptionNameOne);
}).start();
});

test.serial(`should create a push subscription`, async (t) => {
const output = await tools.runAsync(`${cmd} create-push ${topicNameOne} ${subscriptionNameTwo}`, cwd);
t.is(output, `Subscription ${fullSubscriptionNameTwo} created.`);
const results = await pubsub.subscription(subscriptionNameTwo).exists();
t.true(results[0]);
await tools.tryTest(async (assert) => {
const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions();
assert(subscriptions.some((s) => s.name === fullSubscriptionNameTwo));
}).start();
});

test.serial(`should get metadata for a subscription`, async (t) => {
Expand All @@ -86,52 +91,60 @@ test.serial(`should get metadata for a subscription`, async (t) => {
});

test.serial(`should list all subscriptions`, async (t) => {
await tools.tryTest(async () => {
t.plan(0);
await tools.tryTest(async (assert) => {
const output = await tools.runAsync(`${cmd} list`, cwd);
t.true(output.includes(`Subscriptions:`));
t.true(output.includes(fullSubscriptionNameOne));
t.true(output.includes(fullSubscriptionNameTwo));
assert(output.includes(`Subscriptions:`));
assert(output.includes(fullSubscriptionNameOne));
assert(output.includes(fullSubscriptionNameTwo));
}).start();
});

test.serial(`should list subscriptions for a topic`, async (t) => {
const output = await tools.runAsync(`${cmd} list ${topicNameOne}`, cwd);
t.true(output.includes(`Subscriptions for ${topicNameOne}:`));
t.true(output.includes(fullSubscriptionNameOne));
t.true(output.includes(fullSubscriptionNameTwo));
t.plan(0);
await tools.tryTest(async (assert) => {
const output = await tools.runAsync(`${cmd} list ${topicNameOne}`, cwd);
assert(output.includes(`Subscriptions for ${topicNameOne}:`));
assert(output.includes(fullSubscriptionNameOne));
assert(output.includes(fullSubscriptionNameTwo));
}).start();
});

test.serial(`should pull messages`, async (t) => {
test.serial(`should listen for messages`, async (t) => {
const expected = `Hello, world!`;
const results = await pubsub.topic(topicNameOne).publish(expected);
const messageIds = results[0];
const expectedOutput = `Received ${messageIds.length} messages.\n* ${messageIds[0]} "${expected}" {}`;
const output = await tools.runAsync(`${cmd} pull ${subscriptionNameOne}`, cwd);
t.is(output, expectedOutput);
const messageIds = await pubsub.topic(topicNameOne).publisher().publish(Buffer.from(expected));
const output = await tools.runAsync(`${cmd} listen ${subscriptionNameOne}`, cwd);
t.true(output.includes(`Received message ${messageIds[0]}:`));
});

test.serial(`should pull ordered messages`, async (t) => {
test.serial(`should listen for ordered messages`, async (t) => {
const timeout = 5;
const subscriptions = require('../subscriptions');
const expected = `Hello, world!`;
const expectedBuffer = Buffer.from(expected);
const publishedMessageIds = [];
await pubsub.topic(topicNameTwo).subscribe(subscriptionNameThree);
let results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '3' } }, { raw: true });
publishedMessageIds.push(results[0][0]);
await subscriptions.pullOrderedMessages(subscriptionNameThree);
const publisherTwo = pubsub.topic(topicNameTwo).publisher();

await pubsub.topic(topicNameTwo).createSubscription(subscriptionNameThree);
let [result] = await publisherTwo.publish(expectedBuffer, { counterId: '3' });
publishedMessageIds.push(result);
await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout);
t.is(console.log.callCount, 0);
results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '1' } }, { raw: true });
publishedMessageIds.push(results[0][0]);
await subscriptions.pullOrderedMessages(subscriptionNameThree);

[result] = await publisherTwo.publish(expectedBuffer, { counterId: '1' });
publishedMessageIds.push(result);
await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout);
t.is(console.log.callCount, 1);
t.deepEqual(console.log.firstCall.args, [`* %d %j %j`, publishedMessageIds[1], expected, { counterId: '1' }]);
results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '1' } }, { raw: true });
results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '2' } }, { raw: true });
publishedMessageIds.push(results[0][0]);
await tools.tryTest(async () => {
await subscriptions.pullOrderedMessages(subscriptionNameThree);
t.is(console.log.callCount, 3);
t.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
t.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);

[result] = await publisherTwo.publish(expectedBuffer, { counterId: '1' });
[result] = await publisherTwo.publish(expectedBuffer, { counterId: '2' });
publishedMessageIds.push(result);
await tools.tryTest(async (assert) => {
await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout);
assert.equal(console.log.callCount, 3);
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);
});
});

Expand Down Expand Up @@ -163,8 +176,12 @@ test.serial(`should test permissions for a subscription`, async (t) => {
});

test.serial(`should delete a subscription`, async (t) => {
t.plan(1);
const output = await tools.runAsync(`${cmd} delete ${subscriptionNameOne}`, cwd);
t.is(output, `Subscription ${fullSubscriptionNameOne} deleted.`);
const results = await pubsub.subscription(subscriptionNameOne).exists();
t.false(results[0], false);
await tools.tryTest(async (assert) => {
const [subscriptions] = await pubsub.getSubscriptions();
assert.ok(subscriptions);
assert(subscriptions.every((s) => s.name !== fullSubscriptionNameOne));
}).start();
});
Loading