diff --git a/go.mod b/go.mod index bcaf927b..ee3e45ed 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,9 @@ module github.com/elastic/apm-data -go 1.21 +go 1.21.1 require ( + github.com/elastic/opentelemetry-lib v0.0.0-20240520143123-3234f90c8fca github.com/google/go-cmp v0.6.0 github.com/jaegertracing/jaeger v1.56.0 github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index 66ce49f4..0420b1a3 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6 github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0= github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss= +github.com/elastic/opentelemetry-lib v0.0.0-20240520143123-3234f90c8fca h1:LM2sFnvnkQP9txkkdMr7kVQiKGtWNJl+yZECsEGacCA= +github.com/elastic/opentelemetry-lib v0.0.0-20240520143123-3234f90c8fca/go.mod h1:/kKvHbJLVo/NcKMPHI8/RZKL64fushmnRUzn+arQpjg= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= diff --git a/input/otlp/consumer.go b/input/otlp/consumer.go index 851ea407..b1c3e3e8 100644 --- a/input/otlp/consumer.go +++ b/input/otlp/consumer.go @@ -22,10 +22,17 @@ import ( "github.com/elastic/apm-data/input" "github.com/elastic/apm-data/model/modelpb" + "github.com/elastic/opentelemetry-lib/remappers/hostmetrics" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) +type remapper interface { + Remap(pmetric.ScopeMetrics, pmetric.MetricSlice, pcommon.Resource) +} + // ConsumerConfig holds configuration for Consumer. type ConsumerConfig struct { // Logger holds a logger for the consumer. If this is nil, then @@ -39,14 +46,19 @@ type ConsumerConfig struct { // Semaphore holds a semaphore on which Processor.HandleStream will acquire a // token before proceeding, to limit concurrency. Semaphore input.Semaphore + + // RemapOTelMetrics remaps certain OpenTelemetry metrics to elastic metrics. + // Note that both, OTel and Elastic, metrics would be published. + RemapOTelMetrics bool } // Consumer transforms OpenTelemetry data to the Elastic APM data model, // sending each payload as a batch to the configured BatchProcessor. type Consumer struct { - sem input.Semaphore - config ConsumerConfig - stats consumerStats + config ConsumerConfig + sem input.Semaphore + remappers []remapper + stats consumerStats } // NewConsumer returns a new Consumer with the given configuration. @@ -56,9 +68,16 @@ func NewConsumer(config ConsumerConfig) *Consumer { } else { config.Logger = config.Logger.Named("otel") } + var remappers []remapper + if config.RemapOTelMetrics { + remappers = []remapper{ + hostmetrics.NewRemapper(config.Logger), + } + } return &Consumer{ - config: config, - sem: config.Semaphore, + config: config, + sem: config.Semaphore, + remappers: remappers, } } diff --git a/input/otlp/metrics.go b/input/otlp/metrics.go index b6ea2da6..26e28bf2 100644 --- a/input/otlp/metrics.go +++ b/input/otlp/metrics.go @@ -75,7 +75,7 @@ func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric remainingMetrics := totalMetrics receiveTimestamp := time.Now() c.config.Logger.Debug("consuming metrics", zap.Stringer("metrics", metricsStringer(metrics))) - batch := c.convertMetrics(metrics, receiveTimestamp, &remainingDataPoints, &remainingMetrics) + batch := c.handleMetrics(metrics, receiveTimestamp, &remainingDataPoints, &remainingMetrics) if remainingMetrics > 0 { // Some metrics remained after conversion, meaning that they were dropped. atomic.AddInt64(&c.stats.unsupportedMetricsDropped, remainingMetrics) @@ -93,7 +93,7 @@ func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric }, nil } -func (c *Consumer) convertMetrics( +func (c *Consumer) handleMetrics( metrics pmetric.Metrics, receiveTimestamp time.Time, remainingDataPoints, remainingMetrics *int64, @@ -101,12 +101,12 @@ func (c *Consumer) convertMetrics( batch = &modelpb.Batch{} resourceMetrics := metrics.ResourceMetrics() for i := 0; i < resourceMetrics.Len(); i++ { - c.convertResourceMetrics(resourceMetrics.At(i), receiveTimestamp, batch, remainingDataPoints, remainingMetrics) + c.handleResourceMetrics(resourceMetrics.At(i), receiveTimestamp, batch, remainingDataPoints, remainingMetrics) } return } -func (c *Consumer) convertResourceMetrics( +func (c *Consumer) handleResourceMetrics( resourceMetrics pmetric.ResourceMetrics, receiveTimestamp time.Time, out *modelpb.Batch, @@ -125,23 +125,42 @@ func (c *Consumer) convertResourceMetrics( } scopeMetrics := resourceMetrics.ScopeMetrics() for i := 0; i < scopeMetrics.Len(); i++ { - c.convertScopeMetrics(scopeMetrics.At(i), baseEvent, timeDelta, out, remainingDataPoints, remainingMetrics) + c.handleScopeMetrics(scopeMetrics.At(i), resource, baseEvent, timeDelta, out, remainingDataPoints, remainingMetrics) } return } -func (c *Consumer) convertScopeMetrics( +func (c *Consumer) handleScopeMetrics( in pmetric.ScopeMetrics, + resource pcommon.Resource, baseEvent *modelpb.APMEvent, timeDelta time.Duration, out *modelpb.Batch, remainingDataPoints, remainingMetrics *int64, ) { ms := make(metricsets) + // Add the original otel metrics to the metricset. otelMetrics := in.Metrics() for i := 0; i < otelMetrics.Len(); i++ { c.addMetric(otelMetrics.At(i), ms, remainingDataPoints, remainingMetrics) } + // Handle remapping if any. Remapped metrics will be added to a new + // metric slice and then processed as any other metric in the scope. + // TODO (lahsivjar): Possible to approximate capacity of the slice? + if len(c.remappers) > 0 { + remappedMetrics := pmetric.NewMetricSlice() + for _, r := range c.remappers { + r.Remap(in, remappedMetrics, resource) + } + // Add each remapped metric to the metricset. Code assumes that + // a single datapoint is added for each metric by the library. + *remainingDataPoints += int64(remappedMetrics.Len()) + *remainingMetrics += int64(remappedMetrics.Len()) + for i := 0; i < remappedMetrics.Len(); i++ { + c.addMetric(remappedMetrics.At(i), ms, remainingDataPoints, remainingMetrics) + } + } + // Process all the metrics added to the metricset. for key, ms := range ms { event := baseEvent.CloneVT() diff --git a/input/otlp/metrics_test.go b/input/otlp/metrics_test.go index a5114898..4d886c4b 100644 --- a/input/otlp/metrics_test.go +++ b/input/otlp/metrics_test.go @@ -851,6 +851,83 @@ func TestConsumeMetricsDataStream(t *testing.T) { } } +// The below test asserts that remapped metrics are correctly processed by the +// code but does not test the correctness of the remapping libarary or if all +// the metrics are remapped as it is within the scope of the remapping library. +func TestConsumeMetricsWithOTelRemapper(t *testing.T) { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + sm := rm.ScopeMetrics().AppendEmpty() + ms := sm.Metrics() + + // Configure scope for hostmetrics receiver + sm.Scope().SetName("otelcol/hostmetricsreceiver/load") + + // Add a datapoint for a metric produced by hostmetrics receiver + ts := time.Now().UTC() + metric := ms.AppendEmpty() + metric.SetName("system.cpu.load_average.1m") + dp := metric.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + dp.SetDoubleValue(0.7) + + events, stats, results, err := transformMetrics(t, metrics) + assert.NoError(t, err) + + expected := []*modelpb.APMEvent{ + { + Service: &modelpb.Service{ + Name: "unknown", + Language: &modelpb.Language{Name: "unknown"}, + }, + Agent: &modelpb.Agent{Name: "otlp", Version: "unknown"}, + Timestamp: modelpb.FromTime(ts), + Metricset: &modelpb.Metricset{ + Name: "app", + Samples: []*modelpb.MetricsetSample{ + { + Name: "system.cpu.load_average.1m", + Type: modelpb.MetricType_METRIC_TYPE_GAUGE, + Value: 0.7, + }, + }, + }, + }, + { + Service: &modelpb.Service{ + Name: "unknown", + Language: &modelpb.Language{Name: "unknown"}, + }, + Agent: &modelpb.Agent{Name: "otlp", Version: "unknown"}, + Timestamp: modelpb.FromTime(ts), + Metricset: &modelpb.Metricset{ + Name: "app", + Samples: []*modelpb.MetricsetSample{ + { + Name: "system.load.1", + Type: modelpb.MetricType_METRIC_TYPE_GAUGE, + Value: 0.7, + }, + { + Name: "system.load.5", + Type: modelpb.MetricType_METRIC_TYPE_GAUGE, + }, + { + Name: "system.load.15", + Type: modelpb.MetricType_METRIC_TYPE_GAUGE, + }, + }, + }, + Labels: map[string]*modelpb.LabelValue{ + "event.provider": &modelpb.LabelValue{Value: "hostmetrics"}, + }, + }, + } + eventsMatch(t, expected, events) + assert.Equal(t, otlp.ConsumerStats{}, stats) + assert.Equal(t, otlp.ConsumeMetricsResult{}, results) +} + /* TODO func TestMetricsLogging(t *testing.T) { for _, level := range []logp.Level{logp.InfoLevel, logp.DebugLevel} { @@ -873,8 +950,9 @@ func transformMetrics(t *testing.T, metrics pmetric.Metrics) ([]*modelpb.APMEven recorder := batchRecorderBatchProcessor(&batches) consumer := otlp.NewConsumer(otlp.ConsumerConfig{ - Processor: recorder, - Semaphore: semaphore.NewWeighted(100), + Processor: recorder, + Semaphore: semaphore.NewWeighted(100), + RemapOTelMetrics: true, }) result, err := consumer.ConsumeMetricsWithResult(context.Background(), metrics) require.Len(t, batches, 1) diff --git a/model/modelprocessor/datastream.go b/model/modelprocessor/datastream.go index b7f14a59..8f41347e 100644 --- a/model/modelprocessor/datastream.go +++ b/model/modelprocessor/datastream.go @@ -141,10 +141,18 @@ func metricsetDataset(event *modelpb.APMEvent) string { // metrics that we don't already know about; otherwise they will end // up creating service-specific data streams. internal := true - for _, s := range event.Metricset.Samples { - if !IsInternalMetricName(s.Name) { - internal = false - break + + // set internal to false for metrics translated using OTel remappers. + if label, ok := event.Labels["event.provider"]; ok && label != nil { + internal = !(label.Value == "hostmetrics") + } + + if internal { + for _, s := range event.Metricset.Samples { + if !IsInternalMetricName(s.Name) { + internal = false + break + } } } if internal { diff --git a/model/modelprocessor/datastream_test.go b/model/modelprocessor/datastream_test.go index 572bc544..18be5578 100644 --- a/model/modelprocessor/datastream_test.go +++ b/model/modelprocessor/datastream_test.go @@ -196,6 +196,34 @@ func TestSetDataStream(t *testing.T) { }, }, output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"}, + }, { + input: &modelpb.APMEvent{ + Agent: &modelpb.Agent{Name: "otel"}, + Service: &modelpb.Service{Name: "service-name"}, + Metricset: &modelpb.Metricset{ + Samples: []*modelpb.MetricsetSample{ + {Name: "system.memory.total"}, + }, + }, + Labels: map[string]*modelpb.LabelValue{ + "event.provider": &modelpb.LabelValue{Value: "hostmetrics"}, // otel translated hostmetrics + }, + }, + output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.app.service_name", Namespace: "custom"}, + }, { + input: &modelpb.APMEvent{ + Agent: &modelpb.Agent{Name: "otel"}, + Service: &modelpb.Service{Name: "service-name"}, + Metricset: &modelpb.Metricset{ + Samples: []*modelpb.MetricsetSample{ + {Name: "system.memory.total"}, + }, + }, + Labels: map[string]*modelpb.LabelValue{ + "event.provider": &modelpb.LabelValue{Value: "kernel"}, + }, + }, + output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"}, }} for _, test := range tests {