From 2df6310351539ecae70f126fa70a72d2f0b493c3 Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Wed, 31 Jan 2024 16:41:16 +0100 Subject: [PATCH] refactor(instrumentation-grpc): clean up remnants of 'grpc' package instrumentation (#4420) * refactor(instrumentation-grpc): clean up remnants of 'grpc' package instrumentation * fix(changelog): add changelog entry --- experimental/CHANGELOG.md | 2 + .../src/{grpc-js => }/clientUtils.ts | 17 +- .../src/grpc-js/index.ts | 591 ------------------ .../src/grpc-js/types.ts | 74 --- .../src/index.ts | 2 +- .../src/instrumentation.ts | 582 +++++++++++++++-- .../src/internal-types.ts | 59 +- .../src/{grpc-js => }/serverUtils.ts | 10 +- .../test/grpc-js.test.ts | 2 +- 9 files changed, 623 insertions(+), 716 deletions(-) rename experimental/packages/opentelemetry-instrumentation-grpc/src/{grpc-js => }/clientUtils.ts (95%) delete mode 100644 experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts delete mode 100644 experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts rename experimental/packages/opentelemetry-instrumentation-grpc/src/{grpc-js => }/serverUtils.ts (96%) diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index bc332c0284..7b677d8fa9 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -18,6 +18,8 @@ All notable changes to experimental packages in this project will be documented ### :house: (Internal) +* refactor(instrumentation-grpc): clean up remnants of 'grpc' package instrumentation [#4420](https://github.com/open-telemetry/opentelemetry-js/pull/4420) @pichlermarc + ## 0.48.0 ### :boom: Breaking Change diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/clientUtils.ts b/experimental/packages/opentelemetry-instrumentation-grpc/src/clientUtils.ts similarity index 95% rename from experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/clientUtils.ts rename to experimental/packages/opentelemetry-instrumentation-grpc/src/clientUtils.ts index 9e26062d1c..f4c5470d16 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/clientUtils.ts +++ b/experimental/packages/opentelemetry-instrumentation-grpc/src/clientUtils.ts @@ -18,19 +18,22 @@ import type { EventEmitter } from 'events'; import type { Span, SpanStatus } from '@opentelemetry/api'; import type { Client, Metadata, ServiceError } from '@grpc/grpc-js'; import type * as grpcJs from '@grpc/grpc-js'; -import type { GrpcJsInstrumentation } from './'; -import type { GrpcClientFunc, SendUnaryDataCallback } from './types'; -import type { metadataCaptureType } from '../internal-types'; +import type { GrpcInstrumentation } from './'; +import type { + GrpcClientFunc, + SendUnaryDataCallback, + metadataCaptureType, +} from './internal-types'; import { propagation, context } from '@opentelemetry/api'; import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; -import { AttributeNames } from '../enums/AttributeNames'; -import { GRPC_STATUS_CODE_OK } from '../status-code'; +import { AttributeNames } from './enums/AttributeNames'; +import { GRPC_STATUS_CODE_OK } from './status-code'; import { _grpcStatusCodeToSpanStatus, _grpcStatusCodeToOpenTelemetryStatusCode, _methodIsIgnored, -} from '../utils'; +} from './utils'; import { errorMonitor } from 'events'; /** @@ -38,7 +41,7 @@ import { errorMonitor } from 'events'; * with both possible casings e.g. "TestMethod" & "testMethod" */ export function getMethodsToWrap( - this: GrpcJsInstrumentation, + this: GrpcInstrumentation, client: typeof Client, methods: { [key: string]: { originalName?: string } } ): string[] { diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts b/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts deleted file mode 100644 index 36be79fd07..0000000000 --- a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts +++ /dev/null @@ -1,591 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import type { EventEmitter } from 'events'; - -import type { - Server, - serialize as Serialize, - deserialize as Deserialize, - Metadata, - Client, - ServiceDefinition, - loadPackageDefinition, - GrpcObject, -} from '@grpc/grpc-js'; - -import type * as grpcJs from '@grpc/grpc-js'; - -import type { - ServerCallWithMeta, - SendUnaryDataCallback, - ServerRegisterFunction, - HandleCall, - MakeClientConstructorFunction, - PackageDefinition, - GrpcClientFunc, - ClientRequestFunction, -} from './types'; -import type { GrpcInstrumentationConfig } from '../types'; -import type { metadataCaptureType } from '../internal-types'; - -import { - context, - propagation, - ROOT_CONTEXT, - SpanOptions, - SpanKind, - trace, - Span, -} from '@opentelemetry/api'; -import { - InstrumentationNodeModuleDefinition, - isWrapped, - InstrumentationBase, -} from '@opentelemetry/instrumentation'; -import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; - -import { - shouldNotTraceServerCall, - handleServerFunction, - handleUntracedServerFunction, -} from './serverUtils'; -import { - getMethodsToWrap, - makeGrpcClientRemoteCall, - extractMetadataOrSpliceDefault, - setSpanContext, - patchedCallback, - patchResponseStreamEvents, - patchResponseMetadataEvent, - extractMetadataOrSplice, -} from './clientUtils'; -import { - _extractMethodAndService, - metadataCapture, - URI_REGEX, - _methodIsIgnored, -} from '../utils'; -import { AttributeValues } from '../enums/AttributeValues'; - -export class GrpcJsInstrumentation extends InstrumentationBase { - private _metadataCapture: metadataCaptureType; - - constructor( - name: string, - version: string, - config?: GrpcInstrumentationConfig - ) { - super(name, version, config); - this._metadataCapture = this._createMetadataCapture(); - } - - init() { - return [ - new InstrumentationNodeModuleDefinition( - '@grpc/grpc-js', - ['1.*'], - (moduleExports, version) => { - this._diag.debug(`Applying patch for @grpc/grpc-js@${version}`); - if (isWrapped(moduleExports.Server.prototype.register)) { - this._unwrap(moduleExports.Server.prototype, 'register'); - } - // Patch Server methods - this._wrap( - moduleExports.Server.prototype, - 'register', - this._patchServer() - ); - // Patch Client methods - if (isWrapped(moduleExports.makeGenericClientConstructor)) { - this._unwrap(moduleExports, 'makeGenericClientConstructor'); - } - this._wrap( - moduleExports, - 'makeGenericClientConstructor', - this._patchClient(moduleExports) - ); - if (isWrapped(moduleExports.makeClientConstructor)) { - this._unwrap(moduleExports, 'makeClientConstructor'); - } - this._wrap( - moduleExports, - 'makeClientConstructor', - this._patchClient(moduleExports) - ); - if (isWrapped(moduleExports.loadPackageDefinition)) { - this._unwrap(moduleExports, 'loadPackageDefinition'); - } - this._wrap( - moduleExports, - 'loadPackageDefinition', - this._patchLoadPackageDefinition(moduleExports) - ); - if (isWrapped(moduleExports.Client.prototype)) { - this._unwrap(moduleExports.Client.prototype, 'makeUnaryRequest'); - this._unwrap( - moduleExports.Client.prototype, - 'makeClientStreamRequest' - ); - this._unwrap( - moduleExports.Client.prototype, - 'makeServerStreamRequest' - ); - this._unwrap( - moduleExports.Client.prototype, - 'makeBidiStreamRequest' - ); - } - this._wrap( - moduleExports.Client.prototype, - 'makeUnaryRequest', - this._patchClientRequestMethod(moduleExports, false) as any - ); - this._wrap( - moduleExports.Client.prototype, - 'makeClientStreamRequest', - this._patchClientRequestMethod(moduleExports, false) as any - ); - this._wrap( - moduleExports.Client.prototype, - 'makeServerStreamRequest', - this._patchClientRequestMethod(moduleExports, true) as any - ); - this._wrap( - moduleExports.Client.prototype, - 'makeBidiStreamRequest', - this._patchClientRequestMethod(moduleExports, true) as any - ); - return moduleExports; - }, - (moduleExports, version) => { - if (moduleExports === undefined) return; - this._diag.debug(`Removing patch for @grpc/grpc-js@${version}`); - - this._unwrap(moduleExports.Server.prototype, 'register'); - this._unwrap(moduleExports, 'makeClientConstructor'); - this._unwrap(moduleExports, 'makeGenericClientConstructor'); - this._unwrap(moduleExports, 'loadPackageDefinition'); - this._unwrap(moduleExports.Client.prototype, 'makeUnaryRequest'); - this._unwrap( - moduleExports.Client.prototype, - 'makeClientStreamRequest' - ); - this._unwrap( - moduleExports.Client.prototype, - 'makeServerStreamRequest' - ); - this._unwrap(moduleExports.Client.prototype, 'makeBidiStreamRequest'); - } - ), - ]; - } - - override getConfig(): GrpcInstrumentationConfig { - return super.getConfig(); - } - - override setConfig(config?: GrpcInstrumentationConfig): void { - super.setConfig(config); - this._metadataCapture = this._createMetadataCapture(); - } - - /** - * Patch for grpc.Server.prototype.register(...) function. Provides auto-instrumentation for - * client_stream, server_stream, bidi, unary server handler calls. - */ - private _patchServer(): ( - originalRegister: ServerRegisterFunction - ) => ServerRegisterFunction { - const instrumentation = this; - return (originalRegister: ServerRegisterFunction) => { - const config = this.getConfig(); - instrumentation._diag.debug('patched gRPC server'); - return function register( - this: Server, - name: string, - handler: HandleCall, - serialize: Serialize, - deserialize: Deserialize, - type: string - ): boolean { - const originalRegisterResult = originalRegister.call( - this, - name, - handler, - serialize, - deserialize, - type - ); - const handlerSet = this['handlers'].get(name); - - instrumentation._wrap( - handlerSet, - 'func', - (originalFunc: HandleCall) => { - return function func( - this: typeof handlerSet, - call: ServerCallWithMeta, - callback: SendUnaryDataCallback - ) { - const self = this; - - if (shouldNotTraceServerCall(name, config.ignoreGrpcMethods)) { - return handleUntracedServerFunction( - type, - originalFunc, - call, - callback - ); - } - - const spanName = `grpc.${name.replace('/', '')}`; - const spanOptions: SpanOptions = { - kind: SpanKind.SERVER, - }; - - instrumentation._diag.debug( - `patch func: ${JSON.stringify(spanOptions)}` - ); - - context.with( - propagation.extract(ROOT_CONTEXT, call.metadata, { - get: (carrier, key) => carrier.get(key).map(String), - keys: carrier => Object.keys(carrier.getMap()), - }), - () => { - const { service, method } = _extractMethodAndService(name); - - const span = instrumentation.tracer - .startSpan(spanName, spanOptions) - .setAttributes({ - [SemanticAttributes.RPC_SYSTEM]: - AttributeValues.RPC_SYSTEM, - [SemanticAttributes.RPC_METHOD]: method, - [SemanticAttributes.RPC_SERVICE]: service, - }); - - instrumentation._metadataCapture.server.captureRequestMetadata( - span, - call.metadata - ); - - instrumentation._wrap( - call, - 'sendMetadata', - originalSendMetadata => (responseMetadata: Metadata) => { - instrumentation._metadataCapture.server.captureResponseMetadata( - span, - responseMetadata - ); - originalSendMetadata.call(call, responseMetadata); - } - ); - - context.with(trace.setSpan(context.active(), span), () => { - handleServerFunction.call( - self, - span, - type, - originalFunc, - call, - callback - ); - }); - } - ); - }; - } - ); - return originalRegisterResult; - } as typeof Server.prototype.register; - }; - } - - /** - * Patch for grpc.Client.make*Request(...) functions. - * Provides auto-instrumentation for client requests when using a Client without - * makeGenericClientConstructor/makeClientConstructor - */ - private _patchClientRequestMethod( - grpcLib: typeof grpcJs, - hasResponseStream: boolean - ): ( - original: ClientRequestFunction - ) => ClientRequestFunction { - const instrumentation = this; - return (original: ClientRequestFunction) => { - instrumentation._diag.debug( - 'patched makeClientStreamRequest on grpc client' - ); - - return function makeClientStreamRequest(this: grpcJs.Client) { - // method must always be at first position - const method = arguments[0]; - const { name, service, methodAttributeValue } = - instrumentation._splitMethodString(method); - - // Do not attempt to trace/inject context if method is ignored - if ( - method != null && - _methodIsIgnored( - methodAttributeValue, - instrumentation.getConfig().ignoreGrpcMethods - ) - ) { - return original.apply(this, [...arguments]); - } - - const modifiedArgs = [...arguments]; - const metadata = extractMetadataOrSplice(grpcLib, modifiedArgs, 4); - - const span = instrumentation.createClientSpan( - name, - methodAttributeValue, - service, - metadata - ); - instrumentation.extractNetMetadata(this, span); - - // Callback is only present when there is no responseStream - if (!hasResponseStream) { - // Replace the callback with the patched one if it is there. - // If the callback arg is not a function on the last position then the client will throw - // and never call the callback -> so there's nothing to patch - const lastArgIndex = modifiedArgs.length - 1; - const callback = modifiedArgs[lastArgIndex]; - if (typeof callback === 'function') { - modifiedArgs[lastArgIndex] = patchedCallback(span, callback); - } - } - - return context.with(trace.setSpan(context.active(), span), () => { - setSpanContext(metadata); - - const call = original.apply(this, [...modifiedArgs]); - patchResponseMetadataEvent( - span, - call, - instrumentation._metadataCapture - ); - - // Subscribe to response stream events when there's a response stream. - if (hasResponseStream) { - patchResponseStreamEvents(span, call); - } - - return call; - }); - }; - }; - } - - /** - * Entry point for applying client patches to `grpc.makeClientConstructor(...)` equivalents - * @param this GrpcJsPlugin - */ - private _patchClient( - grpcClient: typeof grpcJs - ): ( - original: MakeClientConstructorFunction - ) => MakeClientConstructorFunction { - const instrumentation = this; - return (original: MakeClientConstructorFunction) => { - instrumentation._diag.debug('patching client'); - return function makeClientConstructor( - this: typeof Client, - methods: ServiceDefinition, - serviceName: string, - options?: object - ) { - const client = original.call(this, methods, serviceName, options); - instrumentation._massWrap( - client.prototype, - getMethodsToWrap.call(instrumentation, client, methods), - instrumentation._getPatchedClientMethods(grpcClient) - ); - return client; - }; - }; - } - - /** - * Entry point for client patching for grpc.loadPackageDefinition(...) - * @param this - GrpcJsPlugin - */ - private _patchLoadPackageDefinition(grpcClient: typeof grpcJs) { - const instrumentation = this; - instrumentation._diag.debug('patching loadPackageDefinition'); - return (original: typeof loadPackageDefinition) => { - return function patchedLoadPackageDefinition( - this: null, - packageDef: PackageDefinition - ) { - const result: GrpcObject = original.call( - this, - packageDef - ) as GrpcObject; - instrumentation._patchLoadedPackage(grpcClient, result); - return result; - } as typeof loadPackageDefinition; - }; - } - - /** - * Parse initial client call properties and start a span to trace its execution - */ - private _getPatchedClientMethods( - grpcClient: typeof grpcJs - ): (original: GrpcClientFunc) => () => EventEmitter { - const instrumentation = this; - return (original: GrpcClientFunc) => { - instrumentation._diag.debug('patch all client methods'); - function clientMethodTrace(this: Client) { - const name = `grpc.${original.path.replace('/', '')}`; - const args = [...arguments]; - const metadata = extractMetadataOrSpliceDefault.call( - instrumentation, - grpcClient, - original, - args - ); - const { service, method } = _extractMethodAndService(original.path); - - const span = instrumentation.tracer - .startSpan(name, { kind: SpanKind.CLIENT }) - .setAttributes({ - [SemanticAttributes.RPC_SYSTEM]: 'grpc', - [SemanticAttributes.RPC_METHOD]: method, - [SemanticAttributes.RPC_SERVICE]: service, - }); - instrumentation.extractNetMetadata(this, span); - - instrumentation._metadataCapture.client.captureRequestMetadata( - span, - metadata - ); - - return context.with(trace.setSpan(context.active(), span), () => - makeGrpcClientRemoteCall( - instrumentation._metadataCapture, - original, - args, - metadata, - this - )(span) - ); - } - Object.assign(clientMethodTrace, original); - return clientMethodTrace; - }; - } - - private _splitMethodString(method: string) { - if (method == null) { - return { name: '', service: '', methodAttributeValue: '' }; - } - const name = `grpc.${method.replace('/', '')}`; - const { service, method: methodAttributeValue } = - _extractMethodAndService(method); - return { name, service, methodAttributeValue }; - } - - private createClientSpan( - name: string, - methodAttributeValue: string, - service: string, - metadata?: grpcJs.Metadata - ) { - const span = this.tracer - .startSpan(name, { kind: SpanKind.CLIENT }) - .setAttributes({ - [SemanticAttributes.RPC_SYSTEM]: 'grpc', - [SemanticAttributes.RPC_METHOD]: methodAttributeValue, - [SemanticAttributes.RPC_SERVICE]: service, - }); - - if (metadata != null) { - this._metadataCapture.client.captureRequestMetadata(span, metadata); - } - return span; - } - - private extractNetMetadata(client: grpcJs.Client, span: Span) { - // set net.peer.* from target (e.g., "dns:otel-productcatalogservice:8080") as a hint to APMs - const parsedUri = URI_REGEX.exec(client.getChannel().getTarget()); - if (parsedUri != null && parsedUri.groups != null) { - span.setAttribute( - SemanticAttributes.NET_PEER_NAME, - parsedUri.groups['name'] - ); - span.setAttribute( - SemanticAttributes.NET_PEER_PORT, - parseInt(parsedUri.groups['port']) - ); - } - } - - /** - * Utility function to patch *all* functions loaded through a proto file. - * Recursively searches for Client classes and patches all methods, reversing the - * parsing done by grpc.loadPackageDefinition - * https://github.com/grpc/grpc-node/blob/1d14203c382509c3f36132bd0244c99792cb6601/packages/grpc-js/src/make-client.ts#L200-L217 - */ - private _patchLoadedPackage( - grpcClient: typeof grpcJs, - result: GrpcObject - ): void { - Object.values(result).forEach(service => { - if (typeof service === 'function') { - this._massWrap( - service.prototype, - getMethodsToWrap.call(this, service, service.service), - this._getPatchedClientMethods.call(this, grpcClient) - ); - } else if (typeof service.format !== 'string') { - // GrpcObject - this._patchLoadedPackage.call(this, grpcClient, service as GrpcObject); - } - }); - } - - private _createMetadataCapture(): metadataCaptureType { - const config = this.getConfig(); - - return { - client: { - captureRequestMetadata: metadataCapture( - 'request', - config.metadataToSpanAttributes?.client?.requestMetadata ?? [] - ), - captureResponseMetadata: metadataCapture( - 'response', - config.metadataToSpanAttributes?.client?.responseMetadata ?? [] - ), - }, - server: { - captureRequestMetadata: metadataCapture( - 'request', - config.metadataToSpanAttributes?.server?.requestMetadata ?? [] - ), - captureResponseMetadata: metadataCapture( - 'response', - config.metadataToSpanAttributes?.server?.responseMetadata ?? [] - ), - }, - }; - } -} diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts b/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts deleted file mode 100644 index 06ca7c0b61..0000000000 --- a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import type { EventEmitter } from 'events'; -import type { CALL_SPAN_ENDED } from './serverUtils'; -import type { - requestCallback, - ServerUnaryCall, - ServerReadableStream, - ServerWritableStream, - ServerDuplexStream, - Metadata, - Server, - makeGenericClientConstructor, -} from '@grpc/grpc-js'; - -/** - * Server Unary callback type - */ -export type SendUnaryDataCallback = requestCallback; - -/** - * Intersection type of all grpc server call types - */ -export type ServerCall = - | ServerUnaryCall - | ServerReadableStream - | ServerWritableStream - | ServerDuplexStream; - -/** - * {@link ServerCall} ServerCall extended with misc. missing utility types - */ -export type ServerCallWithMeta = ServerCall & { - metadata: Metadata; -}; - -/** - * EventEmitter with span ended symbol indicator - */ -export type GrpcEmitter = EventEmitter & { [CALL_SPAN_ENDED]?: boolean }; - -/** - * Grpc client callback function extended with missing utility types - */ -export type GrpcClientFunc = ((...args: unknown[]) => GrpcEmitter) & { - path: string; - requestStream: boolean; - responseStream: boolean; -}; - -export type ServerRegisterFunction = typeof Server.prototype.register; - -export type ClientRequestFunction = ( - ...args: unknown[] -) => ReturnType; - -export type MakeClientConstructorFunction = typeof makeGenericClientConstructor; - -export type { HandleCall } from '@grpc/grpc-js/build/src/server-call'; -export type { PackageDefinition } from '@grpc/grpc-js/build/src/make-client'; diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/src/index.ts b/experimental/packages/opentelemetry-instrumentation-grpc/src/index.ts index 5e1bb947d1..3e02b5fc9c 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/src/index.ts +++ b/experimental/packages/opentelemetry-instrumentation-grpc/src/index.ts @@ -14,5 +14,5 @@ * limitations under the License. */ -export * from './instrumentation'; +export { GrpcInstrumentation } from './instrumentation'; export type { GrpcInstrumentationConfig } from './types'; diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/src/instrumentation.ts b/experimental/packages/opentelemetry-instrumentation-grpc/src/instrumentation.ts index bb6e095cf7..3a04c24e1f 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/src/instrumentation.ts +++ b/experimental/packages/opentelemetry-instrumentation-grpc/src/instrumentation.ts @@ -14,32 +14,181 @@ * limitations under the License. */ -import type { GrpcInstrumentationConfig } from './types'; -import type { MeterProvider, TracerProvider } from '@opentelemetry/api'; +import type { EventEmitter } from 'events'; -import { VERSION } from './version'; -import { GrpcJsInstrumentation } from './grpc-js'; +import type { + Server, + serialize as Serialize, + deserialize as Deserialize, + Metadata, + Client, + ServiceDefinition, + loadPackageDefinition, + GrpcObject, +} from '@grpc/grpc-js'; -/** The metadata key under which span context is stored as a binary value. */ -export const GRPC_TRACE_KEY = 'grpc-trace-bin'; +import type * as grpcJs from '@grpc/grpc-js'; -export class GrpcInstrumentation { - private _grpcJsInstrumentation: GrpcJsInstrumentation; +import type { + ServerCallWithMeta, + SendUnaryDataCallback, + ServerRegisterFunction, + HandleCall, + MakeClientConstructorFunction, + PackageDefinition, + GrpcClientFunc, + ClientRequestFunction, + metadataCaptureType, +} from './internal-types'; +import type { GrpcInstrumentationConfig } from './types'; - public readonly instrumentationName: string = - '@opentelemetry/instrumentation-grpc'; - public readonly instrumentationVersion: string = VERSION; +import { + context, + propagation, + ROOT_CONTEXT, + SpanOptions, + SpanKind, + trace, + Span, +} from '@opentelemetry/api'; +import { + InstrumentationNodeModuleDefinition, + isWrapped, + InstrumentationBase, +} from '@opentelemetry/instrumentation'; +import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; + +import { + shouldNotTraceServerCall, + handleServerFunction, + handleUntracedServerFunction, +} from './serverUtils'; +import { + getMethodsToWrap, + makeGrpcClientRemoteCall, + extractMetadataOrSpliceDefault, + setSpanContext, + patchedCallback, + patchResponseStreamEvents, + patchResponseMetadataEvent, + extractMetadataOrSplice, +} from './clientUtils'; +import { + _extractMethodAndService, + metadataCapture, + URI_REGEX, + _methodIsIgnored, +} from './utils'; +import { AttributeValues } from './enums/AttributeValues'; +import { VERSION } from './version'; + +export class GrpcInstrumentation extends InstrumentationBase { + private _metadataCapture: metadataCaptureType; constructor(config?: GrpcInstrumentationConfig) { - this._grpcJsInstrumentation = new GrpcJsInstrumentation( - this.instrumentationName, - this.instrumentationVersion, - config - ); + super('@opentelemetry/instrumentation-grpc', VERSION, config); + this._metadataCapture = this._createMetadataCapture(); } - public setConfig(config?: GrpcInstrumentationConfig) { - this._grpcJsInstrumentation.setConfig(config); + init() { + return [ + new InstrumentationNodeModuleDefinition( + '@grpc/grpc-js', + ['1.*'], + (moduleExports, version) => { + this._diag.debug(`Applying patch for @grpc/grpc-js@${version}`); + if (isWrapped(moduleExports.Server.prototype.register)) { + this._unwrap(moduleExports.Server.prototype, 'register'); + } + // Patch Server methods + this._wrap( + moduleExports.Server.prototype, + 'register', + this._patchServer() + ); + // Patch Client methods + if (isWrapped(moduleExports.makeGenericClientConstructor)) { + this._unwrap(moduleExports, 'makeGenericClientConstructor'); + } + this._wrap( + moduleExports, + 'makeGenericClientConstructor', + this._patchClient(moduleExports) + ); + if (isWrapped(moduleExports.makeClientConstructor)) { + this._unwrap(moduleExports, 'makeClientConstructor'); + } + this._wrap( + moduleExports, + 'makeClientConstructor', + this._patchClient(moduleExports) + ); + if (isWrapped(moduleExports.loadPackageDefinition)) { + this._unwrap(moduleExports, 'loadPackageDefinition'); + } + this._wrap( + moduleExports, + 'loadPackageDefinition', + this._patchLoadPackageDefinition(moduleExports) + ); + if (isWrapped(moduleExports.Client.prototype)) { + this._unwrap(moduleExports.Client.prototype, 'makeUnaryRequest'); + this._unwrap( + moduleExports.Client.prototype, + 'makeClientStreamRequest' + ); + this._unwrap( + moduleExports.Client.prototype, + 'makeServerStreamRequest' + ); + this._unwrap( + moduleExports.Client.prototype, + 'makeBidiStreamRequest' + ); + } + this._wrap( + moduleExports.Client.prototype, + 'makeUnaryRequest', + this._patchClientRequestMethod(moduleExports, false) as any + ); + this._wrap( + moduleExports.Client.prototype, + 'makeClientStreamRequest', + this._patchClientRequestMethod(moduleExports, false) as any + ); + this._wrap( + moduleExports.Client.prototype, + 'makeServerStreamRequest', + this._patchClientRequestMethod(moduleExports, true) as any + ); + this._wrap( + moduleExports.Client.prototype, + 'makeBidiStreamRequest', + this._patchClientRequestMethod(moduleExports, true) as any + ); + return moduleExports; + }, + (moduleExports, version) => { + if (moduleExports === undefined) return; + this._diag.debug(`Removing patch for @grpc/grpc-js@${version}`); + + this._unwrap(moduleExports.Server.prototype, 'register'); + this._unwrap(moduleExports, 'makeClientConstructor'); + this._unwrap(moduleExports, 'makeGenericClientConstructor'); + this._unwrap(moduleExports, 'loadPackageDefinition'); + this._unwrap(moduleExports.Client.prototype, 'makeUnaryRequest'); + this._unwrap( + moduleExports.Client.prototype, + 'makeClientStreamRequest' + ); + this._unwrap( + moduleExports.Client.prototype, + 'makeServerStreamRequest' + ); + this._unwrap(moduleExports.Client.prototype, 'makeBidiStreamRequest'); + } + ), + ]; } /** @@ -47,37 +196,398 @@ export class GrpcInstrumentation { * Public reference to the protected BaseInstrumentation `_config` instance to be used by this * plugin's external helper functions */ - public getConfig(): GrpcInstrumentationConfig { - // grpcNative and grpcJs have their own config copy which should be identical so just pick one - return this._grpcJsInstrumentation.getConfig(); + override getConfig(): GrpcInstrumentationConfig { + return super.getConfig(); } - init() { - // sub instrumentations will already be init when constructing them - return; + override setConfig(config?: GrpcInstrumentationConfig): void { + super.setConfig(config); + this._metadataCapture = this._createMetadataCapture(); + } + + /** + * Patch for grpc.Server.prototype.register(...) function. Provides auto-instrumentation for + * client_stream, server_stream, bidi, unary server handler calls. + */ + private _patchServer(): ( + originalRegister: ServerRegisterFunction + ) => ServerRegisterFunction { + const instrumentation = this; + return (originalRegister: ServerRegisterFunction) => { + const config = this.getConfig(); + instrumentation._diag.debug('patched gRPC server'); + return function register( + this: Server, + name: string, + handler: HandleCall, + serialize: Serialize, + deserialize: Deserialize, + type: string + ): boolean { + const originalRegisterResult = originalRegister.call( + this, + name, + handler, + serialize, + deserialize, + type + ); + const handlerSet = this['handlers'].get(name); + + instrumentation._wrap( + handlerSet, + 'func', + (originalFunc: HandleCall) => { + return function func( + this: typeof handlerSet, + call: ServerCallWithMeta, + callback: SendUnaryDataCallback + ) { + const self = this; + + if (shouldNotTraceServerCall(name, config.ignoreGrpcMethods)) { + return handleUntracedServerFunction( + type, + originalFunc, + call, + callback + ); + } + + const spanName = `grpc.${name.replace('/', '')}`; + const spanOptions: SpanOptions = { + kind: SpanKind.SERVER, + }; + + instrumentation._diag.debug( + `patch func: ${JSON.stringify(spanOptions)}` + ); + + context.with( + propagation.extract(ROOT_CONTEXT, call.metadata, { + get: (carrier, key) => carrier.get(key).map(String), + keys: carrier => Object.keys(carrier.getMap()), + }), + () => { + const { service, method } = _extractMethodAndService(name); + + const span = instrumentation.tracer + .startSpan(spanName, spanOptions) + .setAttributes({ + [SemanticAttributes.RPC_SYSTEM]: + AttributeValues.RPC_SYSTEM, + [SemanticAttributes.RPC_METHOD]: method, + [SemanticAttributes.RPC_SERVICE]: service, + }); + + instrumentation._metadataCapture.server.captureRequestMetadata( + span, + call.metadata + ); + + instrumentation._wrap( + call, + 'sendMetadata', + originalSendMetadata => (responseMetadata: Metadata) => { + instrumentation._metadataCapture.server.captureResponseMetadata( + span, + responseMetadata + ); + originalSendMetadata.call(call, responseMetadata); + } + ); + + context.with(trace.setSpan(context.active(), span), () => { + handleServerFunction.call( + self, + span, + type, + originalFunc, + call, + callback + ); + }); + } + ); + }; + } + ); + return originalRegisterResult; + } as typeof Server.prototype.register; + }; + } + + /** + * Patch for grpc.Client.make*Request(...) functions. + * Provides auto-instrumentation for client requests when using a Client without + * makeGenericClientConstructor/makeClientConstructor + */ + private _patchClientRequestMethod( + grpcLib: typeof grpcJs, + hasResponseStream: boolean + ): ( + original: ClientRequestFunction + ) => ClientRequestFunction { + const instrumentation = this; + return (original: ClientRequestFunction) => { + instrumentation._diag.debug( + 'patched makeClientStreamRequest on grpc client' + ); + + return function makeClientStreamRequest(this: grpcJs.Client) { + // method must always be at first position + const method = arguments[0]; + const { name, service, methodAttributeValue } = + instrumentation._splitMethodString(method); + + // Do not attempt to trace/inject context if method is ignored + if ( + method != null && + _methodIsIgnored( + methodAttributeValue, + instrumentation.getConfig().ignoreGrpcMethods + ) + ) { + return original.apply(this, [...arguments]); + } + + const modifiedArgs = [...arguments]; + const metadata = extractMetadataOrSplice(grpcLib, modifiedArgs, 4); + + const span = instrumentation.createClientSpan( + name, + methodAttributeValue, + service, + metadata + ); + instrumentation.extractNetMetadata(this, span); + + // Callback is only present when there is no responseStream + if (!hasResponseStream) { + // Replace the callback with the patched one if it is there. + // If the callback arg is not a function on the last position then the client will throw + // and never call the callback -> so there's nothing to patch + const lastArgIndex = modifiedArgs.length - 1; + const callback = modifiedArgs[lastArgIndex]; + if (typeof callback === 'function') { + modifiedArgs[lastArgIndex] = patchedCallback(span, callback); + } + } + + return context.with(trace.setSpan(context.active(), span), () => { + setSpanContext(metadata); + + const call = original.apply(this, [...modifiedArgs]); + patchResponseMetadataEvent( + span, + call, + instrumentation._metadataCapture + ); + + // Subscribe to response stream events when there's a response stream. + if (hasResponseStream) { + patchResponseStreamEvents(span, call); + } + + return call; + }); + }; + }; } - enable() { - this._grpcJsInstrumentation.enable(); + /** + * Entry point for applying client patches to `grpc.makeClientConstructor(...)` equivalents + * @param this GrpcJsPlugin + */ + private _patchClient( + grpcClient: typeof grpcJs + ): ( + original: MakeClientConstructorFunction + ) => MakeClientConstructorFunction { + const instrumentation = this; + return (original: MakeClientConstructorFunction) => { + instrumentation._diag.debug('patching client'); + return function makeClientConstructor( + this: typeof Client, + methods: ServiceDefinition, + serviceName: string, + options?: object + ) { + const client = original.call(this, methods, serviceName, options); + instrumentation._massWrap( + client.prototype, + getMethodsToWrap.call(instrumentation, client, methods), + instrumentation._getPatchedClientMethods(grpcClient) + ); + return client; + }; + }; } - disable() { - this._grpcJsInstrumentation.disable(); + /** + * Entry point for client patching for grpc.loadPackageDefinition(...) + * @param this - GrpcJsPlugin + */ + private _patchLoadPackageDefinition(grpcClient: typeof grpcJs) { + const instrumentation = this; + instrumentation._diag.debug('patching loadPackageDefinition'); + return (original: typeof loadPackageDefinition) => { + return function patchedLoadPackageDefinition( + this: null, + packageDef: PackageDefinition + ) { + const result: GrpcObject = original.call( + this, + packageDef + ) as GrpcObject; + instrumentation._patchLoadedPackage(grpcClient, result); + return result; + } as typeof loadPackageDefinition; + }; } /** - * Sets MeterProvider to this plugin - * @param meterProvider + * Parse initial client call properties and start a span to trace its execution */ - public setMeterProvider(meterProvider: MeterProvider) { - this._grpcJsInstrumentation.setMeterProvider(meterProvider); + private _getPatchedClientMethods( + grpcClient: typeof grpcJs + ): (original: GrpcClientFunc) => () => EventEmitter { + const instrumentation = this; + return (original: GrpcClientFunc) => { + instrumentation._diag.debug('patch all client methods'); + function clientMethodTrace(this: Client) { + const name = `grpc.${original.path.replace('/', '')}`; + const args = [...arguments]; + const metadata = extractMetadataOrSpliceDefault.call( + instrumentation, + grpcClient, + original, + args + ); + const { service, method } = _extractMethodAndService(original.path); + + const span = instrumentation.tracer + .startSpan(name, { kind: SpanKind.CLIENT }) + .setAttributes({ + [SemanticAttributes.RPC_SYSTEM]: 'grpc', + [SemanticAttributes.RPC_METHOD]: method, + [SemanticAttributes.RPC_SERVICE]: service, + }); + instrumentation.extractNetMetadata(this, span); + + instrumentation._metadataCapture.client.captureRequestMetadata( + span, + metadata + ); + + return context.with(trace.setSpan(context.active(), span), () => + makeGrpcClientRemoteCall( + instrumentation._metadataCapture, + original, + args, + metadata, + this + )(span) + ); + } + Object.assign(clientMethodTrace, original); + return clientMethodTrace; + }; + } + + private _splitMethodString(method: string) { + if (method == null) { + return { name: '', service: '', methodAttributeValue: '' }; + } + const name = `grpc.${method.replace('/', '')}`; + const { service, method: methodAttributeValue } = + _extractMethodAndService(method); + return { name, service, methodAttributeValue }; + } + + private createClientSpan( + name: string, + methodAttributeValue: string, + service: string, + metadata?: grpcJs.Metadata + ) { + const span = this.tracer + .startSpan(name, { kind: SpanKind.CLIENT }) + .setAttributes({ + [SemanticAttributes.RPC_SYSTEM]: 'grpc', + [SemanticAttributes.RPC_METHOD]: methodAttributeValue, + [SemanticAttributes.RPC_SERVICE]: service, + }); + + if (metadata != null) { + this._metadataCapture.client.captureRequestMetadata(span, metadata); + } + return span; + } + + private extractNetMetadata(client: grpcJs.Client, span: Span) { + // set net.peer.* from target (e.g., "dns:otel-productcatalogservice:8080") as a hint to APMs + const parsedUri = URI_REGEX.exec(client.getChannel().getTarget()); + if (parsedUri != null && parsedUri.groups != null) { + span.setAttribute( + SemanticAttributes.NET_PEER_NAME, + parsedUri.groups['name'] + ); + span.setAttribute( + SemanticAttributes.NET_PEER_PORT, + parseInt(parsedUri.groups['port']) + ); + } } /** - * Sets TraceProvider to this plugin - * @param tracerProvider + * Utility function to patch *all* functions loaded through a proto file. + * Recursively searches for Client classes and patches all methods, reversing the + * parsing done by grpc.loadPackageDefinition + * https://github.com/grpc/grpc-node/blob/1d14203c382509c3f36132bd0244c99792cb6601/packages/grpc-js/src/make-client.ts#L200-L217 */ - public setTracerProvider(tracerProvider: TracerProvider) { - this._grpcJsInstrumentation.setTracerProvider(tracerProvider); + private _patchLoadedPackage( + grpcClient: typeof grpcJs, + result: GrpcObject + ): void { + Object.values(result).forEach(service => { + if (typeof service === 'function') { + this._massWrap( + service.prototype, + getMethodsToWrap.call(this, service, service.service), + this._getPatchedClientMethods.call(this, grpcClient) + ); + } else if (typeof service.format !== 'string') { + // GrpcObject + this._patchLoadedPackage.call(this, grpcClient, service as GrpcObject); + } + }); + } + + private _createMetadataCapture(): metadataCaptureType { + const config = this.getConfig(); + + return { + client: { + captureRequestMetadata: metadataCapture( + 'request', + config.metadataToSpanAttributes?.client?.requestMetadata ?? [] + ), + captureResponseMetadata: metadataCapture( + 'response', + config.metadataToSpanAttributes?.client?.responseMetadata ?? [] + ), + }, + server: { + captureRequestMetadata: metadataCapture( + 'request', + config.metadataToSpanAttributes?.server?.requestMetadata ?? [] + ), + captureResponseMetadata: metadataCapture( + 'response', + config.metadataToSpanAttributes?.server?.responseMetadata ?? [] + ), + }, + }; } } diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/src/internal-types.ts b/experimental/packages/opentelemetry-instrumentation-grpc/src/internal-types.ts index 50d337b636..9e87e57a7c 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/src/internal-types.ts +++ b/experimental/packages/opentelemetry-instrumentation-grpc/src/internal-types.ts @@ -15,7 +15,18 @@ */ import type { Span } from '@opentelemetry/api'; -import type { Metadata } from '@grpc/grpc-js'; +import type { EventEmitter } from 'events'; +import type { CALL_SPAN_ENDED } from './serverUtils'; +import type { + requestCallback, + ServerUnaryCall, + ServerReadableStream, + ServerWritableStream, + ServerDuplexStream, + Server, + Metadata, + makeGenericClientConstructor, +} from '@grpc/grpc-js'; export type metadataCaptureType = { client: { @@ -27,3 +38,49 @@ export type metadataCaptureType = { captureResponseMetadata: (span: Span, metadata: Metadata) => void; }; }; + +/** + * Server Unary callback type + */ +export type SendUnaryDataCallback = requestCallback; + +/** + * Intersection type of all grpc server call types + */ +export type ServerCall = + | ServerUnaryCall + | ServerReadableStream + | ServerWritableStream + | ServerDuplexStream; + +/** + * {@link ServerCall} ServerCall extended with misc. missing utility types + */ +export type ServerCallWithMeta = ServerCall & { + metadata: Metadata; +}; + +/** + * EventEmitter with span ended symbol indicator + */ +export type GrpcEmitter = EventEmitter & { [CALL_SPAN_ENDED]?: boolean }; + +/** + * Grpc client callback function extended with missing utility types + */ +export type GrpcClientFunc = ((...args: unknown[]) => GrpcEmitter) & { + path: string; + requestStream: boolean; + responseStream: boolean; +}; + +export type ServerRegisterFunction = typeof Server.prototype.register; + +export type ClientRequestFunction = ( + ...args: unknown[] +) => ReturnType; + +export type MakeClientConstructorFunction = typeof makeGenericClientConstructor; + +export type { HandleCall } from '@grpc/grpc-js/build/src/server-call'; +export type { PackageDefinition } from '@grpc/grpc-js/build/src/make-client'; diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/serverUtils.ts b/experimental/packages/opentelemetry-instrumentation-grpc/src/serverUtils.ts similarity index 96% rename from experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/serverUtils.ts rename to experimental/packages/opentelemetry-instrumentation-grpc/src/serverUtils.ts index ad07828e68..142c65f3ce 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/src/grpc-js/serverUtils.ts +++ b/experimental/packages/opentelemetry-instrumentation-grpc/src/serverUtils.ts @@ -34,8 +34,8 @@ import type { SendUnaryDataCallback, GrpcEmitter, HandleCall, -} from './types'; -import type { IgnoreMatcher } from '../types'; +} from './internal-types'; +import type { IgnoreMatcher } from './types'; import { context, SpanStatusCode } from '@opentelemetry/api'; import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; @@ -43,9 +43,9 @@ import { SemanticAttributes } from '@opentelemetry/semantic-conventions'; import { _grpcStatusCodeToOpenTelemetryStatusCode, _methodIsIgnored, -} from '../utils'; -import { AttributeNames } from '../enums/AttributeNames'; -import { GRPC_STATUS_CODE_OK } from '../status-code'; +} from './utils'; +import { AttributeNames } from './enums/AttributeNames'; +import { GRPC_STATUS_CODE_OK } from './status-code'; export const CALL_SPAN_ENDED = Symbol('opentelemetry call span ended'); diff --git a/experimental/packages/opentelemetry-instrumentation-grpc/test/grpc-js.test.ts b/experimental/packages/opentelemetry-instrumentation-grpc/test/grpc-js.test.ts index b7e2210298..dc2918d874 100644 --- a/experimental/packages/opentelemetry-instrumentation-grpc/test/grpc-js.test.ts +++ b/experimental/packages/opentelemetry-instrumentation-grpc/test/grpc-js.test.ts @@ -15,7 +15,7 @@ */ import { runTests } from './helper'; -import { GrpcInstrumentation } from '../src/instrumentation'; +import { GrpcInstrumentation } from '../src'; const instrumentation = new GrpcInstrumentation(); instrumentation.enable();