Skip to content

Commit

Permalink
pubsub: support generated subscription names
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Nov 16, 2016
1 parent 75992b3 commit ff6e4f7
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 73 deletions.
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,7 @@ var topic = pubsubClient.topic('my-topic');
topic.publish('New message!', function(err) {});

// Subscribe to the topic.
var options = {
reuseExisting: true
};

topic.subscribe('subscription-name', options, function(err, subscription) {
topic.subscribe('subscription-name', function(err, subscription) {
// Register listeners to start pulling for messages.
function onError(err) {}
function onMessage(message) {}
Expand Down
6 changes: 1 addition & 5 deletions packages/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ var topic = pubsub.topic('my-topic');
topic.publish('New message!', function(err) {});

// Subscribe to the topic.
var options = {
reuseExisting: true
};

topic.subscribe('subscription-name', options, function(err, subscription) {
topic.subscribe('subscription-name', function(err, subscription) {
// Register listeners to start pulling for messages.
function onError(err) {}
function onMessage(message) {}
Expand Down
2 changes: 1 addition & 1 deletion packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@
"google-proto-files": "^0.8.0",
"is": "^3.0.1",
"modelo": "^4.2.0",
"node-uuid": "^1.4.3",
"propprop": "^0.3.0"
},
"devDependencies": {
"async": "^1.5.2",
"mocha": "^3.0.1",
"node-uuid": "^1.4.3",
"proxyquire": "^1.7.10"
},
"scripts": {
Expand Down
51 changes: 33 additions & 18 deletions packages/pubsub/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,17 @@ PubSub.prototype.getTopicsStream = common.paginator.streamify('getTopics');
/**
* Create a subscription to a topic.
*
* All generated subscription names share a common prefix, `generated-`.
*
* @resource [Subscriptions: create API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/create}
*
* @throws {Error} If a Topic instance or topic name is not provided.
* @throws {Error} If a subName is not provided.
*
* @param {module:pubsub/topic|string} topic - The Topic to create a
* subscription to.
* @param {string} subName - The name of the subscription.
* @param {string=} subName - The name of the subscription. If a name is not
* provided, a random subscription name will be generated and created.
* @param {object=} options - See a
* [Subscription resource](https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions)
* @param {number} options.ackDeadlineSeconds - The maximum time after receiving
Expand All @@ -431,10 +434,6 @@ PubSub.prototype.getTopicsStream = common.paginator.streamify('getTopics');
* simultaneously.
* @param {string} options.pushEndpoint - A URL to a custom endpoint that
* messages should be pushed to.
* @param {boolean} options.reuseExisting - If the subscription already exists,
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
* (default: false)
* @param {number} options.timeout - Set a maximum amount of time in
* milliseconds on an HTTP request to pull new messages to wait for a
* response before the connection is broken.
Expand All @@ -453,6 +452,14 @@ PubSub.prototype.getTopicsStream = common.paginator.streamify('getTopics');
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
*
* //-
* // Omit the name to have one generated automatically. All generated names
* // share a common prefix, `generated-`.
* //-
* pubsub.subscribe(topic, function(err, subscription, apiResponse) {
* // subscription.name = The generated name.
* });
*
* //-
* // Customize the subscription.
* //-
* pubsub.subscribe(topic, name, {
Expand All @@ -474,21 +481,27 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
throw new Error('A Topic is required for a new subscription.');
}

if (!is.string(subName)) {
throw new Error('A subscription name is required for a new subscription.');
if (is.string(topic)) {
topic = this.topic(topic);
}

if (!callback) {
if (is.object(subName)) {
options = subName;
subName = '';
}

if (is.fn(subName)) {
callback = subName;
subName = '';
}

if (is.fn(options)) {
callback = options;
options = {};
}

options = options || {};

if (is.string(topic)) {
topic = this.topic(topic);
}

var subscription = this.subscription(subName, options);

var protoOpts = {
Expand All @@ -513,11 +526,10 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
delete reqOpts.interval;
delete reqOpts.maxInProgress;
delete reqOpts.pushEndpoint;
delete reqOpts.reuseExisting;
delete reqOpts.timeout;

this.request(protoOpts, reqOpts, function(err, resp) {
if (err && !(err.code === 409 && options.reuseExisting)) {
if (err && err.code !== 409) {
callback(err, null, resp);
return;
}
Expand All @@ -531,9 +543,10 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
* requests. You will receive a {module:pubsub/subscription} object,
* which will allow you to interact with a subscription.
*
* @throws {Error} If a name is not provided.
* All generated names share a common prefix, `generated-`.
*
* @param {string} name - The name of the subscription.
* @param {string=} name - The name of the subscription. If a name is not
* provided, a random subscription name will be generated.
* @param {object=} options - Configuration object.
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled. (default: false)
Expand All @@ -560,12 +573,14 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
* });
*/
PubSub.prototype.subscription = function(name, options) {
if (!name) {
throw new Error('The name of a subscription is required.');
if (is.object(name)) {
options = name;
name = undefined;
}

options = options || {};
options.name = name;

return new Subscription(this, options);
};

Expand Down
14 changes: 13 additions & 1 deletion packages/pubsub/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var events = require('events');
var is = require('is');
var modelo = require('modelo');
var prop = require('propprop');
var uuid = require('node-uuid');

/**
* @type {module:pubsub/iam}
Expand Down Expand Up @@ -140,7 +141,9 @@ var PUBSUB_API_TIMEOUT = 90000;
* subscription.removeListener('message', onMessage);
*/
function Subscription(pubsub, options) {
this.name = Subscription.formatName_(pubsub.projectId, options.name);
var name = options.name || Subscription.generateName_();

this.name = Subscription.formatName_(pubsub.projectId, name);

var methods = {
/**
Expand Down Expand Up @@ -374,6 +377,15 @@ Subscription.formatName_ = function(projectId, name) {
return 'projects/' + projectId + '/subscriptions/' + name;
};

/**
* Generate a random name to use for a name-less subscription.
*
* @private
*/
Subscription.generateName_ = function() {
return 'autogenerated-' + uuid.v4();
};

/**
* Acknowledge to the backend that the message was retrieved. You must provide
* either a single ackId or an array of ackIds.
Expand Down
17 changes: 12 additions & 5 deletions packages/pubsub/src/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,12 @@ Topic.prototype.publish = function(messages, options, callback) {
/**
* Create a subscription to this topic.
*
* All generated subscription names share a common prefix, `generated-`.
*
* @resource [Subscriptions: create API Documentation]{@link https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/create}
*
* @param {string} subName - The name of the subscription.
* @param {string=} subName - The name of the subscription. If a name is not
* provided, a random subscription name will be generated and created.
* @param {object=} options - See a
* [Subscription resource](https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions)
* @param {number} options.ackDeadlineSeconds - The maximum time after
Expand All @@ -494,10 +497,6 @@ Topic.prototype.publish = function(messages, options, callback) {
* simultaneously.
* @param {string} options.pushEndpoint - A URL to a custom endpoint that
* messages should be pushed to.
* @param {boolean=} options.reuseExisting - If the subscription already exists,
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
* (default: false)
* @param {number} options.timeout - Set a maximum amount of time in
* milliseconds on an HTTP request to pull new messages to wait for a
* response before the connection is broken.
Expand All @@ -507,6 +506,14 @@ Topic.prototype.publish = function(messages, options, callback) {
* // Without specifying any options.
* topic.subscribe('newMessages', function(err, subscription, apiResponse) {});
*
* //-
* // Omit the name to have one generated automatically. All generated names
* // share a common prefix, `generated-`.
* //-
* topic.subscribe(function(err, subscription, apiResponse) {
* // subscription.name = The generated name.
* });
*
* // With options.
* topic.subscribe('newMessages', {
* ackDeadlineSeconds: 90,
Expand Down
9 changes: 8 additions & 1 deletion packages/pubsub/system-test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,15 @@ describe('pubsub', function() {
});
});

it('should create a subscription with a generated name', function(done) {
topic.subscribe(function(err, sub) {
assert.ifError(err);
sub.delete(done);
});
});

it('should re-use an existing subscription', function(done) {
pubsub.subscribe(topic, SUB_NAMES[0], { reuseExisting: true }, done);
pubsub.subscribe(topic, SUB_NAMES[0], done);
});

it('should error when using a non-existent subscription', function(done) {
Expand Down
63 changes: 32 additions & 31 deletions packages/pubsub/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,12 @@ describe('PubSub', function() {
}, /A Topic is required for a new subscription\./);
});

it('should throw if no sub name is provided', function() {
assert.throws(function() {
pubsub.subscribe('topic');
}, /A subscription name is required for a new subscription\./);
it('should not require a subscription name', function(done) {
pubsub.request = function(protoOpts, reqOpts, callback) {
callback(null, apiResponse);
};

pubsub.subscribe(TOPIC_NAME, done);
});

it('should not require configuration options', function(done) {
Expand Down Expand Up @@ -573,7 +575,6 @@ describe('PubSub', function() {
interval: 3,
maxInProgress: 5,
pushEndpoint: 'https://domain/push',
reuseExisting: false,
timeout: 30000
};

Expand All @@ -591,7 +592,6 @@ describe('PubSub', function() {
delete expectedBody.interval;
delete expectedBody.maxInProgress;
delete expectedBody.pushEndpoint;
delete expectedBody.reuseExisting;
delete expectedBody.timeout;

pubsub.topic = function() {
Expand Down Expand Up @@ -625,7 +625,7 @@ describe('PubSub', function() {
};
});

it('should re-use existing subscription if specified', function(done) {
it('should re-use existing subscription', function(done) {
var apiResponse = { code: 409 };

pubsub.subscription = function() {
Expand All @@ -636,18 +636,9 @@ describe('PubSub', function() {
callback({ code: 409 }, apiResponse);
};

// Don't re-use an existing subscription (error if one exists).
pubsub.subscribe(TOPIC_NAME, SUB_NAME, function(err, sub, resp) {
assert.equal(err.code, 409);
assert.strictEqual(resp, apiResponse);
});

// Re-use an existing subscription (ignore error if one exists).
var opts = { reuseExisting: true };
pubsub.subscribe(TOPIC_NAME, SUB_NAME, opts, function(err, sub) {
pubsub.subscribe(TOPIC_NAME, SUB_NAME, function(err, subscription) {
assert.ifError(err);
assert.deepEqual(sub, SUBSCRIPTION);

assert.strictEqual(subscription, SUBSCRIPTION);
done();
});
});
Expand Down Expand Up @@ -700,18 +691,20 @@ describe('PubSub', function() {
var SUB_NAME = 'new-sub-name';
var CONFIG = { autoAck: true, interval: 90 };

it('should throw if no name is provided', function() {
assert.throws(function() {
pubsub.subscription();
}, /The name of a subscription is required\./);
});

it('should return a Subscription object', function() {
SubscriptionOverride = function() {};
var subscription = pubsub.subscription(SUB_NAME, {});
assert(subscription instanceof SubscriptionOverride);
});

it('should pass specified name to the Subscription', function(done) {
SubscriptionOverride = function(pubsub, options) {
assert.equal(options.name, SUB_NAME);
done();
};
pubsub.subscription(SUB_NAME, {});
});

it('should honor settings', function(done) {
SubscriptionOverride = function(pubsub, options) {
assert.deepEqual(options, CONFIG);
Expand All @@ -720,18 +713,26 @@ describe('PubSub', function() {
pubsub.subscription(SUB_NAME, CONFIG);
});

it('should pass specified name to the Subscription', function(done) {
it('should not require a name', function(done) {
SubscriptionOverride = function(pubsub, options) {
assert.equal(options.name, SUB_NAME);
assert.deepEqual(options, {
name: undefined
});
done();
};
pubsub.subscription(SUB_NAME, {});

pubsub.subscription();
});

it('should not require options', function() {
assert.doesNotThrow(function() {
pubsub.subscription(SUB_NAME);
});
it('should not require options', function(done) {
SubscriptionOverride = function(pubsub, options) {
assert.deepEqual(options, {
name: SUB_NAME
});
done();
};

pubsub.subscription(SUB_NAME);
});
});

Expand Down
Loading

0 comments on commit ff6e4f7

Please sign in to comment.