Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for consuming OTLP/gRPC metrics #4722

Merged
merged 4 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 46 additions & 7 deletions beater/otlp/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver/metrics"
"go.opentelemetry.io/collector/receiver/otlpreceiver/trace"
"google.golang.org/grpc"

Expand All @@ -38,8 +39,10 @@ var (
request.IDRequestCount, request.IDResponseCount, request.IDResponseErrorsCount, request.IDResponseValidCount,
}

gRPCConsumerRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.consumer")
gRPCConsumerMonitoringMap = request.MonitoringMapForRegistry(gRPCConsumerRegistry, monitoringKeys)
gRPCMetricsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.metrics")
gRPCMetricsMonitoringMap = request.MonitoringMapForRegistry(gRPCMetricsRegistry, monitoringKeys)
gRPCTracesRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.traces")
gRPCTracesMonitoringMap = request.MonitoringMapForRegistry(gRPCTracesRegistry, monitoringKeys)
)

// RegisterGRPCServices registers OTLP consumer services with the given gRPC server.
Expand All @@ -48,11 +51,24 @@ func RegisterGRPCServices(grpcServer *grpc.Server, reporter publish.Reporter, lo
consumer: &otel.Consumer{Reporter: reporter},
logger: logger,
}
// TODO(axw) add support for metrics to processer/otel.Consumer, and register a metrics receiver here.

// TODO(axw) rather than registering and unregistering monitoring callbacks
// each time a new consumer is created, we should register one callback and
// have it aggregate metrics from the dynamic consumers.
//
// For now, we take the easy way out: we only have one OTLP gRPC service
// running at any time, so just unregister/register a new one.
gRPCMetricsRegistry.Remove("consumer")
monitoring.NewFunc(gRPCMetricsRegistry, "consumer", consumer.collectMetricsMonitoring, monitoring.Report)

traceReceiver := trace.New("otlp", consumer)
metricsReceiver := metrics.New("otlp", consumer)
if err := otlpreceiver.RegisterTraceReceiver(context.Background(), traceReceiver, grpcServer, nil); err != nil {
return errors.Wrap(err, "failed to register OTLP trace receiver")
}
if err := otlpreceiver.RegisterMetricsReceiver(context.Background(), metricsReceiver, grpcServer, nil); err != nil {
return errors.Wrap(err, "failed to register OTLP metrics receiver")
}
return nil
}

Expand All @@ -63,13 +79,36 @@ type monitoredConsumer struct {

// ConsumeTraces consumes OpenTelemtry trace data.
func (c *monitoredConsumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error {
gRPCConsumerMonitoringMap[request.IDRequestCount].Inc()
defer gRPCConsumerMonitoringMap[request.IDResponseCount].Inc()
gRPCTracesMonitoringMap[request.IDRequestCount].Inc()
defer gRPCTracesMonitoringMap[request.IDResponseCount].Inc()
if err := c.consumer.ConsumeTraces(ctx, traces); err != nil {
gRPCConsumerMonitoringMap[request.IDResponseErrorsCount].Inc()
gRPCTracesMonitoringMap[request.IDResponseErrorsCount].Inc()
c.logger.With(logp.Error(err)).Error("ConsumeTraces returned an error")
return err
}
gRPCConsumerMonitoringMap[request.IDResponseValidCount].Inc()
gRPCTracesMonitoringMap[request.IDResponseValidCount].Inc()
return nil
}

// ConsumeMetrics consumes OpenTelemtry metrics data.
func (c *monitoredConsumer) ConsumeMetrics(ctx context.Context, metrics pdata.Metrics) error {
gRPCMetricsMonitoringMap[request.IDRequestCount].Inc()
defer gRPCMetricsMonitoringMap[request.IDResponseCount].Inc()
if err := c.consumer.ConsumeMetrics(ctx, metrics); err != nil {
gRPCMetricsMonitoringMap[request.IDResponseErrorsCount].Inc()
c.logger.With(logp.Error(err)).Error("ConsumeMetrics returned an error")
return err
}
gRPCMetricsMonitoringMap[request.IDResponseValidCount].Inc()
return nil
}

func (c *monitoredConsumer) collectMetricsMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
V.OnRegistryFinished()

stats := c.consumer.Stats()
monitoring.ReportNamespace(V, "consumer", func() {
monitoring.ReportInt(V, "unsupported_dropped", stats.UnsupportedMetricsDropped)
})
}
81 changes: 78 additions & 3 deletions beater/otlp/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import (
)

var (
exportTraceServiceRequestType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest")
exportTraceServiceResponseType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse")
exportMetricsServiceRequestType = proto.MessageType("opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest")
exportMetricsServiceResponseType = proto.MessageType("opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse")
exportTraceServiceRequestType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest")
exportTraceServiceResponseType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse")
)

func TestConsumeTraces(t *testing.T) {
Expand Down Expand Up @@ -93,7 +95,7 @@ func TestConsumeTraces(t *testing.T) {
assert.Len(t, events, 2)

actual := map[string]interface{}{}
monitoring.GetRegistry("apm-server.otlp.grpc.consumer").Do(monitoring.Full, func(key string, value interface{}) {
monitoring.GetRegistry("apm-server.otlp.grpc.traces").Do(monitoring.Full, func(key string, value interface{}) {
actual[key] = value
})
assert.Equal(t, map[string]interface{}{
Expand All @@ -104,6 +106,65 @@ func TestConsumeTraces(t *testing.T) {
}, actual)
}

func TestConsumeMetrics(t *testing.T) {
var reportError error
report := func(ctx context.Context, req publish.PendingReq) error {
return reportError
}

// Send a minimal metric to verify that everything is connected properly.
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
cannedRequest := jsonExportMetricsServiceRequest(`{
"resource_metrics": [
{
"instrumentation_library_metrics": [
{
"metrics": [
{
"name": "metric_name"
}
]
}
]
}
]
}`)

conn := newServer(t, report)
err := conn.Invoke(
context.Background(), "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
cannedRequest, newExportMetricsServiceResponse(),
)
assert.NoError(t, err)

reportError = errors.New("failed to publish events")
err = conn.Invoke(
context.Background(), "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
cannedRequest, newExportMetricsServiceResponse(),
)
assert.Error(t, err)
errStatus := status.Convert(err)
assert.Equal(t, "failed to publish events", errStatus.Message())

actual := map[string]interface{}{}
monitoring.GetRegistry("apm-server.otlp.grpc.metrics").Do(monitoring.Full, func(key string, value interface{}) {
actual[key] = value
})
assert.Equal(t, map[string]interface{}{
// In both of the requests we send above,
// the metrics do not have a type and so
// we treat them as unsupported metrics.
"consumer.unsupported_dropped": int64(2),

"request.count": int64(2),
"response.count": int64(2),
"response.errors.count": int64(1),
"response.valid.count": int64(1),
}, actual)
}

func jsonExportTraceServiceRequest(j string) interface{} {
request := reflect.New(exportTraceServiceRequestType.Elem()).Interface()
decoder := json.NewDecoder(strings.NewReader(j))
Expand All @@ -118,6 +179,20 @@ func newExportTraceServiceResponse() interface{} {
return reflect.New(exportTraceServiceResponseType.Elem()).Interface()
}

func jsonExportMetricsServiceRequest(j string) interface{} {
request := reflect.New(exportMetricsServiceRequestType.Elem()).Interface()
decoder := json.NewDecoder(strings.NewReader(j))
decoder.DisallowUnknownFields()
if err := decoder.Decode(request); err != nil {
panic(err)
}
return request
}

func newExportMetricsServiceResponse() interface{} {
return reflect.New(exportMetricsServiceResponseType.Elem()).Interface()
}

func newServer(t *testing.T, report publish.Reporter) *grpc.ClientConn {
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ https://github.com/elastic/apm-server/compare/7.11\...master[View commits]
* Support for reloading config in Fleet mode, gracefully stopping the HTTP server and starting a new one {pull}4623[4623]
* Add a `_doc_count` field to transaction histogram docs {pull}4647[4647]
* Upgrade Go to 1.15.7 {pull}4663[4663]
* OpenTelemetry Protocol (OTLP) over gRPC is now supported on the standard endpoint (8200) {pull}4677[4677]
* OpenTelemetry Protocol (OTLP) over gRPC is now supported on the standard endpoint (8200) {pull}4677[4677] {pull}4722[4722]
* Data stream and ILM policy for tail-based sampling {pull}4707[4707]

[float]
Expand Down
23 changes: 23 additions & 0 deletions processor/otel/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"net/url"
"strconv"
"strings"
"sync/atomic"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
Expand All @@ -65,9 +66,31 @@ const (

// Consumer transforms open-telemetry data to be compatible with elastic APM data
type Consumer struct {
stats consumerStats

Reporter publish.Reporter
}

// ConsumerStats holds a snapshot of statistics about data consumption.
type ConsumerStats struct {
// UnsupportedMetricsDropped records the number of unsupported metrics
// that have been dropped by the consumer.
UnsupportedMetricsDropped int64
}

// consumerStats holds the current statistics, which must be accessed and
// modified using atomic operations.
type consumerStats struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused about this, why both consumerStats and ConsumerStats are needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumerStats might change at any time, ConsumerStats is a snapshot. I'll add a comment.

unsupportedMetricsDropped int64
}

// Stats returns a snapshot of the current statistics about data consumption.
func (c *Consumer) Stats() ConsumerStats {
return ConsumerStats{
UnsupportedMetricsDropped: atomic.LoadInt64(&c.stats.unsupportedMetricsDropped),
}
}

// ConsumeTraces consumes OpenTelemetry trace data,
// converting into Elastic APM events and reporting to the Elastic APM schema.
func (c *Consumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error {
Expand Down
Loading