Skip to content

Commit

Permalink
Update Pub/Sub samples (#464)
Browse files Browse the repository at this point in the history
* Update Pub/Sub samples WIP

* Make tests a little nicer

* Update package.json
  • Loading branch information
Ace Nassri authored Aug 31, 2017
1 parent 4495be7 commit 6fca8cf
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 151 deletions.
2 changes: 1 addition & 1 deletion pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Commands:
create-push <topicName> <subscriptionName> Creates a new push subscription.
delete <subscriptionName> Deletes a subscription.
get <subscriptionName> Gets the metadata for a subscription.
pull <subscriptionName> Pulls messages for a subscription.
listen <subscriptionName> Listens to messages for a subscription.
get-policy <subscriptionName> Gets the IAM policy for a subscription.
set-policy <subscriptionName> Sets the IAM policy for a subscription.
test-permissions <subscriptionName> Tests the permissions for a subscription.
Expand Down
6 changes: 3 additions & 3 deletions pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
"test": "samples test run --cmd ava -- -T 30s --verbose system-test/*.test.js"
},
"dependencies": {
"@google-cloud/pubsub": "0.13.2",
"@google-cloud/pubsub": "0.14.0",
"yargs": "8.0.2"
},
"devDependencies": {
"@google-cloud/nodejs-repo-tools": "1.4.17",
"ava": "0.21.0",
"ava": "0.22.0",
"proxyquire": "1.8.0",
"sinon": "3.2.0"
"sinon": "3.2.1"
},
"cloud-repo-tools": {
"requiresKeyFile": true,
Expand Down
139 changes: 77 additions & 62 deletions pubsub/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ function createSubscription (topicName, subscriptionName) {
const topic = pubsub.topic(topicName);

// Creates a new subscription, e.g. "my-new-subscription"
return topic.subscribe(subscriptionName)
return topic.createSubscription(subscriptionName)
.then((results) => {
const subscription = results[0];

Expand Down Expand Up @@ -101,7 +101,7 @@ function createPushSubscription (topicName, subscriptionName) {
};

// Creates a new push subscription, e.g. "my-new-subscription"
return topic.subscribe(subscriptionName, options)
return topic.createSubscription(subscriptionName, options)
.then((results) => {
const subscription = results[0];

Expand Down Expand Up @@ -151,32 +151,34 @@ function getSubscription (subscriptionName) {
}
// [END pubsub_get_subscription]

// [START pubsub_pull_messages]
function pullMessages (subscriptionName) {
// [START pubsub_listen_messages]
function listenForMessages (subscriptionName, timeout) {
// Instantiates a client
const pubsub = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsub.subscription(subscriptionName);

// Pulls messages. Set returnImmediately to false to block until messages are
// received.
return subscription.pull()
.then((results) => {
const messages = results[0];

console.log(`Received ${messages.length} messages.`);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = (message) => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;

messages.forEach((message) => {
console.log(`* %d %j %j`, message.id, message.data, message.attributes);
});
// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Acknowledges received messages. If you do not acknowledge, Pub/Sub will
// redeliver the message.
return subscription.ack(messages.map((message) => message.ackId));
});
// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// [END pubsub_pull_messages]
// [END pubsub_listen_messages]

let subscribeCounterValue = 1;

Expand All @@ -188,54 +190,61 @@ function setSubscribeCounterValue (value) {
subscribeCounterValue = value;
}

// [START pubsub_pull_ordered_messages]
// [START pubsub_listen_ordered_messages]
const outstandingMessages = {};

function pullOrderedMessages (subscriptionName) {
function listenForOrderedMessages (subscriptionName, timeout) {
// Instantiates a client
const pubsub = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsub.subscription(subscriptionName);

// Pulls messages. Set returnImmediately to false to block until messages are
// received.
return subscription.pull()
.then((results) => {
const messages = results[0];

// Pub/Sub messages are unordered, so here we manually order messages by
// their "counterId" attribute which was set when they were published.
messages.forEach((message) => {
outstandingMessages[message.attributes.counterId] = message;
});

const outstandingIds = Object.keys(outstandingMessages).map((counterId) => parseInt(counterId, 10));
outstandingIds.sort();

outstandingIds.forEach((counterId) => {
const counter = getSubscribeCounterValue();
const message = outstandingMessages[counterId];

if (counterId < counter) {
// The message has already been processed
subscription.ack(message.ackId);
delete outstandingMessages[counterId];
} else if (counterId === counter) {
// Process the message
console.log(`* %d %j %j`, message.id, message.data, message.attributes);

setSubscribeCounterValue(counterId + 1);
subscription.ack(message.ackId);
delete outstandingMessages[counterId];
} else {
// Have not yet processed the message on which this message is dependent
return false;
}
});
// Create an event handler to handle messages
const messageHandler = function (message) {
// Buffer the message in an object (for later ordering)
outstandingMessages[message.attributes.counterId] = message;

// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Listen for new messages until timeout is hit
return new Promise((resolve) => {
subscription.on(`message`, messageHandler);
setTimeout(() => {
subscription.removeListener(`message`, messageHandler);
resolve();
}, timeout * 1000);
})
.then(() => {
// Pub/Sub messages are unordered, so here we manually order messages by
// their "counterId" attribute which was set when they were published.
const outstandingIds = Object.keys(outstandingMessages).map((counterId) => parseInt(counterId, 10));
outstandingIds.sort();

outstandingIds.forEach((counterId) => {
const counter = getSubscribeCounterValue();
const message = outstandingMessages[counterId];

if (counterId < counter) {
// The message has already been processed
message.ack();
delete outstandingMessages[counterId];
} else if (counterId === counter) {
// Process the message
console.log(`* %d %j %j`, message.id, message.data.toString(), message.attributes);
setSubscribeCounterValue(counterId + 1);
message.ack();
delete outstandingMessages[counterId];
} else {
// Have not yet processed the message on which this message is dependent
return false;
}
});
});
}
// [END pubsub_pull_ordered_messages]
// [END pubsub_listen_ordered_messages]

// [START pubsub_get_subscription_policy]
function getSubscriptionPolicy (subscriptionName) {
Expand Down Expand Up @@ -318,7 +327,7 @@ function testSubscriptionPermissions (subscriptionName) {
}
// [END pubsub_test_subscription_permissions]

module.exports = { pullOrderedMessages };
module.exports = { listenForOrderedMessages };

const cli = require(`yargs`)
.demand(1)
Expand Down Expand Up @@ -359,10 +368,16 @@ const cli = require(`yargs`)
(opts) => getSubscription(opts.subscriptionName)
)
.command(
`pull <subscriptionName>`,
`Pulls messages for a subscription.`,
{},
(opts) => pullMessages(opts.subscriptionName)
`listen <subscriptionName>`,
`Listens to messages for a subscription.`,
{
timeout: {
alias: 't',
type: 'number',
default: 10
}
},
(opts) => listenForMessages(opts.subscriptionName, opts.timeout)
)
.command(
`get-policy <subscriptionName>`,
Expand Down
89 changes: 53 additions & 36 deletions pubsub/system-test/subscriptions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,22 @@ test.beforeEach(tools.stubConsole);
test.afterEach.always(tools.restoreConsole);

test.serial(`should create a subscription`, async (t) => {
t.plan(1);
const output = await tools.runAsync(`${cmd} create ${topicNameOne} ${subscriptionNameOne}`, cwd);
t.is(output, `Subscription ${fullSubscriptionNameOne} created.`);
const results = await pubsub.subscription(subscriptionNameOne).exists();
t.true(results[0]);
await tools.tryTest(async (assert) => {
const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions();
assert.equal(subscriptions[0].name, fullSubscriptionNameOne);
}).start();
});

test.serial(`should create a push subscription`, async (t) => {
const output = await tools.runAsync(`${cmd} create-push ${topicNameOne} ${subscriptionNameTwo}`, cwd);
t.is(output, `Subscription ${fullSubscriptionNameTwo} created.`);
const results = await pubsub.subscription(subscriptionNameTwo).exists();
t.true(results[0]);
await tools.tryTest(async (assert) => {
const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions();
assert(subscriptions.some((s) => s.name === fullSubscriptionNameTwo));
}).start();
});

test.serial(`should get metadata for a subscription`, async (t) => {
Expand All @@ -86,52 +91,60 @@ test.serial(`should get metadata for a subscription`, async (t) => {
});

test.serial(`should list all subscriptions`, async (t) => {
await tools.tryTest(async () => {
t.plan(0);
await tools.tryTest(async (assert) => {
const output = await tools.runAsync(`${cmd} list`, cwd);
t.true(output.includes(`Subscriptions:`));
t.true(output.includes(fullSubscriptionNameOne));
t.true(output.includes(fullSubscriptionNameTwo));
assert(output.includes(`Subscriptions:`));
assert(output.includes(fullSubscriptionNameOne));
assert(output.includes(fullSubscriptionNameTwo));
}).start();
});

test.serial(`should list subscriptions for a topic`, async (t) => {
const output = await tools.runAsync(`${cmd} list ${topicNameOne}`, cwd);
t.true(output.includes(`Subscriptions for ${topicNameOne}:`));
t.true(output.includes(fullSubscriptionNameOne));
t.true(output.includes(fullSubscriptionNameTwo));
t.plan(0);
await tools.tryTest(async (assert) => {
const output = await tools.runAsync(`${cmd} list ${topicNameOne}`, cwd);
assert(output.includes(`Subscriptions for ${topicNameOne}:`));
assert(output.includes(fullSubscriptionNameOne));
assert(output.includes(fullSubscriptionNameTwo));
}).start();
});

test.serial(`should pull messages`, async (t) => {
test.serial(`should listen for messages`, async (t) => {
const expected = `Hello, world!`;
const results = await pubsub.topic(topicNameOne).publish(expected);
const messageIds = results[0];
const expectedOutput = `Received ${messageIds.length} messages.\n* ${messageIds[0]} "${expected}" {}`;
const output = await tools.runAsync(`${cmd} pull ${subscriptionNameOne}`, cwd);
t.is(output, expectedOutput);
const messageIds = await pubsub.topic(topicNameOne).publisher().publish(Buffer.from(expected));
const output = await tools.runAsync(`${cmd} listen ${subscriptionNameOne}`, cwd);
t.true(output.includes(`Received message ${messageIds[0]}:`));
});

test.serial(`should pull ordered messages`, async (t) => {
test.serial(`should listen for ordered messages`, async (t) => {
const timeout = 5;
const subscriptions = require('../subscriptions');
const expected = `Hello, world!`;
const expectedBuffer = Buffer.from(expected);
const publishedMessageIds = [];
await pubsub.topic(topicNameTwo).subscribe(subscriptionNameThree);
let results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '3' } }, { raw: true });
publishedMessageIds.push(results[0][0]);
await subscriptions.pullOrderedMessages(subscriptionNameThree);
const publisherTwo = pubsub.topic(topicNameTwo).publisher();

await pubsub.topic(topicNameTwo).createSubscription(subscriptionNameThree);
let [result] = await publisherTwo.publish(expectedBuffer, { counterId: '3' });
publishedMessageIds.push(result);
await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout);
t.is(console.log.callCount, 0);
results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '1' } }, { raw: true });
publishedMessageIds.push(results[0][0]);
await subscriptions.pullOrderedMessages(subscriptionNameThree);

[result] = await publisherTwo.publish(expectedBuffer, { counterId: '1' });
publishedMessageIds.push(result);
await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout);
t.is(console.log.callCount, 1);
t.deepEqual(console.log.firstCall.args, [`* %d %j %j`, publishedMessageIds[1], expected, { counterId: '1' }]);
results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '1' } }, { raw: true });
results = await pubsub.topic(topicNameTwo).publish({ data: expected, attributes: { counterId: '2' } }, { raw: true });
publishedMessageIds.push(results[0][0]);
await tools.tryTest(async () => {
await subscriptions.pullOrderedMessages(subscriptionNameThree);
t.is(console.log.callCount, 3);
t.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
t.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);

[result] = await publisherTwo.publish(expectedBuffer, { counterId: '1' });
[result] = await publisherTwo.publish(expectedBuffer, { counterId: '2' });
publishedMessageIds.push(result);
await tools.tryTest(async (assert) => {
await subscriptions.listenForOrderedMessages(subscriptionNameThree, timeout);
assert.equal(console.log.callCount, 3);
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);
});
});

Expand Down Expand Up @@ -163,8 +176,12 @@ test.serial(`should test permissions for a subscription`, async (t) => {
});

test.serial(`should delete a subscription`, async (t) => {
t.plan(1);
const output = await tools.runAsync(`${cmd} delete ${subscriptionNameOne}`, cwd);
t.is(output, `Subscription ${fullSubscriptionNameOne} deleted.`);
const results = await pubsub.subscription(subscriptionNameOne).exists();
t.false(results[0], false);
await tools.tryTest(async (assert) => {
const [subscriptions] = await pubsub.getSubscriptions();
assert.ok(subscriptions);
assert(subscriptions.every((s) => s.name !== fullSubscriptionNameOne));
}).start();
});
Loading

0 comments on commit 6fca8cf

Please sign in to comment.