Skip to content

Commit

Permalink
Populate Jaeger's Span.Process from Resource (#1673)
Browse files Browse the repository at this point in the history
* Jaeger exporter now populate Jaeger's Span Process from Resource

* Remove jaeger.WithProcess

* Fix tests

* Change the type of default service name into string

* Add tests

* Update CHANGELOG

* Use the API from `Set` to fetch service name in exporter

* Fix nits

* Add more test cases for jaegerBatchList function

* precommit

Co-authored-by: Anthony Mirabella <[email protected]>
  • Loading branch information
XSAM and Aneurysm9 committed Mar 16, 2021
1 parent 28eaaa9 commit 62cbf0f
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 90 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Added `Marshaler` config option to `otlphttp` to enable otlp over json or protobufs. (#1586)
- A `ForceFlush` method to the `"go.opentelemetry.io/otel/sdk/trace".TracerProvider` to flush all registered `SpanProcessor`s. (#1608)
- Added `WithDefaultSampler` and `WithSpanLimits` to tracer provider. (#1633)
- Jaeger exporter falls back to `resource.Default`'s `service.name` if the exported Span does not have one. (#1673)

### Changed

Expand All @@ -25,6 +26,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `trace.SpanContext` is now immutable and has no exported fields. (#1573)
- `trace.NewSpanContext()` can be used in conjunction with the `trace.SpanContextConfig` struct to initialize a new `SpanContext` where all values are known.
- Renamed the `LabelSet` method of `"go.opentelemetry.io/otel/sdk/resource".Resource` to `Set`. (#1692)
- Jaeger exporter populates Jaeger's Span Process from Resource. (#1673)

### Removed

Expand All @@ -33,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
These are now returned as a SpanProcessor interface from their respective constructors. (#1638)
- Removed setting status to `Error` while recording an error as a span event in `RecordError`. (#1663)
- Removed `WithConfig` from tracer provider to avoid overriding configuration. (#1633)
- Removed `jaeger.WithProcess`. (#1673)

### Fixed

Expand Down
1 change: 0 additions & 1 deletion bridge/opencensus/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
Expand Down
13 changes: 8 additions & 5 deletions example/jaeger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"log"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/trace/jaeger"
Expand All @@ -32,14 +34,14 @@ func initTracer() func() {
// Create and install Jaeger export pipeline.
flush, err := jaeger.InstallNewPipeline(
jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"),
jaeger.WithProcess(jaeger.Process{
ServiceName: "trace-demo",
Tags: []attribute.KeyValue{
jaeger.WithSDK(&sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
Resource: resource.NewWithAttributes(
semconv.ServiceNameKey.String("trace-demo"),
attribute.String("exporter", "jaeger"),
attribute.Float64("float", 312.23),
},
),
}),
jaeger.WithSDK(&sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
if err != nil {
log.Fatal(err)
Expand All @@ -63,6 +65,7 @@ func main() {
func bar(ctx context.Context) {
tr := otel.Tracer("component-bar")
_, span := tr.Start(ctx, "bar")
span.SetAttributes(attribute.Key("testset").String("value"))
defer span.End()

// Do bar...
Expand Down
8 changes: 4 additions & 4 deletions exporters/trace/jaeger/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ func TestNewRawExporterWithEnv(t *testing.T) {

assert.NoError(t, err)
assert.Equal(t, false, exp.o.Disabled)
assert.EqualValues(t, serviceName, exp.process.ServiceName)
assert.Len(t, exp.process.Tags, 1)
assert.EqualValues(t, serviceName, exp.o.Process.ServiceName)
assert.Len(t, exp.o.Process.Tags, 1)

require.IsType(t, &collectorUploader{}, exp.uploader)
uploader := exp.uploader.(*collectorUploader)
Expand Down Expand Up @@ -276,8 +276,8 @@ func TestNewRawExporterWithEnvImplicitly(t *testing.T) {
assert.NoError(t, err)
// NewRawExporter will ignore Disabled env
assert.Equal(t, true, exp.o.Disabled)
assert.EqualValues(t, serviceName, exp.process.ServiceName)
assert.Len(t, exp.process.Tags, 1)
assert.EqualValues(t, serviceName, exp.o.Process.ServiceName)
assert.Len(t, exp.o.Process.Tags, 1)

require.IsType(t, &collectorUploader{}, exp.uploader)
uploader := exp.uploader.(*collectorUploader)
Expand Down
154 changes: 109 additions & 45 deletions exporters/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import (
"go.opentelemetry.io/otel/codes"
gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger"
export "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"
)

const (
defaultServiceName = "OpenTelemetry"

keyInstrumentationLibraryName = "otel.library.name"
keyInstrumentationLibraryVersion = "otel.library.version"
)
Expand All @@ -57,13 +57,6 @@ type options struct {
Disabled bool
}

// WithProcess sets the process with the information about the exporting process.
func WithProcess(process Process) Option {
return func(o *options) {
o.Process = process
}
}

// WithBufferMaxCount defines the total number of traces that can be buffered in memory
func WithBufferMaxCount(bufferMaxCount int) Option {
return func(o *options) {
Expand Down Expand Up @@ -109,27 +102,24 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e
opt(&o)
}

service := o.Process.ServiceName
if service == "" {
service = defaultServiceName
// Fetch default service.name from default resource for backup
var defaultServiceName string
defaultResource := resource.Default()
if value, exists := defaultResource.Set().Value(semconv.ServiceNameKey); exists {
defaultServiceName = value.AsString()
}
tags := make([]*gen.Tag, 0, len(o.Process.Tags))
for _, tag := range o.Process.Tags {
t := keyValueToTag(tag)
if t != nil {
tags = append(tags, t)
}
if defaultServiceName == "" {
return nil, fmt.Errorf("failed to get service name from default resource")
}

e := &Exporter{
uploader: uploader,
process: &gen.Process{
ServiceName: service,
Tags: tags,
},
o: o,
}
bundler := bundler.NewBundler((*gen.Span)(nil), func(bundle interface{}) {
if err := e.upload(bundle.([]*gen.Span)); err != nil {
uploader: uploader,
o: o,
defaultServiceName: defaultServiceName,
resourceFromProcess: processToResource(o.Process),
}
bundler := bundler.NewBundler((*export.SpanSnapshot)(nil), func(bundle interface{}) {
if err := e.upload(bundle.([]*export.SpanSnapshot)); err != nil {
otel.Handle(err)
}
})
Expand Down Expand Up @@ -205,13 +195,15 @@ type Process struct {
// Exporter is an implementation of an OTel SpanSyncer that uploads spans to
// Jaeger.
type Exporter struct {
process *gen.Process
bundler *bundler.Bundler
uploader batchUploader
o options

stoppedMu sync.RWMutex
stopped bool

defaultServiceName string
resourceFromProcess *resource.Resource
}

var _ export.SpanExporter = (*Exporter)(nil)
Expand All @@ -227,7 +219,7 @@ func (e *Exporter) ExportSpans(ctx context.Context, ss []*export.SpanSnapshot) e

for _, span := range ss {
// TODO(jbd): Handle oversized bundlers.
err := e.bundler.Add(spanSnapshotToThrift(span), 1)
err := e.bundler.Add(span, 1)
if err != nil {
return fmt.Errorf("failed to bundle %q: %w", span.Name, err)
}
Expand Down Expand Up @@ -275,17 +267,6 @@ func spanSnapshotToThrift(ss *export.SpanSnapshot) *gen.Span {
}
}

// TODO (jmacd): OTel has a broad "last value wins"
// semantic. Should resources be appended before span
// attributes, above, to allow span attributes to
// overwrite resource attributes?
if ss.Resource != nil {
for iter := ss.Resource.Iter(); iter.Next(); {
if tag := keyValueToTag(iter.Attribute()); tag != nil {
tags = append(tags, tag)
}
}
}
if il := ss.InstrumentationLibrary; il.Name != "" {
tags = append(tags, getStringTag(keyInstrumentationLibraryName, il.Name))
if il.Version != "" {
Expand Down Expand Up @@ -429,11 +410,94 @@ func (e *Exporter) Flush() {
flush(e)
}

func (e *Exporter) upload(spans []*gen.Span) error {
batch := &gen.Batch{
Spans: spans,
Process: e.process,
func (e *Exporter) upload(spans []*export.SpanSnapshot) error {
batchList := jaegerBatchList(spans, e.defaultServiceName, e.resourceFromProcess)
for _, batch := range batchList {
err := e.uploader.upload(batch)
if err != nil {
return err
}
}

return nil
}

// jaegerBatchList transforms a slice of SpanSnapshot into a slice of jaeger
// Batch.
func jaegerBatchList(ssl []*export.SpanSnapshot, defaultServiceName string, resourceFromProcess *resource.Resource) []*gen.Batch {
if len(ssl) == 0 {
return nil
}

batchDict := make(map[attribute.Distinct]*gen.Batch)

for _, ss := range ssl {
if ss == nil {
continue
}

newResource := ss.Resource
if resourceFromProcess != nil {
// The value from process will overwrite the value from span's resources
newResource = resource.Merge(ss.Resource, resourceFromProcess)
}
resourceKey := newResource.Equivalent()
batch, bOK := batchDict[resourceKey]
if !bOK {
batch = &gen.Batch{
Process: process(newResource, defaultServiceName),
Spans: []*gen.Span{},
}
}
batch.Spans = append(batch.Spans, spanSnapshotToThrift(ss))
batchDict[resourceKey] = batch
}

// Transform the categorized map into a slice
batchList := make([]*gen.Batch, 0, len(batchDict))
for _, batch := range batchDict {
batchList = append(batchList, batch)
}
return batchList
}

// process transforms an OTel Resource into a jaeger Process.
func process(res *resource.Resource, defaultServiceName string) *gen.Process {
var process gen.Process

var serviceName attribute.KeyValue
if res != nil {
for iter := res.Iter(); iter.Next(); {
if iter.Attribute().Key == semconv.ServiceNameKey {
serviceName = iter.Attribute()
// Don't convert service.name into tag.
continue
}
if tag := keyValueToTag(iter.Attribute()); tag != nil {
process.Tags = append(process.Tags, tag)
}
}
}

return e.uploader.upload(batch)
// If no service.name is contained in a Span's Resource,
// that field MUST be populated from the default Resource.
if serviceName.Value.AsString() == "" {
serviceName = semconv.ServiceVersionKey.String(defaultServiceName)
}
process.ServiceName = serviceName.Value.AsString()

return &process
}

func processToResource(process Process) *resource.Resource {
var attrs []attribute.KeyValue
if process.ServiceName != "" {
attrs = append(attrs, semconv.ServiceNameKey.String(process.ServiceName))
}
attrs = append(attrs, process.Tags...)

if len(attrs) == 0 {
return nil
}
return resource.NewWithAttributes(attrs...)
}
Loading

0 comments on commit 62cbf0f

Please sign in to comment.