Skip to content

Commit

Permalink
Remap otel hostmetrics to elastic metrics using opentelemetry-lib
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed May 17, 2024
1 parent 2c1119f commit 0f968a9
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 16 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
module github.com/elastic/apm-data

go 1.21
go 1.21.1

require (
github.com/elastic/opentelemetry-lib v0.0.0-20240517090306-161b0f72476b
github.com/google/go-cmp v0.6.0
github.com/jaegertracing/jaeger v1.56.0
github.com/json-iterator/go v1.1.12
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0=
github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss=
github.com/elastic/opentelemetry-lib v0.0.0-20240517090306-161b0f72476b h1://FNKIpIdBhiHx4/B5xxZsyFurGVF/6+z19etoiogZ0=
github.com/elastic/opentelemetry-lib v0.0.0-20240517090306-161b0f72476b/go.mod h1:/kKvHbJLVo/NcKMPHI8/RZKL64fushmnRUzn+arQpjg=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down
29 changes: 24 additions & 5 deletions input/otlp/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@ import (

"github.com/elastic/apm-data/input"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/opentelemetry-lib/remappers/hostmetrics"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

type remapper interface {
Remap(pmetric.ScopeMetrics, pmetric.MetricSlice, pcommon.Resource)
}

// ConsumerConfig holds configuration for Consumer.
type ConsumerConfig struct {
// Logger holds a logger for the consumer. If this is nil, then
Expand All @@ -39,14 +46,19 @@ type ConsumerConfig struct {
// Semaphore holds a semaphore on which Processor.HandleStream will acquire a
// token before proceeding, to limit concurrency.
Semaphore input.Semaphore

// RemapOTelMetrics remaps certain OpenTelemetry metrics to elastic metrics.
// Note that both, OTel and Elastic, metrics would be published.
RemapOTelMetrics bool
}

// Consumer transforms OpenTelemetry data to the Elastic APM data model,
// sending each payload as a batch to the configured BatchProcessor.
type Consumer struct {
sem input.Semaphore
config ConsumerConfig
stats consumerStats
config ConsumerConfig
sem input.Semaphore
remappers []remapper
stats consumerStats
}

// NewConsumer returns a new Consumer with the given configuration.
Expand All @@ -56,9 +68,16 @@ func NewConsumer(config ConsumerConfig) *Consumer {
} else {
config.Logger = config.Logger.Named("otel")
}
var remappers []remapper
if config.RemapOTelMetrics {
remappers = []remapper{
hostmetrics.NewRemapper(config.Logger),
}
}
return &Consumer{
config: config,
sem: config.Semaphore,
config: config,
sem: config.Semaphore,
remappers: remappers,
}
}

Expand Down
27 changes: 21 additions & 6 deletions input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric
remainingMetrics := totalMetrics
receiveTimestamp := time.Now()
c.config.Logger.Debug("consuming metrics", zap.Stringer("metrics", metricsStringer(metrics)))
batch := c.convertMetrics(metrics, receiveTimestamp, &remainingDataPoints, &remainingMetrics)
batch := c.handleMetrics(metrics, receiveTimestamp, &remainingDataPoints, &remainingMetrics)
if remainingMetrics > 0 {
// Some metrics remained after conversion, meaning that they were dropped.
atomic.AddInt64(&c.stats.unsupportedMetricsDropped, remainingMetrics)
Expand All @@ -93,20 +93,20 @@ func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric
}, nil
}

func (c *Consumer) convertMetrics(
func (c *Consumer) handleMetrics(
metrics pmetric.Metrics,
receiveTimestamp time.Time,
remainingDataPoints, remainingMetrics *int64,
) (batch *modelpb.Batch) {
batch = &modelpb.Batch{}
resourceMetrics := metrics.ResourceMetrics()
for i := 0; i < resourceMetrics.Len(); i++ {
c.convertResourceMetrics(resourceMetrics.At(i), receiveTimestamp, batch, remainingDataPoints, remainingMetrics)
c.handleResourceMetrics(resourceMetrics.At(i), receiveTimestamp, batch, remainingDataPoints, remainingMetrics)
}
return
}

func (c *Consumer) convertResourceMetrics(
func (c *Consumer) handleResourceMetrics(
resourceMetrics pmetric.ResourceMetrics,
receiveTimestamp time.Time,
out *modelpb.Batch,
Expand All @@ -125,23 +125,38 @@ func (c *Consumer) convertResourceMetrics(
}
scopeMetrics := resourceMetrics.ScopeMetrics()
for i := 0; i < scopeMetrics.Len(); i++ {
c.convertScopeMetrics(scopeMetrics.At(i), baseEvent, timeDelta, out, remainingDataPoints, remainingMetrics)
c.handleScopeMetrics(scopeMetrics.At(i), resource, baseEvent, timeDelta, out, remainingDataPoints, remainingMetrics)
}
return
}

func (c *Consumer) convertScopeMetrics(
func (c *Consumer) handleScopeMetrics(
in pmetric.ScopeMetrics,
resource pcommon.Resource,
baseEvent *modelpb.APMEvent,
timeDelta time.Duration,
out *modelpb.Batch,
remainingDataPoints, remainingMetrics *int64,
) {
ms := make(metricsets)
// Add the original otel metrics to the metricset.
otelMetrics := in.Metrics()
for i := 0; i < otelMetrics.Len(); i++ {
c.addMetric(otelMetrics.At(i), ms, remainingDataPoints, remainingMetrics)
}
// Handle remapping if any. Remapped metrics will be added to a new
// metric slice and then processed as any other metric in the scope.
// TODO (lahsivjar): Possible to approximate capacity of the slice?
if len(c.remappers) > 0 {
remappedMetrics := pmetric.NewMetricSlice()
for _, r := range c.remappers {
r.Remap(in, remappedMetrics, resource)
}
for i := 0; i < remappedMetrics.Len(); i++ {
c.addMetric(remappedMetrics.At(i), ms, remainingDataPoints, remainingMetrics)
}
}
// Process all the metrics added to the metricset.
for key, ms := range ms {
event := baseEvent.CloneVT()

Expand Down
16 changes: 12 additions & 4 deletions model/modelprocessor/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,18 @@ func metricsetDataset(event *modelpb.APMEvent) string {
// metrics that we don't already know about; otherwise they will end
// up creating service-specific data streams.
internal := true
for _, s := range event.Metricset.Samples {
if !IsInternalMetricName(s.Name) {
internal = false
break

// set internal to false for metrics translated using OTel remappers.
if label, ok := event.Labels["event.provider"]; ok && label != nil {
internal = !(label.Value == "hostmetrics")
}

if internal {
for _, s := range event.Metricset.Samples {
if !IsInternalMetricName(s.Name) {
internal = false
break
}
}
}
if internal {
Expand Down

0 comments on commit 0f968a9

Please sign in to comment.