From 77c8aa69dc48972bfa55737fc0f6ad8f1a69493f Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Fri, 19 Feb 2021 17:28:03 +0800 Subject: [PATCH] Add support for consuming OTLP/gRPC metrics (#4722) (#4826) * Add support for consuming OTLP/gRPC metrics Add support for consuming basic OTLP/gRPC metrics, like we support in the OpenTelemetry Collector exporter. This does not cover distributions or summaries; these will be added once the Elasticsearch enhancements we are dependent on are available. * processor/otel: add comment about [Cc]onsumerStats # Conflicts: # changelogs/head.asciidoc --- beater/otlp/grpc.go | 53 +++- beater/otlp/grpc_test.go | 81 +++++- processor/otel/consumer.go | 23 ++ processor/otel/metrics.go | 235 ++++++++++++++++++ processor/otel/metrics_test.go | 206 +++++++++++++++ .../TestOTLPGRPCMetrics.approved.json | 36 +++ ....json => TestOTLPGRPCTraces.approved.json} | 0 systemtest/otlp_test.go | 74 +++++- 8 files changed, 697 insertions(+), 11 deletions(-) create mode 100644 processor/otel/metrics.go create mode 100644 processor/otel/metrics_test.go create mode 100644 systemtest/approvals/TestOTLPGRPCMetrics.approved.json rename systemtest/approvals/{TestOTLPGRPC.approved.json => TestOTLPGRPCTraces.approved.json} (100%) diff --git a/beater/otlp/grpc.go b/beater/otlp/grpc.go index 17963ca596f..ecfa839cf3b 100644 --- a/beater/otlp/grpc.go +++ b/beater/otlp/grpc.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/otlpreceiver" + "go.opentelemetry.io/collector/receiver/otlpreceiver/metrics" "go.opentelemetry.io/collector/receiver/otlpreceiver/trace" "google.golang.org/grpc" @@ -38,8 +39,10 @@ var ( request.IDRequestCount, request.IDResponseCount, request.IDResponseErrorsCount, request.IDResponseValidCount, } - gRPCConsumerRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.consumer") - gRPCConsumerMonitoringMap = request.MonitoringMapForRegistry(gRPCConsumerRegistry, monitoringKeys) + gRPCMetricsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.metrics") + gRPCMetricsMonitoringMap = request.MonitoringMapForRegistry(gRPCMetricsRegistry, monitoringKeys) + gRPCTracesRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.traces") + gRPCTracesMonitoringMap = request.MonitoringMapForRegistry(gRPCTracesRegistry, monitoringKeys) ) // RegisterGRPCServices registers OTLP consumer services with the given gRPC server. @@ -48,11 +51,24 @@ func RegisterGRPCServices(grpcServer *grpc.Server, reporter publish.Reporter, lo consumer: &otel.Consumer{Reporter: reporter}, logger: logger, } - // TODO(axw) add support for metrics to processer/otel.Consumer, and register a metrics receiver here. + + // TODO(axw) rather than registering and unregistering monitoring callbacks + // each time a new consumer is created, we should register one callback and + // have it aggregate metrics from the dynamic consumers. + // + // For now, we take the easy way out: we only have one OTLP gRPC service + // running at any time, so just unregister/register a new one. + gRPCMetricsRegistry.Remove("consumer") + monitoring.NewFunc(gRPCMetricsRegistry, "consumer", consumer.collectMetricsMonitoring, monitoring.Report) + traceReceiver := trace.New("otlp", consumer) + metricsReceiver := metrics.New("otlp", consumer) if err := otlpreceiver.RegisterTraceReceiver(context.Background(), traceReceiver, grpcServer, nil); err != nil { return errors.Wrap(err, "failed to register OTLP trace receiver") } + if err := otlpreceiver.RegisterMetricsReceiver(context.Background(), metricsReceiver, grpcServer, nil); err != nil { + return errors.Wrap(err, "failed to register OTLP metrics receiver") + } return nil } @@ -63,13 +79,36 @@ type monitoredConsumer struct { // ConsumeTraces consumes OpenTelemtry trace data. func (c *monitoredConsumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error { - gRPCConsumerMonitoringMap[request.IDRequestCount].Inc() - defer gRPCConsumerMonitoringMap[request.IDResponseCount].Inc() + gRPCTracesMonitoringMap[request.IDRequestCount].Inc() + defer gRPCTracesMonitoringMap[request.IDResponseCount].Inc() if err := c.consumer.ConsumeTraces(ctx, traces); err != nil { - gRPCConsumerMonitoringMap[request.IDResponseErrorsCount].Inc() + gRPCTracesMonitoringMap[request.IDResponseErrorsCount].Inc() c.logger.With(logp.Error(err)).Error("ConsumeTraces returned an error") return err } - gRPCConsumerMonitoringMap[request.IDResponseValidCount].Inc() + gRPCTracesMonitoringMap[request.IDResponseValidCount].Inc() return nil } + +// ConsumeMetrics consumes OpenTelemtry metrics data. +func (c *monitoredConsumer) ConsumeMetrics(ctx context.Context, metrics pdata.Metrics) error { + gRPCMetricsMonitoringMap[request.IDRequestCount].Inc() + defer gRPCMetricsMonitoringMap[request.IDResponseCount].Inc() + if err := c.consumer.ConsumeMetrics(ctx, metrics); err != nil { + gRPCMetricsMonitoringMap[request.IDResponseErrorsCount].Inc() + c.logger.With(logp.Error(err)).Error("ConsumeMetrics returned an error") + return err + } + gRPCMetricsMonitoringMap[request.IDResponseValidCount].Inc() + return nil +} + +func (c *monitoredConsumer) collectMetricsMonitoring(_ monitoring.Mode, V monitoring.Visitor) { + V.OnRegistryStart() + V.OnRegistryFinished() + + stats := c.consumer.Stats() + monitoring.ReportNamespace(V, "consumer", func() { + monitoring.ReportInt(V, "unsupported_dropped", stats.UnsupportedMetricsDropped) + }) +} diff --git a/beater/otlp/grpc_test.go b/beater/otlp/grpc_test.go index 24566beebf7..49ffdb734ce 100644 --- a/beater/otlp/grpc_test.go +++ b/beater/otlp/grpc_test.go @@ -40,8 +40,10 @@ import ( ) var ( - exportTraceServiceRequestType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest") - exportTraceServiceResponseType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse") + exportMetricsServiceRequestType = proto.MessageType("opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest") + exportMetricsServiceResponseType = proto.MessageType("opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse") + exportTraceServiceRequestType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest") + exportTraceServiceResponseType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse") ) func TestConsumeTraces(t *testing.T) { @@ -93,7 +95,7 @@ func TestConsumeTraces(t *testing.T) { assert.Len(t, events, 2) actual := map[string]interface{}{} - monitoring.GetRegistry("apm-server.otlp.grpc.consumer").Do(monitoring.Full, func(key string, value interface{}) { + monitoring.GetRegistry("apm-server.otlp.grpc.traces").Do(monitoring.Full, func(key string, value interface{}) { actual[key] = value }) assert.Equal(t, map[string]interface{}{ @@ -104,6 +106,65 @@ func TestConsumeTraces(t *testing.T) { }, actual) } +func TestConsumeMetrics(t *testing.T) { + var reportError error + report := func(ctx context.Context, req publish.PendingReq) error { + return reportError + } + + // Send a minimal metric to verify that everything is connected properly. + // + // We intentionally do not check the published event contents; those are + // tested in processor/otel. + cannedRequest := jsonExportMetricsServiceRequest(`{ +"resource_metrics": [ + { + "instrumentation_library_metrics": [ + { + "metrics": [ + { + "name": "metric_name" + } + ] + } + ] + } +] +}`) + + conn := newServer(t, report) + err := conn.Invoke( + context.Background(), "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export", + cannedRequest, newExportMetricsServiceResponse(), + ) + assert.NoError(t, err) + + reportError = errors.New("failed to publish events") + err = conn.Invoke( + context.Background(), "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export", + cannedRequest, newExportMetricsServiceResponse(), + ) + assert.Error(t, err) + errStatus := status.Convert(err) + assert.Equal(t, "failed to publish events", errStatus.Message()) + + actual := map[string]interface{}{} + monitoring.GetRegistry("apm-server.otlp.grpc.metrics").Do(monitoring.Full, func(key string, value interface{}) { + actual[key] = value + }) + assert.Equal(t, map[string]interface{}{ + // In both of the requests we send above, + // the metrics do not have a type and so + // we treat them as unsupported metrics. + "consumer.unsupported_dropped": int64(2), + + "request.count": int64(2), + "response.count": int64(2), + "response.errors.count": int64(1), + "response.valid.count": int64(1), + }, actual) +} + func jsonExportTraceServiceRequest(j string) interface{} { request := reflect.New(exportTraceServiceRequestType.Elem()).Interface() decoder := json.NewDecoder(strings.NewReader(j)) @@ -118,6 +179,20 @@ func newExportTraceServiceResponse() interface{} { return reflect.New(exportTraceServiceResponseType.Elem()).Interface() } +func jsonExportMetricsServiceRequest(j string) interface{} { + request := reflect.New(exportMetricsServiceRequestType.Elem()).Interface() + decoder := json.NewDecoder(strings.NewReader(j)) + decoder.DisallowUnknownFields() + if err := decoder.Decode(request); err != nil { + panic(err) + } + return request +} + +func newExportMetricsServiceResponse() interface{} { + return reflect.New(exportMetricsServiceResponseType.Elem()).Interface() +} + func newServer(t *testing.T, report publish.Reporter) *grpc.ClientConn { lis, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) diff --git a/processor/otel/consumer.go b/processor/otel/consumer.go index a32a8c73c30..9874d5f1b4a 100644 --- a/processor/otel/consumer.go +++ b/processor/otel/consumer.go @@ -41,6 +41,7 @@ import ( "net/url" "strconv" "strings" + "sync/atomic" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/translator/conventions" @@ -65,9 +66,31 @@ const ( // Consumer transforms open-telemetry data to be compatible with elastic APM data type Consumer struct { + stats consumerStats + Reporter publish.Reporter } +// ConsumerStats holds a snapshot of statistics about data consumption. +type ConsumerStats struct { + // UnsupportedMetricsDropped records the number of unsupported metrics + // that have been dropped by the consumer. + UnsupportedMetricsDropped int64 +} + +// consumerStats holds the current statistics, which must be accessed and +// modified using atomic operations. +type consumerStats struct { + unsupportedMetricsDropped int64 +} + +// Stats returns a snapshot of the current statistics about data consumption. +func (c *Consumer) Stats() ConsumerStats { + return ConsumerStats{ + UnsupportedMetricsDropped: atomic.LoadInt64(&c.stats.unsupportedMetricsDropped), + } +} + // ConsumeTraces consumes OpenTelemetry trace data, // converting into Elastic APM events and reporting to the Elastic APM schema. func (c *Consumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error { diff --git a/processor/otel/metrics.go b/processor/otel/metrics.go new file mode 100644 index 00000000000..c778fe3b892 --- /dev/null +++ b/processor/otel/metrics.go @@ -0,0 +1,235 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Portions copied from OpenTelemetry Collector (contrib), from the +// elastic exporter. +// +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otel + +import ( + "context" + "sort" + "strings" + "sync/atomic" + "time" + + "go.opentelemetry.io/collector/consumer/pdata" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/publish" + "github.com/elastic/beats/v7/libbeat/common" +) + +// ConsumeMetrics consumes OpenTelemetry metrics data, converting into +// the Elastic APM metrics model and sending to the reporter. +func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pdata.Metrics) error { + batch := c.convertMetrics(metrics) + return c.Reporter(ctx, publish.PendingReq{ + Transformables: batch.Transformables(), + Trace: true, + }) +} + +func (c *Consumer) convertMetrics(metrics pdata.Metrics) *model.Batch { + batch := model.Batch{} + resourceMetrics := metrics.ResourceMetrics() + for i := 0; i < resourceMetrics.Len(); i++ { + c.convertResourceMetrics(resourceMetrics.At(i), &batch) + } + return &batch +} + +func (c *Consumer) convertResourceMetrics(resourceMetrics pdata.ResourceMetrics, out *model.Batch) { + var metadata model.Metadata + translateResourceMetadata(resourceMetrics.Resource(), &metadata) + instrumentationLibraryMetrics := resourceMetrics.InstrumentationLibraryMetrics() + for i := 0; i < instrumentationLibraryMetrics.Len(); i++ { + c.convertInstrumentationLibraryMetrics(instrumentationLibraryMetrics.At(i), metadata, out) + } +} + +func (c *Consumer) convertInstrumentationLibraryMetrics(in pdata.InstrumentationLibraryMetrics, metadata model.Metadata, out *model.Batch) { + var ms metricsets + otelMetrics := in.Metrics() + var unsupported int64 + for i := 0; i < otelMetrics.Len(); i++ { + if !c.addMetric(otelMetrics.At(i), &ms) { + unsupported++ + } + } + for _, m := range ms { + m.Metadata = metadata + out.Metricsets = append(out.Metricsets, m.Metricset) + } + if unsupported > 0 { + atomic.AddInt64(&c.stats.unsupportedMetricsDropped, unsupported) + } +} + +func (c *Consumer) addMetric(metric pdata.Metric, ms *metricsets) bool { + switch metric.DataType() { + case pdata.MetricDataTypeIntGauge: + dps := metric.IntGauge().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + ms.upsert( + asTime(dp.Timestamp()), dp.LabelsMap(), + model.Sample{ + Name: metric.Name(), + Value: float64(dp.Value()), + }, + ) + } + return true + case pdata.MetricDataTypeDoubleGauge: + dps := metric.DoubleGauge().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + ms.upsert( + asTime(dp.Timestamp()), dp.LabelsMap(), + model.Sample{ + Name: metric.Name(), + Value: float64(dp.Value()), + }, + ) + } + return true + case pdata.MetricDataTypeIntSum: + dps := metric.IntSum().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + ms.upsert( + asTime(dp.Timestamp()), dp.LabelsMap(), + model.Sample{ + Name: metric.Name(), + Value: float64(dp.Value()), + }, + ) + } + return true + case pdata.MetricDataTypeDoubleSum: + dps := metric.DoubleSum().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + ms.upsert( + asTime(dp.Timestamp()), dp.LabelsMap(), + model.Sample{ + Name: metric.Name(), + Value: float64(dp.Value()), + }, + ) + } + return true + case pdata.MetricDataTypeIntHistogram: + // TODO(axw) https://github.com/elastic/apm-server/issues/3195 + case pdata.MetricDataTypeDoubleHistogram: + // TODO(axw) https://github.com/elastic/apm-server/issues/3195 + case pdata.MetricDataTypeDoubleSummary: + // TODO(axw) https://github.com/elastic/apm-server/issues/3195 + // (Not quite the same issue, but the solution would also enable + // aggregate metrics, which would be appropriate for summaries.) + } + // Unsupported metric: report that it has been dropped. + return false +} + +type metricsets []metricset + +type metricset struct { + *model.Metricset + labels []stringMapItem // sorted by key +} + +type stringMapItem struct { + key string + value string +} + +// upsert searches for an existing metricset with the given timestamp and labels, +// and appends the sample to it. If there is no such existing metricset, a new one +// is created. +func (ms *metricsets) upsert(timestamp time.Time, labelMap pdata.StringMap, sample model.Sample) { + labelMap.Sort() + labels := make([]stringMapItem, 0, labelMap.Len()) + labelMap.ForEach(func(k, v string) { + labels = append(labels, stringMapItem{k, v}) + }) + var m *model.Metricset + i := ms.search(timestamp, labels) + if i < len(*ms) && compareMetricsets((*ms)[i], timestamp, labels) == 0 { + m = (*ms)[i].Metricset + } else { + m = &model.Metricset{Timestamp: timestamp} + if len(labels) > 0 { + m.Labels = make(common.MapStr, len(labels)) + for _, label := range labels { + m.Labels[label.key] = label.value + } + } + head := (*ms)[:i] + tail := append([]metricset{{Metricset: m, labels: labels}}, (*ms)[i:]...) + *ms = append(head, tail...) + } + m.Samples = append(m.Samples, sample) +} + +func (ms *metricsets) search(timestamp time.Time, labels []stringMapItem) int { + return sort.Search(len(*ms), func(i int) bool { + return compareMetricsets((*ms)[i], timestamp, labels) >= 0 + }) +} + +func compareMetricsets(ms metricset, timestamp time.Time, labels []stringMapItem) int { + if d := ms.Timestamp.Sub(timestamp); d < 0 { + return -1 + } else if d > 0 { + return 1 + } + if n := len(ms.labels) - len(labels); n < 0 { + return -1 + } else if n > 0 { + return 1 + } + for i, la := range ms.labels { + lb := labels[i] + if n := strings.Compare(la.key, lb.key); n != 0 { + return n + } + if n := strings.Compare(la.value, lb.value); n != 0 { + return n + } + } + return 0 +} + +func asTime(in pdata.TimestampUnixNano) time.Time { + return time.Unix(0, int64(in)).UTC() +} diff --git a/processor/otel/metrics_test.go b/processor/otel/metrics_test.go new file mode 100644 index 00000000000..907e634ed49 --- /dev/null +++ b/processor/otel/metrics_test.go @@ -0,0 +1,206 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Portions copied from OpenTelemetry Collector (contrib), from the +// elastic exporter. +// +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otel_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/pdata" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/processor/otel" + "github.com/elastic/apm-server/publish" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestConsumeMetrics(t *testing.T) { + metrics := pdata.NewMetrics() + resourceMetrics := pdata.NewResourceMetrics() + metrics.ResourceMetrics().Append(resourceMetrics) + instrumentationLibraryMetrics := pdata.NewInstrumentationLibraryMetrics() + resourceMetrics.InstrumentationLibraryMetrics().Append(instrumentationLibraryMetrics) + metricSlice := instrumentationLibraryMetrics.Metrics() + appendMetric := func(name string, dataType pdata.MetricDataType) pdata.Metric { + n := metricSlice.Len() + metricSlice.Resize(n + 1) + metric := metricSlice.At(n) + metric.SetName(name) + metric.SetDataType(dataType) + return metric + } + + timestamp0 := time.Unix(123, 0).UTC() + timestamp1 := time.Unix(456, 0).UTC() + + var expectDropped int64 + + metric := appendMetric("int_gauge_metric", pdata.MetricDataTypeIntGauge) + intGauge := metric.IntGauge() + intGauge.DataPoints().Resize(4) + intGauge.DataPoints().At(0).SetTimestamp(pdata.TimestampUnixNano(timestamp0.UnixNano())) + intGauge.DataPoints().At(0).SetValue(1) + intGauge.DataPoints().At(1).SetTimestamp(pdata.TimestampUnixNano(timestamp1.UnixNano())) + intGauge.DataPoints().At(1).SetValue(2) + intGauge.DataPoints().At(1).LabelsMap().InitFromMap(map[string]string{"k": "v"}) + intGauge.DataPoints().At(2).SetTimestamp(pdata.TimestampUnixNano(timestamp1.UnixNano())) + intGauge.DataPoints().At(2).SetValue(3) + intGauge.DataPoints().At(3).SetTimestamp(pdata.TimestampUnixNano(timestamp1.UnixNano())) + intGauge.DataPoints().At(3).SetValue(4) + intGauge.DataPoints().At(3).LabelsMap().InitFromMap(map[string]string{"k": "v2"}) + + metric = appendMetric("double_gauge_metric", pdata.MetricDataTypeDoubleGauge) + doubleGauge := metric.DoubleGauge() + doubleGauge.DataPoints().Resize(4) + doubleGauge.DataPoints().At(0).SetTimestamp(pdata.TimestampUnixNano(timestamp0.UnixNano())) + doubleGauge.DataPoints().At(0).SetValue(5) + doubleGauge.DataPoints().At(1).SetTimestamp(pdata.TimestampUnixNano(timestamp1.UnixNano())) + doubleGauge.DataPoints().At(1).SetValue(6) + doubleGauge.DataPoints().At(1).LabelsMap().InitFromMap(map[string]string{"k": "v"}) + doubleGauge.DataPoints().At(2).SetTimestamp(pdata.TimestampUnixNano(timestamp1.UnixNano())) + doubleGauge.DataPoints().At(2).SetValue(7) + doubleGauge.DataPoints().At(3).SetTimestamp(pdata.TimestampUnixNano(timestamp1.UnixNano())) + doubleGauge.DataPoints().At(3).SetValue(8) + doubleGauge.DataPoints().At(3).LabelsMap().InitFromMap(map[string]string{"k": "v2"}) + + metric = appendMetric("int_sum_metric", pdata.MetricDataTypeIntSum) + intSum := metric.IntSum() + intSum.DataPoints().Resize(3) + intSum.DataPoints().At(0).SetTimestamp(pdata.TimestampUnixNano(timestamp0.UnixNano())) + intSum.DataPoints().At(0).SetValue(9) + intSum.DataPoints().At(1).SetTimestamp(pdata.TimestampUnixNano(timestamp1.UnixNano())) + intSum.DataPoints().At(1).SetValue(10) + intSum.DataPoints().At(1).LabelsMap().InitFromMap(map[string]string{"k": "v"}) + intSum.DataPoints().At(2).SetTimestamp(pdata.TimestampUnixNano(timestamp1.UnixNano())) + intSum.DataPoints().At(2).SetValue(11) + intSum.DataPoints().At(2).LabelsMap().InitFromMap(map[string]string{"k2": "v"}) + + metric = appendMetric("double_sum_metric", pdata.MetricDataTypeDoubleSum) + doubleSum := metric.DoubleSum() + doubleSum.DataPoints().Resize(3) + doubleSum.DataPoints().At(0).SetTimestamp(pdata.TimestampUnixNano(timestamp0.UnixNano())) + doubleSum.DataPoints().At(0).SetValue(12) + doubleSum.DataPoints().At(1).SetTimestamp(pdata.TimestampUnixNano(timestamp1.UnixNano())) + doubleSum.DataPoints().At(1).SetValue(13) + doubleSum.DataPoints().At(1).LabelsMap().InitFromMap(map[string]string{"k": "v"}) + doubleSum.DataPoints().At(2).SetTimestamp(pdata.TimestampUnixNano(timestamp1.UnixNano())) + doubleSum.DataPoints().At(2).SetValue(14) + doubleSum.DataPoints().At(2).LabelsMap().InitFromMap(map[string]string{"k2": "v"}) + + // Histograms are currently not supported, and will be ignored. + metric = appendMetric("double_histogram_metric", pdata.MetricDataTypeDoubleHistogram) + metric.DoubleHistogram().DataPoints().Resize(1) + expectDropped++ + metric = appendMetric("int_histogram_metric", pdata.MetricDataTypeIntHistogram) + metric.IntHistogram().DataPoints().Resize(1) + expectDropped++ + + metadata := model.Metadata{ + Service: model.Service{ + Name: "unknown", + Language: model.Language{ + Name: "unknown", + }, + Agent: model.Agent{ + Name: "otlp", + Version: "unknown", + }, + }, + } + + metricsets, stats := transformMetrics(t, metrics) + assert.Equal(t, expectDropped, stats.UnsupportedMetricsDropped) + + assert.Equal(t, []*model.Metricset{{ + Metadata: metadata, + Timestamp: timestamp0, + Samples: []model.Sample{ + {Name: "int_gauge_metric", Value: 1}, + {Name: "double_gauge_metric", Value: 5}, + {Name: "int_sum_metric", Value: 9}, + {Name: "double_sum_metric", Value: 12}, + }, + }, { + Metadata: metadata, + Timestamp: timestamp1, + Samples: []model.Sample{ + {Name: "int_gauge_metric", Value: 3}, + {Name: "double_gauge_metric", Value: 7}, + }, + }, { + Metadata: metadata, + Timestamp: timestamp1, + Labels: common.MapStr{"k": "v"}, + Samples: []model.Sample{ + {Name: "int_gauge_metric", Value: 2}, + {Name: "double_gauge_metric", Value: 6}, + {Name: "int_sum_metric", Value: 10}, + {Name: "double_sum_metric", Value: 13}, + }, + }, { + Metadata: metadata, + Timestamp: timestamp1, + Labels: common.MapStr{"k": "v2"}, + Samples: []model.Sample{ + {Name: "int_gauge_metric", Value: 4}, + {Name: "double_gauge_metric", Value: 8}, + }, + }, { + Metadata: metadata, + Timestamp: timestamp1, + Labels: common.MapStr{"k2": "v"}, + Samples: []model.Sample{ + {Name: "int_sum_metric", Value: 11}, + {Name: "double_sum_metric", Value: 14}, + }, + }}, metricsets) +} + +func transformMetrics(t *testing.T, metrics pdata.Metrics) ([]*model.Metricset, otel.ConsumerStats) { + var metricsets []*model.Metricset + reporter := func(ctx context.Context, req publish.PendingReq) error { + for _, tf := range req.Transformables { + metricsets = append(metricsets, tf.(*model.Metricset)) + } + return nil + } + consumer := &otel.Consumer{Reporter: reporter} + err := consumer.ConsumeMetrics(context.Background(), metrics) + require.NoError(t, err) + return metricsets, consumer.Stats() +} diff --git a/systemtest/approvals/TestOTLPGRPCMetrics.approved.json b/systemtest/approvals/TestOTLPGRPCMetrics.approved.json new file mode 100644 index 00000000000..c0c344e79f3 --- /dev/null +++ b/systemtest/approvals/TestOTLPGRPCMetrics.approved.json @@ -0,0 +1,36 @@ +{ + "events": [ + { + "@timestamp": "dynamic", + "agent": { + "name": "otlp", + "version": "unknown" + }, + "ecs": { + "version": "dynamic" + }, + "event": { + "ingested": "dynamic" + }, + "float64_counter": 1, + "observer": { + "ephemeral_id": "dynamic", + "hostname": "dynamic", + "id": "dynamic", + "type": "apm-server", + "version": "dynamic", + "version_major": "dynamic" + }, + "processor": { + "event": "metric", + "name": "metric" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + } + } + ] +} diff --git a/systemtest/approvals/TestOTLPGRPC.approved.json b/systemtest/approvals/TestOTLPGRPCTraces.approved.json similarity index 100% rename from systemtest/approvals/TestOTLPGRPC.approved.json rename to systemtest/approvals/TestOTLPGRPCTraces.approved.json diff --git a/systemtest/otlp_test.go b/systemtest/otlp_test.go index fe147f41ccf..74ccd2faff3 100644 --- a/systemtest/otlp_test.go +++ b/systemtest/otlp_test.go @@ -24,9 +24,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp" "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" + "go.opentelemetry.io/otel/metric" + controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" + processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" @@ -52,7 +57,7 @@ func init() { })) } -func TestOTLPGRPC(t *testing.T) { +func TestOTLPGRPCTraces(t *testing.T) { systemtest.CleanupElasticsearch(t) srv := apmservertest.NewServer(t) @@ -67,6 +72,32 @@ func TestOTLPGRPC(t *testing.T) { systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits) } +func TestOTLPGRPCMetrics(t *testing.T) { + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewUnstartedServer(t) + srv.Config.Monitoring = &apmservertest.MonitoringConfig{ + Enabled: true, + MetricsPeriod: time.Duration(time.Second), + StatePeriod: time.Duration(time.Second), + } + err := srv.Start() + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = sendOTLPMetrics(ctx, srv) + require.NoError(t, err) + + result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{ + estest.TermQuery{Field: "processor.event", Value: "metric"}, + }}) + systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits, "@timestamp") + + // Make sure we report monitoring for the metrics consumer. Metric values are unit tested. + doc := getBeatsMonitoringStats(t, srv, nil) + assert.True(t, gjson.GetBytes(doc.RawSource, "beats_stats.metrics.apm-server.otlp.grpc.metrics.consumer").Exists()) +} + func TestOTLPGRPCAuth(t *testing.T) { systemtest.CleanupElasticsearch(t) srv := apmservertest.NewUnstartedServer(t) @@ -132,6 +163,47 @@ func sendOTLPTrace(ctx context.Context, srv *apmservertest.Server, options ...ot } } +func sendOTLPMetrics(ctx context.Context, srv *apmservertest.Server, options ...otlpgrpc.Option) error { + options = append(options, otlpgrpc.WithEndpoint(serverAddr(srv)), otlpgrpc.WithInsecure()) + driver := otlpgrpc.NewDriver(options...) + exporter, err := otlp.NewExporter(context.Background(), driver) + if err != nil { + panic(err) + } + + controller := controller.New( + processor.New( + simple.NewWithHistogramDistribution([]float64{1, 100, 1000, 10000}), + exporter, + ), + controller.WithPusher(exporter), + controller.WithCollectPeriod(time.Minute), + ) + if err := controller.Start(context.Background()); err != nil { + return err + } + meterProvider := controller.MeterProvider() + meter := meterProvider.Meter("test-meter") + + float64Counter := metric.Must(meter).NewFloat64Counter("float64_counter") + float64Counter.Add(context.Background(), 1) + + // This will be dropped, as we do not support consuming histograms yet. + int64Recorder := metric.Must(meter).NewInt64ValueRecorder("int64_recorder") + int64Recorder.Record(context.Background(), 123) + + // Stopping the controller will collect and export metrics. + if err := controller.Stop(context.Background()); err != nil { + return err + } + select { + case err := <-otelErrors: + return err + default: + return nil + } +} + type idGeneratorFuncs struct { newIDs func(context context.Context) (trace.TraceID, trace.SpanID) newSpanID func(ctx context.Context, traceID trace.TraceID) trace.SpanID