Skip to content

Commit

Permalink
samples: split otel samples into listen/publish
Browse files Browse the repository at this point in the history
  • Loading branch information
feywind committed Jul 12, 2024
1 parent 479b085 commit 16aba6c
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 49 deletions.
134 changes: 134 additions & 0 deletions samples/listenWithOpenTelemetryTracing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2020-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to add OpenTelemetry tracing to the
* Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Subscribe with OpenTelemetry Tracing
// description: Demonstrates how to enable OpenTelemetry tracing in a subscriber.
// usage: node listenWithOpenTelemetryTracing.js <subscription-name-or-id>

const OTEL_TIMEOUT = 2;
const SUBSCRIBER_TIMEOUT = 10;

// [START pubsub_subscribe_otel_tracing]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');

const {Resource} = require('@opentelemetry/resources');
const {
SEMRESATTRS_SERVICE_NAME,
} = require('@opentelemetry/semantic-conventions');

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
}),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub();

async function subscriptionListen(subscriptionNameOrId) {
const subscriber = pubSubClient.subscription(subscriptionNameOrId);

// Message handler for subscriber
const messageHandler = async message => {
console.log(`Message ${message.id} received.`);
message.ack();
};

// Error handler for subscriber
const errorHandler = async error => {
console.log('Received error:', error);
};

// Listens for new messages from the topic
subscriber.on('message', messageHandler);
subscriber.on('error', errorHandler);

// Ensures that all spans got flushed by the exporter. This function
// is in service to making sure that any buffered Pub/Sub messages
// and/or OpenTelemetry spans are properly flushed to the server
// side. In normal usage, you'd only need to do something like this
// on process shutdown.
async function shutdown() {
await subscriber.close();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}

// Wait a bit for the subscription to receive messages, then shut down
// gracefully. This is for the sample only; normally you would not need
// this delay.
await new Promise(r =>
setTimeout(async () => {
subscriber.removeAllListeners();
await shutdown();
r();
}, SUBSCRIBER_TIMEOUT * 1000)
);
}
// [END pubsub_subscribe_otel_tracing]

function main(subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID') {
subscriptionListen(subscriptionNameOrId).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
110 changes: 110 additions & 0 deletions samples/publishWithOpenTelemetryTracing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2020-2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to add OpenTelemetry tracing to the
* Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Publish with OpenTelemetry Tracing
// description: Demonstrates how to enable OpenTelemetry tracing in a publisher.
// usage: node openTelemetryTracing.js <topic-name-or-id>

const OTEL_TIMEOUT = 2;

// [START pubsub_publish_otel_tracing]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');

const {Resource} = require('@opentelemetry/resources');
const {
SEMRESATTRS_SERVICE_NAME,
} = require('@opentelemetry/semantic-conventions');

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
}),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId, data) {
// Publishes the message as a string, e.g. "Hello, world!"
// or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publisher = pubSubClient.topic(topicNameOrId);
const messageId = await publisher.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);

// The rest of the sample is in service to making sure that any
// buffered Pub/Sub messages and/or OpenTelemetry spans are properly
// flushed to the server side. In normal usage, you'd only need to do
// something like this on process shutdown.
await publisher.flush();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}
// [END pubsub_publish_otel_tracing]

function main(topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', data = 'Hello, world!') {
publishMessage(topicNameOrId, data).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
13 changes: 10 additions & 3 deletions samples/system-test/openTelemetryTracing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ describe('openTelemetry', () => {
);
});

it('should run the openTelemetryTracing sample', async () => {
it('should run the WithOpenTelemetryTracing samples', async () => {
const stdout = execSync(
`${commandFor('openTelemetryTracing')} ${topicName} ${subName}`
`${commandFor('publishWithOpenTelemetryTracing')} ${topicName}`
);
assert.match(stdout, /Message .* published./);
assert.match(stdout, /Message .* received/);
assert.match(stdout, /Cloud Trace batch writing traces/);
assert.match(stdout, /batchWriteSpans successfully/);
assert.notMatch(stdout, /Received error/);

const stdoutSub = execSync(
`${commandFor('listenWithOpenTelemetryTracing')} ${subName}`
);
assert.match(stdoutSub, /Message .* received/);
assert.match(stdoutSub, /Cloud Trace batch writing traces/);
assert.match(stdoutSub, /batchWriteSpans successfully/);
assert.notMatch(stdoutSub, /Received error/);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@
*/

// sample-metadata:
// title: OpenTelemetry Tracing
// description: Demonstrates how to enable OpenTelemetry tracing in
// a publisher or subscriber.
// usage: node openTelemetryTracing.js <topic-name-or-id> <subscription-name-or-id>
// title: Subscribe with OpenTelemetry Tracing
// description: Demonstrates how to enable OpenTelemetry tracing in a subscriber.
// usage: node listenWithOpenTelemetryTracing.js <subscription-name-or-id>

const OTEL_TIMEOUT = 2;
const SUBSCRIBER_TIMEOUT = 10;

// [START pubsub_publish_otel_tracing]
// [START pubsub_subscribe_otel_tracing]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';
// const data = 'Hello, world!";

// Imports the Google Cloud client library
import {Message, PubSub} from '@google-cloud/pubsub';
Expand Down Expand Up @@ -66,7 +64,7 @@ const exporter = new TraceExporter();
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel example',
[SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
}),
});
const processor = new SimpleSpanProcessor(exporter);
Expand All @@ -76,62 +74,53 @@ provider.register();
// Creates a client; cache this for further use.
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId: string, data: string) {
// Publishes the message as a string, e.g. "Hello, world!"
// or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publisher = pubSubClient.topic(topicNameOrId);
const messageId = await publisher.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
}

async function subscriptionListen(subscriptionNameOrId: string) {
const subscriber = pubSubClient.subscription(subscriptionNameOrId);

// Message handler for subscriber
const messageHandler = async (message: Message) => {
console.log(`Message ${message.id} received.`);
message.ack();

// Ensure that all spans got flushed by the exporter. Note that
// this isn't required under normal circumstances; we're doing it
// here to ensure spans are flushed before closing the subscriber.
console.log('Cleaning up OpenTelemetry exporter...');
await processor.forceFlush();
await subscriber.close();
};

// Error handler for subscriber
const errorHandler = async (error: Error) => {
console.log('Received error:', error);

console.log('Cleaning up OpenTelemetry exporter...');
await processor.forceFlush();
await subscriber.close();
};

// Listens for new messages from the topic
subscriber.on('message', messageHandler);
subscriber.on('error', errorHandler);

// Wait a bit for the subscription to receive messages.
// For the sample only.
setTimeout(() => {
subscriber.removeAllListeners();
}, SUBSCRIBER_TIMEOUT * 1000);
// Ensures that all spans got flushed by the exporter. This function
// is in service to making sure that any buffered Pub/Sub messages
// and/or OpenTelemetry spans are properly flushed to the server
// side. In normal usage, you'd only need to do something like this
// on process shutdown.
async function shutdown() {
await subscriber.close();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}

// Wait a bit for the subscription to receive messages, then shut down
// gracefully. This is for the sample only; normally you would not need
// this delay.
await new Promise<void>(r =>
setTimeout(async () => {
subscriber.removeAllListeners();
await shutdown();
r();
}, SUBSCRIBER_TIMEOUT * 1000)
);
}
// [END pubsub_publish_otel_tracing]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
data = 'Hello, world!'
) {
publishMessage(topicNameOrId, data)
.then(() => subscriptionListen(subscriptionNameOrId))
.catch(err => {
console.error(err.message);
process.exitCode = 1;
});
// [END pubsub_subscribe_otel_tracing]

function main(subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID') {
subscriptionListen(subscriptionNameOrId).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
Loading

0 comments on commit 16aba6c

Please sign in to comment.