Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: provide bidi links between RPC and message spans, and filter by isSampled; upgrade otel deps #9

Merged
merged 4 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
"@google-cloud/precise-date": "^4.0.0",
"@google-cloud/projectify": "^4.0.0",
"@google-cloud/promisify": "^4.0.0",
"@opentelemetry/api": "~1.8.0",
"@opentelemetry/semantic-conventions": "~1.21.0",
"@opentelemetry/api": "~1.9.0",
"@opentelemetry/semantic-conventions": "~1.25.1",
"arrify": "^2.0.0",
"extend": "^3.0.2",
"google-auth-library": "^9.3.0",
Expand Down
63 changes: 35 additions & 28 deletions src/telemetry-tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,21 @@ export function getTopicInfo(fullName: string): AttributeParams {
};
}

// Determines if a trace is to be sampled. There doesn't appear to be a sanctioned
// way to do this currently (isRecording does something different).
//
// Based on this: https://github.com/open-telemetry/opentelemetry-js/issues/4193
function isSampled(span: Span) {
const FLAG_MASK_SAMPLED = 0x1;
const spanContext = span.spanContext();
const traceFlags = spanContext?.traceFlags;
const sampled = !!(
traceFlags && (traceFlags & FLAG_MASK_SAMPLED) === FLAG_MASK_SAMPLED
);

return sampled;
}

export class PubsubSpans {
static createAttributes(
params: AttributeParams,
Expand Down Expand Up @@ -420,7 +435,8 @@ export class PubsubSpans {
getTopicInfo(topicName)
);
const links: Link[] = messages
.map(m => ({context: m.parentSpan?.spanContext()}) as Link)
.filter(m => m.parentSpan && isSampled(m.parentSpan))
.map(m => ({context: m.parentSpan!.spanContext()}) as Link)
.filter(l => l.context);
const span: Span = getTracer().startSpan(
`${topicName} send`,
Expand All @@ -432,19 +448,14 @@ export class PubsubSpans {
ROOT_CONTEXT
);
span?.setAttribute('messaging.batch.message_count', messages.length);
messages.forEach(m => {
// Workaround until the JS API properly supports adding links later.
if (m.parentSpan) {
m.parentSpan.setAttribute(
'messaging.gcp_pubsub.publish.trace_id',
span.spanContext().traceId
);
m.parentSpan.setAttribute(
'messaging.gcp_pubsub.publish.span_id',
span.spanContext().spanId
);
}
});
if (span) {
// Also attempt to link from message spans back to the publish RPC span.
messages.forEach(m => {
if (m.parentSpan && isSampled(m.parentSpan)) {
m.parentSpan.addLink({context: span.spanContext()});
}
});
}

return span;
}
Expand All @@ -457,7 +468,8 @@ export class PubsubSpans {
getSubscriptionInfo(subName)
);
const links: Link[] = messageSpans
.map(m => ({context: m?.spanContext()}) as Link)
.filter(m => m && isSampled(m))
.map(m => ({context: m!.spanContext()}) as Link)
.filter(l => l.context);
const span: Span = getTracer().startSpan(
`${subName} response batch`,
Expand All @@ -469,19 +481,14 @@ export class PubsubSpans {
ROOT_CONTEXT
);
span?.setAttribute('messaging.batch.message_count', messageSpans.length);
messageSpans.forEach(m => {
// Workaround until the JS API properly supports adding links later.
if (m) {
m.setAttribute(
'messaging.gcp_pubsub.receive.trace_id',
span.spanContext().traceId
);
m.setAttribute(
'messaging.gcp_pubsub.receive.span_id',
span.spanContext().spanId
);
}
});
if (span) {
// Also attempt to link from message spans back to the publish RPC span.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this say subscribe span?

messageSpans.forEach(m => {
if (m && isSampled(m)) {
m.addLink({context: span.spanContext()});
}
});
}

return span;
}
Expand Down
29 changes: 29 additions & 0 deletions test/telemetry-tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,5 +336,34 @@ describe('OpenTelemetryTracer', () => {
assert.strictEqual(childReadSpan.kind, SpanKind.CONSUMER);
assert.ok(childReadSpan.parentSpanId);
});

it('creates publish RPC spans', () => {
const message: PubsubMessage = {};
const topicName = 'projects/test/topics/topicfoo';
const span = otel.PubsubSpans.createPublisherSpan(
message,
topicName
) as trace.Span;
message.parentSpan = span;
span.end();

const publishSpan = otel.PubsubSpans.createPublishRpcSpan(
[message],
topicName
);

publishSpan.end();
const spans = exporter.getFinishedSpans();
const publishReadSpan = spans.pop();
const childReadSpan = spans.pop();
assert.ok(publishReadSpan && childReadSpan);

assert.strictEqual(
publishReadSpan.attributes['messaging.batch.message_count'],
1
);
assert.strictEqual(publishReadSpan.links.length, 1);
assert.strictEqual(childReadSpan.links.length, 1);
});
});
});
Loading