Skip to content

Commit

Permalink
samples: add samples for AWS Kinesis ingestion (#1896)
Browse files Browse the repository at this point in the history
* samples: add samples for AWS Kinesis ingestion

* samples: add topic type update sample for kinesis ingestion

* tests: add system tests for kinesis samples

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* tests:remove incorrect test for new sample

* samples: simplify parameter names

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
feywind and gcf-owl-bot[bot] committed Mar 22, 2024
1 parent d13d395 commit 9589e9f
Show file tree
Hide file tree
Showing 7 changed files with 435 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
| Create Subscription with ordering enabled | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithOrdering.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithOrdering.js,samples/README.md) |
| Create Subscription With Retry Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createSubscriptionWithRetryPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createSubscriptionWithRetryPolicy.js,samples/README.md) |
| Create Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopic.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopic.js,samples/README.md) |
| Create Topic With Kinesis Ingestion | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithKinesisIngestion.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithKinesisIngestion.js,samples/README.md) |
| Create Topic With Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithSchema.js,samples/README.md) |
| Create Topic With Schema Revisions | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithSchemaRevisions.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithSchemaRevisions.js,samples/README.md) |
| Delete a previously created schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/deleteSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/deleteSchema.js,samples/README.md) |
Expand Down Expand Up @@ -184,6 +185,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
| Test Subscription Permissions | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/testSubscriptionPermissions.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/testSubscriptionPermissions.js,samples/README.md) |
| Test Topic Permissions | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/testTopicPermissions.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/testTopicPermissions.js,samples/README.md) |
| Update Dead Letter Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/updateDeadLetterPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/updateDeadLetterPolicy.js,samples/README.md) |
| Update Topic Ingestion Type | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/updateTopicIngestionType.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/updateTopicIngestionType.js,samples/README.md) |
| Update Topic Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/updateTopicSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/updateTopicSchema.js,samples/README.md) |
| Validate a schema definition | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/validateSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/validateSchema.js,samples/README.md) |

Expand Down
40 changes: 40 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ guides.
* [Create Subscription with ordering enabled](#create-subscription-with-ordering-enabled)
* [Create Subscription With Retry Policy](#create-subscription-with-retry-policy)
* [Create Topic](#create-topic)
* [Create Topic With Kinesis Ingestion](#create-topic-with-kinesis-ingestion)
* [Create Topic With Schema](#create-topic-with-schema)
* [Create Topic With Schema Revisions](#create-topic-with-schema-revisions)
* [Delete a previously created schema](#delete-a-previously-created-schema)
Expand Down Expand Up @@ -81,6 +82,7 @@ guides.
* [Test Subscription Permissions](#test-subscription-permissions)
* [Test Topic Permissions](#test-topic-permissions)
* [Update Dead Letter Policy](#update-dead-letter-policy)
* [Update Topic Ingestion Type](#update-topic-ingestion-type)
* [Update Topic Schema](#update-topic-schema)
* [Validate a schema definition](#validate-a-schema-definition)

Expand Down Expand Up @@ -365,6 +367,25 @@ __Usage:__



### Create Topic With Kinesis Ingestion

Creates a new topic, with Kinesis ingestion enabled.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createTopicWithKinesisIngestion.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createTopicWithKinesisIngestion.js,samples/README.md)

__Usage:__


`node createTopicWithKinesisIngestion.js <topic-name> <role-arn> <gcp-service-account> <stream-arn> <consumer-arn>`


-----




### Create Topic With Schema

Creates a new topic, with a schema definition.
Expand Down Expand Up @@ -1258,6 +1279,25 @@ __Usage:__



### Update Topic Ingestion Type

Update the ingestion type on a topic.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/updateTopicIngestionType.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/updateTopicIngestionType.js,samples/README.md)

__Usage:__


`node updateTopicIngestionType.js <topic-name-or-id> <stream-arn> <consumer-arn> <aws-role-arn> <gcp-service-account>`


-----




### Update Topic Schema

Update the schema on a topic.
Expand Down
91 changes: 91 additions & 0 deletions samples/createTopicWithKinesisIngestion.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With Kinesis Ingestion
// description: Creates a new topic, with Kinesis ingestion enabled.
// usage: node createTopicWithKinesisIngestion.js <topic-name> <role-arn> <gcp-service-account> <stream-arn> <consumer-arn>

// [START pubsub_create_topic_with_kinesis_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
topicNameOrId,
awsRoleArn,
gcpServiceAccount,
streamArn,
consumerArn
) {
// Creates a new topic with a schema. Note that you might also
// pass Encodings.Json or Encodings.Binary here.
await pubSubClient.createTopic({
name: topicNameOrId,
ingestionDataSourceSettings: {
awsKinesis: {
awsRoleArn,
gcpServiceAccount,
streamArn,
consumerArn,
},
},
});
console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}
// [END pubsub_create_topic_with_kinesis_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
roleArn = 'arn:aws:iam:...',
gcpServiceAccount = 'ingestion-account@...',
streamArn = 'arn:aws:kinesis:...',
consumerArn = 'arn:aws:kinesis:...'
) {
createTopicWithKinesisIngestion(
topicNameOrId,
roleArn,
gcpServiceAccount,
streamArn,
consumerArn
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
37 changes: 37 additions & 0 deletions samples/system-test/topics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,43 @@ describe('topics', () => {
assert.ok(exists, 'Topic was created');
});

const kinesisFakeArns = {
roleArn: 'arn:aws:iam::111111111111:role/fake-role-name',
gcpServiceAccount:
'[email protected]',
streamArn: 'arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name',
consumerArn:
'arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111',
};

it('should create a topic with kinesis ingestion', async () => {
const name = topicName('create-kinesis');

const output = execSync(
`${commandFor('createTopicWithKinesisIngestion')} ${name} ${
kinesisFakeArns.roleArn
} ${kinesisFakeArns.gcpServiceAccount} ${kinesisFakeArns.streamArn} ${
kinesisFakeArns.consumerArn
}`
);
assert.include(output, `Topic ${name} created with AWS Kinesis ingestion.`);
const [topics] = await pubsub.getTopics();
const exists = topics.some(t => t.name === fullTopicName(name));
assert.ok(exists, 'Topic was created');
});

it('should update a topic with kinesis integration', async () => {
const pair = await createPair('update-kinesis');
const output = execSync(
`${commandFor('updateTopicIngestionType')} ${pair.t.name} ${
kinesisFakeArns.roleArn
} ${kinesisFakeArns.gcpServiceAccount} ${kinesisFakeArns.streamArn} ${
kinesisFakeArns.consumerArn
}`
);
assert.include(output, 'Topic updated with Kinesis source successfully.');
});

it('should list topics', async () => {
const pair = await createPair('list');
const output = execSync(`${commandFor('listAllTopics')}`);
Expand Down
87 changes: 87 additions & 0 deletions samples/typescript/createTopicWithKinesisIngestion.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With Kinesis Ingestion
// description: Creates a new topic, with Kinesis ingestion enabled.
// usage: node createTopicWithKinesisIngestion.js <topic-name> <role-arn> <gcp-service-account> <stream-arn> <consumer-arn>

// [START pubsub_create_topic_with_kinesis_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
topicNameOrId: string,
awsRoleArn: string,
gcpServiceAccount: string,
streamArn: string,
consumerArn: string
) {
// Creates a new topic with a schema. Note that you might also
// pass Encodings.Json or Encodings.Binary here.
await pubSubClient.createTopic({
name: topicNameOrId,
ingestionDataSourceSettings: {
awsKinesis: {
awsRoleArn,
gcpServiceAccount,
streamArn,
consumerArn,
},
},
});
console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}
// [END pubsub_create_topic_with_kinesis_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
roleArn = 'arn:aws:iam:...',
gcpServiceAccount = 'ingestion-account@...',
streamArn = 'arn:aws:kinesis:...',
consumerArn = 'arn:aws:kinesis:...'
) {
createTopicWithKinesisIngestion(
topicNameOrId,
roleArn,
gcpServiceAccount,
streamArn,
consumerArn
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
87 changes: 87 additions & 0 deletions samples/typescript/updateTopicIngestionType.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This sample demonstrates how to perform basic operations on
* subscriptions with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Update Topic Ingestion Type
// description: Update the ingestion type on a topic.
// usage: node updateTopicIngestionType.js <topic-name-or-id> <stream-arn> <consumer-arn> <aws-role-arn> <gcp-service-account>

// [START pubsub_update_topic_type]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
topicNameOrId: string,
awsRoleArn: string,
gcpServiceAccount: string,
streamArn: string,
consumerArn: string
) {
const metadata: TopicMetadata = {
ingestionDataSourceSettings: {
awsKinesis: {
awsRoleArn,
gcpServiceAccount,
streamArn,
consumerArn,
},
},
};

await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

console.log('Topic updated with Kinesis source successfully.');
}
// [END pubsub_update_topic_type]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
roleArn = 'arn:aws:iam:...',
gcpServiceAccount = 'ingestion-account@...',
streamArn = 'arn:aws:kinesis:...',
consumerArn = 'arn:aws:kinesis:...'
) {
updateTopicIngestionType(
topicNameOrId,
roleArn,
gcpServiceAccount,
streamArn,
consumerArn
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
Loading

0 comments on commit 9589e9f

Please sign in to comment.