Skip to content

Commit

Permalink
Adjust code to opentelemetry-collector changes
Browse files Browse the repository at this point in the history
Mostly just adjusting to API changes, but there are
some functional changes related to Jaeger span events.
open-telemetry/opentelemetry-collector-contrib#10273
uncovered an issue where we were recording the "event" tag
as a label, when we should have been recording it as the
"message" field in some circumstances.
  • Loading branch information
axw committed Aug 1, 2022
1 parent b85cc21 commit c2a0163
Show file tree
Hide file tree
Showing 21 changed files with 722 additions and 735 deletions.
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ https://github.com/elastic/apm-server/compare/8.5\...main[View commits]

[float]
==== Bug fixes
- Set `message` instead of `labels.event` for Jaeger span events {pull}8765[8765]

[float]
==== Intake API Changes
Expand Down
21 changes: 6 additions & 15 deletions internal/.otel_collector_mixin/receiver/otlpreceiver/mixin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"context"
"net/http"

"go.opentelemetry.io/collector/component/componenterror"

"go.opentelemetry.io/otel/metric"
apitrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
Expand All @@ -29,7 +27,9 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace"
Expand Down Expand Up @@ -62,51 +62,42 @@ type HTTPHandlers struct {
// RegisterGRPCTraceReceiver registers the trace receiver with a gRPC server.
func RegisterGRPCTraceReceiver(ctx context.Context, consumer consumer.Traces, serverGRPC *grpc.Server) error {
receiver := trace.New(config.NewComponentID("otlp"), consumer, settings)
otlpgrpc.RegisterTracesServer(serverGRPC, receiver)
ptraceotlp.RegisterServer(serverGRPC, receiver)
return nil
}

// RegisterGRPCMetricsReceiver registers the metrics receiver with a gRPC server.
func RegisterGRPCMetricsReceiver(ctx context.Context, consumer consumer.Metrics, serverGRPC *grpc.Server) error {
receiver := metrics.New(config.NewComponentID("otlp"), consumer, settings)
otlpgrpc.RegisterMetricsServer(serverGRPC, receiver)
pmetricotlp.RegisterServer(serverGRPC, receiver)
return nil
}

// RegisterGRPCLogsReceiver registers the logs receiver with a gRPC server.
func RegisterGRPCLogsReceiver(ctx context.Context, consumer consumer.Logs, serverGRPC *grpc.Server) error {
receiver := logs.New(config.NewComponentID("otlp"), consumer, settings)
otlpgrpc.RegisterLogsServer(serverGRPC, receiver)
plogotlp.RegisterServer(serverGRPC, receiver)
return nil
}

// HTTP Receivers

func TracesHTTPHandler(ctx context.Context, consumer consumer.Traces) (http.HandlerFunc, error) {
receiver := trace.New(config.NewComponentID("otlp"), consumer, settings)
if consumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
return func(w http.ResponseWriter, r *http.Request) {
handleTraces(w, r, receiver, pbEncoder)
}, nil
}

func MetricsHTTPHandler(ctx context.Context, consumer consumer.Metrics) (http.HandlerFunc, error) {
receiver := metrics.New(config.NewComponentID("otlp"), consumer, settings)
if consumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
return func(w http.ResponseWriter, r *http.Request) {
handleMetrics(w, r, receiver, pbEncoder)
}, nil
}

func LogsHTTPHandler(ctx context.Context, consumer consumer.Logs) (http.HandlerFunc, error) {
receiver := logs.New(config.NewComponentID("otlp"), consumer, settings)
if consumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
return func(w http.ResponseWriter, r *http.Request) {
handleLogs(w, r, receiver, pbEncoder)
}, nil
Expand Down
5 changes: 4 additions & 1 deletion internal/beater/jaeger/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ func (c *grpcCollector) PostSpans(ctx context.Context, r *api_v2.PostSpansReques
func (c *grpcCollector) postSpans(ctx context.Context, batch jaegermodel.Batch) error {
spanCount := int64(len(batch.Spans))
gRPCCollectorMonitoringMap.add(request.IDEventReceivedCount, spanCount)
traces := jaegertranslator.ProtoBatchToInternalTraces(batch)
traces, err := jaegertranslator.ProtoToTraces([]*jaegermodel.Batch{&batch})
if err != nil {
return err
}
return c.consumer.ConsumeTraces(ctx, traces)
}

Expand Down
17 changes: 9 additions & 8 deletions internal/beater/jaeger/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -97,17 +98,17 @@ func TestPostSpans(t *testing.T) {
}

func newPostSpansRequest(t *testing.T) *api_v2.PostSpansRequest {
traces := pdata.NewTraces()
traces := ptrace.NewTraces()
resourceSpans := traces.ResourceSpans().AppendEmpty()
spans := resourceSpans.InstrumentationLibrarySpans().AppendEmpty()
spans := resourceSpans.ScopeSpans().AppendEmpty()
span0 := spans.Spans().AppendEmpty()
span0.SetTraceID(pdata.NewTraceID([16]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}))
span0.SetSpanID(pdata.NewSpanID([8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}))
span0.SetTraceID(pcommon.NewTraceID([16]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}))
span0.SetSpanID(pcommon.NewSpanID([8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}))
span1 := spans.Spans().AppendEmpty()
span1.SetTraceID(pdata.NewTraceID([16]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}))
span1.SetSpanID(pdata.NewSpanID([8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}))
span1.SetTraceID(pcommon.NewTraceID([16]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}))
span1.SetSpanID(pcommon.NewSpanID([8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}))

batches, err := jaegertranslator.InternalTracesToJaegerProto(traces)
batches, err := jaegertranslator.ProtoFromTraces(traces)
require.NoError(t, err)
require.Len(t, batches, 1)
return &api_v2.PostSpansRequest{Batch: *batches[0]}
Expand Down
38 changes: 19 additions & 19 deletions internal/beater/otlp/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
Expand All @@ -47,18 +51,17 @@ func TestConsumeTracesGRPC(t *testing.T) {
}

conn := newGRPCServer(t, batchProcessor)
client := otlpgrpc.NewTracesClient(conn)
client := ptraceotlp.NewClient(conn)

// Send a minimal trace to verify that everything is connected properly.
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
traces := pdata.NewTraces()
span := traces.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty()
traces := ptrace.NewTraces()
span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetName("operation_name")

tracesRequest := otlpgrpc.NewTracesRequest()
tracesRequest.SetTraces(traces)
tracesRequest := ptraceotlp.NewRequestFromTraces(traces)
_, err := client.Export(context.Background(), tracesRequest)
assert.NoError(t, err)
require.Len(t, batches, 1)
Expand Down Expand Up @@ -94,20 +97,19 @@ func TestConsumeMetricsGRPC(t *testing.T) {
}

conn := newGRPCServer(t, batchProcessor)
client := otlpgrpc.NewMetricsClient(conn)
client := pmetricotlp.NewClient(conn)

// Send a minimal metric to verify that everything is connected properly.
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
metrics := pdata.NewMetrics()
metric := metrics.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics().AppendEmpty()
metrics := pmetric.NewMetrics()
metric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric.SetName("metric_type")
metric.SetDataType(pdata.MetricDataTypeSummary)
metric.SetDataType(pmetric.MetricDataTypeSummary)
metric.Summary().DataPoints().AppendEmpty()

metricsRequest := otlpgrpc.NewMetricsRequest()
metricsRequest.SetMetrics(metrics)
metricsRequest := pmetricotlp.NewRequestFromMetrics(metrics)
_, err := client.Export(context.Background(), metricsRequest)
assert.NoError(t, err)

Expand Down Expand Up @@ -144,18 +146,16 @@ func TestConsumeLogsGRPC(t *testing.T) {
}

conn := newGRPCServer(t, batchProcessor)
client := otlpgrpc.NewLogsClient(conn)
client := plogotlp.NewClient(conn)

// Send a minimal log record to verify that everything is connected properly.
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
logs := pdata.NewLogs()
logRecord := logs.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty().LogRecords().AppendEmpty()
logRecord.SetName("log_name")
logs := plog.NewLogs()
logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

logsRequest := otlpgrpc.NewLogsRequest()
logsRequest.SetLogs(logs)
logsRequest := plogotlp.NewRequestFromLogs(logs)
_, err := client.Export(context.Background(), logsRequest)
assert.NoError(t, err)
require.Len(t, batches, 1)
Expand Down
38 changes: 19 additions & 19 deletions internal/beater/otlp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"

"github.com/elastic/apm-server/internal/agentcfg"
"github.com/elastic/apm-server/internal/beater/api"
Expand All @@ -54,13 +58,12 @@ func TestConsumeTracesHTTP(t *testing.T) {
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
traces := pdata.NewTraces()
span := traces.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty()
traces := ptrace.NewTraces()
span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetName("operation_name")

tracesRequest := otlpgrpc.NewTracesRequest()
tracesRequest.SetTraces(traces)
request, err := tracesRequest.Marshal()
tracesRequest := ptraceotlp.NewRequestFromTraces(traces)
request, err := tracesRequest.MarshalProto()
assert.NoError(t, err)
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/v1/traces", addr), bytes.NewReader(request))
assert.NoError(t, err)
Expand Down Expand Up @@ -98,15 +101,14 @@ func TestConsumeMetricsHTTP(t *testing.T) {
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
metrics := pdata.NewMetrics()
metric := metrics.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics().AppendEmpty()
metrics := pmetric.NewMetrics()
metric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
metric.SetName("metric_type")
metric.SetDataType(pdata.MetricDataTypeSummary)
metric.SetDataType(pmetric.MetricDataTypeSummary)
metric.Summary().DataPoints().AppendEmpty()

metricsRequest := otlpgrpc.NewMetricsRequest()
metricsRequest.SetMetrics(metrics)
request, err := metricsRequest.Marshal()
metricsRequest := pmetricotlp.NewRequestFromMetrics(metrics)
request, err := metricsRequest.MarshalProto()
assert.NoError(t, err)
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/v1/metrics", addr), bytes.NewReader(request))
assert.NoError(t, err)
Expand Down Expand Up @@ -146,13 +148,11 @@ func TestConsumeLogsHTTP(t *testing.T) {
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
logs := pdata.NewLogs()
logRecord := logs.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty().LogRecords().AppendEmpty()
logRecord.SetName("log_name")
logs := plog.NewLogs()
logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

logsRequest := otlpgrpc.NewLogsRequest()
logsRequest.SetLogs(logs)
request, err := logsRequest.Marshal()
logsRequest := plogotlp.NewRequestFromLogs(logs)
request, err := logsRequest.MarshalProto()
assert.NoError(t, err)
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/v1/logs", addr), bytes.NewReader(request))
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit c2a0163

Please sign in to comment.