From 3454870c60dff02777be1d6fe4f659d446fd38c1 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 18 Oct 2024 15:42:39 -0700 Subject: [PATCH] Replace probe.Event with ptrace.ScopeSpans Use the pdata module as the data-model for the project. --- instrumentation.go | 4 +- .../instrumentation/bpf/database/sql/probe.go | 65 ++-- .../bpf/database/sql/probe_test.go | 40 +-- .../segmentio/kafka-go/consumer/probe.go | 81 +++-- .../segmentio/kafka-go/consumer/probe_test.go | 42 +-- .../segmentio/kafka-go/producer/probe.go | 110 ++++--- .../segmentio/kafka-go/producer/probe_test.go | 68 ++-- .../bpf/go.opentelemetry.io/auto/sdk/probe.go | 213 +------------ .../otel/traceglobal/probe.go | 91 +++--- .../otel/traceglobal/probe_test.go | 43 +-- .../google.golang.org/grpc/client/probe.go | 97 +++--- .../google.golang.org/grpc/server/probe.go | 73 ++--- .../bpf/net/http/client/probe.go | 179 ++++++----- .../bpf/net/http/client/probe_test.go | 298 +++++++++++------- .../bpf/net/http/server/probe.go | 152 +++++---- .../bpf/net/http/server/probe_test.go | 204 ++++++------ internal/pkg/instrumentation/manager.go | 46 ++- internal/pkg/instrumentation/manager_test.go | 13 +- internal/pkg/instrumentation/probe/event.go | 40 --- internal/pkg/instrumentation/probe/probe.go | 23 +- internal/pkg/instrumentation/utils/attrs.go | 48 +++ internal/pkg/instrumentation/utils/kernel.go | 8 + internal/pkg/opentelemetry/controller.go | 264 ++++++++++++---- internal/pkg/opentelemetry/controller_test.go | 212 +++++++------ internal/pkg/opentelemetry/id_generator.go | 36 +-- 25 files changed, 1251 insertions(+), 1199 deletions(-) delete mode 100644 internal/pkg/instrumentation/probe/event.go create mode 100644 internal/pkg/instrumentation/utils/attrs.go diff --git a/instrumentation.go b/instrumentation.go index 0aeb1a800..d79ed9500 100644 --- a/instrumentation.go +++ b/instrumentation.go @@ -88,13 +88,13 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In return nil, err } - ctrl, err := opentelemetry.NewController(c.logger, c.tracerProvider(pa.BuildInfo), Version()) + ctrl, err := opentelemetry.NewController(c.logger, c.tracerProvider(pa.BuildInfo)) if err != nil { return nil, err } cp := convertConfigProvider(c.cp) - mngr, err := instrumentation.NewManager(c.logger, ctrl, c.globalImpl, c.loadIndicator, cp) + mngr, err := instrumentation.NewManager(c.logger, ctrl, c.globalImpl, c.loadIndicator, cp, Version()) if err != nil { return nil, err } diff --git a/internal/pkg/instrumentation/bpf/database/sql/probe.go b/internal/pkg/instrumentation/bpf/database/sql/probe.go index c70a2796b..68ea756cd 100644 --- a/internal/pkg/instrumentation/bpf/database/sql/probe.go +++ b/internal/pkg/instrumentation/bpf/database/sql/probe.go @@ -8,7 +8,8 @@ import ( "os" "strconv" - "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "golang.org/x/sys/unix" @@ -29,7 +30,7 @@ const ( ) // New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +func New(logger *slog.Logger, version string) probe.Probe { id := probe.ID{ SpanKind: trace.SpanKindClient, InstrumentedPkg: pkg, @@ -61,7 +62,7 @@ func New(logger *slog.Logger) probe.Probe { }, SpecFn: loadBpf, - ProcessFn: convertEvent, + ProcessFn: processFn(pkg, version, semconv.SchemaURL), } } @@ -72,40 +73,32 @@ type event struct { Query [256]byte } -func convertEvent(e *event) []*probe.SpanEvent { - query := unix.ByteSliceToString(e.Query[:]) - - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.SpanContext.TraceID, - SpanID: e.SpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - }) - - var pscPtr *trace.SpanContext - if e.ParentSpanContext.TraceID.IsValid() { - psc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.ParentSpanContext.TraceID, - SpanID: e.ParentSpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - Remote: true, - }) - pscPtr = &psc - } else { - pscPtr = nil - } +func processFn(pkg, ver, schemaURL string) func(*event) ptrace.ScopeSpans { + scopeName := "go.opentelemetry.io/auto/" + pkg + return func(e *event) ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + + scope := ss.Scope() + scope.SetName(scopeName) + scope.SetVersion(ver) + ss.SetSchemaUrl(schemaURL) + + span := ss.Spans().AppendEmpty() + span.SetName("DB") + span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime)) + span.SetTraceID(pcommon.TraceID(e.SpanContext.TraceID)) + span.SetSpanID(pcommon.SpanID(e.SpanContext.SpanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + + if e.ParentSpanContext.SpanID.IsValid() { + span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID)) + } - return []*probe.SpanEvent{ - { - SpanName: "DB", - StartTime: utils.BootOffsetToTime(e.StartTime), - EndTime: utils.BootOffsetToTime(e.EndTime), - SpanContext: &sc, - Attributes: []attribute.KeyValue{ - semconv.DBQueryText(query), - }, - ParentSpanContext: pscPtr, - TracerSchema: semconv.SchemaURL, - }, + query := unix.ByteSliceToString(e.Query[:]) + span.Attributes().PutStr(string(semconv.DBQueryTextKey), query) + + return ss } } diff --git a/internal/pkg/instrumentation/bpf/database/sql/probe_test.go b/internal/pkg/instrumentation/bpf/database/sql/probe_test.go index d718de61d..f2b9586a6 100644 --- a/internal/pkg/instrumentation/bpf/database/sql/probe_test.go +++ b/internal/pkg/instrumentation/bpf/database/sql/probe_test.go @@ -9,12 +9,12 @@ import ( "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" ) @@ -28,7 +28,8 @@ func TestProbeConvertEvent(t *testing.T) { traceID := trace.TraceID{1} spanID := trace.SpanID{1} - got := convertEvent(&event{ + const ver = "v1" + got := processFn(pkg, ver, semconv.SchemaURL)(&event{ BaseSpanProperties: context.BaseSpanProperties{ StartTime: startOffset, EndTime: endOffset, @@ -38,20 +39,21 @@ func TestProbeConvertEvent(t *testing.T) { Query: [256]byte{0x53, 0x45, 0x4c, 0x45, 0x43, 0x54, 0x20, 0x2a, 0x20, 0x46, 0x52, 0x4f, 0x4d, 0x20, 0x66, 0x6f, 0x6f}, }) - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: traceID, - SpanID: spanID, - TraceFlags: trace.FlagsSampled, - }) - want := &probe.SpanEvent{ - SpanName: "DB", - StartTime: start, - EndTime: end, - SpanContext: &sc, - Attributes: []attribute.KeyValue{ - semconv.DBQueryText("SELECT * FROM foo"), - }, - TracerSchema: semconv.SchemaURL, - } - assert.Equal(t, want, got[0]) + want := func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.Scope().SetName("go.opentelemetry.io/auto/" + pkg) + ss.Scope().SetVersion(ver) + ss.SetSchemaUrl(semconv.SchemaURL) + + span := ss.Spans().AppendEmpty() + span.SetName("DB") + span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset)) + span.SetTraceID(pcommon.TraceID(traceID)) + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + utils.Attributes(span.Attributes(), semconv.DBQueryText("SELECT * FROM foo")) + return ss + }() + assert.Equal(t, want, got) } diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go index fe2998fa7..241caff87 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go @@ -8,7 +8,8 @@ import ( "log/slog" "strconv" - "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "golang.org/x/sys/unix" @@ -27,7 +28,7 @@ const ( ) // New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +func New(logger *slog.Logger, version string) probe.Probe { id := probe.ID{ SpanKind: trace.SpanKindConsumer, InstrumentedPkg: pkg, @@ -75,7 +76,7 @@ func New(logger *slog.Logger) probe.Probe { }, }, SpecFn: loadBpf, - ProcessFn: convertEvent, + ProcessFn: processFn(pkg, version, semconv.SchemaURL), } } @@ -89,47 +90,43 @@ type event struct { Partition int64 } -func convertEvent(e *event) []*probe.SpanEvent { - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.SpanContext.TraceID, - SpanID: e.SpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - }) - - var pscPtr *trace.SpanContext - if e.ParentSpanContext.TraceID.IsValid() { - psc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.ParentSpanContext.TraceID, - SpanID: e.ParentSpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - Remote: true, - }) - pscPtr = &psc - } else { - pscPtr = nil - } +func processFn(pkg, ver, schemaURL string) func(*event) ptrace.ScopeSpans { + scopeName := "go.opentelemetry.io/auto/" + pkg + return func(e *event) ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() - topic := unix.ByteSliceToString(e.Topic[:]) + scope := ss.Scope() + scope.SetName(scopeName) + scope.SetVersion(ver) + ss.SetSchemaUrl(schemaURL) - attributes := []attribute.KeyValue{ - semconv.MessagingSystemKafka, - semconv.MessagingOperationTypeReceive, - semconv.MessagingDestinationPartitionID(strconv.Itoa(int(e.Partition))), - semconv.MessagingDestinationName(topic), - semconv.MessagingKafkaMessageOffset(int(e.Offset)), - semconv.MessagingKafkaMessageKey(unix.ByteSliceToString(e.Key[:])), - semconv.MessagingKafkaConsumerGroup(unix.ByteSliceToString(e.ConsumerGroup[:])), - } - return []*probe.SpanEvent{ - { - SpanName: kafkaConsumerSpanName(topic), - StartTime: utils.BootOffsetToTime(e.StartTime), - EndTime: utils.BootOffsetToTime(e.EndTime), - SpanContext: &sc, - ParentSpanContext: pscPtr, - Attributes: attributes, - TracerSchema: semconv.SchemaURL, - }, + span := ss.Spans().AppendEmpty() + + topic := unix.ByteSliceToString(e.Topic[:]) + span.SetName(kafkaConsumerSpanName(topic)) + + span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime)) + span.SetTraceID(pcommon.TraceID(e.SpanContext.TraceID)) + span.SetSpanID(pcommon.SpanID(e.SpanContext.SpanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + + if e.ParentSpanContext.SpanID.IsValid() { + span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID)) + } + + utils.Attributes( + span.Attributes(), + semconv.MessagingSystemKafka, + semconv.MessagingOperationTypeReceive, + semconv.MessagingDestinationPartitionID(strconv.Itoa(int(e.Partition))), + semconv.MessagingDestinationName(topic), + semconv.MessagingKafkaMessageOffsetKey.Int64(e.Offset), + semconv.MessagingKafkaMessageKey(unix.ByteSliceToString(e.Key[:])), + semconv.MessagingKafkaConsumerGroup(unix.ByteSliceToString(e.ConsumerGroup[:])), + ) + + return ss } } diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe_test.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe_test.go index fa5d97eb9..2dce6316f 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe_test.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe_test.go @@ -9,12 +9,12 @@ import ( "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" ) @@ -28,7 +28,8 @@ func TestProbeConvertEvent(t *testing.T) { traceID := trace.TraceID{1} spanID := trace.SpanID{1} - got := convertEvent(&event{ + const ver = "v1" + got := processFn(pkg, ver, semconv.SchemaURL)(&event{ BaseSpanProperties: context.BaseSpanProperties{ StartTime: startOffset, EndTime: endOffset, @@ -44,17 +45,22 @@ func TestProbeConvertEvent(t *testing.T) { Partition: 12, }) - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: traceID, - SpanID: spanID, - TraceFlags: trace.FlagsSampled, - }) - want := &probe.SpanEvent{ - SpanName: kafkaConsumerSpanName("topic1"), - StartTime: start, - EndTime: end, - SpanContext: &sc, - Attributes: []attribute.KeyValue{ + want := func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + + ss.Scope().SetName("go.opentelemetry.io/auto/" + pkg) + ss.Scope().SetVersion(ver) + ss.SetSchemaUrl(semconv.SchemaURL) + + span := ss.Spans().AppendEmpty() + span.SetName(kafkaConsumerSpanName("topic1")) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset)) + span.SetTraceID(pcommon.TraceID(traceID)) + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + utils.Attributes( + span.Attributes(), semconv.MessagingSystemKafka, semconv.MessagingOperationTypeReceive, semconv.MessagingDestinationPartitionID("12"), @@ -62,8 +68,8 @@ func TestProbeConvertEvent(t *testing.T) { semconv.MessagingKafkaMessageOffset(42), semconv.MessagingKafkaMessageKey("key1"), semconv.MessagingKafkaConsumerGroup("test consumer group"), - }, - TracerSchema: semconv.SchemaURL, - } - assert.Equal(t, want, got[0]) + ) + return ss + }() + assert.Equal(t, want, got) } diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go index 528071bcf..587c0eb39 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go @@ -7,6 +7,8 @@ import ( "fmt" "log/slog" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" @@ -26,7 +28,7 @@ const ( ) // New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +func New(logger *slog.Logger, version string) probe.Probe { id := probe.ID{ SpanKind: trace.SpanKindProducer, InstrumentedPkg: pkg, @@ -62,7 +64,7 @@ func New(logger *slog.Logger) probe.Probe { }, }, SpecFn: loadBpf, - ProcessFn: convertEvent, + ProcessFn: processFn(pkg, version, semconv.SchemaURL), } } @@ -85,70 +87,64 @@ type event struct { ValidMessages uint64 } -func convertEvent(e *event) []*probe.SpanEvent { - tsc := trace.SpanContextConfig{ - TraceID: e.Messages[0].SpanContext.TraceID, - TraceFlags: trace.FlagsSampled, - } - - var pscPtr *trace.SpanContext - if e.ParentSpanContext.TraceID.IsValid() { - psc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.ParentSpanContext.TraceID, - SpanID: e.ParentSpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - Remote: true, - }) - pscPtr = &psc - } else { - pscPtr = nil - } - - globalTopic := unix.ByteSliceToString(e.GlobalTopic[:]) +func processFn(pkg, ver, schemaURL string) func(*event) ptrace.ScopeSpans { + scopeName := "go.opentelemetry.io/auto/" + pkg + return func(e *event) ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() - commonAttrs := []attribute.KeyValue{semconv.MessagingSystemKafka, semconv.MessagingOperationTypePublish} - if len(globalTopic) > 0 { - commonAttrs = append(commonAttrs, semconv.MessagingDestinationName(globalTopic)) - } + scope := ss.Scope() + scope.SetName(scopeName) + scope.SetVersion(ver) + ss.SetSchemaUrl(schemaURL) - if e.ValidMessages > 0 { - commonAttrs = append(commonAttrs, semconv.MessagingBatchMessageCount(int(e.ValidMessages))) - } + globalTopic := unix.ByteSliceToString(e.GlobalTopic[:]) - var res []*probe.SpanEvent - var msgTopic string - for i := uint64(0); i < e.ValidMessages; i++ { - tsc.SpanID = e.Messages[i].SpanContext.SpanID - sc := trace.NewSpanContext(tsc) - key := unix.ByteSliceToString(e.Messages[i].Key[:]) + attrs := []attribute.KeyValue{semconv.MessagingSystemKafka, semconv.MessagingOperationTypePublish} + if len(globalTopic) > 0 { + attrs = append(attrs, semconv.MessagingDestinationName(globalTopic)) + } - msgAttrs := []attribute.KeyValue{} - if len(key) > 0 { - msgAttrs = append(msgAttrs, semconv.MessagingKafkaMessageKey(key)) + if e.ValidMessages > 0 { + attrs = append(attrs, semconv.MessagingBatchMessageCount(int(e.ValidMessages))) } - // Topic is either the global topic or the message specific topic - if len(globalTopic) == 0 { - msgTopic = unix.ByteSliceToString(e.Messages[i].Topic[:]) - } else { - msgTopic = globalTopic + traceID := pcommon.TraceID(e.Messages[0].SpanContext.TraceID) + + var msgTopic string + for i := uint64(0); i < e.ValidMessages; i++ { + key := unix.ByteSliceToString(e.Messages[i].Key[:]) + var msgAttrs []attribute.KeyValue + if len(key) > 0 { + msgAttrs = append(msgAttrs, semconv.MessagingKafkaMessageKey(key)) + } + + // Topic is either the global topic or the message specific topic + if len(globalTopic) == 0 { + msgTopic = unix.ByteSliceToString(e.Messages[i].Topic[:]) + } else { + msgTopic = globalTopic + } + + msgAttrs = append(msgAttrs, semconv.MessagingDestinationName(msgTopic)) + msgAttrs = append(msgAttrs, attrs...) + + span := ss.Spans().AppendEmpty() + span.SetName(kafkaProducerSpanName(msgTopic)) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime)) + span.SetTraceID(traceID) + span.SetSpanID(pcommon.SpanID(e.Messages[i].SpanContext.SpanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + + if e.ParentSpanContext.SpanID.IsValid() { + span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID)) + } + + utils.Attributes(span.Attributes(), msgAttrs...) } - msgAttrs = append(msgAttrs, semconv.MessagingDestinationName(msgTopic)) - msgAttrs = append(msgAttrs, commonAttrs...) - - res = append(res, &probe.SpanEvent{ - SpanName: kafkaProducerSpanName(msgTopic), - StartTime: utils.BootOffsetToTime(e.StartTime), - EndTime: utils.BootOffsetToTime(e.EndTime), - SpanContext: &sc, - Attributes: msgAttrs, - ParentSpanContext: pscPtr, - TracerSchema: semconv.SchemaURL, - }) + return ss } - - return res } func kafkaProducerSpanName(topic string) string { diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe_test.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe_test.go index 66486e9b2..bf9ecd9c4 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe_test.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe_test.go @@ -9,12 +9,12 @@ import ( "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" ) @@ -27,7 +27,8 @@ func TestProbeConvertEvent(t *testing.T) { traceID := trace.TraceID{1} - got := convertEvent(&event{ + ver := "v1" + got := processFn(pkg, ver, semconv.SchemaURL)(&event{ StartTime: startOffset, EndTime: endOffset, Messages: [10]messageAttributes{ @@ -55,45 +56,46 @@ func TestProbeConvertEvent(t *testing.T) { ValidMessages: 2, }) - sc1 := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: traceID, - SpanID: trace.SpanID{1}, - TraceFlags: trace.FlagsSampled, - }) - sc2 := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: traceID, - SpanID: trace.SpanID{2}, - TraceFlags: trace.FlagsSampled, - }) - want1 := &probe.SpanEvent{ - SpanName: kafkaProducerSpanName("topic1"), - StartTime: start, - EndTime: end, - SpanContext: &sc1, - Attributes: []attribute.KeyValue{ + want := func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + + ss.Scope().SetName("go.opentelemetry.io/auto/" + pkg) + ss.Scope().SetVersion(ver) + ss.SetSchemaUrl(semconv.SchemaURL) + + span := ss.Spans().AppendEmpty() + span.SetName(kafkaProducerSpanName("topic1")) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset)) + span.SetTraceID(pcommon.TraceID(traceID)) + span.SetSpanID(pcommon.SpanID{1}) + span.SetFlags(uint32(trace.FlagsSampled)) + utils.Attributes( + span.Attributes(), semconv.MessagingKafkaMessageKey("key1"), semconv.MessagingDestinationName("topic1"), semconv.MessagingSystemKafka, semconv.MessagingOperationTypePublish, semconv.MessagingBatchMessageCount(2), - }, - TracerSchema: semconv.SchemaURL, - } + ) - want2 := &probe.SpanEvent{ - SpanName: kafkaProducerSpanName("topic2"), - StartTime: start, - EndTime: end, - SpanContext: &sc2, - Attributes: []attribute.KeyValue{ + span = ss.Spans().AppendEmpty() + span.SetName(kafkaProducerSpanName("topic2")) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset)) + span.SetTraceID(pcommon.TraceID(traceID)) + span.SetSpanID(pcommon.SpanID{2}) + span.SetFlags(uint32(trace.FlagsSampled)) + utils.Attributes( + span.Attributes(), semconv.MessagingKafkaMessageKey("key2"), semconv.MessagingDestinationName("topic2"), semconv.MessagingSystemKafka, semconv.MessagingOperationTypePublish, semconv.MessagingBatchMessageCount(2), - }, - TracerSchema: semconv.SchemaURL, - } - assert.Equal(t, want1, got[0]) - assert.Equal(t, want2, got[1]) + ) + + return ss + }() + assert.Equal(t, want, got) } diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go index 1e67861d7..833528198 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk/probe.go @@ -6,25 +6,21 @@ package sdk import ( "bytes" "encoding/binary" - "fmt" "log/slog" "github.com/cilium/ebpf/perf" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/structfield" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target amd64,arm64 -cc clang -cflags $CFLAGS bpf ./bpf/probe.bpf.c // New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +func New(logger *slog.Logger, version string) probe.Probe { id := probe.ID{ SpanKind: trace.SpanKindClient, InstrumentedPkg: "go.opentelemetry.io/auto", @@ -75,7 +71,7 @@ func New(logger *slog.Logger) probe.Probe { }, }, SpecFn: loadBpf, - ProcessFn: c.convertEvent, + ProcessFn: c.processFn, ProcessRecord: c.decodeEvent, } } @@ -110,212 +106,13 @@ func (c *converter) decodeEvent(record perf.Record) (event, error) { return e, nil } -func (c *converter) convertEvent(e *event) []*probe.SpanEvent { +func (c *converter) processFn(e *event) ptrace.ScopeSpans { var m ptrace.ProtoUnmarshaler traces, err := m.UnmarshalTraces(e.SpanData[:e.Size]) if err != nil { c.logger.Error("failed to unmarshal span data", "error", err) - return nil + return ptrace.NewScopeSpans() } - ss := traces.ResourceSpans().At(0).ScopeSpans().At(0) // TODO: validate len before lookup. - span := ss.Spans().At(0) // TODO: validate len before lookup. - - raw := span.TraceState().AsRaw() - ts, err := trace.ParseTraceState(raw) - if err != nil { - c.logger.Error("failed to parse tracestate", "error", err, "tracestate", raw) - } - - var pscPtr *trace.SpanContext - if psid := span.ParentSpanID(); psid != pcommon.NewSpanIDEmpty() { - psc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: trace.TraceID(span.TraceID()), - SpanID: trace.SpanID(psid), - TraceFlags: trace.TraceFlags(span.Flags()), - TraceState: ts, - }) - pscPtr = &psc - } - - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: trace.TraceID(span.TraceID()), - SpanID: trace.SpanID(span.SpanID()), - TraceFlags: trace.TraceFlags(span.Flags()), - TraceState: ts, - }) - span.ParentSpanID() - return []*probe.SpanEvent{{ - SpanName: span.Name(), - StartTime: span.StartTimestamp().AsTime(), - EndTime: span.EndTimestamp().AsTime(), - SpanContext: &sc, - ParentSpanContext: pscPtr, - TracerName: ss.Scope().Name(), - TracerVersion: ss.Scope().Version(), - TracerSchema: ss.SchemaUrl(), - Kind: spanKind(span.Kind()), - Attributes: attributes(span.Attributes()), - Events: events(span.Events()), - Links: c.links(span.Links()), - Status: status(span.Status()), - }} -} - -func spanKind(kind ptrace.SpanKind) trace.SpanKind { - switch kind { - case ptrace.SpanKindInternal: - return trace.SpanKindInternal - case ptrace.SpanKindServer: - return trace.SpanKindServer - case ptrace.SpanKindClient: - return trace.SpanKindClient - case ptrace.SpanKindProducer: - return trace.SpanKindProducer - case ptrace.SpanKindConsumer: - return trace.SpanKindConsumer - default: - return trace.SpanKindUnspecified - } -} - -func events(e ptrace.SpanEventSlice) map[string][]trace.EventOption { - out := make(map[string][]trace.EventOption) - for i := 0; i < e.Len(); i++ { - var opts []trace.EventOption - - event := e.At(i) - - ts := event.Timestamp().AsTime() - if !ts.IsZero() { - opts = append(opts, trace.WithTimestamp(ts)) - } - - attrs := attributes(event.Attributes()) - if len(attrs) > 0 { - opts = append(opts, trace.WithAttributes(attrs...)) - } - - out[event.Name()] = opts - } - return out -} - -func (c *converter) links(links ptrace.SpanLinkSlice) []trace.Link { - n := links.Len() - if n == 0 { - return nil - } - - out := make([]trace.Link, n) - for i := range out { - l := links.At(i) - - raw := l.TraceState().AsRaw() - ts, err := trace.ParseTraceState(raw) - if err != nil { - c.logger.Error("failed to parse link tracestate", "error", err, "tracestate", raw) - } - - out[i] = trace.Link{ - SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: trace.TraceID(l.TraceID()), - SpanID: trace.SpanID(l.SpanID()), - TraceFlags: trace.TraceFlags(l.Flags()), - TraceState: ts, - }), - Attributes: attributes(l.Attributes()), - } - } - return out -} - -func attributes(m pcommon.Map) []attribute.KeyValue { - out := make([]attribute.KeyValue, 0, m.Len()) - m.Range(func(key string, val pcommon.Value) bool { - out = append(out, attribute.KeyValue{ - Key: attribute.Key(key), - Value: attributeValue(val), - }) - return true - }) - return out -} - -func attributeValue(val pcommon.Value) (out attribute.Value) { - switch val.Type() { - case pcommon.ValueTypeEmpty: - case pcommon.ValueTypeStr: - out = attribute.StringValue(val.AsString()) - case pcommon.ValueTypeInt: - out = attribute.Int64Value(val.Int()) - case pcommon.ValueTypeDouble: - out = attribute.Float64Value(val.Double()) - case pcommon.ValueTypeBool: - out = attribute.BoolValue(val.Bool()) - case pcommon.ValueTypeSlice: - s := val.Slice() - if s.Len() == 0 { - // Undetectable slice type. - out = attribute.StringValue("") - return out - } - - // Validate homogeneity before allocating. - t := s.At(0).Type() - for i := 1; i < s.Len(); i++ { - if s.At(i).Type() != t { - out = attribute.StringValue("") - return out - } - } - - switch t { - case pcommon.ValueTypeBool: - v := make([]bool, s.Len()) - for i := 0; i < s.Len(); i++ { - v[i] = s.At(i).Bool() - } - out = attribute.BoolSliceValue(v) - case pcommon.ValueTypeStr: - v := make([]string, s.Len()) - for i := 0; i < s.Len(); i++ { - v[i] = s.At(i).Str() - } - out = attribute.StringSliceValue(v) - case pcommon.ValueTypeInt: - v := make([]int64, s.Len()) - for i := 0; i < s.Len(); i++ { - v[i] = s.At(i).Int() - } - out = attribute.Int64SliceValue(v) - case pcommon.ValueTypeDouble: - v := make([]float64, s.Len()) - for i := 0; i < s.Len(); i++ { - v[i] = s.At(i).Double() - } - out = attribute.Float64SliceValue(v) - default: - out = attribute.StringValue(fmt.Sprintf("", t.String())) - } - default: - out = attribute.StringValue(fmt.Sprintf("", val.AsRaw())) - } - return out -} - -func status(stat ptrace.Status) probe.Status { - var c codes.Code - switch stat.Code() { - case ptrace.StatusCodeUnset: - c = codes.Unset - case ptrace.StatusCodeOk: - c = codes.Ok - case ptrace.StatusCodeError: - c = codes.Error - } - return probe.Status{ - Code: c, - Description: stat.Message(), - } + return traces.ResourceSpans().At(0).ScopeSpans().At(0) // TODO: validate len before lookup. } diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go index 56a90c411..5961b3070 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go @@ -9,6 +9,9 @@ import ( "log/slog" "math" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/auto/internal/pkg/inject" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" @@ -33,7 +36,7 @@ const ( ) // New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +func New(logger *slog.Logger, version string) probe.Probe { id := probe.ID{ SpanKind: trace.SpanKindClient, InstrumentedPkg: pkg, @@ -129,7 +132,7 @@ func New(logger *slog.Logger) probe.Probe { }, }, SpecFn: loadBpf, - ProcessFn: convertEvent, + ProcessFn: processFn, } } @@ -183,62 +186,60 @@ type event struct { TracerID tracerID } -func convertEvent(e *event) []*probe.SpanEvent { - spanName := unix.ByteSliceToString(e.SpanName[:]) - - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.SpanContext.TraceID, - SpanID: e.SpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - }) - - var pscPtr *trace.SpanContext - if e.ParentSpanContext.TraceID.IsValid() { - psc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.ParentSpanContext.TraceID, - SpanID: e.ParentSpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - Remote: true, - }) - pscPtr = &psc - } else { - pscPtr = nil +func processFn(e *event) ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + + scope := ss.Scope() + // TODO: ensure this is not empty. + scope.SetName(unix.ByteSliceToString(e.TracerID.Name[:])) + scope.SetVersion(unix.ByteSliceToString(e.TracerID.Version[:])) + ss.SetSchemaUrl(unix.ByteSliceToString(e.TracerID.SchemaURL[:])) + + span := ss.Spans().AppendEmpty() + span.SetName(unix.ByteSliceToString(e.SpanName[:])) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime)) + span.SetTraceID(pcommon.TraceID(e.SpanContext.TraceID)) + span.SetSpanID(pcommon.SpanID(e.SpanContext.SpanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + + if e.ParentSpanContext.SpanID.IsValid() { + span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID)) } - return []*probe.SpanEvent{ - { - SpanName: spanName, - StartTime: utils.BootOffsetToTime(e.StartTime), - EndTime: utils.BootOffsetToTime(e.EndTime), - Attributes: convertAttributes(e.Attributes), - SpanContext: &sc, - ParentSpanContext: pscPtr, - Status: probe.Status{ - Code: codes.Code(e.Status.Code), - Description: string(unix.ByteSliceToString(e.Status.Description[:])), - }, - TracerName: unix.ByteSliceToString(e.TracerID.Name[:]), - TracerVersion: unix.ByteSliceToString(e.TracerID.Version[:]), - TracerSchema: unix.ByteSliceToString(e.TracerID.SchemaURL[:]), - }, + setAttributes(span.Attributes(), e.Attributes) + setStatus(span.Status(), e.Status) + + return ss +} + +func setStatus(dest ptrace.Status, stat status) { + switch codes.Code(stat.Code) { + case codes.Unset: + dest.SetCode(ptrace.StatusCodeUnset) + case codes.Ok: + dest.SetCode(ptrace.StatusCodeOk) + case codes.Error: + dest.SetCode(ptrace.StatusCodeError) } + dest.SetMessage(string(unix.ByteSliceToString(stat.Description[:]))) } -func convertAttributes(ab attributesBuffer) []attribute.KeyValue { - var res []attribute.KeyValue +func setAttributes(dest pcommon.Map, ab attributesBuffer) { for i := 0; i < int(ab.ValidAttrs); i++ { akv := ab.AttrsKv[i] key := unix.ByteSliceToString(akv.Key[:]) switch akv.Vtype { case uint8(attribute.BOOL): - res = append(res, attribute.Bool(key, akv.Value[0] != 0)) + dest.PutBool(key, akv.Value[0] != 0) case uint8(attribute.INT64): - res = append(res, attribute.Int64(key, int64(binary.LittleEndian.Uint64(akv.Value[:8])))) + v := int64(binary.LittleEndian.Uint64(akv.Value[:8])) + dest.PutInt(key, v) case uint8(attribute.FLOAT64): - res = append(res, attribute.Float64(key, math.Float64frombits(binary.LittleEndian.Uint64(akv.Value[:8])))) + v := math.Float64frombits(binary.LittleEndian.Uint64(akv.Value[:8])) + dest.PutDouble(key, v) case uint8(attribute.STRING): - res = append(res, attribute.String(key, unix.ByteSliceToString(akv.Value[:]))) + dest.PutStr(key, unix.ByteSliceToString(akv.Value[:])) } } - return res } diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe_test.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe_test.go index bf9c76778..5b2dbba76 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe_test.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe_test.go @@ -11,11 +11,12 @@ import ( "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" ) @@ -32,7 +33,7 @@ func TestProbeConvertEvent(t *testing.T) { var floatBuf [128]byte binary.LittleEndian.PutUint64(floatBuf[:], math.Float64bits(math.Pi)) - got := convertEvent(&event{ + got := processFn(&event{ BaseSpanProperties: context.BaseSpanProperties{ StartTime: startOffset, EndTime: endOffset, @@ -101,26 +102,30 @@ func TestProbeConvertEvent(t *testing.T) { }, }) - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: traceID, - SpanID: spanID, - TraceFlags: trace.FlagsSampled, - }) - want := &probe.SpanEvent{ - SpanName: "Foo", - StartTime: start, - EndTime: end, - SpanContext: &sc, - Attributes: []attribute.KeyValue{ + want := func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + + ss.Scope().SetName("user-tracer") + ss.Scope().SetVersion("v1") + ss.SetSchemaUrl("user-schema") + + span := ss.Spans().AppendEmpty() + span.SetName("Foo") + span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset)) + span.SetTraceID(pcommon.TraceID(traceID)) + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + utils.Attributes( + span.Attributes(), attribute.Bool("bool_key", true), attribute.String("string_key1", "string value 1"), attribute.Float64("float_key", math.Pi), attribute.Int64("int_key", 42), attribute.String("string_key2", "string value 2"), - }, - TracerName: "user-tracer", - TracerVersion: "v1", - TracerSchema: "user-schema", - } - assert.Equal(t, want, got[0]) + ) + + return ss + }() + assert.Equal(t, want, got) } diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go index 90adee925..57ba16fe6 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go @@ -10,8 +10,9 @@ import ( "strings" "github.com/cilium/ebpf" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "golang.org/x/sys/unix" @@ -30,7 +31,7 @@ const ( ) // New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +func New(logger *slog.Logger, version string) probe.Probe { id := probe.ID{ SpanKind: trace.SpanKindClient, InstrumentedPkg: pkg, @@ -86,7 +87,7 @@ func New(logger *slog.Logger) probe.Probe { }, }, SpecFn: verifyAndLoadBpf, - ProcessFn: convertEvent, + ProcessFn: processFn(pkg, version, semconv.SchemaURL), } } @@ -106,58 +107,50 @@ type event struct { StatusCode int32 } -// According to https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/rpc.md -func convertEvent(e *event) []*probe.SpanEvent { - method := unix.ByteSliceToString(e.Method[:]) - target := unix.ByteSliceToString(e.Target[:]) - var attrs []attribute.KeyValue - - // remove port - if parts := strings.Split(target, ":"); len(parts) > 1 { - target = parts[0] - if remotePeerPortInt, err := strconv.Atoi(parts[1]); err == nil { - attrs = append(attrs, semconv.NetworkPeerPort(remotePeerPortInt)) +func processFn(pkg, ver, schemaURL string) func(*event) ptrace.ScopeSpans { + scopeName := "go.opentelemetry.io/auto/" + pkg + return func(e *event) ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + + scope := ss.Scope() + scope.SetName(scopeName) + scope.SetVersion(ver) + ss.SetSchemaUrl(schemaURL) + + method := unix.ByteSliceToString(e.Method[:]) + target := unix.ByteSliceToString(e.Target[:]) + + attrs := []attribute.KeyValue{ + semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)), + semconv.RPCSystemKey.String("grpc"), + semconv.RPCServiceKey.String(method), + semconv.ServerAddress(target), + } + // remove port + if parts := strings.Split(target, ":"); len(parts) > 1 { + if remotePeerPortInt, err := strconv.Atoi(parts[1]); err == nil { + attrs = append(attrs, semconv.NetworkPeerPort(remotePeerPortInt)) + } } - } - attrs = append(attrs, semconv.RPCSystemKey.String("grpc"), - semconv.RPCServiceKey.String(method), - semconv.ServerAddress(target)) - - attrs = append(attrs, semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode))) - - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.SpanContext.TraceID, - SpanID: e.SpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - }) - - var pscPtr *trace.SpanContext - if e.ParentSpanContext.TraceID.IsValid() { - psc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.ParentSpanContext.TraceID, - SpanID: e.ParentSpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - Remote: true, - }) - pscPtr = &psc - } else { - pscPtr = nil - } + span := ss.Spans().AppendEmpty() + span.SetName(method) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime)) + span.SetTraceID(pcommon.TraceID(e.SpanContext.TraceID)) + span.SetSpanID(pcommon.SpanID(e.SpanContext.SpanID)) + span.SetFlags(uint32(trace.FlagsSampled)) - event := &probe.SpanEvent{ - SpanName: method, - StartTime: utils.BootOffsetToTime(e.StartTime), - EndTime: utils.BootOffsetToTime(e.EndTime), - Attributes: attrs, - SpanContext: &sc, - ParentSpanContext: pscPtr, - TracerSchema: semconv.SchemaURL, - } + if e.ParentSpanContext.SpanID.IsValid() { + span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID)) + } - if e.StatusCode > 0 { - event.Status = probe.Status{Code: codes.Error} - } + utils.Attributes(span.Attributes(), attrs...) - return []*probe.SpanEvent{event} + if e.StatusCode > 0 { + span.Status().SetCode(ptrace.StatusCodeError) + } + + return ss + } } diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go index 107d0e83b..965740268 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go @@ -8,7 +8,8 @@ import ( "log/slog" "github.com/hashicorp/go-version" - "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "golang.org/x/sys/unix" @@ -29,7 +30,7 @@ const ( ) // New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +func New(logger *slog.Logger, version string) probe.Probe { id := probe.ID{ SpanKind: trace.SpanKindServer, InstrumentedPkg: pkg, @@ -74,7 +75,7 @@ func New(logger *slog.Logger) probe.Probe { }, }, SpecFn: loadBpf, - ProcessFn: convertEvent, + ProcessFn: processFn(pkg, version, semconv.SchemaURL), } } @@ -103,40 +104,36 @@ type event struct { Method [100]byte } -func convertEvent(e *event) []*probe.SpanEvent { - method := unix.ByteSliceToString(e.Method[:]) - - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.SpanContext.TraceID, - SpanID: e.SpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - }) - - var pscPtr *trace.SpanContext - if e.ParentSpanContext.TraceID.IsValid() { - psc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.ParentSpanContext.TraceID, - SpanID: e.ParentSpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - Remote: true, - }) - pscPtr = &psc - } else { - pscPtr = nil - } - - return []*probe.SpanEvent{ - { - SpanName: method, - StartTime: utils.BootOffsetToTime(e.StartTime), - EndTime: utils.BootOffsetToTime(e.EndTime), - Attributes: []attribute.KeyValue{ - semconv.RPCSystemKey.String("grpc"), - semconv.RPCServiceKey.String(method), - }, - ParentSpanContext: pscPtr, - SpanContext: &sc, - TracerSchema: semconv.SchemaURL, - }, +func processFn(pkg, ver, schemaURL string) func(*event) ptrace.ScopeSpans { + scopeName := "go.opentelemetry.io/auto/" + pkg + return func(e *event) ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + + scope := ss.Scope() + scope.SetName(scopeName) + scope.SetVersion(ver) + ss.SetSchemaUrl(schemaURL) + + method := unix.ByteSliceToString(e.Method[:]) + + span := ss.Spans().AppendEmpty() + span.SetName(method) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime)) + span.SetTraceID(pcommon.TraceID(e.SpanContext.TraceID)) + span.SetSpanID(pcommon.SpanID(e.SpanContext.SpanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + + if e.ParentSpanContext.SpanID.IsValid() { + span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID)) + } + + utils.Attributes( + span.Attributes(), + semconv.RPCSystemKey.String("grpc"), + semconv.RPCServiceKey.String(method), + ) + + return ss } } diff --git a/internal/pkg/instrumentation/bpf/net/http/client/probe.go b/internal/pkg/instrumentation/bpf/net/http/client/probe.go index 7fd04c55d..00dd5cef8 100644 --- a/internal/pkg/instrumentation/bpf/net/http/client/probe.go +++ b/internal/pkg/instrumentation/bpf/net/http/client/probe.go @@ -11,8 +11,9 @@ import ( "strings" "github.com/cilium/ebpf" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "golang.org/x/sys/unix" @@ -33,7 +34,7 @@ const ( ) // New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +func New(logger *slog.Logger, version string) probe.Probe { id := probe.ID{ SpanKind: trace.SpanKindClient, InstrumentedPkg: pkg, @@ -155,7 +156,7 @@ func New(logger *slog.Logger) probe.Probe { }, Uprobes: uprobes, SpecFn: verifyAndLoadBpf, - ProcessFn: convertEvent, + ProcessFn: processFn(pkg, version, semconv.SchemaURL), } } @@ -186,105 +187,101 @@ type event struct { OmitHost uint8 } -func convertEvent(e *event) []*probe.SpanEvent { - method := unix.ByteSliceToString(e.Method[:]) - path := unix.ByteSliceToString(e.Path[:]) - scheme := unix.ByteSliceToString(e.Scheme[:]) - opaque := unix.ByteSliceToString(e.Opaque[:]) - host := unix.ByteSliceToString(e.Host[:]) - rawPath := unix.ByteSliceToString(e.RawPath[:]) - rawQuery := unix.ByteSliceToString(e.RawQuery[:]) - username := unix.ByteSliceToString(e.Username[:]) - fragment := unix.ByteSliceToString(e.Fragment[:]) - rawFragment := unix.ByteSliceToString(e.RawFragment[:]) - forceQuery := e.ForceQuery != 0 - omitHost := e.OmitHost != 0 - var user *url.Userinfo - if len(username) > 0 { - // check that username!="", otherwise url.User will instantiate - // an empty, non-nil *Userinfo object which url.String() will parse - // to just "@" in the final fullUrl - user = url.User(username) - } +func processFn(pkg, ver, schemaURL string) func(*event) ptrace.ScopeSpans { + scopeName := "go.opentelemetry.io/auto/" + pkg + return func(e *event) ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.SpanContext.TraceID, - SpanID: e.SpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - }) + scope := ss.Scope() + scope.SetName(scopeName) + scope.SetVersion(ver) + ss.SetSchemaUrl(schemaURL) - var pscPtr *trace.SpanContext - if e.ParentSpanContext.TraceID.IsValid() { - psc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.ParentSpanContext.TraceID, - SpanID: e.ParentSpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - Remote: true, - }) - pscPtr = &psc - } else { - pscPtr = nil - } + method := unix.ByteSliceToString(e.Method[:]) + path := unix.ByteSliceToString(e.Path[:]) + scheme := unix.ByteSliceToString(e.Scheme[:]) + opaque := unix.ByteSliceToString(e.Opaque[:]) + host := unix.ByteSliceToString(e.Host[:]) + rawPath := unix.ByteSliceToString(e.RawPath[:]) + rawQuery := unix.ByteSliceToString(e.RawQuery[:]) + username := unix.ByteSliceToString(e.Username[:]) + fragment := unix.ByteSliceToString(e.Fragment[:]) + rawFragment := unix.ByteSliceToString(e.RawFragment[:]) + forceQuery := e.ForceQuery != 0 + omitHost := e.OmitHost != 0 + var user *url.Userinfo + if len(username) > 0 { + // check that username!="", otherwise url.User will instantiate + // an empty, non-nil *Userinfo object which url.String() will parse + // to just "@" in the final fullUrl + user = url.User(username) + } - attrs := []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String(method), - semconv.HTTPResponseStatusCodeKey.Int(int(e.StatusCode)), - } + attrs := []attribute.KeyValue{ + semconv.HTTPRequestMethodKey.String(method), + semconv.HTTPResponseStatusCodeKey.Int(int(e.StatusCode)), + } - if path != "" { - attrs = append(attrs, semconv.URLPath(path)) - } + if path != "" { + attrs = append(attrs, semconv.URLPath(path)) + } - urlObj := &url.URL{ - Path: path, - Scheme: scheme, - Opaque: opaque, - Host: host, - RawPath: rawPath, - User: user, - RawQuery: rawQuery, - Fragment: fragment, - RawFragment: rawFragment, - ForceQuery: forceQuery, - OmitHost: omitHost, - } + urlObj := &url.URL{ + Path: path, + Scheme: scheme, + Opaque: opaque, + Host: host, + RawPath: rawPath, + User: user, + RawQuery: rawQuery, + Fragment: fragment, + RawFragment: rawFragment, + ForceQuery: forceQuery, + OmitHost: omitHost, + } - fullURL := urlObj.String() - attrs = append(attrs, semconv.URLFull(fullURL)) + fullURL := urlObj.String() + attrs = append(attrs, semconv.URLFull(fullURL)) - // Server address and port - serverAddr, serverPort := http.ServerAddressPortAttributes(e.Host[:]) - if serverAddr.Valid() { - attrs = append(attrs, serverAddr) - } - if serverPort.Valid() { - attrs = append(attrs, serverPort) - } + // Server address and port + serverAddr, serverPort := http.ServerAddressPortAttributes(e.Host[:]) + if serverAddr.Valid() { + attrs = append(attrs, serverAddr) + } + if serverPort.Valid() { + attrs = append(attrs, serverPort) + } - proto := unix.ByteSliceToString(e.Proto[:]) - if proto != "" { - parts := strings.Split(proto, "/") - if len(parts) == 2 { - if parts[0] != "HTTP" { - attrs = append(attrs, semconv.NetworkProtocolName(parts[0])) + proto := unix.ByteSliceToString(e.Proto[:]) + if proto != "" { + parts := strings.Split(proto, "/") + if len(parts) == 2 { + if parts[0] != "HTTP" { + attrs = append(attrs, semconv.NetworkProtocolName(parts[0])) + } + attrs = append(attrs, semconv.NetworkProtocolVersion(parts[1])) } - attrs = append(attrs, semconv.NetworkProtocolVersion(parts[1])) } - } - spanEvent := &probe.SpanEvent{ - SpanName: method, - StartTime: utils.BootOffsetToTime(e.StartTime), - EndTime: utils.BootOffsetToTime(e.EndTime), - SpanContext: &sc, - Attributes: attrs, - ParentSpanContext: pscPtr, - TracerSchema: semconv.SchemaURL, - } + span := ss.Spans().AppendEmpty() + span.SetName(method) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime)) + span.SetTraceID(pcommon.TraceID(e.SpanContext.TraceID)) + span.SetSpanID(pcommon.SpanID(e.SpanContext.SpanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + span.SetKind(ptrace.SpanKindClient) - if int(e.StatusCode) >= 400 && int(e.StatusCode) < 600 { - spanEvent.Status = probe.Status{Code: codes.Error} - } + if e.ParentSpanContext.SpanID.IsValid() { + span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID)) + } + + utils.Attributes(span.Attributes(), attrs...) - return []*probe.SpanEvent{spanEvent} + if int(e.StatusCode) >= 400 && int(e.StatusCode) < 600 { + span.Status().SetCode(ptrace.StatusCodeError) + } + + return ss + } } diff --git a/internal/pkg/instrumentation/bpf/net/http/client/probe_test.go b/internal/pkg/instrumentation/bpf/net/http/client/probe_test.go index 9fd15627b..64a2d5ec0 100644 --- a/internal/pkg/instrumentation/bpf/net/http/client/probe_test.go +++ b/internal/pkg/instrumentation/bpf/net/http/client/probe_test.go @@ -8,17 +8,18 @@ import ( "time" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" ) func TestConvertEvent(t *testing.T) { + const ver = "ver" + startTime := time.Unix(0, time.Now().UnixNano()) // No wall clock. endTime := startTime.Add(1 * time.Second) @@ -69,18 +70,11 @@ func TestConvertEvent(t *testing.T) { assert.NoError(t, err) trId, err := trace.TraceIDFromHex("00f067aa0ba902b700f067aa0ba902b7") assert.NoError(t, err) - spanContext := trace.NewSpanContext( - trace.SpanContextConfig{ - SpanID: spId, - TraceID: trId, - TraceFlags: 1, - }, - ) testCases := []struct { name string event *event - expected []*probe.SpanEvent + expected ptrace.ScopeSpans }{ { name: "basic client event", @@ -97,23 +91,35 @@ func TestConvertEvent(t *testing.T) { SpanContext: context.EBPFSpanContext{TraceID: trId, SpanID: spId}, }, }, - expected: []*probe.SpanEvent{ - { - SpanName: methodString, - SpanContext: &spanContext, - StartTime: startTime, - EndTime: endTime, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String(methodString), - semconv.HTTPResponseStatusCodeKey.Int(200), - semconv.URLPath(pathString), - semconv.URLFull("http://google.com/home"), - semconv.ServerAddress(hostString), - semconv.NetworkProtocolVersion("1.1"), - }, - TracerSchema: semconv.SchemaURL, - }, - }, + expected: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.SetSchemaUrl(semconv.SchemaURL) + + scope := ss.Scope() + scope.SetName("go.opentelemetry.io/auto/" + pkg) + scope.SetVersion(ver) + + span := ss.Spans().AppendEmpty() + span.SetName(methodString) + span.SetTraceID(pcommon.TraceID(trId)) + span.SetSpanID(pcommon.SpanID(spId)) + span.SetFlags(1) + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String(methodString), + semconv.HTTPResponseStatusCodeKey.Int(200), + semconv.URLPath(pathString), + semconv.URLFull("http://google.com/home"), + semconv.ServerAddress(hostString), + semconv.NetworkProtocolVersion("1.1"), + ) + + return ss + }(), }, { name: "client event code 400", @@ -130,24 +136,36 @@ func TestConvertEvent(t *testing.T) { SpanContext: context.EBPFSpanContext{TraceID: trId, SpanID: spId}, }, }, - expected: []*probe.SpanEvent{ - { - SpanName: methodString, - SpanContext: &spanContext, - StartTime: startTime, - EndTime: endTime, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String(methodString), - semconv.HTTPResponseStatusCodeKey.Int(400), - semconv.URLPath(pathString), - semconv.URLFull("http://google.com/home"), - semconv.ServerAddress(hostString), - semconv.NetworkProtocolVersion("1.1"), - }, - Status: probe.Status{Code: codes.Error}, - TracerSchema: semconv.SchemaURL, - }, - }, + expected: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.SetSchemaUrl(semconv.SchemaURL) + + scope := ss.Scope() + scope.SetName("go.opentelemetry.io/auto/" + pkg) + scope.SetVersion(ver) + + span := ss.Spans().AppendEmpty() + span.SetName(methodString) + span.SetTraceID(pcommon.TraceID(trId)) + span.SetSpanID(pcommon.SpanID(spId)) + span.SetFlags(1) + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + span.Status().SetCode(ptrace.StatusCodeError) + + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String(methodString), + semconv.HTTPResponseStatusCodeKey.Int(400), + semconv.URLPath(pathString), + semconv.URLFull("http://google.com/home"), + semconv.ServerAddress(hostString), + semconv.NetworkProtocolVersion("1.1"), + ) + + return ss + }(), }, { name: "client event code 500", @@ -164,24 +182,36 @@ func TestConvertEvent(t *testing.T) { SpanContext: context.EBPFSpanContext{TraceID: trId, SpanID: spId}, }, }, - expected: []*probe.SpanEvent{ - { - SpanName: methodString, - SpanContext: &spanContext, - StartTime: startTime, - EndTime: endTime, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String(methodString), - semconv.HTTPResponseStatusCodeKey.Int(500), - semconv.URLPath(pathString), - semconv.URLFull("http://google.com/home"), - semconv.ServerAddress(hostString), - semconv.NetworkProtocolVersion("1.1"), - }, - Status: probe.Status{Code: codes.Error}, - TracerSchema: semconv.SchemaURL, - }, - }, + expected: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.SetSchemaUrl(semconv.SchemaURL) + + scope := ss.Scope() + scope.SetName("go.opentelemetry.io/auto/" + pkg) + scope.SetVersion(ver) + + span := ss.Spans().AppendEmpty() + span.SetName(methodString) + span.SetTraceID(pcommon.TraceID(trId)) + span.SetSpanID(pcommon.SpanID(spId)) + span.SetFlags(1) + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + span.Status().SetCode(ptrace.StatusCodeError) + + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String(methodString), + semconv.HTTPResponseStatusCodeKey.Int(500), + semconv.URLPath(pathString), + semconv.URLFull("http://google.com/home"), + semconv.ServerAddress(hostString), + semconv.NetworkProtocolVersion("1.1"), + ) + + return ss + }(), }, { name: "non-http protocol.name", @@ -198,24 +228,36 @@ func TestConvertEvent(t *testing.T) { SpanContext: context.EBPFSpanContext{TraceID: trId, SpanID: spId}, }, }, - expected: []*probe.SpanEvent{ - { - SpanName: methodString, - SpanContext: &spanContext, - StartTime: startTime, - EndTime: endTime, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String(methodString), - semconv.HTTPResponseStatusCodeKey.Int(200), - semconv.URLPath(pathString), - semconv.URLFull("foo://google.com/home"), - semconv.ServerAddress(hostString), - semconv.NetworkProtocolName("foo"), - semconv.NetworkProtocolVersion("2.2"), - }, - TracerSchema: semconv.SchemaURL, - }, - }, + expected: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.SetSchemaUrl(semconv.SchemaURL) + + scope := ss.Scope() + scope.SetName("go.opentelemetry.io/auto/" + pkg) + scope.SetVersion(ver) + + span := ss.Spans().AppendEmpty() + span.SetName(methodString) + span.SetTraceID(pcommon.TraceID(trId)) + span.SetSpanID(pcommon.SpanID(spId)) + span.SetFlags(1) + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String(methodString), + semconv.HTTPResponseStatusCodeKey.Int(200), + semconv.URLPath(pathString), + semconv.URLFull("foo://google.com/home"), + semconv.ServerAddress(hostString), + semconv.NetworkProtocolName("foo"), + semconv.NetworkProtocolVersion("2.2"), + ) + + return ss + }(), }, { name: "basic url parsing", @@ -235,23 +277,35 @@ func TestConvertEvent(t *testing.T) { SpanContext: context.EBPFSpanContext{TraceID: trId, SpanID: spId}, }, }, - expected: []*probe.SpanEvent{ - { - SpanName: methodString, - SpanContext: &spanContext, - StartTime: startTime, - EndTime: endTime, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String(methodString), - semconv.HTTPResponseStatusCodeKey.Int(200), - semconv.URLPath(pathString), - semconv.URLFull("http://user@google.com/home?query=true#fragment"), - semconv.ServerAddress(hostString), - semconv.NetworkProtocolVersion("1.1"), - }, - TracerSchema: semconv.SchemaURL, - }, - }, + expected: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.SetSchemaUrl(semconv.SchemaURL) + + scope := ss.Scope() + scope.SetName("go.opentelemetry.io/auto/" + pkg) + scope.SetVersion(ver) + + span := ss.Spans().AppendEmpty() + span.SetName(methodString) + span.SetTraceID(pcommon.TraceID(trId)) + span.SetSpanID(pcommon.SpanID(spId)) + span.SetFlags(1) + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String(methodString), + semconv.HTTPResponseStatusCodeKey.Int(200), + semconv.URLPath(pathString), + semconv.URLFull("http://user@google.com/home?query=true#fragment"), + semconv.ServerAddress(hostString), + semconv.NetworkProtocolVersion("1.1"), + ) + + return ss + }(), }, { // see https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/net/url/url.go;l=815 @@ -272,28 +326,40 @@ func TestConvertEvent(t *testing.T) { SpanContext: context.EBPFSpanContext{TraceID: trId, SpanID: spId}, }, }, - expected: []*probe.SpanEvent{ - { - SpanName: methodString, - SpanContext: &spanContext, - StartTime: startTime, - EndTime: endTime, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String(methodString), - semconv.HTTPResponseStatusCodeKey.Int(200), - semconv.URLPath(pathString), - semconv.URLFull("http:/home?"), - semconv.NetworkProtocolVersion("1.1"), - }, - TracerSchema: semconv.SchemaURL, - }, - }, + expected: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.SetSchemaUrl(semconv.SchemaURL) + + scope := ss.Scope() + scope.SetName("go.opentelemetry.io/auto/" + pkg) + scope.SetVersion(ver) + + span := ss.Spans().AppendEmpty() + span.SetName(methodString) + span.SetTraceID(pcommon.TraceID(trId)) + span.SetSpanID(pcommon.SpanID(spId)) + span.SetFlags(1) + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String(methodString), + semconv.HTTPResponseStatusCodeKey.Int(200), + semconv.URLPath(pathString), + semconv.URLFull("http:/home?"), + semconv.NetworkProtocolVersion("1.1"), + ) + + return ss + }(), }, } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - out := convertEvent(tt.event) + out := processFn(pkg, ver, semconv.SchemaURL)(tt.event) assert.Equal(t, tt.expected, out) }) } diff --git a/internal/pkg/instrumentation/bpf/net/http/server/probe.go b/internal/pkg/instrumentation/bpf/net/http/server/probe.go index 2b305148d..2d08e96b0 100644 --- a/internal/pkg/instrumentation/bpf/net/http/server/probe.go +++ b/internal/pkg/instrumentation/bpf/net/http/server/probe.go @@ -8,8 +8,9 @@ import ( "strings" "github.com/hashicorp/go-version" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "golang.org/x/sys/unix" @@ -31,7 +32,7 @@ const ( ) // New returns a new [probe.Probe]. -func New(logger *slog.Logger) probe.Probe { +func New(logger *slog.Logger, version string) probe.Probe { id := probe.ID{ SpanKind: trace.SpanKindServer, InstrumentedPkg: pkg, @@ -109,7 +110,7 @@ func New(logger *slog.Logger) probe.Probe { }, }, SpecFn: loadBpf, - ProcessFn: convertEvent, + ProcessFn: processFn(pkg, version, semconv.SchemaURL), } } @@ -138,91 +139,86 @@ type event struct { Proto [8]byte } -func convertEvent(e *event) []*probe.SpanEvent { - path := unix.ByteSliceToString(e.Path[:]) - method := unix.ByteSliceToString(e.Method[:]) - patternPath := unix.ByteSliceToString(e.PathPattern[:]) +func processFn(pkg, ver, schemaURL string) func(*event) ptrace.ScopeSpans { + scopeName := "go.opentelemetry.io/auto/" + pkg + return func(e *event) ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() - isValidPatternPath := true - patternPath, err := http.ParsePattern(patternPath) - if err != nil || patternPath == "" { - isValidPatternPath = false - } + scope := ss.Scope() + scope.SetName(scopeName) + scope.SetVersion(ver) + ss.SetSchemaUrl(schemaURL) - proto := unix.ByteSliceToString(e.Proto[:]) - - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.SpanContext.TraceID, - SpanID: e.SpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - }) - - var pscPtr *trace.SpanContext - if e.ParentSpanContext.TraceID.IsValid() { - psc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: e.ParentSpanContext.TraceID, - SpanID: e.ParentSpanContext.SpanID, - TraceFlags: trace.FlagsSampled, - Remote: true, - }) - pscPtr = &psc - } else { - pscPtr = nil - } + path := unix.ByteSliceToString(e.Path[:]) + method := unix.ByteSliceToString(e.Method[:]) + patternPath := unix.ByteSliceToString(e.PathPattern[:]) - attributes := []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String(method), - semconv.URLPath(path), - semconv.HTTPResponseStatusCodeKey.Int(int(e.StatusCode)), - } + isValidPatternPath := true + patternPath, err := http.ParsePattern(patternPath) + if err != nil || patternPath == "" { + isValidPatternPath = false + } - // Client address and port - peerAddr, peerPort := http.NetPeerAddressPortAttributes(e.RemoteAddr[:]) - if peerAddr.Valid() { - attributes = append(attributes, peerAddr) - } - if peerPort.Valid() { - attributes = append(attributes, peerPort) - } + proto := unix.ByteSliceToString(e.Proto[:]) - // Server address and port - serverAddr, serverPort := http.ServerAddressPortAttributes(e.Host[:]) - if serverAddr.Valid() { - attributes = append(attributes, serverAddr) - } - if serverPort.Valid() { - attributes = append(attributes, serverPort) - } + attrs := []attribute.KeyValue{ + semconv.HTTPRequestMethodKey.String(method), + semconv.URLPath(path), + semconv.HTTPResponseStatusCodeKey.Int(int(e.StatusCode)), + } + + // Client address and port + peerAddr, peerPort := http.NetPeerAddressPortAttributes(e.RemoteAddr[:]) + if peerAddr.Valid() { + attrs = append(attrs, peerAddr) + } + if peerPort.Valid() { + attrs = append(attrs, peerPort) + } - if proto != "" { - parts := strings.Split(proto, "/") - if len(parts) == 2 { - if parts[0] != "HTTP" { - attributes = append(attributes, semconv.NetworkProtocolName(parts[0])) + // Server address and port + serverAddr, serverPort := http.ServerAddressPortAttributes(e.Host[:]) + if serverAddr.Valid() { + attrs = append(attrs, serverAddr) + } + if serverPort.Valid() { + attrs = append(attrs, serverPort) + } + + if proto != "" { + parts := strings.Split(proto, "/") + if len(parts) == 2 { + if parts[0] != "HTTP" { + attrs = append(attrs, semconv.NetworkProtocolName(parts[0])) + } + attrs = append(attrs, semconv.NetworkProtocolVersion(parts[1])) } - attributes = append(attributes, semconv.NetworkProtocolVersion(parts[1])) } - } - spanName := method - if isPatternPathSupported && isValidPatternPath { - spanName = spanName + " " + patternPath - attributes = append(attributes, semconv.HTTPRouteKey.String(patternPath)) - } + spanName := method + if isPatternPathSupported && isValidPatternPath { + spanName = spanName + " " + patternPath + attrs = append(attrs, semconv.HTTPRouteKey.String(patternPath)) + } - spanEvent := &probe.SpanEvent{ - SpanName: spanName, - StartTime: utils.BootOffsetToTime(e.StartTime), - EndTime: utils.BootOffsetToTime(e.EndTime), - SpanContext: &sc, - ParentSpanContext: pscPtr, - Attributes: attributes, - TracerSchema: semconv.SchemaURL, - } + span := ss.Spans().AppendEmpty() + span.SetName(spanName) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime)) + span.SetTraceID(pcommon.TraceID(e.SpanContext.TraceID)) + span.SetSpanID(pcommon.SpanID(e.SpanContext.SpanID)) + span.SetFlags(uint32(trace.FlagsSampled)) - if int(e.StatusCode) >= 500 && int(e.StatusCode) < 600 { - spanEvent.Status = probe.Status{Code: codes.Error} - } + if e.ParentSpanContext.SpanID.IsValid() { + span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID)) + } + + utils.Attributes(span.Attributes(), attrs...) - return []*probe.SpanEvent{spanEvent} + if int(e.StatusCode) >= 500 && int(e.StatusCode) < 600 { + span.Status().SetCode(ptrace.StatusCodeError) + } + + return ss + } } diff --git a/internal/pkg/instrumentation/bpf/net/http/server/probe_test.go b/internal/pkg/instrumentation/bpf/net/http/server/probe_test.go index 812f50490..a7f8301a0 100644 --- a/internal/pkg/instrumentation/bpf/net/http/server/probe_test.go +++ b/internal/pkg/instrumentation/bpf/net/http/server/probe_test.go @@ -9,13 +9,12 @@ import ( "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" ) @@ -28,16 +27,13 @@ func TestProbeConvertEvent(t *testing.T) { traceID := trace.TraceID{1} spanID := trace.SpanID{1} - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: traceID, - SpanID: spanID, - TraceFlags: trace.FlagsSampled, - }) + + const version = "v0.0.1" testCases := []struct { name string event *event - expected []*probe.SpanEvent + expected ptrace.ScopeSpans }{ { name: "basic server test", @@ -59,25 +55,33 @@ func TestProbeConvertEvent(t *testing.T) { // "HTTP/1.1" Proto: [8]byte{0x48, 0x54, 0x54, 0x50, 0x2f, 0x31, 0x2e, 0x31}, }, - expected: []*probe.SpanEvent{ - { - SpanName: "GET", - StartTime: start, - EndTime: end, - SpanContext: &sc, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String("GET"), - semconv.URLPath("/foo/bar"), - semconv.HTTPResponseStatusCodeKey.Int(200), - semconv.NetworkPeerAddress("www.google.com"), - semconv.NetworkPeerPort(8080), - semconv.ServerAddress("localhost"), - semconv.ServerPort(8080), - semconv.NetworkProtocolVersion("1.1"), - }, - TracerSchema: semconv.SchemaURL, - }, - }, + expected: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.Scope().SetName("go.opentelemetry.io/auto/" + pkg) + ss.Scope().SetVersion(version) + ss.SetSchemaUrl(semconv.SchemaURL) + + span := ss.Spans().AppendEmpty() + span.SetName("GET") + span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset)) + span.SetTraceID(pcommon.TraceID(traceID)) + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String("GET"), + semconv.URLPath("/foo/bar"), + semconv.HTTPResponseStatusCodeKey.Int(200), + semconv.NetworkPeerAddress("www.google.com"), + semconv.NetworkPeerPort(8080), + semconv.ServerAddress("localhost"), + semconv.ServerPort(8080), + semconv.NetworkProtocolVersion("1.1"), + ) + + return ss + }(), }, { name: "proto name added when not HTTP", @@ -99,26 +103,34 @@ func TestProbeConvertEvent(t *testing.T) { // "FOO/2.2" Proto: [8]byte{0x46, 0x4f, 0x4f, 0x2f, 0x32, 0x2e, 0x32}, }, - expected: []*probe.SpanEvent{ - { - SpanName: "GET", - StartTime: start, - EndTime: end, - SpanContext: &sc, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String("GET"), - semconv.URLPath("/foo/bar"), - semconv.HTTPResponseStatusCodeKey.Int(200), - semconv.NetworkPeerAddress("www.google.com"), - semconv.NetworkPeerPort(8080), - semconv.ServerAddress("localhost"), - semconv.ServerPort(8080), - semconv.NetworkProtocolName("FOO"), - semconv.NetworkProtocolVersion("2.2"), - }, - TracerSchema: semconv.SchemaURL, - }, - }, + expected: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.Scope().SetName("go.opentelemetry.io/auto/" + pkg) + ss.Scope().SetVersion(version) + ss.SetSchemaUrl(semconv.SchemaURL) + + span := ss.Spans().AppendEmpty() + span.SetName("GET") + span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset)) + span.SetTraceID(pcommon.TraceID(traceID)) + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String("GET"), + semconv.URLPath("/foo/bar"), + semconv.HTTPResponseStatusCodeKey.Int(200), + semconv.NetworkPeerAddress("www.google.com"), + semconv.NetworkPeerPort(8080), + semconv.ServerAddress("localhost"), + semconv.ServerPort(8080), + semconv.NetworkProtocolName("FOO"), + semconv.NetworkProtocolVersion("2.2"), + ) + + return ss + }(), }, { name: "server statuscode 400 doesn't set span.Status", @@ -140,25 +152,33 @@ func TestProbeConvertEvent(t *testing.T) { // "HTTP/1.1" Proto: [8]byte{0x48, 0x54, 0x54, 0x50, 0x2f, 0x31, 0x2e, 0x31}, }, - expected: []*probe.SpanEvent{ - { - SpanName: "GET", - StartTime: start, - EndTime: end, - SpanContext: &sc, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String("GET"), - semconv.URLPath("/foo/bar"), - semconv.HTTPResponseStatusCodeKey.Int(400), - semconv.NetworkPeerAddress("www.google.com"), - semconv.NetworkPeerPort(8080), - semconv.ServerAddress("localhost"), - semconv.ServerPort(8080), - semconv.NetworkProtocolVersion("1.1"), - }, - TracerSchema: semconv.SchemaURL, - }, - }, + expected: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.Scope().SetName("go.opentelemetry.io/auto/" + pkg) + ss.Scope().SetVersion(version) + ss.SetSchemaUrl(semconv.SchemaURL) + + span := ss.Spans().AppendEmpty() + span.SetName("GET") + span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset)) + span.SetTraceID(pcommon.TraceID(traceID)) + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String("GET"), + semconv.URLPath("/foo/bar"), + semconv.HTTPResponseStatusCodeKey.Int(400), + semconv.NetworkPeerAddress("www.google.com"), + semconv.NetworkPeerPort(8080), + semconv.ServerAddress("localhost"), + semconv.ServerPort(8080), + semconv.NetworkProtocolVersion("1.1"), + ) + + return ss + }(), }, { name: "server statuscode 500 sets span.Status", @@ -180,32 +200,40 @@ func TestProbeConvertEvent(t *testing.T) { // "HTTP/1.1" Proto: [8]byte{0x48, 0x54, 0x54, 0x50, 0x2f, 0x31, 0x2e, 0x31}, }, - expected: []*probe.SpanEvent{ - { - SpanName: "GET", - StartTime: start, - EndTime: end, - SpanContext: &sc, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String("GET"), - semconv.URLPath("/foo/bar"), - semconv.HTTPResponseStatusCodeKey.Int(500), - semconv.NetworkPeerAddress("www.google.com"), - semconv.NetworkPeerPort(8080), - semconv.ServerAddress("localhost"), - semconv.ServerPort(8080), - semconv.NetworkProtocolVersion("1.1"), - }, - Status: probe.Status{Code: codes.Error}, - TracerSchema: semconv.SchemaURL, - }, - }, + expected: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.Scope().SetName("go.opentelemetry.io/auto/" + pkg) + ss.Scope().SetVersion(version) + ss.SetSchemaUrl(semconv.SchemaURL) + + span := ss.Spans().AppendEmpty() + span.SetName("GET") + span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset)) + span.SetTraceID(pcommon.TraceID(traceID)) + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + span.Status().SetCode(ptrace.StatusCodeError) + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String("GET"), + semconv.URLPath("/foo/bar"), + semconv.HTTPResponseStatusCodeKey.Int(500), + semconv.NetworkPeerAddress("www.google.com"), + semconv.NetworkPeerPort(8080), + semconv.ServerAddress("localhost"), + semconv.ServerPort(8080), + semconv.NetworkProtocolVersion("1.1"), + ) + + return ss + }(), }, } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - out := convertEvent(tt.event) + out := processFn(pkg, version, semconv.SchemaURL)(tt.event) assert.Equal(t, tt.expected, out) }) } diff --git a/internal/pkg/instrumentation/manager.go b/internal/pkg/instrumentation/manager.go index c34a1364d..c086e3581 100644 --- a/internal/pkg/instrumentation/manager.go +++ b/internal/pkg/instrumentation/manager.go @@ -13,6 +13,7 @@ import ( "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/rlimit" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/trace" dbSql "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/database/sql" @@ -48,6 +49,7 @@ const ( // Manager handles the management of [probe.Probe] instances. type Manager struct { logger *slog.Logger + version string probes map[probe.ID]probe.Probe otelController *opentelemetry.Controller globalImpl bool @@ -56,22 +58,23 @@ type Manager struct { exe *link.Executable td *process.TargetDetails runningProbesWG sync.WaitGroup - eventCh chan *probe.Event + telemetryCh chan ptrace.ScopeSpans currentConfig Config probeMu sync.Mutex state managerState } // NewManager returns a new [Manager]. -func NewManager(logger *slog.Logger, otelController *opentelemetry.Controller, globalImpl bool, loadIndicator chan struct{}, cp ConfigProvider) (*Manager, error) { +func NewManager(logger *slog.Logger, otelController *opentelemetry.Controller, globalImpl bool, loadIndicator chan struct{}, cp ConfigProvider, version string) (*Manager, error) { m := &Manager{ logger: logger, + version: version, probes: make(map[probe.ID]probe.Probe), otelController: otelController, globalImpl: globalImpl, loadedIndicator: loadIndicator, cp: cp, - eventCh: make(chan *probe.Event), + telemetryCh: make(chan ptrace.ScopeSpans), } err := m.registerProbes() @@ -224,7 +227,7 @@ func (m *Manager) runProbe(p probe.Probe) { m.runningProbesWG.Add(1) go func(ap probe.Probe) { defer m.runningProbesWG.Done() - ap.Run(m.eventCh) + ap.Run(m.telemetryCh) }(p) } @@ -287,13 +290,13 @@ func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error // Wait for all probes to stop before closing the chan they send on. m.runningProbesWG.Wait() - close(m.eventCh) + close(m.telemetryCh) m.state = managerStateStopped m.probeMu.Unlock() return errors.Join(err, ctx.Err()) - case e := <-m.eventCh: - m.otelController.Trace(e) + case data := <-m.telemetryCh: + m.otelController.Trace(data) } } } @@ -360,29 +363,22 @@ func (m *Manager) cleanup(target *process.TargetDetails) error { return errors.Join(err, bpffsCleanup(target)) } -//nolint:revive // ignoring linter complaint about control flag -func availableProbes(l *slog.Logger, withTraceGlobal bool) []probe.Probe { +func (m *Manager) registerProbes() error { insts := []probe.Probe{ - grpcClient.New(l), - grpcServer.New(l), - httpServer.New(l), - httpClient.New(l), - dbSql.New(l), - kafkaProducer.New(l), - kafkaConsumer.New(l), - autosdk.New(l), + grpcClient.New(m.logger, m.version), + grpcServer.New(m.logger, m.version), + httpServer.New(m.logger, m.version), + httpClient.New(m.logger, m.version), + dbSql.New(m.logger, m.version), + kafkaProducer.New(m.logger, m.version), + kafkaConsumer.New(m.logger, m.version), + autosdk.New(m.logger, m.version), } - if withTraceGlobal { - insts = append(insts, otelTraceGlobal.New(l)) + if m.globalImpl { + insts = append(insts, otelTraceGlobal.New(m.logger, m.version)) } - return insts -} - -func (m *Manager) registerProbes() error { - insts := availableProbes(m.logger, m.globalImpl) - for _, i := range insts { err := m.registerProbe(i) if err != nil { diff --git a/internal/pkg/instrumentation/manager_test.go b/internal/pkg/instrumentation/manager_test.go index 1bbb0a72e..619dbe10a 100644 --- a/internal/pkg/instrumentation/manager_test.go +++ b/internal/pkg/instrumentation/manager_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" @@ -185,7 +186,7 @@ func TestDependencyChecks(t *testing.T) { } func fakeManager(t *testing.T) *Manager { - m, err := NewManager(slog.Default(), nil, true, nil, NewNoopConfigProvider(nil)) + m, err := NewManager(slog.Default(), nil, true, nil, NewNoopConfigProvider(nil), "") assert.NoError(t, err) assert.NotNil(t, m) @@ -226,14 +227,14 @@ func TestRunStopping(t *testing.T) { p := newSlowProbe(probeStop) tp := new(shutdownTracerProvider) - ctrl, err := opentelemetry.NewController(slog.Default(), tp, "") + ctrl, err := opentelemetry.NewController(slog.Default(), tp) require.NoError(t, err) m := &Manager{ otelController: ctrl, logger: slog.Default(), probes: map[probe.ID]probe.Probe{{}: p}, - eventCh: make(chan *probe.Event), + telemetryCh: make(chan ptrace.ScopeSpans), cp: NewNoopConfigProvider(nil), } @@ -286,7 +287,7 @@ func (p slowProbe) Load(*link.Executable, *process.TargetDetails, *sampling.Conf return nil } -func (p slowProbe) Run(c chan<- *probe.Event) { +func (p slowProbe) Run(c chan<- ptrace.ScopeSpans) { } func (p slowProbe) Close() error { @@ -306,7 +307,7 @@ func (p *noopProbe) Load(*link.Executable, *process.TargetDetails, *sampling.Con return nil } -func (p *noopProbe) Run(c chan<- *probe.Event) { +func (p *noopProbe) Run(c chan<- ptrace.ScopeSpans) { p.running = true } @@ -368,7 +369,7 @@ func TestConfigProvider(t *testing.T) { netHTTPServerProbeID: &noopProbe{}, somePackageProducerProbeID: &noopProbe{}, }, - eventCh: make(chan *probe.Event), + telemetryCh: make(chan ptrace.ScopeSpans), cp: newDummyProvider(Config{ InstrumentationLibraryConfigs: map[LibraryID]Library{ netHTTPClientLibID: {TracesEnabled: &falseVal}, diff --git a/internal/pkg/instrumentation/probe/event.go b/internal/pkg/instrumentation/probe/event.go deleted file mode 100644 index f2283d237..000000000 --- a/internal/pkg/instrumentation/probe/event.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package probe - -import ( - "time" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" -) - -// Event is a telemetry event that happens within an instrumented package. -type Event struct { - Package string - Kind trace.SpanKind - SpanEvents []*SpanEvent -} - -type Status struct { - Code codes.Code - Description string -} - -type SpanEvent struct { - SpanName string - Kind trace.SpanKind - Attributes []attribute.KeyValue - StartTime time.Time - EndTime time.Time - SpanContext *trace.SpanContext - ParentSpanContext *trace.SpanContext - Status Status - TracerName string - TracerVersion string - TracerSchema string - Events map[string][]trace.EventOption - Links []trace.Link -} diff --git a/internal/pkg/instrumentation/probe/probe.go b/internal/pkg/instrumentation/probe/probe.go index 52b78252a..7a327c765 100644 --- a/internal/pkg/instrumentation/probe/probe.go +++ b/internal/pkg/instrumentation/probe/probe.go @@ -18,6 +18,8 @@ import ( "github.com/cilium/ebpf/perf" "github.com/hashicorp/go-version" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/auto/internal/pkg/inject" "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpffs" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe/sampling" @@ -40,7 +42,7 @@ type Probe interface { Load(*link.Executable, *process.TargetDetails, *sampling.Config) error // Run runs the events processing loop. - Run(eventsChan chan<- *Event) + Run(tracesChan chan<- ptrace.ScopeSpans) // Close stops the Probe. Close() error @@ -68,7 +70,7 @@ type Base[BPFObj any, BPFEvent any] struct { // probe. SpecFn func() (*ebpf.CollectionSpec, error) // ProcessFn processes probe events into a uniform Event type. - ProcessFn func(*BPFEvent) []*SpanEvent + ProcessFn func(*BPFEvent) ptrace.ScopeSpans // ProcessRecord is an optional processing function for the probe. If nil, // all records will be read directly into a new BPFEvent using the // encoding/binary package. @@ -213,7 +215,7 @@ func (i *Base[BPFObj, BPFEvent]) buildEBPFCollection(td *process.TargetDetails, } // Run runs the events processing loop. -func (i *Base[BPFObj, BPFEvent]) Run(dest chan<- *Event) { +func (i *Base[BPFObj, BPFEvent]) Run(dest chan<- ptrace.ScopeSpans) { for { record, err := i.reader.Read() if err != nil { @@ -229,21 +231,16 @@ func (i *Base[BPFObj, BPFEvent]) Run(dest chan<- *Event) { continue } - se, err := i.processRecord(record) + data, err := i.processRecord(record) if err != nil { i.Logger.Error("failed to process perf record", "error", err, "pkg", i.ID.InstrumentedPkg) + continue } - e := &Event{ - Package: i.ID.InstrumentedPkg, - Kind: i.ID.SpanKind, - SpanEvents: se, - } - - dest <- e + dest <- data } } -func (i *Base[BPFObj, BPFEvent]) processRecord(record perf.Record) ([]*SpanEvent, error) { +func (i *Base[BPFObj, BPFEvent]) processRecord(record perf.Record) (ptrace.ScopeSpans, error) { var ( event BPFEvent err error @@ -257,7 +254,7 @@ func (i *Base[BPFObj, BPFEvent]) processRecord(record perf.Record) ([]*SpanEvent } if err != nil { - return nil, err + return ptrace.NewScopeSpans(), err } return i.ProcessFn(&event), nil diff --git a/internal/pkg/instrumentation/utils/attrs.go b/internal/pkg/instrumentation/utils/attrs.go new file mode 100644 index 000000000..757644a01 --- /dev/null +++ b/internal/pkg/instrumentation/utils/attrs.go @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package utils + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/otel/attribute" +) + +func Attributes(dest pcommon.Map, attrs ...attribute.KeyValue) { + for _, attr := range attrs { + setAttr(dest, attr) + } +} + +func setAttr(dest pcommon.Map, attr attribute.KeyValue) { + switch attr.Value.Type() { + case attribute.BOOL: + dest.PutBool(string(attr.Key), attr.Value.AsBool()) + case attribute.INT64: + dest.PutInt(string(attr.Key), attr.Value.AsInt64()) + case attribute.FLOAT64: + dest.PutDouble(string(attr.Key), attr.Value.AsFloat64()) + case attribute.STRING: + dest.PutStr(string(attr.Key), attr.Value.AsString()) + case attribute.BOOLSLICE: + s := dest.PutEmptySlice(string(attr.Key)) + for _, v := range attr.Value.AsBoolSlice() { + s.AppendEmpty().SetBool(v) + } + case attribute.INT64SLICE: + s := dest.PutEmptySlice(string(attr.Key)) + for _, v := range attr.Value.AsInt64Slice() { + s.AppendEmpty().SetInt(v) + } + case attribute.FLOAT64SLICE: + s := dest.PutEmptySlice(string(attr.Key)) + for _, v := range attr.Value.AsFloat64Slice() { + s.AppendEmpty().SetDouble(v) + } + case attribute.STRINGSLICE: + s := dest.PutEmptySlice(string(attr.Key)) + for _, v := range attr.Value.AsStringSlice() { + s.AppendEmpty().SetStr(v) + } + } +} diff --git a/internal/pkg/instrumentation/utils/kernel.go b/internal/pkg/instrumentation/utils/kernel.go index 021235fca..0e51026d9 100644 --- a/internal/pkg/instrumentation/utils/kernel.go +++ b/internal/pkg/instrumentation/utils/kernel.go @@ -6,6 +6,8 @@ package utils import ( "math" "time" + + "go.opentelemetry.io/collector/pdata/pcommon" ) var bootTimeOffset = func() int64 { @@ -16,6 +18,12 @@ var bootTimeOffset = func() int64 { return o }() +// BootOffsetToTimestamp returns the [pcommon.Timestamp] that is nsec number of +// nanoseconds after the estimated boot time of the system. +func BootOffsetToTimestamp(nsec uint64) pcommon.Timestamp { + return pcommon.NewTimestampFromTime(BootOffsetToTime(nsec)) +} + // BootOffsetToTime returns the timestamp that is nsec number of nanoseconds // after the estimated boot time of the system. func BootOffsetToTime(nsec uint64) time.Time { diff --git a/internal/pkg/opentelemetry/controller.go b/internal/pkg/opentelemetry/controller.go index a7a6dcdfa..42c0d9e0e 100644 --- a/internal/pkg/opentelemetry/controller.go +++ b/internal/pkg/opentelemetry/controller.go @@ -5,94 +5,96 @@ package opentelemetry import ( "context" + "fmt" "log/slog" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" - - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" ) // Controller handles OpenTelemetry telemetry generation for events. type Controller struct { logger *slog.Logger - version string tracerProvider trace.TracerProvider tracersMap map[tracerID]trace.Tracer } type tracerID struct{ name, version, schema string } -func (c *Controller) getTracer(pkg, tracerName, version, schema string) trace.Tracer { - // Default Tracer ID, if the user does not provide one. - tID := tracerID{name: pkg, version: c.version} - if tracerName != "" { - tID = tracerID{name: tracerName, version: version, schema: schema} - } - +func (c *Controller) getTracer(name, version, schema string) trace.Tracer { + tID := tracerID{name: name, version: version, schema: schema} t, exists := c.tracersMap[tID] - if exists { - return t - } - - var newTracer trace.Tracer - if tracerName != "" { - // If the user has provided a tracer, use it. - newTracer = c.tracerProvider.Tracer(tracerName, trace.WithInstrumentationVersion(version), trace.WithSchemaURL(schema)) - } else { - newTracer = c.tracerProvider.Tracer( - "go.opentelemetry.io/auto/"+pkg, - trace.WithInstrumentationVersion(c.version), - trace.WithSchemaURL(schema), - ) + if !exists { + t = c.tracerProvider.Tracer(name, trace.WithInstrumentationVersion(version), trace.WithSchemaURL(schema)) + c.tracersMap[tID] = t } - - c.tracersMap[tID] = newTracer - return newTracer + return t } // Trace creates a trace span for event. -func (c *Controller) Trace(event *probe.Event) { - for _, se := range event.SpanEvents { - c.logger.Debug("got event", "kind", event.Kind.String(), "pkg", event.Package, "attrs", se.Attributes, "traceID", se.SpanContext.TraceID().String(), "spanID", se.SpanContext.SpanID().String()) - ctx := context.Background() +func (c *Controller) Trace(ss ptrace.ScopeSpans) { + var ( + startOpts []trace.SpanStartOption + eventOpts []trace.EventOption + endOpts []trace.SpanEndOption + kvs []attribute.KeyValue + ) - if se.SpanContext == nil { - c.logger.Debug("got event without context - dropping") - return - } + t := c.getTracer(ss.Scope().Name(), ss.Scope().Version(), ss.SchemaUrl()) + for k := 0; k < ss.Spans().Len(); k++ { + pSpan := ss.Spans().At(k) - // TODO: handle remote parent - if se.ParentSpanContext != nil { - ctx = trace.ContextWithSpanContext(ctx, *se.ParentSpanContext) + if pSpan.TraceID().IsEmpty() || pSpan.SpanID().IsEmpty() { + c.logger.Debug("dropping invalid span", "name", pSpan.Name()) + continue } + c.logger.Debug("handling span", "tracer", t, "span", pSpan) - kind := se.Kind - if kind == trace.SpanKindUnspecified { - kind = event.Kind + ctx := context.Background() + if !pSpan.ParentSpanID().IsEmpty() { + psc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID(pSpan.TraceID()), + SpanID: trace.SpanID(pSpan.ParentSpanID()), + }) + ctx = trace.ContextWithSpanContext(ctx, psc) } + ctx = ContextWithSpan(ctx, pSpan) + + kvs = appendAttrs(kvs, pSpan.Attributes()) + startOpts = append( + startOpts, + trace.WithAttributes(kvs...), + trace.WithSpanKind(spanKind(pSpan.Kind())), + trace.WithTimestamp(pSpan.StartTimestamp().AsTime()), + trace.WithLinks(c.links(pSpan.Links())...), + ) + _, span := t.Start(ctx, pSpan.Name(), startOpts...) + startOpts = startOpts[:0] + kvs = kvs[:0] - ctx = ContextWithEBPFEvent(ctx, *se) - c.logger.Debug("getting tracer", "name", se.TracerName, "version", se.TracerVersion, "schema", se.TracerSchema) - _, span := c.getTracer(event.Package, se.TracerName, se.TracerVersion, se.TracerSchema). - Start(ctx, se.SpanName, - trace.WithAttributes(se.Attributes...), - trace.WithSpanKind(kind), - trace.WithTimestamp(se.StartTime), - trace.WithLinks(se.Links...), - ) - for name, opts := range se.Events { - span.AddEvent(name, opts...) + for l := 0; l < pSpan.Events().Len(); l++ { + e := pSpan.Events().At(l) + eventOpts = appendEventOpts(eventOpts, e) + span.AddEvent(e.Name(), eventOpts...) + eventOpts = eventOpts[:0] } - span.SetStatus(se.Status.Code, se.Status.Description) - span.End(trace.WithTimestamp(se.EndTime)) + + c, msg := status(pSpan.Status()) + span.SetStatus(c, msg) + + endOpts = append(endOpts, trace.WithTimestamp(pSpan.EndTimestamp().AsTime())) + span.End(endOpts...) + endOpts = endOpts[:0] } } // NewController returns a new initialized [Controller]. -func NewController(logger *slog.Logger, tracerProvider trace.TracerProvider, ver string) (*Controller, error) { +func NewController(logger *slog.Logger, tracerProvider trace.TracerProvider) (*Controller, error) { return &Controller{ logger: logger, - version: ver, tracerProvider: tracerProvider, tracersMap: make(map[tracerID]trace.Tracer), }, nil @@ -110,3 +112,153 @@ func (c *Controller) Shutdown(ctx context.Context) error { } return nil } + +func appendAttrs(dest []attribute.KeyValue, m pcommon.Map) []attribute.KeyValue { + m.Range(func(k string, v pcommon.Value) bool { + dest = append(dest, attr(k, v)) + return true + }) + return dest +} + +func attr(k string, v pcommon.Value) attribute.KeyValue { + return attribute.KeyValue{ + Key: attribute.Key(k), + Value: val(v), + } +} + +func val(val pcommon.Value) (out attribute.Value) { + switch val.Type() { + case pcommon.ValueTypeEmpty: + case pcommon.ValueTypeStr: + out = attribute.StringValue(val.AsString()) + case pcommon.ValueTypeInt: + out = attribute.Int64Value(val.Int()) + case pcommon.ValueTypeDouble: + out = attribute.Float64Value(val.Double()) + case pcommon.ValueTypeBool: + out = attribute.BoolValue(val.Bool()) + case pcommon.ValueTypeSlice: + s := val.Slice() + if s.Len() == 0 { + // Undetectable slice type. + out = attribute.StringValue("") + return out + } + + // Validate homogeneity before allocating. + t := s.At(0).Type() + for i := 1; i < s.Len(); i++ { + if s.At(i).Type() != t { + out = attribute.StringValue("") + return out + } + } + + switch t { + case pcommon.ValueTypeBool: + v := make([]bool, s.Len()) + for i := 0; i < s.Len(); i++ { + v[i] = s.At(i).Bool() + } + out = attribute.BoolSliceValue(v) + case pcommon.ValueTypeStr: + v := make([]string, s.Len()) + for i := 0; i < s.Len(); i++ { + v[i] = s.At(i).Str() + } + out = attribute.StringSliceValue(v) + case pcommon.ValueTypeInt: + v := make([]int64, s.Len()) + for i := 0; i < s.Len(); i++ { + v[i] = s.At(i).Int() + } + out = attribute.Int64SliceValue(v) + case pcommon.ValueTypeDouble: + v := make([]float64, s.Len()) + for i := 0; i < s.Len(); i++ { + v[i] = s.At(i).Double() + } + out = attribute.Float64SliceValue(v) + default: + out = attribute.StringValue(fmt.Sprintf("", t.String())) + } + default: + out = attribute.StringValue(fmt.Sprintf("", val.AsRaw())) + } + return out +} + +func spanKind(kind ptrace.SpanKind) trace.SpanKind { + switch kind { + case ptrace.SpanKindInternal: + return trace.SpanKindInternal + case ptrace.SpanKindServer: + return trace.SpanKindServer + case ptrace.SpanKindClient: + return trace.SpanKindClient + case ptrace.SpanKindProducer: + return trace.SpanKindProducer + case ptrace.SpanKindConsumer: + return trace.SpanKindConsumer + default: + return trace.SpanKindUnspecified + } +} + +func appendEventOpts(dest []trace.EventOption, e ptrace.SpanEvent) []trace.EventOption { + ts := e.Timestamp().AsTime() + if !ts.IsZero() { + dest = append(dest, trace.WithTimestamp(ts)) + } + + var kvs []attribute.KeyValue + kvs = appendAttrs(kvs, e.Attributes()) + if len(kvs) > 0 { + dest = append(dest, trace.WithAttributes(kvs...)) + } + return dest +} + +func (c *Controller) links(links ptrace.SpanLinkSlice) []trace.Link { + n := links.Len() + if n == 0 { + return nil + } + + out := make([]trace.Link, n) + for i := range out { + l := links.At(i) + + raw := l.TraceState().AsRaw() + ts, err := trace.ParseTraceState(raw) + if err != nil { + c.logger.Error("failed to parse link tracestate", "error", err, "tracestate", raw) + } + + out[i] = trace.Link{ + SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID(l.TraceID()), + SpanID: trace.SpanID(l.SpanID()), + TraceFlags: trace.TraceFlags(l.Flags()), + TraceState: ts, + }), + } + out[i].Attributes = appendAttrs(out[i].Attributes, l.Attributes()) + } + return out +} + +func status(stat ptrace.Status) (codes.Code, string) { + var c codes.Code + switch stat.Code() { + case ptrace.StatusCodeUnset: + c = codes.Unset + case ptrace.StatusCodeOk: + c = codes.Ok + case ptrace.StatusCodeError: + c = codes.Error + } + return c, stat.Message() +} diff --git a/internal/pkg/opentelemetry/controller_test.go b/internal/pkg/opentelemetry/controller_test.go index dd7d3880e..9a3a7bd3c 100644 --- a/internal/pkg/opentelemetry/controller_test.go +++ b/internal/pkg/opentelemetry/controller_test.go @@ -17,6 +17,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/sdk/instrumentation" @@ -26,7 +28,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" ) // copied from instrumentation.go. @@ -57,8 +59,8 @@ func instResource() *resource.Resource { } func TestTrace(t *testing.T) { - startTime := time.Now() - endTime := startTime.Add(1 * time.Second) + startTime := time.Unix(0, 0).UTC() + endTime := time.Unix(1, 0).UTC() exporter := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( @@ -71,41 +73,40 @@ func TestTrace(t *testing.T) { assert.NoError(t, err) }() - ctrl, err := NewController(slog.Default(), tp, "test") + ctrl, err := NewController(slog.Default(), tp) assert.NoError(t, err) spId, err := trace.SpanIDFromHex("00f067aa0ba902b7") assert.NoError(t, err) trId, err := trace.TraceIDFromHex("00f067aa0ba902b700f067aa0ba902b7") assert.NoError(t, err) - spanContext := trace.NewSpanContext( - trace.SpanContextConfig{ - SpanID: spId, - TraceID: trId, - TraceFlags: 1, - }, - ) testCases := []struct { name string - event *probe.Event + traces ptrace.ScopeSpans expected tracetest.SpanStubs }{ { name: "basic test span", - event: &probe.Event{ - Package: "foo/bar", - Kind: trace.SpanKindClient, - SpanEvents: []*probe.SpanEvent{ - { - SpanName: "testSpan", - StartTime: startTime, - EndTime: endTime, - SpanContext: &spanContext, - TracerSchema: semconv.SchemaURL, - }, - }, - }, + traces: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.SetSchemaUrl(semconv.SchemaURL) + + scope := ss.Scope() + scope.SetName("go.opentelemetry.io/auto/foo/bar") + scope.SetVersion("test") + + span := ss.Spans().AppendEmpty() + span.SetName("testSpan") + span.SetTraceID(pcommon.TraceID(trId)) + span.SetSpanID(pcommon.SpanID(spId)) + span.SetFlags(1) + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + + return ss + }(), expected: tracetest.SpanStubs{ { Name: "testSpan", @@ -128,25 +129,33 @@ func TestTrace(t *testing.T) { }, { name: "http/client", - event: &probe.Event{ - Package: "net/http", - Kind: trace.SpanKindClient, - SpanEvents: []*probe.SpanEvent{ - { - SpanName: "GET", - StartTime: startTime, - EndTime: endTime, - SpanContext: &spanContext, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String("GET"), - semconv.URLPath("/"), - semconv.HTTPResponseStatusCodeKey.Int(200), - semconv.ServerAddress("https://google.com"), - semconv.ServerPort(8080), - }, - }, - }, - }, + traces: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + + scope := ss.Scope() + scope.SetName("go.opentelemetry.io/auto/net/http") + scope.SetVersion("test") + + span := ss.Spans().AppendEmpty() + span.SetName("GET") + span.SetTraceID(pcommon.TraceID(trId)) + span.SetSpanID(pcommon.SpanID(spId)) + span.SetFlags(1) + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String("GET"), + semconv.URLPath("/"), + semconv.HTTPResponseStatusCodeKey.Int(200), + semconv.ServerAddress("https://google.com"), + semconv.ServerPort(8080), + ) + + return ss + }(), expected: tracetest.SpanStubs{ { Name: "GET", @@ -174,26 +183,34 @@ func TestTrace(t *testing.T) { }, { name: "http/client with status code", - event: &probe.Event{ - Package: "net/http", - Kind: trace.SpanKindClient, - SpanEvents: []*probe.SpanEvent{ - { - SpanName: "GET", - StartTime: startTime, - EndTime: endTime, - SpanContext: &spanContext, - Attributes: []attribute.KeyValue{ - semconv.HTTPRequestMethodKey.String("GET"), - semconv.URLPath("/"), - semconv.HTTPResponseStatusCodeKey.Int(500), - semconv.ServerAddress("https://google.com"), - semconv.ServerPort(8080), - }, - Status: probe.Status{Code: codes.Error}, - }, - }, - }, + traces: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + + scope := ss.Scope() + scope.SetName("go.opentelemetry.io/auto/net/http") + scope.SetVersion("test") + + span := ss.Spans().AppendEmpty() + span.SetName("GET") + span.SetTraceID(pcommon.TraceID(trId)) + span.SetSpanID(pcommon.SpanID(spId)) + span.SetFlags(1) + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + span.Status().SetCode(ptrace.StatusCodeError) + + utils.Attributes( + span.Attributes(), + semconv.HTTPRequestMethodKey.String("GET"), + semconv.URLPath("/"), + semconv.HTTPResponseStatusCodeKey.Int(500), + semconv.ServerAddress("https://google.com"), + semconv.ServerPort(8080), + ) + + return ss + }(), expected: tracetest.SpanStubs{ { Name: "GET", @@ -222,27 +239,35 @@ func TestTrace(t *testing.T) { }, { name: "otelglobal", - event: &probe.Event{ - Kind: trace.SpanKindClient, - SpanEvents: []*probe.SpanEvent{ - { - SpanName: "very important span", - StartTime: startTime, - EndTime: endTime, - SpanContext: &spanContext, - Attributes: []attribute.KeyValue{ - attribute.Int64("int.value", 42), - attribute.String("string.value", "hello"), - attribute.Float64("float.value", 3.14), - attribute.Bool("bool.value", true), - }, - Status: probe.Status{Code: codes.Error, Description: "error description"}, - TracerName: "user-tracer", - TracerVersion: "v1", - TracerSchema: "user-schema", - }, - }, - }, + traces: func() ptrace.ScopeSpans { + ss := ptrace.NewScopeSpans() + ss.SetSchemaUrl("user-schema") + + scope := ss.Scope() + scope.SetName("user-tracer") + scope.SetVersion("v1") + + span := ss.Spans().AppendEmpty() + span.SetName("very important span") + span.SetTraceID(pcommon.TraceID(trId)) + span.SetSpanID(pcommon.SpanID(spId)) + span.SetFlags(1) + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + span.Status().SetCode(ptrace.StatusCodeError) + span.Status().SetMessage("error description") + + utils.Attributes( + span.Attributes(), + attribute.Int64("int.value", 42), + attribute.String("string.value", "hello"), + attribute.Float64("float.value", 3.14), + attribute.Bool("bool.value", true), + ) + + return ss + }(), expected: tracetest.SpanStubs{ { Name: "very important span", @@ -275,7 +300,7 @@ func TestTrace(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { defer exporter.Reset() - ctrl.Trace(tt.event) + ctrl.Trace(tt.traces) tp.ForceFlush(context.Background()) spans := exporter.GetSpans() assert.Equal(t, len(tt.expected), len(spans)) @@ -301,20 +326,19 @@ func TestGetTracer(t *testing.T) { assert.NoError(t, err) }() - ctrl, err := NewController(slog.Default(), tp, "test") + ctrl, err := NewController(slog.Default(), tp) assert.NoError(t, err) - t1 := ctrl.getTracer("foo/bar", "test", "v1", "schema") + t1 := ctrl.getTracer("test", "v1", "schema") assert.Equal(t, t1, ctrl.tracersMap[tracerID{name: "test", version: "v1", schema: "schema"}]) - assert.Nil(t, ctrl.tracersMap[tracerID{name: "foo/bar", version: "v1", schema: "schema"}]) - t2 := ctrl.getTracer("net/http", "", "", "") - assert.Equal(t, t2, ctrl.tracersMap[tracerID{name: "net/http", version: ctrl.version, schema: ""}]) + t2 := ctrl.getTracer("net/http", "", "") + assert.Equal(t, t2, ctrl.tracersMap[tracerID{name: "net/http", version: "", schema: ""}]) - t3 := ctrl.getTracer("foo/bar", "test", "v1", "schema") + t3 := ctrl.getTracer("test", "v1", "schema") assert.Same(t, t1, t3) - t4 := ctrl.getTracer("net/http", "", "", "") + t4 := ctrl.getTracer("net/http", "", "") assert.Same(t, t2, t4) assert.Equal(t, len(ctrl.tracersMap), 2) } @@ -352,7 +376,7 @@ func TestShutdown(t *testing.T) { tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(batcher)) - ctrl, err := NewController(slog.Default(), tp, "test") + ctrl, err := NewController(slog.Default(), tp) require.NoError(t, err) ctx := context.Background() diff --git a/internal/pkg/opentelemetry/id_generator.go b/internal/pkg/opentelemetry/id_generator.go index d13199dd9..b6d9f13d2 100644 --- a/internal/pkg/opentelemetry/id_generator.go +++ b/internal/pkg/opentelemetry/id_generator.go @@ -6,9 +6,8 @@ package opentelemetry import ( "context" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/trace" - - "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" ) type EBPFSourceIDGenerator struct{} @@ -19,40 +18,31 @@ func NewEBPFSourceIDGenerator() *EBPFSourceIDGenerator { return &EBPFSourceIDGenerator{} } -// ContextWithEBPFEvent returns a copy of parent in which event is stored. -func ContextWithEBPFEvent(parent context.Context, event probe.SpanEvent) context.Context { - return context.WithValue(parent, eBPFEventKey{}, event) +// ContextWithSpan returns a copy of parent in which span is stored. +func ContextWithSpan(parent context.Context, span ptrace.Span) context.Context { + return context.WithValue(parent, eBPFEventKey{}, span) } -// EventFromContext returns the event within ctx if one exists. -func EventFromContext(ctx context.Context) *probe.SpanEvent { +// SpanFromContext returns the Span within ctx if one exists. +func SpanFromContext(ctx context.Context) ptrace.Span { val := ctx.Value(eBPFEventKey{}) if val == nil { - return nil - } - - event, ok := val.(probe.SpanEvent) - if !ok { - return nil + return ptrace.NewSpan() } - return &event + s, _ := val.(ptrace.Span) + return s } func (e *EBPFSourceIDGenerator) NewIDs(ctx context.Context) (trace.TraceID, trace.SpanID) { - event := EventFromContext(ctx) - if event == nil || event.SpanContext == nil { + s := SpanFromContext(ctx) + if s.TraceID().IsEmpty() || s.SpanID().IsEmpty() { return trace.TraceID{}, trace.SpanID{} } - return event.SpanContext.TraceID(), event.SpanContext.SpanID() + return trace.TraceID(s.TraceID()), trace.SpanID(s.SpanID()) } func (e *EBPFSourceIDGenerator) NewSpanID(ctx context.Context, traceID trace.TraceID) trace.SpanID { - event := EventFromContext(ctx) - if event == nil { - return trace.SpanID{} - } - - return event.SpanContext.SpanID() + return trace.SpanID(SpanFromContext(ctx).SpanID()) }