Skip to content

Commit

Permalink
Remap otel hostmetrics to elastic metrics using opentelemetry-lib
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed May 29, 2024
1 parent f171078 commit 7996be4
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 18 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
module github.com/elastic/apm-data

go 1.21
go 1.21.1

require (
github.com/elastic/opentelemetry-lib v0.0.0-20240520143123-3234f90c8fca
github.com/google/go-cmp v0.6.0
github.com/jaegertracing/jaeger v1.56.0
github.com/json-iterator/go v1.1.12
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0=
github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss=
github.com/elastic/opentelemetry-lib v0.0.0-20240520143123-3234f90c8fca h1:LM2sFnvnkQP9txkkdMr7kVQiKGtWNJl+yZECsEGacCA=
github.com/elastic/opentelemetry-lib v0.0.0-20240520143123-3234f90c8fca/go.mod h1:/kKvHbJLVo/NcKMPHI8/RZKL64fushmnRUzn+arQpjg=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down
29 changes: 24 additions & 5 deletions input/otlp/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@ import (

"github.com/elastic/apm-data/input"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/opentelemetry-lib/remappers/hostmetrics"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type remapper interface {
Remap(pmetric.ScopeMetrics, pmetric.MetricSlice, pcommon.Resource)
}

// ConsumerConfig holds configuration for Consumer.
type ConsumerConfig struct {
// Logger holds a logger for the consumer. If this is nil, then
Expand All @@ -39,14 +46,19 @@ type ConsumerConfig struct {
// Semaphore holds a semaphore on which Processor.HandleStream will acquire a
// token before proceeding, to limit concurrency.
Semaphore input.Semaphore

// RemapOTelMetrics remaps certain OpenTelemetry metrics to elastic metrics.
// Note that both, OTel and Elastic, metrics would be published.
RemapOTelMetrics bool
}

// Consumer transforms OpenTelemetry data to the Elastic APM data model,
// sending each payload as a batch to the configured BatchProcessor.
type Consumer struct {
sem input.Semaphore
config ConsumerConfig
stats consumerStats
config ConsumerConfig
sem input.Semaphore
remappers []remapper
stats consumerStats
}

// NewConsumer returns a new Consumer with the given configuration.
Expand All @@ -56,9 +68,16 @@ func NewConsumer(config ConsumerConfig) *Consumer {
} else {
config.Logger = config.Logger.Named("otel")
}
var remappers []remapper
if config.RemapOTelMetrics {
remappers = []remapper{
hostmetrics.NewRemapper(config.Logger),
}
}
return &Consumer{
config: config,
sem: config.Semaphore,
config: config,
sem: config.Semaphore,
remappers: remappers,
}
}

Expand Down
31 changes: 25 additions & 6 deletions input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric
remainingMetrics := totalMetrics
receiveTimestamp := time.Now()
c.config.Logger.Debug("consuming metrics", zap.Stringer("metrics", metricsStringer(metrics)))
batch := c.convertMetrics(metrics, receiveTimestamp, &remainingDataPoints, &remainingMetrics)
batch := c.handleMetrics(metrics, receiveTimestamp, &remainingDataPoints, &remainingMetrics)
if remainingMetrics > 0 {
// Some metrics remained after conversion, meaning that they were dropped.
atomic.AddInt64(&c.stats.unsupportedMetricsDropped, remainingMetrics)
Expand All @@ -93,20 +93,20 @@ func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric
}, nil
}

func (c *Consumer) convertMetrics(
func (c *Consumer) handleMetrics(
metrics pmetric.Metrics,
receiveTimestamp time.Time,
remainingDataPoints, remainingMetrics *int64,
) (batch *modelpb.Batch) {
batch = &modelpb.Batch{}
resourceMetrics := metrics.ResourceMetrics()
for i := 0; i < resourceMetrics.Len(); i++ {
c.convertResourceMetrics(resourceMetrics.At(i), receiveTimestamp, batch, remainingDataPoints, remainingMetrics)
c.handleResourceMetrics(resourceMetrics.At(i), receiveTimestamp, batch, remainingDataPoints, remainingMetrics)
}
return
}

func (c *Consumer) convertResourceMetrics(
func (c *Consumer) handleResourceMetrics(
resourceMetrics pmetric.ResourceMetrics,
receiveTimestamp time.Time,
out *modelpb.Batch,
Expand All @@ -125,23 +125,42 @@ func (c *Consumer) convertResourceMetrics(
}
scopeMetrics := resourceMetrics.ScopeMetrics()
for i := 0; i < scopeMetrics.Len(); i++ {
c.convertScopeMetrics(scopeMetrics.At(i), baseEvent, timeDelta, out, remainingDataPoints, remainingMetrics)
c.handleScopeMetrics(scopeMetrics.At(i), resource, baseEvent, timeDelta, out, remainingDataPoints, remainingMetrics)
}
return
}

func (c *Consumer) convertScopeMetrics(
func (c *Consumer) handleScopeMetrics(
in pmetric.ScopeMetrics,
resource pcommon.Resource,
baseEvent *modelpb.APMEvent,
timeDelta time.Duration,
out *modelpb.Batch,
remainingDataPoints, remainingMetrics *int64,
) {
ms := make(metricsets)
// Add the original otel metrics to the metricset.
otelMetrics := in.Metrics()
for i := 0; i < otelMetrics.Len(); i++ {
c.addMetric(otelMetrics.At(i), ms, remainingDataPoints, remainingMetrics)
}
// Handle remapping if any. Remapped metrics will be added to a new
// metric slice and then processed as any other metric in the scope.
// TODO (lahsivjar): Possible to approximate capacity of the slice?
if len(c.remappers) > 0 {
remappedMetrics := pmetric.NewMetricSlice()
for _, r := range c.remappers {
r.Remap(in, remappedMetrics, resource)
}
// Add each remapped metric to the metricset. Code assumes that
// a single datapoint is added for each metric by the library.
*remainingDataPoints += int64(remappedMetrics.Len())
*remainingMetrics += int64(remappedMetrics.Len())
for i := 0; i < remappedMetrics.Len(); i++ {
c.addMetric(remappedMetrics.At(i), ms, remainingDataPoints, remainingMetrics)
}
}
// Process all the metrics added to the metricset.
for key, ms := range ms {
event := baseEvent.CloneVT()

Expand Down
82 changes: 80 additions & 2 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,83 @@ func TestConsumeMetricsDataStream(t *testing.T) {
}
}

// The below test asserts that remapped metrics are correctly processed by the
// code but does not test the correctness of the remapping libarary or if all
// the metrics are remapped as it is within the scope of the remapping library.
func TestConsumeMetricsWithOTelRemapper(t *testing.T) {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
sm := rm.ScopeMetrics().AppendEmpty()
ms := sm.Metrics()

// Configure scope for hostmetrics receiver
sm.Scope().SetName("otelcol/hostmetricsreceiver/load")

// Add a datapoint for a metric produced by hostmetrics receiver
ts := time.Now().UTC()
metric := ms.AppendEmpty()
metric.SetName("system.cpu.load_average.1m")
dp := metric.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.SetDoubleValue(0.7)

events, stats, results, err := transformMetrics(t, metrics)
assert.NoError(t, err)

expected := []*modelpb.APMEvent{
{
Service: &modelpb.Service{
Name: "unknown",
Language: &modelpb.Language{Name: "unknown"},
},
Agent: &modelpb.Agent{Name: "otlp", Version: "unknown"},
Timestamp: modelpb.FromTime(ts),
Metricset: &modelpb.Metricset{
Name: "app",
Samples: []*modelpb.MetricsetSample{
{
Name: "system.cpu.load_average.1m",
Type: modelpb.MetricType_METRIC_TYPE_GAUGE,
Value: 0.7,
},
},
},
},
{
Service: &modelpb.Service{
Name: "unknown",
Language: &modelpb.Language{Name: "unknown"},
},
Agent: &modelpb.Agent{Name: "otlp", Version: "unknown"},
Timestamp: modelpb.FromTime(ts),
Metricset: &modelpb.Metricset{
Name: "app",
Samples: []*modelpb.MetricsetSample{
{
Name: "system.load.1",
Type: modelpb.MetricType_METRIC_TYPE_GAUGE,
Value: 0.7,
},
{
Name: "system.load.5",
Type: modelpb.MetricType_METRIC_TYPE_GAUGE,
},
{
Name: "system.load.15",
Type: modelpb.MetricType_METRIC_TYPE_GAUGE,
},
},
},
Labels: map[string]*modelpb.LabelValue{
"event.provider": &modelpb.LabelValue{Value: "hostmetrics"},
},
},
}
eventsMatch(t, expected, events)
assert.Equal(t, otlp.ConsumerStats{}, stats)
assert.Equal(t, otlp.ConsumeMetricsResult{}, results)
}

/* TODO
func TestMetricsLogging(t *testing.T) {
for _, level := range []logp.Level{logp.InfoLevel, logp.DebugLevel} {
Expand All @@ -873,8 +950,9 @@ func transformMetrics(t *testing.T, metrics pmetric.Metrics) ([]*modelpb.APMEven
recorder := batchRecorderBatchProcessor(&batches)

consumer := otlp.NewConsumer(otlp.ConsumerConfig{
Processor: recorder,
Semaphore: semaphore.NewWeighted(100),
Processor: recorder,
Semaphore: semaphore.NewWeighted(100),
RemapOTelMetrics: true,
})
result, err := consumer.ConsumeMetricsWithResult(context.Background(), metrics)
require.Len(t, batches, 1)
Expand Down
16 changes: 12 additions & 4 deletions model/modelprocessor/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,18 @@ func metricsetDataset(event *modelpb.APMEvent) string {
// metrics that we don't already know about; otherwise they will end
// up creating service-specific data streams.
internal := true
for _, s := range event.Metricset.Samples {
if !IsInternalMetricName(s.Name) {
internal = false
break

// set internal to false for metrics translated using OTel remappers.
if label, ok := event.Labels["event.provider"]; ok && label != nil {
internal = !(label.Value == "hostmetrics")
}

if internal {
for _, s := range event.Metricset.Samples {
if !IsInternalMetricName(s.Name) {
internal = false
break
}
}
}
if internal {
Expand Down
28 changes: 28 additions & 0 deletions model/modelprocessor/datastream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,34 @@ func TestSetDataStream(t *testing.T) {
},
},
output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"},
}, {
input: &modelpb.APMEvent{
Agent: &modelpb.Agent{Name: "otel"},
Service: &modelpb.Service{Name: "service-name"},
Metricset: &modelpb.Metricset{
Samples: []*modelpb.MetricsetSample{
{Name: "system.memory.total"},
},
},
Labels: map[string]*modelpb.LabelValue{
"event.provider": &modelpb.LabelValue{Value: "hostmetrics"}, // otel translated hostmetrics
},
},
output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.app.service_name", Namespace: "custom"},
}, {
input: &modelpb.APMEvent{
Agent: &modelpb.Agent{Name: "otel"},
Service: &modelpb.Service{Name: "service-name"},
Metricset: &modelpb.Metricset{
Samples: []*modelpb.MetricsetSample{
{Name: "system.memory.total"},
},
},
Labels: map[string]*modelpb.LabelValue{
"event.provider": &modelpb.LabelValue{Value: "kernel"},
},
},
output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"},
}}

for _, test := range tests {
Expand Down

0 comments on commit 7996be4

Please sign in to comment.