From 9378ab5a15e4fe481fdc03627138e5def9638eb5 Mon Sep 17 00:00:00 2001 From: Alex Boten Date: Fri, 15 Dec 2023 10:33:21 -0800 Subject: [PATCH] [processor/servicegraph] update own telemetry to use otel (#29917) This updates the servicegraph processor to emit telemetry using OpenTelemetry instead of OpenCensus. Related #29867 --------- Signed-off-by: Alex Boten --- .../codeboten_rm-census-servicegraph.yaml | 27 +++++ processor/servicegraphprocessor/factory.go | 10 +- processor/servicegraphprocessor/go.mod | 8 +- processor/servicegraphprocessor/metrics.go | 45 -------- processor/servicegraphprocessor/processor.go | 40 +++++-- .../servicegraphprocessor/processor_test.go | 101 ++++++++++++++++-- 6 files changed, 162 insertions(+), 69 deletions(-) create mode 100755 .chloggen/codeboten_rm-census-servicegraph.yaml delete mode 100644 processor/servicegraphprocessor/metrics.go diff --git a/.chloggen/codeboten_rm-census-servicegraph.yaml b/.chloggen/codeboten_rm-census-servicegraph.yaml new file mode 100755 index 000000000000..bfdd7a64540f --- /dev/null +++ b/.chloggen/codeboten_rm-census-servicegraph.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: servicegraphprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: update own telemetry to use otel + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29917] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/servicegraphprocessor/factory.go b/processor/servicegraphprocessor/factory.go index 219e3853321b..fb046d4baf9a 100644 --- a/processor/servicegraphprocessor/factory.go +++ b/processor/servicegraphprocessor/factory.go @@ -7,7 +7,6 @@ import ( "context" "time" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" @@ -51,9 +50,6 @@ func init() { // NewFactory creates a factory for the servicegraph processor. func NewFactory() processor.Factory { - // TODO: Handle this err - _ = view.Register(serviceGraphProcessorViews()...) - return processor.NewFactory( typeStr, createDefaultConfig, @@ -63,8 +59,6 @@ func NewFactory() processor.Factory { // NewConnectorFactoryFunc creates a function that returns a factory for the servicegraph connector. var NewConnectorFactoryFunc = func(cfgType component.Type, tracesToMetricsStability component.StabilityLevel) connector.Factory { - // TODO: Handle this err - _ = view.Register(serviceGraphProcessorViews()...) return connector.NewFactory( cfgType, createDefaultConfig, @@ -84,13 +78,13 @@ func createDefaultConfig() component.Config { } func createTracesProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) { - p := newProcessor(params.Logger, cfg) + p := newProcessor(params.TelemetrySettings, cfg) p.tracesConsumer = nextConsumer return p, nil } func createTracesToMetricsConnector(_ context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) { - c := newProcessor(params.Logger, cfg) + c := newProcessor(params.TelemetrySettings, cfg) c.metricsConsumer = nextConsumer return c, nil } diff --git a/processor/servicegraphprocessor/go.mod b/processor/servicegraphprocessor/go.mod index 434f8b18a9be..b2a07e9c3c8e 100644 --- a/processor/servicegraphprocessor/go.mod +++ b/processor/servicegraphprocessor/go.mod @@ -5,9 +5,9 @@ go 1.20 require ( github.com/stretchr/testify v1.8.4 - go.opencensus.io v0.24.0 go.opentelemetry.io/collector/component v0.91.0 go.opentelemetry.io/collector/config/configgrpc v0.91.0 + go.opentelemetry.io/collector/config/configtelemetry v0.91.0 go.opentelemetry.io/collector/connector v0.91.0 go.opentelemetry.io/collector/consumer v0.91.0 go.opentelemetry.io/collector/exporter v0.91.0 @@ -17,6 +17,8 @@ require ( go.opentelemetry.io/collector/pdata v1.0.0 go.opentelemetry.io/collector/processor v0.91.0 go.opentelemetry.io/collector/semconv v0.91.0 + go.opentelemetry.io/otel/metric v1.21.0 + go.opentelemetry.io/otel/sdk/metric v1.21.0 go.uber.org/zap v1.26.0 ) @@ -67,12 +69,12 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect + go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.91.0 // indirect go.opentelemetry.io/collector/config/configauth v0.91.0 // indirect go.opentelemetry.io/collector/config/configcompression v0.91.0 // indirect go.opentelemetry.io/collector/config/confignet v0.91.0 // indirect go.opentelemetry.io/collector/config/configopaque v0.91.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect go.opentelemetry.io/collector/config/configtls v0.91.0 // indirect go.opentelemetry.io/collector/config/internal v0.91.0 // indirect go.opentelemetry.io/collector/confmap v0.91.0 // indirect @@ -93,9 +95,7 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.44.1-0.20231201153405-6027c1ae76f2 // indirect go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect go.opentelemetry.io/otel/sdk v1.21.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect go.opentelemetry.io/otel/trace v1.21.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/processor/servicegraphprocessor/metrics.go b/processor/servicegraphprocessor/metrics.go deleted file mode 100644 index 73d23e496ee4..000000000000 --- a/processor/servicegraphprocessor/metrics.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package servicegraphprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor" - -import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opentelemetry.io/collector/processor/processorhelper" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/metadata" -) - -var ( - statDroppedSpans = stats.Int64("dropped_spans", "Number of spans dropped when trying to add edges", stats.UnitDimensionless) - statTotalEdges = stats.Int64("total_edges", "Total number of unique edges", stats.UnitDimensionless) - statExpiredEdges = stats.Int64("expired_edges", "Number of edges that expired before finding its matching span", stats.UnitDimensionless) -) - -func serviceGraphProcessorViews() []*view.View { - droppedSpansView := &view.View{ - Name: processorhelper.BuildCustomMetricName(metadata.Type, statDroppedSpans.Name()), - Description: statDroppedSpans.Description(), - Measure: statDroppedSpans, - Aggregation: view.Count(), - } - totalEdgesView := &view.View{ - Name: processorhelper.BuildCustomMetricName(metadata.Type, statTotalEdges.Name()), - Description: statTotalEdges.Description(), - Measure: statTotalEdges, - Aggregation: view.Count(), - } - expiredEdgesView := &view.View{ - Name: processorhelper.BuildCustomMetricName(metadata.Type, statExpiredEdges.Name()), - Description: statExpiredEdges.Description(), - Measure: statExpiredEdges, - Aggregation: view.Count(), - } - - return []*view.View{ - droppedSpansView, - totalEdgesView, - expiredEdgesView, - } -} diff --git a/processor/servicegraphprocessor/processor.go b/processor/servicegraphprocessor/processor.go index 866b49009647..98d67cc3aa44 100644 --- a/processor/servicegraphprocessor/processor.go +++ b/processor/servicegraphprocessor/processor.go @@ -12,7 +12,6 @@ import ( "sync" "time" - "go.opencensus.io/stats" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -20,9 +19,12 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" semconv "go.opentelemetry.io/collector/semconv/v1.13.0" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/store" ) @@ -76,10 +78,14 @@ type serviceGraphProcessor struct { metricMutex sync.RWMutex keyToMetric map[string]metricSeries + statDroppedSpans metric.Int64Counter + statTotalEdges metric.Int64Counter + statExpiredEdges metric.Int64Counter + shutdownCh chan any } -func newProcessor(logger *zap.Logger, config component.Config) *serviceGraphProcessor { +func newProcessor(set component.TelemetrySettings, config component.Config) *serviceGraphProcessor { pConfig := config.(*Config) bounds := defaultLatencyHistogramBuckets @@ -102,9 +108,28 @@ func newProcessor(logger *zap.Logger, config component.Config) *serviceGraphProc pConfig.VirtualNodePeerAttributes = defaultPeerAttributes } + scopeName := "processor/servicegraphprocessor" + meter := set.MeterProvider.Meter(scopeName) + + droppedSpan, _ := meter.Int64Counter( + processorhelper.BuildCustomMetricName(metadata.Type, "dropped_spans"), + metric.WithDescription("Number of spans dropped when trying to add edges"), + metric.WithUnit("1"), + ) + totalEdges, _ := meter.Int64Counter( + processorhelper.BuildCustomMetricName(metadata.Type, "total_edges"), + metric.WithDescription("Total number of unique edges"), + metric.WithUnit("1"), + ) + expiredEdges, _ := meter.Int64Counter( + processorhelper.BuildCustomMetricName(metadata.Type, "expired_edges"), + metric.WithDescription("Number of edges that expired before finding its matching span"), + metric.WithUnit("1"), + ) + return &serviceGraphProcessor{ config: pConfig, - logger: logger, + logger: set.Logger, startTime: time.Now(), reqTotal: make(map[string]int64), reqFailedTotal: make(map[string]int64), @@ -117,6 +142,9 @@ func newProcessor(logger *zap.Logger, config component.Config) *serviceGraphProc reqDurationBounds: bounds, keyToMetric: make(map[string]metricSeries), shutdownCh: make(chan any), + statDroppedSpans: droppedSpan, + statTotalEdges: totalEdges, + statExpiredEdges: expiredEdges, } } @@ -299,7 +327,7 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace. if errors.Is(err, store.ErrTooManyItems) { totalDroppedSpans++ - stats.Record(ctx, statDroppedSpans.M(1)) + p.statDroppedSpans.Add(ctx, 1) continue } @@ -309,7 +337,7 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace. } if isNew { - stats.Record(ctx, statTotalEdges.M(1)) + p.statTotalEdges.Add(ctx, 1) } } } @@ -354,7 +382,7 @@ func (p *serviceGraphProcessor) onExpire(e *store.Edge) { zap.Stringer("trace_id", e.TraceID), ) - stats.Record(context.Background(), statExpiredEdges.M(1)) + p.statExpiredEdges.Add(context.Background(), 1) if virtualNodeFeatureGate.IsEnabled() { e.ConnectionType = store.VirtualNode diff --git a/processor/servicegraphprocessor/processor_test.go b/processor/servicegraphprocessor/processor_test.go index ab3e3f3051dc..8057519080fb 100644 --- a/processor/servicegraphprocessor/processor_test.go +++ b/processor/servicegraphprocessor/processor_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/connector/connectortest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -27,6 +28,9 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor/processortest" semconv "go.opentelemetry.io/collector/semconv/v1.13.0" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.uber.org/zap/zaptest" ) @@ -99,7 +103,9 @@ func TestProcessorShutdown(t *testing.T) { // Test next := new(consumertest.TracesSink) - p := newProcessor(zaptest.NewLogger(t), cfg) + set := componenttest.NewNopTelemetrySettings() + set.Logger = zaptest.NewLogger(t) + p := newProcessor(set, cfg) p.tracesConsumer = next err := p.Shutdown(context.Background()) @@ -114,7 +120,9 @@ func TestConnectorShutdown(t *testing.T) { // Test next := new(consumertest.MetricsSink) - p := newProcessor(zaptest.NewLogger(t), cfg) + set := componenttest.NewNopTelemetrySettings() + set.Logger = zaptest.NewLogger(t) + p := newProcessor(set, cfg) p.metricsConsumer = next err := p.Shutdown(context.Background()) @@ -216,7 +224,9 @@ func TestProcessorConsume(t *testing.T) { } // Prepare - p := newProcessor(zaptest.NewLogger(t), tc.cfg) + set := componenttest.NewNopTelemetrySettings() + set.Logger = zaptest.NewLogger(t) + p := newProcessor(set, tc.cfg) p.tracesConsumer = consumertest.NewNop() metricsExporter := newMockMetricsExporter() @@ -259,7 +269,9 @@ func TestConnectorConsume(t *testing.T) { Store: StoreConfig{MaxItems: 10}, } - conn := newProcessor(zaptest.NewLogger(t), cfg) + set := componenttest.NewNopTelemetrySettings() + set.Logger = zaptest.NewLogger(t) + conn := newProcessor(set, cfg) conn.metricsConsumer = newMockMetricsExporter() assert.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost())) @@ -281,7 +293,9 @@ func TestConnectorConsume(t *testing.T) { func TestProcessor_MetricsFlushInterval(t *testing.T) { // Prepare - p := newProcessor(zaptest.NewLogger(t), &Config{ + set := componenttest.NewNopTelemetrySettings() + set.Logger = zaptest.NewLogger(t) + p := newProcessor(set, &Config{ MetricsExporter: "mock", Dimensions: []string{"some-attribute", "non-existing-attribute"}, Store: StoreConfig{ @@ -599,7 +613,9 @@ func TestStaleSeriesCleanup(t *testing.T) { mockMetricsExporter := newMockMetricsExporter() - p := newProcessor(zaptest.NewLogger(t), cfg) + set := componenttest.NewNopTelemetrySettings() + set.Logger = zaptest.NewLogger(t) + p := newProcessor(set, cfg) p.tracesConsumer = consumertest.NewNop() mHost := newMockHost(map[component.DataType]map[component.ID]component.Component{ @@ -629,3 +645,76 @@ func TestStaleSeriesCleanup(t *testing.T) { // Shutdown the processor assert.NoError(t, p.Shutdown(context.Background())) } + +func setupTelemetry(reader *sdkmetric.ManualReader) component.TelemetrySettings { + settings := componenttest.NewNopTelemetrySettings() + settings.MetricsLevel = configtelemetry.LevelNormal + + settings.MeterProvider = sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + return settings +} + +func TestValidateOwnTelemetry(t *testing.T) { + cfg := &Config{ + MetricsExporter: "mock", + Dimensions: []string{"some-attribute", "non-existing-attribute"}, + Store: StoreConfig{ + MaxItems: 10, + TTL: time.Second, + }, + } + + mockMetricsExporter := newMockMetricsExporter() + + reader := sdkmetric.NewManualReader() + set := setupTelemetry(reader) + p := newProcessor(set, cfg) + p.tracesConsumer = consumertest.NewNop() + + mHost := newMockHost(map[component.DataType]map[component.ID]component.Component{ + component.DataTypeMetrics: { + component.NewID("mock"): mockMetricsExporter, + }, + }) + + assert.NoError(t, p.Start(context.Background(), mHost)) + + // ConsumeTraces + td := buildSampleTrace(t, "first") + assert.NoError(t, p.ConsumeTraces(context.Background(), td)) + + // Make series stale and force a cache cleanup + for key, metric := range p.keyToMetric { + metric.lastUpdated = 0 + p.keyToMetric[key] = metric + } + p.cleanCache() + assert.Equal(t, 0, len(p.keyToMetric)) + + // ConsumeTraces with a trace with different attribute value + td = buildSampleTrace(t, "second") + assert.NoError(t, p.ConsumeTraces(context.Background(), td)) + + // Shutdown the processor + assert.NoError(t, p.Shutdown(context.Background())) + + rm := metricdata.ResourceMetrics{} + assert.NoError(t, reader.Collect(context.Background(), &rm)) + require.Len(t, rm.ScopeMetrics, 1) + sm := rm.ScopeMetrics[0] + require.Len(t, sm.Metrics, 1) + got := sm.Metrics[0] + want := metricdata.Metrics{ + Name: "processor/servicegraph/total_edges", + Description: "Total number of unique edges", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Value: 2}, + }, + }, + } + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) +}