diff --git a/samples/synchronousPull.js b/samples/synchronousPull.js index 4f917f070..2f7698af2 100644 --- a/samples/synchronousPull.js +++ b/samples/synchronousPull.js @@ -36,7 +36,7 @@ function main( * TODO(developer): Uncomment these variables before running the sample. */ // const projectId = 'YOUR_PROJECT_ID'; - // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME'; + // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; // Imports the Google Cloud client library. v1 is for the lower level // proto access. @@ -46,10 +46,11 @@ function main( const subClient = new v1.SubscriberClient(); async function synchronousPull() { - const formattedSubscription = subClient.subscriptionPath( - projectId, - subscriptionNameOrId - ); + // The low level API client requires a name only. + const formattedSubscription = + subscriptionNameOrId.indexOf('/') >= 0 + ? subscriptionNameOrId + : subClient.subscriptionPath(projectId, subscriptionNameOrId); // The maximum number of messages returned for this request. // Pub/Sub may return fewer than the number specified. diff --git a/samples/synchronousPullWithDeliveryAttempts.js b/samples/synchronousPullWithDeliveryAttempts.js index 8545c30df..25df09c00 100644 --- a/samples/synchronousPullWithDeliveryAttempts.js +++ b/samples/synchronousPullWithDeliveryAttempts.js @@ -46,10 +46,11 @@ function main( const subClient = new v1.SubscriberClient(); async function synchronousPullWithDeliveryAttempts() { - const formattedSubscription = subClient.subscriptionPath( - projectId, - subscriptionNameOrId - ); + // The low level API client requires a name only. + const formattedSubscription = + subscriptionNameOrId.indexOf('/') >= 0 + ? subscriptionNameOrId + : subClient.subscriptionPath(projectId, subscriptionNameOrId); // The maximum number of messages returned for this request. // Pub/Sub may return fewer than the number specified. diff --git a/samples/synchronousPullWithLeaseManagement.js b/samples/synchronousPullWithLeaseManagement.js index 0822a4f5d..5a254e765 100644 --- a/samples/synchronousPullWithLeaseManagement.js +++ b/samples/synchronousPullWithLeaseManagement.js @@ -46,10 +46,11 @@ function main( const subClient = new v1.SubscriberClient(); async function synchronousPullWithLeaseManagement() { - const formattedSubscription = subClient.subscriptionPath( - projectId, - subscriptionNameOrId - ); + // The low level API client requires a name only. + const formattedSubscription = + subscriptionNameOrId.indexOf('/') >= 0 + ? subscriptionNameOrId + : subClient.subscriptionPath(projectId, subscriptionNameOrId); // The maximum number of messages returned for this request. // Pub/Sub may return fewer than the number specified. diff --git a/samples/system-test/schema.test.ts b/samples/system-test/schema.test.ts index 8fb10fd17..760ea76ab 100644 --- a/samples/system-test/schema.test.ts +++ b/samples/system-test/schema.test.ts @@ -23,7 +23,7 @@ import { Topic, } from '@google-cloud/pubsub'; import {assert} from 'chai'; -import {describe, it, after} from 'mocha'; +import {describe, it, afterEach} from 'mocha'; import * as cp from 'child_process'; import * as uuid from 'uuid'; import * as path from 'path'; @@ -39,7 +39,7 @@ describe('schema', () => { const projectId = process.env.GCLOUD_PROJECT; const pubsub = new PubSub({projectId}); const runId = uuid.v4(); - console.log(`Topics runId: ${runId}`); + console.log(`Schema runId: ${runId}`); const topicIdStem = `schema-top-${runId}-`; const subscriptionIdStem = `schema-sub-${runId}-`; const schemaIdStem = `schema-${runId}-`; @@ -90,7 +90,7 @@ describe('schema', () => { ); } - after(async () => { + afterEach(async () => { await cleanAllSubs(); await cleanAllTopics(); await cleanAllSchemas(); diff --git a/samples/system-test/subscriptions.test.ts b/samples/system-test/subscriptions.test.ts index 290fa914c..46087059c 100644 --- a/samples/system-test/subscriptions.test.ts +++ b/samples/system-test/subscriptions.test.ts @@ -12,9 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -import {PubSub} from '@google-cloud/pubsub'; +import { + CreateSubscriptionOptions, + PubSub, + Subscription, + Topic, +} from '@google-cloud/pubsub'; import {assert} from 'chai'; -import {describe, it, before, after} from 'mocha'; +import {describe, it, afterEach} from 'mocha'; import {execSync, commandFor} from './common'; import * as uuid from 'uuid'; @@ -22,135 +27,167 @@ describe('subscriptions', () => { const projectId = process.env.GCLOUD_PROJECT; const pubsub = new PubSub({projectId}); const runId = uuid.v4(); - const topicNameOne = `topic1-${runId}`; - const topicNameTwo = `topic2-${runId}`; - const topicNameThree = `topic3-${runId}`; - const subscriptionNameOne = `sub1-${runId}`; - const subscriptionNameTwo = `sub2-${runId}`; - const subscriptionNameFour = `sub4-${runId}`; - const subscriptionNameFive = `sub5-${runId}`; - const subscriptionNameSix = `sub6-${runId}`; - const subscriptionNameSeven = `sub7-${runId}`; - const subscriptionNameEight = `sub8-${runId}`; - const subscriptionNameDetach = `testdetachsubsxyz-${runId}`; - const subscriptionNameFiltered = `testfilteredsub-${runId}`; - const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`; - const fullSubscriptionNameOne = `projects/${projectId}/subscriptions/${subscriptionNameOne}`; - const fullSubscriptionNameTwo = `projects/${projectId}/subscriptions/${subscriptionNameTwo}`; - const fullSubscriptionNameFour = `projects/${projectId}/subscriptions/${subscriptionNameFour}`; - - before(async () => { - return Promise.all([ - pubsub.createTopic(topicNameOne), - pubsub.createTopic(topicNameTwo), - pubsub.createTopic(topicNameThree), - ]); - }); + const topicNameStem = `stest-topic-${runId}`; + const subNameStem = `stest-sub-${runId}`; + + let nextTopicId = 0; + async function createTopic(): Promise { + const id = `${topicNameStem}-${nextTopicId++}`; + return (await pubsub.createTopic(id))[0]; + } + + let nextSubId = 0; + async function createSub( + topic: Topic, + options?: CreateSubscriptionOptions + ): Promise { + const id = `${subNameStem}-${nextSubId++}`; + return (await topic.createSubscription(id, options))[0]; + } + + function reserveSub(): string { + const id = `${subNameStem}-${nextSubId++}`; + return id; + } + + function fullTopicName(topicId: string): string { + if (topicId.startsWith('projects/')) { + return topicId; + } else { + return `projects/${projectId}/topics/${topicId}`; + } + } - after(async () => { + function fullSubName(subId: string): string { + if (subId.startsWith('projects/')) { + return subId; + } else { + return `projects/${projectId}/subscriptions/${subId}`; + } + } + + // We want this to be run after each test, because otherwise interrupting the + // tests anywhere would litter a bunch of topics/subs. + afterEach(async () => { const [subscriptions] = await pubsub.getSubscriptions(); - const runSubs = subscriptions.filter(x => x.name.endsWith(runId)); + const runSubs = subscriptions.filter(x => x.name.indexOf(runId) >= 0); for (const sub of runSubs) { await sub.delete(); } const [topics] = await pubsub.getTopics(); - const runTops = topics.filter(x => x.name.endsWith(runId)); + const runTops = topics.filter(x => x.name.indexOf(runId) >= 0); for (const t of runTops) { await t.delete(); } }); it('should create a subscription', async () => { + const topic = await createTopic(); + const subName = reserveSub(); const output = execSync( - `${commandFor( - 'createSubscription' - )} ${topicNameOne} ${subscriptionNameOne}` + `${commandFor('createSubscription')} ${topic.name} ${subName}` ); - assert.include(output, `Subscription ${subscriptionNameOne} created.`); - const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions(); - assert.strictEqual(subscriptions[0].name, fullSubscriptionNameOne); + console.log('create', output); + assert.include(output, `Subscription ${subName} created.`); + const [subscriptions] = await pubsub.topic(topic.name).getSubscriptions(); + assert.strictEqual(subscriptions[0].name, fullSubName(subName)); }); it('should create a subscription with filtering', async () => { const filter = 'attributes.author="unknown"'; + const topic = await createTopic(); + const subName = reserveSub(); const output = execSync( - `${commandFor( - 'createSubscriptionWithFiltering' - )} ${topicNameOne} ${subscriptionNameFiltered} '${filter}'` + `${commandFor('createSubscriptionWithFiltering')} ${ + topic.name + } ${subName} '${filter}'` ); + console.log('create filtering', output); assert.include( output, - `Created subscription ${subscriptionNameFiltered} with filter ${filter}` + `Created subscription ${subName} with filter ${filter}` ); - const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions(); - assert.strictEqual(subscriptions[0].name, fullSubscriptionNameOne); + const [subscriptions] = await pubsub.topic(topic.name).getSubscriptions(); + assert.strictEqual(subscriptions[0].name, fullSubName(subName)); }); it('should create a push subscription', async () => { + const topic = await createTopic(); + const subName = reserveSub(); const output = execSync( - `${commandFor( - 'createPushSubscription' - )} ${topicNameOne} ${subscriptionNameTwo}` + `${commandFor('createPushSubscription')} ${topic.name} ${subName}` ); - assert.include(output, `Subscription ${subscriptionNameTwo} created.`); - const [subscriptions] = await pubsub.topic(topicNameOne).getSubscriptions(); - assert(subscriptions.some(s => s.name === fullSubscriptionNameTwo)); + assert.include(output, `Subscription ${subName} created.`); + const [subscriptions] = await pubsub.topic(topic.name).getSubscriptions(); + assert(subscriptions.some(s => s.name === fullSubName(subName))); }); it('should modify the config of an existing push subscription', async () => { + const topic = await createTopic(); + const sub = await createSub(topic); const output = execSync( - `${commandFor('modifyPushConfig')} ${topicNameTwo} ${subscriptionNameTwo}` + `${commandFor('modifyPushConfig')} ${topic.name} ${sub.name}` ); assert.include( output, - `Modified push config for subscription ${subscriptionNameTwo}.` + `Modified push config for subscription ${sub.name}.` ); }); it('should get metadata for a subscription', async () => { - const output = execSync( - `${commandFor('getSubscription')} ${subscriptionNameOne}` - ); + const topic = await createTopic(); + const sub = await createSub(topic); + const output = execSync(`${commandFor('getSubscription')} ${sub.name}`); const expected = - `Subscription: ${fullSubscriptionNameOne}` + - `\nTopic: ${fullTopicNameOne}` + + `Subscription: ${fullSubName(sub.name)}` + + `\nTopic: ${fullTopicName(topic.name)}` + '\nPush config: ' + '\nAck deadline: 10s'; assert.include(output, expected); }); it('should list all subscriptions', async () => { + const topic = await createTopic(); + const sub1 = await createSub(topic), + sub2 = await createSub(topic); const output = execSync(`${commandFor('listSubscriptions')}`); assert.match(output, /Subscriptions:/); - assert.match(output, new RegExp(fullSubscriptionNameOne)); - assert.match(output, new RegExp(fullSubscriptionNameTwo)); + assert.match(output, new RegExp(fullSubName(sub1.name))); + assert.match(output, new RegExp(fullSubName(sub2.name))); }); it('should list subscriptions for a topic', async () => { + const topic = await createTopic(); + const sub1 = await createSub(topic), + sub2 = await createSub(topic); const output = execSync( - `${commandFor('listTopicSubscriptions')} ${topicNameOne}` + `${commandFor('listTopicSubscriptions')} ${topic.name}` ); - assert.match(output, new RegExp(`Subscriptions for ${topicNameOne}:`)); - assert.match(output, new RegExp(fullSubscriptionNameOne)); - assert.match(output, new RegExp(fullSubscriptionNameTwo)); + assert.match(output, new RegExp(`Subscriptions for ${topic.name}:`)); + assert.match(output, new RegExp(fullSubName(sub1.name))); + assert.match(output, new RegExp(fullSubName(sub2.name))); }); it('should listen for messages', async () => { + const topic = await createTopic(); + const sub = await createSub(topic); const messageIds = await pubsub - .topic(topicNameOne) + .topic(topic.name) .publish(Buffer.from('Hello, world!')); const output = execSync( - `${commandFor('listenForMessages')} ${subscriptionNameOne} 10` + `${commandFor('listenForMessages')} ${sub.name} 10` ); assert.match(output, new RegExp(`Received message ${messageIds}:`)); }); it('should listen for messages with custom attributes', async () => { + const topic = await createTopic(); + const sub = await createSub(topic); const messageIds = await pubsub - .topic(topicNameOne) + .topic(topic.name) .publish(Buffer.from('Hello, world!'), {attr: 'value'}); const output = execSync( - `${commandFor('listenWithCustomAttributes')} ${subscriptionNameOne} 10` + `${commandFor('listenWithCustomAttributes')} ${sub.name} 10` ); assert.match( output, @@ -159,40 +196,44 @@ describe('subscriptions', () => { }); it('should listen for messages synchronously', async () => { - await pubsub.topic(topicNameOne).publish(Buffer.from('Hello, world!')); + const topic = await createTopic(); + const sub = await createSub(topic); + await pubsub.topic(topic.name).publish(Buffer.from('Hello, world!')); const output = execSync( - `${commandFor('synchronousPull')} ${projectId} ${subscriptionNameOne}` + `${commandFor('synchronousPull')} ${projectId} ${sub.name}` ); assert.match(output, /Hello/); assert.match(output, /Done./); }); it('should listen for messages synchronously with lease management', async () => { - await pubsub.topic(topicNameOne).publish(Buffer.from('Hello, world!')); + const topic = await createTopic(); + const sub = await createSub(topic); + await pubsub.topic(topic.name).publish(Buffer.from('Hello, world!')); const output = execSync( - `${commandFor( - 'synchronousPullWithLeaseManagement' - )} ${projectId} ${subscriptionNameOne}` + `${commandFor('synchronousPullWithLeaseManagement')} ${projectId} ${ + sub.name + }` ); assert.match(output, /Done./); }); it('should listen to messages with flow control', async () => { - const topicTwo = pubsub.topic(topicNameTwo); - await topicTwo.subscription(subscriptionNameFour).get({autoCreate: true}); + const topic = await createTopic(); + const sub = await createSub(topic); + const topicTwo = pubsub.topic(topic.name); + await topicTwo.subscription(sub.name).get({autoCreate: true}); await topicTwo.publish(Buffer.from('Hello, world!')); const output = execSync( - `${commandFor( - 'subscribeWithFlowControlSettings' - )} ${subscriptionNameFour} 5` + `${commandFor('subscribeWithFlowControlSettings')} ${sub.name} 5` ); assert.include( output, 'ready to receive messages at a controlled volume of 5 messages.' ); - const [subscriptions] = await pubsub.topic(topicNameTwo).getSubscriptions(); - assert(subscriptions.some(s => s.name === fullSubscriptionNameFour)); + const [subscriptions] = await pubsub.topic(topic.name).getSubscriptions(); + assert(subscriptions.some(s => s.name === fullSubName(sub.name))); }); it('should listen for error messages', () => { @@ -203,10 +244,10 @@ describe('subscriptions', () => { }); it('should set the IAM policy for a subscription', async () => { - execSync(`${commandFor('setSubscriptionPolicy')} ${subscriptionNameOne}`); - const results = await pubsub - .subscription(subscriptionNameOne) - .iam.getPolicy(); + const topic = await createTopic(); + const sub = await createSub(topic); + execSync(`${commandFor('setSubscriptionPolicy')} ${sub.name}`); + const results = await pubsub.subscription(sub.name).iam.getPolicy(); const policy = results[0]; assert.deepStrictEqual(policy.bindings, [ { @@ -223,11 +264,11 @@ describe('subscriptions', () => { }); it('should get the IAM policy for a subscription', async () => { - const results = await pubsub - .subscription(subscriptionNameOne) - .iam.getPolicy(); + const topic = await createTopic(); + const sub = await createSub(topic); + const results = await sub.iam.getPolicy(); const output = execSync( - `${commandFor('getSubscriptionPolicy')} ${subscriptionNameOne}` + `${commandFor('getSubscriptionPolicy')} ${sub.name}` ); assert.include( output, @@ -236,48 +277,52 @@ describe('subscriptions', () => { }); it('should test permissions for a subscription', async () => { + const topic = await createTopic(); + const sub = await createSub(topic); const output = execSync( - `${commandFor('testSubscriptionPermissions')} ${subscriptionNameOne}` + `${commandFor('testSubscriptionPermissions')} ${sub.name}` ); assert.match(output, /Tested permissions for subscription/); }); it('should delete a subscription', async () => { - const output = execSync( - `${commandFor('deleteSubscription')} ${subscriptionNameOne}` - ); - assert.include(output, `Subscription ${subscriptionNameOne} deleted.`); + const topic = await createTopic(); + const sub = await createSub(topic); + const output = execSync(`${commandFor('deleteSubscription')} ${sub.name}`); + assert.include(output, `Subscription ${sub.name} deleted.`); const [subscriptions] = await pubsub.getSubscriptions(); assert.ok(subscriptions); - assert(subscriptions.every(s => s.name !== fullSubscriptionNameOne)); + assert(subscriptions.every(s => s.name !== fullSubName(sub.name))); }); it('should detach a subscription', async () => { - await pubsub.createSubscription(topicNameOne, subscriptionNameDetach); - const output = execSync( - `${commandFor('detachSubscription')} ${subscriptionNameDetach}` - ); + const topic = await createTopic(); + const sub = await createSub(topic); + const output = execSync(`${commandFor('detachSubscription')} ${sub.name}`); assert.include(output, "'before' detached status: false"); assert.include(output, "'after' detached status: true"); const [subscriptionDetached] = await pubsub - .subscription(subscriptionNameDetach) + .subscription(sub.name) .detached(); assert(subscriptionDetached === true); }); it('should create a subscription with dead letter policy.', async () => { + const topic = await createTopic(), + topicDeadLetter = await createTopic(); + const subName = reserveSub(); const output = execSync( - `${commandFor( - 'createSubscriptionWithDeadLetterPolicy' - )} ${topicNameTwo} ${subscriptionNameFive} ${topicNameThree}` + `${commandFor('createSubscriptionWithDeadLetterPolicy')} ${ + topic.name + } ${subName} ${topicDeadLetter.name}` ); assert.include( output, - `Created subscription ${subscriptionNameFive} with dead letter topic ${topicNameThree}.` + `Created subscription ${subName} with dead letter topic ${topicDeadLetter.name}.` ); const [subscription] = await pubsub - .topic(topicNameTwo) - .subscription(subscriptionNameFive) + .topic(topic.name) + .subscription(subName) .get(); assert.strictEqual( subscription.metadata?.deadLetterPolicy?.maxDeliveryAttempts, @@ -286,44 +331,43 @@ describe('subscriptions', () => { }); it('should listen for messages synchronously with delivery attempts.', async () => { - await pubsub.topic(topicNameOne).createSubscription(subscriptionNameSix, { + const topic = await createTopic(), + topicDeadLetter = await createTopic(); + const sub = await createSub(topic, { deadLetterPolicy: { - deadLetterTopic: pubsub.topic(topicNameThree).name, + deadLetterTopic: topicDeadLetter.name, maxDeliveryAttempts: 10, }, }); - await pubsub.topic(topicNameOne).publish(Buffer.from('Hello, world!')); + await topic.publish(Buffer.from('Hello, world!')); const output = execSync( - `${commandFor( - 'synchronousPullWithDeliveryAttempts' - )} ${projectId} ${subscriptionNameSix}` + `${commandFor('synchronousPullWithDeliveryAttempts')} ${projectId} ${ + sub.name + }` ); assert.match(output, /Hello/); assert.match(output, /Delivery Attempt: 1/); }); it('should update a subscription with dead letter policy.', async () => { - const [presub] = await pubsub - .topic(topicNameOne) - .subscription(subscriptionNameSeven) - .get({autoCreate: true}); + const topic = await createTopic(), + topicDeadLetter = await createTopic(); + const presub = await createSub(topic); await presub.setMetadata({ deadLetterPolicy: { - deadLetterTopic: pubsub.topic(topicNameThree).name, + deadLetterTopic: topicDeadLetter.name, maxDeliveryAttempts: 10, }, }); execSync( - `${commandFor( - 'updateDeadLetterPolicy' - )} ${topicNameOne} ${subscriptionNameSeven}` + `${commandFor('updateDeadLetterPolicy')} ${topic.name} ${presub.name}` ); const [subscription] = await pubsub - .topic(topicNameOne) - .subscription(subscriptionNameSeven) + .topic(topic.name) + .subscription(presub.name) .get(); assert.equal( subscription.metadata?.deadLetterPolicy?.maxDeliveryAttempts, @@ -332,43 +376,40 @@ describe('subscriptions', () => { }); it('should remove dead letter policy.', async () => { - const [presub] = await pubsub - .topic(topicNameOne) - .subscription(subscriptionNameSeven) - .get({autoCreate: true}); + const topic = await createTopic(), + topicDeadLetter = await createTopic(); + const presub = await createSub(topic); await presub.setMetadata({ deadLetterPolicy: { - deadLetterTopic: pubsub.topic(topicNameThree).name, + deadLetterTopic: topicDeadLetter.name, maxDeliveryAttempts: 10, }, }); execSync( - `${commandFor( - 'removeDeadLetterPolicy' - )} ${topicNameOne} ${subscriptionNameSeven}` + `${commandFor('removeDeadLetterPolicy')} ${topic.name} ${presub.name}` ); const [subscription] = await pubsub - .topic(topicNameOne) - .subscription(subscriptionNameSeven) + .topic(topic.name) + .subscription(presub.name) .get(); assert.isNull(subscription.metadata?.deadLetterPolicy); }); it('should create a subscription with ordering enabled.', async () => { + const topic = await createTopic(); + const subName = reserveSub(); const output = execSync( - `${commandFor( - 'createSubscriptionWithOrdering' - )} ${topicNameTwo} ${subscriptionNameEight} ${topicNameThree}` + `${commandFor('createSubscriptionWithOrdering')} ${topic.name} ${subName}` ); assert.include( output, - `Created subscription ${subscriptionNameEight} with ordering enabled.` + `Created subscription ${subName} with ordering enabled.` ); const [subscription] = await pubsub - .topic(topicNameTwo) - .subscription(subscriptionNameEight) + .topic(topic.name) + .subscription(subName) .get(); assert.strictEqual(subscription.metadata?.enableMessageOrdering, true); });