Skip to content

Commit

Permalink
Replace probe.Event with ptrace.ScopeSpans
Browse files Browse the repository at this point in the history
Use the pdata module as the data-model for the project.
  • Loading branch information
MrAlias committed Oct 21, 2024
1 parent 8b55ec1 commit 3454870
Show file tree
Hide file tree
Showing 25 changed files with 1,251 additions and 1,199 deletions.
4 changes: 2 additions & 2 deletions instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In
return nil, err
}

ctrl, err := opentelemetry.NewController(c.logger, c.tracerProvider(pa.BuildInfo), Version())
ctrl, err := opentelemetry.NewController(c.logger, c.tracerProvider(pa.BuildInfo))
if err != nil {
return nil, err
}

cp := convertConfigProvider(c.cp)
mngr, err := instrumentation.NewManager(c.logger, ctrl, c.globalImpl, c.loadIndicator, cp)
mngr, err := instrumentation.NewManager(c.logger, ctrl, c.globalImpl, c.loadIndicator, cp, Version())
if err != nil {
return nil, err
}
Expand Down
65 changes: 29 additions & 36 deletions internal/pkg/instrumentation/bpf/database/sql/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"os"
"strconv"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sys/unix"
Expand All @@ -29,7 +30,7 @@ const (
)

// New returns a new [probe.Probe].
func New(logger *slog.Logger) probe.Probe {
func New(logger *slog.Logger, version string) probe.Probe {
id := probe.ID{
SpanKind: trace.SpanKindClient,
InstrumentedPkg: pkg,
Expand Down Expand Up @@ -61,7 +62,7 @@ func New(logger *slog.Logger) probe.Probe {
},

SpecFn: loadBpf,
ProcessFn: convertEvent,
ProcessFn: processFn(pkg, version, semconv.SchemaURL),
}
}

Expand All @@ -72,40 +73,32 @@ type event struct {
Query [256]byte
}

func convertEvent(e *event) []*probe.SpanEvent {
query := unix.ByteSliceToString(e.Query[:])

sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: e.SpanContext.TraceID,
SpanID: e.SpanContext.SpanID,
TraceFlags: trace.FlagsSampled,
})

var pscPtr *trace.SpanContext
if e.ParentSpanContext.TraceID.IsValid() {
psc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: e.ParentSpanContext.TraceID,
SpanID: e.ParentSpanContext.SpanID,
TraceFlags: trace.FlagsSampled,
Remote: true,
})
pscPtr = &psc
} else {
pscPtr = nil
}
func processFn(pkg, ver, schemaURL string) func(*event) ptrace.ScopeSpans {
scopeName := "go.opentelemetry.io/auto/" + pkg
return func(e *event) ptrace.ScopeSpans {
ss := ptrace.NewScopeSpans()

scope := ss.Scope()
scope.SetName(scopeName)
scope.SetVersion(ver)
ss.SetSchemaUrl(schemaURL)

span := ss.Spans().AppendEmpty()
span.SetName("DB")
span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime))
span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime))
span.SetTraceID(pcommon.TraceID(e.SpanContext.TraceID))
span.SetSpanID(pcommon.SpanID(e.SpanContext.SpanID))
span.SetFlags(uint32(trace.FlagsSampled))

if e.ParentSpanContext.SpanID.IsValid() {
span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID))
}

return []*probe.SpanEvent{
{
SpanName: "DB",
StartTime: utils.BootOffsetToTime(e.StartTime),
EndTime: utils.BootOffsetToTime(e.EndTime),
SpanContext: &sc,
Attributes: []attribute.KeyValue{
semconv.DBQueryText(query),
},
ParentSpanContext: pscPtr,
TracerSchema: semconv.SchemaURL,
},
query := unix.ByteSliceToString(e.Query[:])
span.Attributes().PutStr(string(semconv.DBQueryTextKey), query)

return ss
}
}

Expand Down
40 changes: 21 additions & 19 deletions internal/pkg/instrumentation/bpf/database/sql/probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/auto/internal/pkg/instrumentation/context"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/utils"
)

Expand All @@ -28,7 +28,8 @@ func TestProbeConvertEvent(t *testing.T) {
traceID := trace.TraceID{1}
spanID := trace.SpanID{1}

got := convertEvent(&event{
const ver = "v1"
got := processFn(pkg, ver, semconv.SchemaURL)(&event{
BaseSpanProperties: context.BaseSpanProperties{
StartTime: startOffset,
EndTime: endOffset,
Expand All @@ -38,20 +39,21 @@ func TestProbeConvertEvent(t *testing.T) {
Query: [256]byte{0x53, 0x45, 0x4c, 0x45, 0x43, 0x54, 0x20, 0x2a, 0x20, 0x46, 0x52, 0x4f, 0x4d, 0x20, 0x66, 0x6f, 0x6f},
})

sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
TraceFlags: trace.FlagsSampled,
})
want := &probe.SpanEvent{
SpanName: "DB",
StartTime: start,
EndTime: end,
SpanContext: &sc,
Attributes: []attribute.KeyValue{
semconv.DBQueryText("SELECT * FROM foo"),
},
TracerSchema: semconv.SchemaURL,
}
assert.Equal(t, want, got[0])
want := func() ptrace.ScopeSpans {
ss := ptrace.NewScopeSpans()
ss.Scope().SetName("go.opentelemetry.io/auto/" + pkg)
ss.Scope().SetVersion(ver)
ss.SetSchemaUrl(semconv.SchemaURL)

span := ss.Spans().AppendEmpty()
span.SetName("DB")
span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset))
span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset))
span.SetTraceID(pcommon.TraceID(traceID))
span.SetSpanID(pcommon.SpanID(spanID))
span.SetFlags(uint32(trace.FlagsSampled))
utils.Attributes(span.Attributes(), semconv.DBQueryText("SELECT * FROM foo"))
return ss
}()
assert.Equal(t, want, got)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"log/slog"
"strconv"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sys/unix"
Expand All @@ -27,7 +28,7 @@ const (
)

// New returns a new [probe.Probe].
func New(logger *slog.Logger) probe.Probe {
func New(logger *slog.Logger, version string) probe.Probe {
id := probe.ID{
SpanKind: trace.SpanKindConsumer,
InstrumentedPkg: pkg,
Expand Down Expand Up @@ -75,7 +76,7 @@ func New(logger *slog.Logger) probe.Probe {
},
},
SpecFn: loadBpf,
ProcessFn: convertEvent,
ProcessFn: processFn(pkg, version, semconv.SchemaURL),
}
}

Expand All @@ -89,47 +90,43 @@ type event struct {
Partition int64
}

func convertEvent(e *event) []*probe.SpanEvent {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: e.SpanContext.TraceID,
SpanID: e.SpanContext.SpanID,
TraceFlags: trace.FlagsSampled,
})

var pscPtr *trace.SpanContext
if e.ParentSpanContext.TraceID.IsValid() {
psc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: e.ParentSpanContext.TraceID,
SpanID: e.ParentSpanContext.SpanID,
TraceFlags: trace.FlagsSampled,
Remote: true,
})
pscPtr = &psc
} else {
pscPtr = nil
}
func processFn(pkg, ver, schemaURL string) func(*event) ptrace.ScopeSpans {
scopeName := "go.opentelemetry.io/auto/" + pkg
return func(e *event) ptrace.ScopeSpans {
ss := ptrace.NewScopeSpans()

topic := unix.ByteSliceToString(e.Topic[:])
scope := ss.Scope()
scope.SetName(scopeName)
scope.SetVersion(ver)
ss.SetSchemaUrl(schemaURL)

attributes := []attribute.KeyValue{
semconv.MessagingSystemKafka,
semconv.MessagingOperationTypeReceive,
semconv.MessagingDestinationPartitionID(strconv.Itoa(int(e.Partition))),
semconv.MessagingDestinationName(topic),
semconv.MessagingKafkaMessageOffset(int(e.Offset)),
semconv.MessagingKafkaMessageKey(unix.ByteSliceToString(e.Key[:])),
semconv.MessagingKafkaConsumerGroup(unix.ByteSliceToString(e.ConsumerGroup[:])),
}
return []*probe.SpanEvent{
{
SpanName: kafkaConsumerSpanName(topic),
StartTime: utils.BootOffsetToTime(e.StartTime),
EndTime: utils.BootOffsetToTime(e.EndTime),
SpanContext: &sc,
ParentSpanContext: pscPtr,
Attributes: attributes,
TracerSchema: semconv.SchemaURL,
},
span := ss.Spans().AppendEmpty()

topic := unix.ByteSliceToString(e.Topic[:])
span.SetName(kafkaConsumerSpanName(topic))

span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime))
span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime))
span.SetTraceID(pcommon.TraceID(e.SpanContext.TraceID))
span.SetSpanID(pcommon.SpanID(e.SpanContext.SpanID))
span.SetFlags(uint32(trace.FlagsSampled))

if e.ParentSpanContext.SpanID.IsValid() {
span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID))
}

utils.Attributes(
span.Attributes(),
semconv.MessagingSystemKafka,
semconv.MessagingOperationTypeReceive,
semconv.MessagingDestinationPartitionID(strconv.Itoa(int(e.Partition))),
semconv.MessagingDestinationName(topic),
semconv.MessagingKafkaMessageOffsetKey.Int64(e.Offset),
semconv.MessagingKafkaMessageKey(unix.ByteSliceToString(e.Key[:])),
semconv.MessagingKafkaConsumerGroup(unix.ByteSliceToString(e.ConsumerGroup[:])),
)

return ss
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/auto/internal/pkg/instrumentation/context"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/utils"
)

Expand All @@ -28,7 +28,8 @@ func TestProbeConvertEvent(t *testing.T) {
traceID := trace.TraceID{1}
spanID := trace.SpanID{1}

got := convertEvent(&event{
const ver = "v1"
got := processFn(pkg, ver, semconv.SchemaURL)(&event{
BaseSpanProperties: context.BaseSpanProperties{
StartTime: startOffset,
EndTime: endOffset,
Expand All @@ -44,26 +45,31 @@ func TestProbeConvertEvent(t *testing.T) {
Partition: 12,
})

sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
TraceFlags: trace.FlagsSampled,
})
want := &probe.SpanEvent{
SpanName: kafkaConsumerSpanName("topic1"),
StartTime: start,
EndTime: end,
SpanContext: &sc,
Attributes: []attribute.KeyValue{
want := func() ptrace.ScopeSpans {
ss := ptrace.NewScopeSpans()

ss.Scope().SetName("go.opentelemetry.io/auto/" + pkg)
ss.Scope().SetVersion(ver)
ss.SetSchemaUrl(semconv.SchemaURL)

span := ss.Spans().AppendEmpty()
span.SetName(kafkaConsumerSpanName("topic1"))
span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset))
span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset))
span.SetTraceID(pcommon.TraceID(traceID))
span.SetSpanID(pcommon.SpanID(spanID))
span.SetFlags(uint32(trace.FlagsSampled))
utils.Attributes(
span.Attributes(),
semconv.MessagingSystemKafka,
semconv.MessagingOperationTypeReceive,
semconv.MessagingDestinationPartitionID("12"),
semconv.MessagingDestinationName("topic1"),
semconv.MessagingKafkaMessageOffset(42),
semconv.MessagingKafkaMessageKey("key1"),
semconv.MessagingKafkaConsumerGroup("test consumer group"),
},
TracerSchema: semconv.SchemaURL,
}
assert.Equal(t, want, got[0])
)
return ss
}()
assert.Equal(t, want, got)
}
Loading

0 comments on commit 3454870

Please sign in to comment.