diff --git a/package.json b/package.json index 1fd403e52..966e857e7 100644 --- a/package.json +++ b/package.json @@ -51,8 +51,8 @@ "@google-cloud/precise-date": "^4.0.0", "@google-cloud/projectify": "^4.0.0", "@google-cloud/promisify": "^4.0.0", - "@opentelemetry/api": "~1.8.0", - "@opentelemetry/semantic-conventions": "~1.21.0", + "@opentelemetry/api": "~1.9.0", + "@opentelemetry/semantic-conventions": "~1.25.1", "arrify": "^2.0.0", "extend": "^3.0.2", "google-auth-library": "^9.3.0", diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts index bd25d911e..01a24d39f 100644 --- a/src/telemetry-tracing.ts +++ b/src/telemetry-tracing.ts @@ -288,6 +288,21 @@ export function getTopicInfo(fullName: string): AttributeParams { }; } +// Determines if a trace is to be sampled. There doesn't appear to be a sanctioned +// way to do this currently (isRecording does something different). +// +// Based on this: https://github.com/open-telemetry/opentelemetry-js/issues/4193 +function isSampled(span: Span) { + const FLAG_MASK_SAMPLED = 0x1; + const spanContext = span.spanContext(); + const traceFlags = spanContext?.traceFlags; + const sampled = !!( + traceFlags && (traceFlags & FLAG_MASK_SAMPLED) === FLAG_MASK_SAMPLED + ); + + return sampled; +} + export class PubsubSpans { static createAttributes( params: AttributeParams, @@ -454,7 +469,8 @@ export class PubsubSpans { getTopicInfo(topicName) ); const links: Link[] = messages - .map(m => ({context: m.parentSpan?.spanContext()}) as Link) + .filter(m => m.parentSpan && isSampled(m.parentSpan)) + .map(m => ({context: m.parentSpan!.spanContext()}) as Link) .filter(l => l.context); const span: Span = getTracer().startSpan( `${topicName} send`, @@ -466,19 +482,14 @@ export class PubsubSpans { ROOT_CONTEXT ); span?.setAttribute('messaging.batch.message_count', messages.length); - messages.forEach(m => { - // Workaround until the JS API properly supports adding links later. - if (m.parentSpan) { - m.parentSpan.setAttribute( - 'messaging.gcp_pubsub.publish.trace_id', - span.spanContext().traceId - ); - m.parentSpan.setAttribute( - 'messaging.gcp_pubsub.publish.span_id', - span.spanContext().spanId - ); - } - }); + if (span) { + // Also attempt to link from message spans back to the publish RPC span. + messages.forEach(m => { + if (m.parentSpan && isSampled(m.parentSpan)) { + m.parentSpan.addLink({context: span.spanContext()}); + } + }); + } return span; } @@ -495,7 +506,8 @@ export class PubsubSpans { getSubscriptionInfo(subName) ); const links: Link[] = messageSpans - .map(m => ({context: m?.spanContext()}) as Link) + .filter(m => m && isSampled(m)) + .map(m => ({context: m!.spanContext()}) as Link) .filter(l => l.context); const span: Span = getTracer().startSpan( `${subName} response batch`, @@ -507,19 +519,14 @@ export class PubsubSpans { ROOT_CONTEXT ); span?.setAttribute('messaging.batch.message_count', messageSpans.length); - messageSpans.forEach(m => { - // Workaround until the JS API properly supports adding links later. - if (m) { - m.setAttribute( - 'messaging.gcp_pubsub.receive.trace_id', - span.spanContext().traceId - ); - m.setAttribute( - 'messaging.gcp_pubsub.receive.span_id', - span.spanContext().spanId - ); - } - }); + if (span) { + // Also attempt to link from the subscribe span(s) back to the publish RPC span. + messageSpans.forEach(m => { + if (m && isSampled(m)) { + m.addLink({context: span.spanContext()}); + } + }); + } return span; } diff --git a/test/telemetry-tracing.ts b/test/telemetry-tracing.ts index 0240fb612..7baa297d1 100644 --- a/test/telemetry-tracing.ts +++ b/test/telemetry-tracing.ts @@ -344,5 +344,34 @@ describe('OpenTelemetryTracer', () => { assert.strictEqual(childReadSpan.kind, SpanKind.CONSUMER); assert.ok(childReadSpan.parentSpanId); }); + + it('creates publish RPC spans', () => { + const message: PubsubMessage = {}; + const topicName = 'projects/test/topics/topicfoo'; + const span = otel.PubsubSpans.createPublisherSpan( + message, + topicName + ) as trace.Span; + message.parentSpan = span; + span.end(); + + const publishSpan = otel.PubsubSpans.createPublishRpcSpan( + [message], + topicName + ); + + publishSpan?.end(); + const spans = exporter.getFinishedSpans(); + const publishReadSpan = spans.pop(); + const childReadSpan = spans.pop(); + assert.ok(publishReadSpan && childReadSpan); + + assert.strictEqual( + publishReadSpan.attributes['messaging.batch.message_count'], + 1 + ); + assert.strictEqual(publishReadSpan.links.length, 1); + assert.strictEqual(childReadSpan.links.length, 1); + }); }); });