Skip to content

Commit

Permalink
Merge pull request #9 from feywind/otel-4-sampler
Browse files Browse the repository at this point in the history
feat: provide bidi links between RPC and message spans, and filter by isSampled; upgrade otel deps
  • Loading branch information
feywind authored Jul 26, 2024
2 parents 95e131e + b66fa7e commit 2f2585b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 30 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
"@google-cloud/precise-date": "^4.0.0",
"@google-cloud/projectify": "^4.0.0",
"@google-cloud/promisify": "^4.0.0",
"@opentelemetry/api": "~1.8.0",
"@opentelemetry/semantic-conventions": "~1.21.0",
"@opentelemetry/api": "~1.9.0",
"@opentelemetry/semantic-conventions": "~1.25.1",
"arrify": "^2.0.0",
"extend": "^3.0.2",
"google-auth-library": "^9.3.0",
Expand Down
63 changes: 35 additions & 28 deletions src/telemetry-tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,21 @@ export function getTopicInfo(fullName: string): AttributeParams {
};
}

// Determines if a trace is to be sampled. There doesn't appear to be a sanctioned
// way to do this currently (isRecording does something different).
//
// Based on this: https://github.com/open-telemetry/opentelemetry-js/issues/4193
function isSampled(span: Span) {
const FLAG_MASK_SAMPLED = 0x1;
const spanContext = span.spanContext();
const traceFlags = spanContext?.traceFlags;
const sampled = !!(
traceFlags && (traceFlags & FLAG_MASK_SAMPLED) === FLAG_MASK_SAMPLED
);

return sampled;
}

export class PubsubSpans {
static createAttributes(
params: AttributeParams,
Expand Down Expand Up @@ -454,7 +469,8 @@ export class PubsubSpans {
getTopicInfo(topicName)
);
const links: Link[] = messages
.map(m => ({context: m.parentSpan?.spanContext()}) as Link)
.filter(m => m.parentSpan && isSampled(m.parentSpan))
.map(m => ({context: m.parentSpan!.spanContext()}) as Link)
.filter(l => l.context);
const span: Span = getTracer().startSpan(
`${topicName} send`,
Expand All @@ -466,19 +482,14 @@ export class PubsubSpans {
ROOT_CONTEXT
);
span?.setAttribute('messaging.batch.message_count', messages.length);
messages.forEach(m => {
// Workaround until the JS API properly supports adding links later.
if (m.parentSpan) {
m.parentSpan.setAttribute(
'messaging.gcp_pubsub.publish.trace_id',
span.spanContext().traceId
);
m.parentSpan.setAttribute(
'messaging.gcp_pubsub.publish.span_id',
span.spanContext().spanId
);
}
});
if (span) {
// Also attempt to link from message spans back to the publish RPC span.
messages.forEach(m => {
if (m.parentSpan && isSampled(m.parentSpan)) {
m.parentSpan.addLink({context: span.spanContext()});
}
});
}

return span;
}
Expand All @@ -495,7 +506,8 @@ export class PubsubSpans {
getSubscriptionInfo(subName)
);
const links: Link[] = messageSpans
.map(m => ({context: m?.spanContext()}) as Link)
.filter(m => m && isSampled(m))
.map(m => ({context: m!.spanContext()}) as Link)
.filter(l => l.context);
const span: Span = getTracer().startSpan(
`${subName} response batch`,
Expand All @@ -507,19 +519,14 @@ export class PubsubSpans {
ROOT_CONTEXT
);
span?.setAttribute('messaging.batch.message_count', messageSpans.length);
messageSpans.forEach(m => {
// Workaround until the JS API properly supports adding links later.
if (m) {
m.setAttribute(
'messaging.gcp_pubsub.receive.trace_id',
span.spanContext().traceId
);
m.setAttribute(
'messaging.gcp_pubsub.receive.span_id',
span.spanContext().spanId
);
}
});
if (span) {
// Also attempt to link from the subscribe span(s) back to the publish RPC span.
messageSpans.forEach(m => {
if (m && isSampled(m)) {
m.addLink({context: span.spanContext()});
}
});
}

return span;
}
Expand Down
29 changes: 29 additions & 0 deletions test/telemetry-tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,5 +344,34 @@ describe('OpenTelemetryTracer', () => {
assert.strictEqual(childReadSpan.kind, SpanKind.CONSUMER);
assert.ok(childReadSpan.parentSpanId);
});

it('creates publish RPC spans', () => {
const message: PubsubMessage = {};
const topicName = 'projects/test/topics/topicfoo';
const span = otel.PubsubSpans.createPublisherSpan(
message,
topicName
) as trace.Span;
message.parentSpan = span;
span.end();

const publishSpan = otel.PubsubSpans.createPublishRpcSpan(
[message],
topicName
);

publishSpan?.end();
const spans = exporter.getFinishedSpans();
const publishReadSpan = spans.pop();
const childReadSpan = spans.pop();
assert.ok(publishReadSpan && childReadSpan);

assert.strictEqual(
publishReadSpan.attributes['messaging.batch.message_count'],
1
);
assert.strictEqual(publishReadSpan.links.length, 1);
assert.strictEqual(childReadSpan.links.length, 1);
});
});
});

0 comments on commit 2f2585b

Please sign in to comment.