From 4bd976a4563f618a40d81a80d2ffe635ade21f4a Mon Sep 17 00:00:00 2001 From: Annanay Date: Tue, 4 May 2021 21:01:15 +0530 Subject: [PATCH 1/6] Add span deduper to support zipkin client/server kind spans Signed-off-by: Annanay --- modules/frontend/deduper.go | 122 ++++++++++++++++++++++++++++++ modules/frontend/deduper_test.go | 88 +++++++++++++++++++++ modules/frontend/querysharding.go | 10 +++ 3 files changed, 220 insertions(+) create mode 100644 modules/frontend/deduper.go create mode 100644 modules/frontend/deduper_test.go diff --git a/modules/frontend/deduper.go b/modules/frontend/deduper.go new file mode 100644 index 00000000000..f387325d98f --- /dev/null +++ b/modules/frontend/deduper.go @@ -0,0 +1,122 @@ +package frontend + +import ( + "context" + "encoding/binary" + "fmt" + + "github.com/opentracing/opentracing-go" + + "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" +) + +const ( + warningTooManySpans = "cannot assign unique span ID, too many spans in the trace" +) + +var ( + maxSpanID uint64 = 0xffffffffffffffff +) + +// This is copied over from Jaeger and modified to work for OpenTelemetry Trace data structure +// https://github.com/jaegertracing/jaeger/blob/12bba8c9b91cf4a29d314934bc08f4a80e43c042/model/adjuster/span_id_deduper.go +type spanIDDeduper struct { + trace *tempopb.Trace + spansByID map[uint64][]*v1.Span + maxUsedID uint64 +} + + +func DedupeSpanIDs(ctx context.Context, trace *tempopb.Trace) (*tempopb.Trace, error) { + span, _ := opentracing.StartSpanFromContext(ctx, "frontend.DedupeSpanIDs") + defer span.Finish() + + deduper := &spanIDDeduper{trace: trace} + deduper.groupSpansByID() + deduper.dedupeSpanIDs() + return deduper.trace, nil +} + +// groupSpansByID groups spans with the same ID returning a map id -> []Span +func (d *spanIDDeduper) groupSpansByID() { + spansByID := make(map[uint64][]*v1.Span) + for _, batch := range d.trace.Batches { + for _, ils := range batch.InstrumentationLibrarySpans { + for _, span := range ils.Spans { + id := binary.BigEndian.Uint64(span.SpanId) + if spans, ok := spansByID[id]; ok { + // TODO maybe return an error if more than 2 spans found + spansByID[id] = append(spans, span) + } else { + spansByID[id] = []*v1.Span{span} + } + } + } + } + d.spansByID = spansByID +} + +func (d *spanIDDeduper) isSharedWithClientSpan(spanID uint64) bool { + for _, span := range d.spansByID[spanID] { + if span.GetKind() == v1.Span_SPAN_KIND_CLIENT { + return true + } + } + return false +} + +func (d *spanIDDeduper) dedupeSpanIDs() { + oldToNewSpanIDs := make(map[uint64]uint64) + for _, batch := range d.trace.Batches { + for _, ils := range batch.InstrumentationLibrarySpans { + for _, span := range ils.Spans { + id := binary.BigEndian.Uint64(span.SpanId) + // only replace span IDs for server-side spans that share the ID with something else + if span.GetKind() == v1.Span_SPAN_KIND_SERVER && d.isSharedWithClientSpan(id) { + newID, err := d.makeUniqueSpanID() + if err != nil { + // ignore this error condition where we have more than 2^64 unique span IDs + continue + } + oldToNewSpanIDs[id] = newID + span.ParentSpanId = span.SpanId // previously shared ID is the new parent + binary.BigEndian.PutUint64(span.SpanId, newID) + } + } + } + } + d.swapParentIDs(oldToNewSpanIDs) +} + +// swapParentIDs corrects ParentSpanID of all spans that are children of the server +// spans whose IDs we deduped. +func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[uint64]uint64) { + for _, batch := range d.trace.Batches { + for _, ils := range batch.InstrumentationLibrarySpans { + for _, span := range ils.Spans { + if len(span.GetParentSpanId()) > 0 { + parentSpanId := binary.BigEndian.Uint64(span.GetParentSpanId()) + if parentID, ok := oldToNewSpanIDs[parentSpanId]; ok { + if binary.BigEndian.Uint64(span.SpanId) != parentID { + binary.BigEndian.PutUint64(span.SpanId, parentID) + } + } + } + } + } + } +} + +// makeUniqueSpanID returns a new ID that is not used in the trace, +// or an error if such ID cannot be generated, which is unlikely, +// given that the whole space of span IDs is 2^64. +func (d *spanIDDeduper) makeUniqueSpanID() (uint64, error) { + for id := d.maxUsedID + 1; id < maxSpanID; id++ { + if _, ok := d.spansByID[id]; !ok { + d.maxUsedID = id + return id, nil + } + } + return 0, fmt.Errorf(warningTooManySpans) +} diff --git a/modules/frontend/deduper_test.go b/modules/frontend/deduper_test.go new file mode 100644 index 00000000000..c98a24138b0 --- /dev/null +++ b/modules/frontend/deduper_test.go @@ -0,0 +1,88 @@ +package frontend + +import ( + "testing" + "context" + + "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" + "github.com/stretchr/testify/assert" +) + +func TestDedupeSpanIDs(t *testing.T) { + firstSpanId := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} + secondSpanId := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02} + thirdSpanId := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03} + tests := []struct { + name string + trace *tempopb.Trace + shouldChange bool + }{ + { + name: "no duplicates", + trace: &tempopb.Trace{ + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + SpanId: firstSpanId, + Kind: v1.Span_SPAN_KIND_SERVER, + }, + { + SpanId: secondSpanId, + }, + { + SpanId: thirdSpanId, + Kind: v1.Span_SPAN_KIND_CLIENT, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "duplicate span id", + trace: &tempopb.Trace{ + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + SpanId: firstSpanId, + Kind: v1.Span_SPAN_KIND_SERVER, + }, + { + SpanId: secondSpanId, + }, + { + SpanId: firstSpanId, + Kind: v1.Span_SPAN_KIND_CLIENT, + }, + }, + }, + }, + }, + }, + }, + shouldChange: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + deduped, err := DedupeSpanIDs(context.TODO(), tt.trace) + assert.NoError(t, err) + if tt.shouldChange { + assert.NotEqual(t, firstSpanId, deduped.Batches[0].InstrumentationLibrarySpans[0].Spans[2]) + } else { + assert.Equal(t, deduped.Batches[0].InstrumentationLibrarySpans[0].Spans[2].SpanId, thirdSpanId) + } + + }) + } + +} \ No newline at end of file diff --git a/modules/frontend/querysharding.go b/modules/frontend/querysharding.go index 3ac89e3c859..1cfb0c4fe36 100644 --- a/modules/frontend/querysharding.go +++ b/modules/frontend/querysharding.go @@ -194,6 +194,16 @@ func mergeResponses(ctx context.Context, marshallingFormat string, rrs []Request }, nil } + traceObject := &tempopb.Trace{} + err := proto.Unmarshal(combinedTrace, traceObject) + if err != nil { + return nil, err + } + traceObject, err = DedupeSpanIDs(ctx, traceObject) + if err != nil { + return nil, err + } + if errCode == http.StatusOK { if marshallingFormat == util.JSONTypeHeaderValue { // if request is for application/json, unmarshal into proto object and re-marshal into json bytes From 2183fcf0b4aa9e0832771b9f09df3fe1c3e935bf Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 5 May 2021 18:13:03 +0530 Subject: [PATCH 2/6] Add deduper as a middleware, fix tests, add CHANGELOG Signed-off-by: Annanay --- CHANGELOG.md | 1 + example/docker-compose/docker-compose.yaml | 1 + modules/frontend/deduper.go | 74 +++++++++++++++++++--- modules/frontend/deduper_test.go | 56 ++++++++-------- modules/frontend/frontend.go | 6 +- modules/frontend/querysharding.go | 10 --- 6 files changed, 101 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 963d07d624b..a340666f77f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [ENHANCEMENT] Improve WAL Replay by not rebuilding the WAL. [#668](https://github.com/grafana/tempo/pull/668) * [ENHANCEMENT] Add config option to disable write extension to the ingesters. [#677](https://github.com/grafana/tempo/pull/677) * [ENHANCEMENT] Preallocate byte slices on ingester request unmarshal. [#679](https://github.com/grafana/tempo/pull/679) +* [ENHANCEMENT] Zipkin support - Dedupe span IDs based on span.Kind (client/server) in Query Frontend. [#687](https://github.com/grafana/tempo/pull/687) ## v0.7.0 diff --git a/example/docker-compose/docker-compose.yaml b/example/docker-compose/docker-compose.yaml index 80d4456842d..81cafec3369 100644 --- a/example/docker-compose/docker-compose.yaml +++ b/example/docker-compose/docker-compose.yaml @@ -10,6 +10,7 @@ services: ports: - "14268" # jaeger ingest - "3100" # tempo + - "9411" # zipkin synthetic-load-generator: image: omnition/synthetic-load-generator:1.0.25 diff --git a/modules/frontend/deduper.go b/modules/frontend/deduper.go index f387325d98f..0aee61570f8 100644 --- a/modules/frontend/deduper.go +++ b/modules/frontend/deduper.go @@ -1,10 +1,14 @@ package frontend import ( - "context" + "bytes" "encoding/binary" "fmt" + "io/ioutil" + "net/http" + "github.com/go-kit/kit/log" + "github.com/golang/protobuf/proto" "github.com/opentracing/opentracing-go" "github.com/grafana/tempo/pkg/tempopb" @@ -19,23 +23,74 @@ var ( maxSpanID uint64 = 0xffffffffffffffff ) +func Deduper(logger log.Logger) Middleware { + return MiddlewareFunc(func(next Handler) Handler { + return spanIDDeduper{ + next: next, + logger: logger, + } + }) +} + // This is copied over from Jaeger and modified to work for OpenTelemetry Trace data structure // https://github.com/jaegertracing/jaeger/blob/12bba8c9b91cf4a29d314934bc08f4a80e43c042/model/adjuster/span_id_deduper.go type spanIDDeduper struct { + next Handler + logger log.Logger trace *tempopb.Trace spansByID map[uint64][]*v1.Span maxUsedID uint64 } - -func DedupeSpanIDs(ctx context.Context, trace *tempopb.Trace) (*tempopb.Trace, error) { +// Do implements Handler +func (s spanIDDeduper) Do(req *http.Request) (*http.Response, error) { + ctx := req.Context() span, _ := opentracing.StartSpanFromContext(ctx, "frontend.DedupeSpanIDs") defer span.Finish() - deduper := &spanIDDeduper{trace: trace} - deduper.groupSpansByID() - deduper.dedupeSpanIDs() - return deduper.trace, nil + resp, err := s.next.Do(req) + if err != nil { + return nil, err + } + + body, err := ioutil.ReadAll(resp.Body) + defer resp.Body.Close() + if err != nil { + return nil, err + } + + if resp.StatusCode == http.StatusOK { + traceObject := &tempopb.Trace{} + err = proto.Unmarshal(body, traceObject) + if err != nil { + return nil, err + } + + s.trace = traceObject + s.dedupe() + + traceBytes, err := proto.Marshal(s.trace) + if err != nil { + return nil, err + } + + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(traceBytes)), + Header: http.Header{}, + }, nil + } + + return &http.Response{ + StatusCode: resp.StatusCode, + Body: ioutil.NopCloser(bytes.NewReader(body)), + Header: http.Header{}, + }, nil +} + +func (s *spanIDDeduper) dedupe() { + s.groupSpansByID() + s.dedupeSpanIDs() } // groupSpansByID groups spans with the same ID returning a map id -> []Span @@ -80,7 +135,10 @@ func (d *spanIDDeduper) dedupeSpanIDs() { continue } oldToNewSpanIDs[id] = newID - span.ParentSpanId = span.SpanId // previously shared ID is the new parent + if len(span.ParentSpanId) == 0 { + span.ParentSpanId = make([]byte, 8) + } + binary.BigEndian.PutUint64(span.ParentSpanId, id) // previously shared ID is the new parent binary.BigEndian.PutUint64(span.SpanId, newID) } } diff --git a/modules/frontend/deduper_test.go b/modules/frontend/deduper_test.go index c98a24138b0..5335c39b2c6 100644 --- a/modules/frontend/deduper_test.go +++ b/modules/frontend/deduper_test.go @@ -2,24 +2,21 @@ package frontend import ( "testing" - "context" + + "github.com/stretchr/testify/assert" "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" - "github.com/stretchr/testify/assert" ) func TestDedupeSpanIDs(t *testing.T) { - firstSpanId := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} - secondSpanId := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02} - thirdSpanId := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03} tests := []struct { name string - trace *tempopb.Trace - shouldChange bool + trace *tempopb.Trace + checkParent bool }{ { - name: "no duplicates", + name: "no duplicates", trace: &tempopb.Trace{ Batches: []*v1.ResourceSpans{ { @@ -27,15 +24,15 @@ func TestDedupeSpanIDs(t *testing.T) { { Spans: []*v1.Span{ { - SpanId: firstSpanId, - Kind: v1.Span_SPAN_KIND_SERVER, + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + Kind: v1.Span_SPAN_KIND_CLIENT, }, { - SpanId: secondSpanId, + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02}, }, { - SpanId: thirdSpanId, - Kind: v1.Span_SPAN_KIND_CLIENT, + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03}, + Kind: v1.Span_SPAN_KIND_SERVER, }, }, }, @@ -45,7 +42,7 @@ func TestDedupeSpanIDs(t *testing.T) { }, }, { - name: "duplicate span id", + name: "duplicate span id", trace: &tempopb.Trace{ Batches: []*v1.ResourceSpans{ { @@ -53,15 +50,15 @@ func TestDedupeSpanIDs(t *testing.T) { { Spans: []*v1.Span{ { - SpanId: firstSpanId, - Kind: v1.Span_SPAN_KIND_SERVER, + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + Kind: v1.Span_SPAN_KIND_CLIENT, }, { - SpanId: secondSpanId, + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02}, }, { - SpanId: firstSpanId, - Kind: v1.Span_SPAN_KIND_CLIENT, + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + Kind: v1.Span_SPAN_KIND_SERVER, }, }, }, @@ -69,20 +66,25 @@ func TestDedupeSpanIDs(t *testing.T) { }, }, }, - shouldChange: true, + checkParent: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - deduped, err := DedupeSpanIDs(context.TODO(), tt.trace) - assert.NoError(t, err) - if tt.shouldChange { - assert.NotEqual(t, firstSpanId, deduped.Batches[0].InstrumentationLibrarySpans[0].Spans[2]) - } else { - assert.Equal(t, deduped.Batches[0].InstrumentationLibrarySpans[0].Spans[2].SpanId, thirdSpanId) + s := &spanIDDeduper{ + trace: tt.trace, } + s.dedupe() + assert.Equal(t, []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[0].SpanId) + assert.Equal(t, v1.Span_SPAN_KIND_CLIENT, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[0].Kind) + assert.Equal(t, []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02}, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[1].SpanId) + assert.Equal(t, []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03}, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[2].SpanId) + assert.Equal(t, v1.Span_SPAN_KIND_SERVER, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[2].Kind) + if tt.checkParent { + assert.EqualValues(t, []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[2].ParentSpanId) + } }) } -} \ No newline at end of file +} diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 205e1685500..d861b6575d9 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -29,8 +29,10 @@ func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registe }, []string{"tenant"}) return func(next http.RoundTripper) http.RoundTripper { - // Get the http request, add custom parameters to it, split it, and call downstream roundtripper - rt := NewRoundTripper(next, ShardingWare(cfg.QueryShards, logger)) + // We're constructing middleware in this statement. There are two at the moment - + // - the rightmost one (executed first) is ShardingWare which helps to shard queries by splitting the block ID space + // - the leftmost one (executed last) is Deduper which dedupe Span IDs for Zipkin support + rt := NewRoundTripper(next, Deduper(logger), ShardingWare(cfg.QueryShards, logger)) return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) { start := time.Now() // tracing instrumentation diff --git a/modules/frontend/querysharding.go b/modules/frontend/querysharding.go index 1cfb0c4fe36..3ac89e3c859 100644 --- a/modules/frontend/querysharding.go +++ b/modules/frontend/querysharding.go @@ -194,16 +194,6 @@ func mergeResponses(ctx context.Context, marshallingFormat string, rrs []Request }, nil } - traceObject := &tempopb.Trace{} - err := proto.Unmarshal(combinedTrace, traceObject) - if err != nil { - return nil, err - } - traceObject, err = DedupeSpanIDs(ctx, traceObject) - if err != nil { - return nil, err - } - if errCode == http.StatusOK { if marshallingFormat == util.JSONTypeHeaderValue { // if request is for application/json, unmarshal into proto object and re-marshal into json bytes From 184c78a9302d6791510d675926100aec87a0d8dd Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 5 May 2021 19:22:39 +0530 Subject: [PATCH 3/6] Add benchmarks Signed-off-by: Annanay --- modules/frontend/deduper.go | 36 ++++++++++++++++---------------- modules/frontend/deduper_test.go | 27 ++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 18 deletions(-) diff --git a/modules/frontend/deduper.go b/modules/frontend/deduper.go index 0aee61570f8..1397195c6f9 100644 --- a/modules/frontend/deduper.go +++ b/modules/frontend/deduper.go @@ -94,9 +94,9 @@ func (s *spanIDDeduper) dedupe() { } // groupSpansByID groups spans with the same ID returning a map id -> []Span -func (d *spanIDDeduper) groupSpansByID() { +func (s *spanIDDeduper) groupSpansByID() { spansByID := make(map[uint64][]*v1.Span) - for _, batch := range d.trace.Batches { + for _, batch := range s.trace.Batches { for _, ils := range batch.InstrumentationLibrarySpans { for _, span := range ils.Spans { id := binary.BigEndian.Uint64(span.SpanId) @@ -109,11 +109,11 @@ func (d *spanIDDeduper) groupSpansByID() { } } } - d.spansByID = spansByID + s.spansByID = spansByID } -func (d *spanIDDeduper) isSharedWithClientSpan(spanID uint64) bool { - for _, span := range d.spansByID[spanID] { +func (s *spanIDDeduper) isSharedWithClientSpan(spanID uint64) bool { + for _, span := range s.spansByID[spanID] { if span.GetKind() == v1.Span_SPAN_KIND_CLIENT { return true } @@ -121,15 +121,15 @@ func (d *spanIDDeduper) isSharedWithClientSpan(spanID uint64) bool { return false } -func (d *spanIDDeduper) dedupeSpanIDs() { +func (s *spanIDDeduper) dedupeSpanIDs() { oldToNewSpanIDs := make(map[uint64]uint64) - for _, batch := range d.trace.Batches { + for _, batch := range s.trace.Batches { for _, ils := range batch.InstrumentationLibrarySpans { for _, span := range ils.Spans { id := binary.BigEndian.Uint64(span.SpanId) // only replace span IDs for server-side spans that share the ID with something else - if span.GetKind() == v1.Span_SPAN_KIND_SERVER && d.isSharedWithClientSpan(id) { - newID, err := d.makeUniqueSpanID() + if span.GetKind() == v1.Span_SPAN_KIND_SERVER && s.isSharedWithClientSpan(id) { + newID, err := s.makeUniqueSpanID() if err != nil { // ignore this error condition where we have more than 2^64 unique span IDs continue @@ -144,18 +144,18 @@ func (d *spanIDDeduper) dedupeSpanIDs() { } } } - d.swapParentIDs(oldToNewSpanIDs) + s.swapParentIDs(oldToNewSpanIDs) } // swapParentIDs corrects ParentSpanID of all spans that are children of the server // spans whose IDs we deduped. -func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[uint64]uint64) { - for _, batch := range d.trace.Batches { +func (s *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[uint64]uint64) { + for _, batch := range s.trace.Batches { for _, ils := range batch.InstrumentationLibrarySpans { for _, span := range ils.Spans { if len(span.GetParentSpanId()) > 0 { - parentSpanId := binary.BigEndian.Uint64(span.GetParentSpanId()) - if parentID, ok := oldToNewSpanIDs[parentSpanId]; ok { + parentSpanID := binary.BigEndian.Uint64(span.GetParentSpanId()) + if parentID, ok := oldToNewSpanIDs[parentSpanID]; ok { if binary.BigEndian.Uint64(span.SpanId) != parentID { binary.BigEndian.PutUint64(span.SpanId, parentID) } @@ -169,10 +169,10 @@ func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[uint64]uint64) { // makeUniqueSpanID returns a new ID that is not used in the trace, // or an error if such ID cannot be generated, which is unlikely, // given that the whole space of span IDs is 2^64. -func (d *spanIDDeduper) makeUniqueSpanID() (uint64, error) { - for id := d.maxUsedID + 1; id < maxSpanID; id++ { - if _, ok := d.spansByID[id]; !ok { - d.maxUsedID = id +func (s *spanIDDeduper) makeUniqueSpanID() (uint64, error) { + for id := s.maxUsedID + 1; id < maxSpanID; id++ { + if _, ok := s.spansByID[id]; !ok { + s.maxUsedID = id return id, nil } } diff --git a/modules/frontend/deduper_test.go b/modules/frontend/deduper_test.go index 5335c39b2c6..014ef1e2ccf 100644 --- a/modules/frontend/deduper_test.go +++ b/modules/frontend/deduper_test.go @@ -1,6 +1,7 @@ package frontend import ( + "github.com/grafana/tempo/pkg/util/test" "testing" "github.com/stretchr/testify/assert" @@ -88,3 +89,29 @@ func TestDedupeSpanIDs(t *testing.T) { } } + +func BenchmarkDeduper100(b *testing.B) { + benchmarkDeduper(b, 100) +} + +func BenchmarkDeduper1000(b *testing.B) { + benchmarkDeduper(b, 1000) +} +func BenchmarkDeduper10000(b *testing.B) { + benchmarkDeduper(b, 10000) +} + +func BenchmarkDeduper100000(b *testing.B) { + benchmarkDeduper(b, 100000) +} + +func benchmarkDeduper(b *testing.B, traceSpanCount int) { + s := &spanIDDeduper{ + trace: test.MakeTraceWithSpanCount(1, traceSpanCount, []byte{0x00}), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.dedupe() + } +} \ No newline at end of file From a7befb5d8a0e52f61ae8451b4160f3494168f196 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 5 May 2021 19:29:35 +0530 Subject: [PATCH 4/6] make fmt Signed-off-by: Annanay --- modules/frontend/deduper_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/frontend/deduper_test.go b/modules/frontend/deduper_test.go index 014ef1e2ccf..9f183a014fa 100644 --- a/modules/frontend/deduper_test.go +++ b/modules/frontend/deduper_test.go @@ -1,13 +1,13 @@ package frontend import ( - "github.com/grafana/tempo/pkg/util/test" "testing" "github.com/stretchr/testify/assert" "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" + "github.com/grafana/tempo/pkg/util/test" ) func TestDedupeSpanIDs(t *testing.T) { @@ -114,4 +114,4 @@ func benchmarkDeduper(b *testing.B, traceSpanCount int) { for i := 0; i < b.N; i++ { s.dedupe() } -} \ No newline at end of file +} From 2fe1dfc4bfbce31133429d88d97fe94664c9174b Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 6 May 2021 14:01:36 +0530 Subject: [PATCH 5/6] Move the marshalling format check to precede any middlewares Signed-off-by: Annanay --- modules/frontend/deduper.go | 18 ++++------ modules/frontend/frontend.go | 44 ++++++++++++++++++++++- modules/frontend/querysharding.go | 41 ++++------------------ modules/frontend/querysharding_test.go | 48 +++----------------------- 4 files changed, 61 insertions(+), 90 deletions(-) diff --git a/modules/frontend/deduper.go b/modules/frontend/deduper.go index 1397195c6f9..08f4cd70b31 100644 --- a/modules/frontend/deduper.go +++ b/modules/frontend/deduper.go @@ -53,13 +53,13 @@ func (s spanIDDeduper) Do(req *http.Request) (*http.Response, error) { return nil, err } - body, err := ioutil.ReadAll(resp.Body) - defer resp.Body.Close() - if err != nil { - return nil, err - } - if resp.StatusCode == http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + defer resp.Body.Close() + if err != nil { + return nil, err + } + traceObject := &tempopb.Trace{} err = proto.Unmarshal(body, traceObject) if err != nil { @@ -81,11 +81,7 @@ func (s spanIDDeduper) Do(req *http.Request) (*http.Response, error) { }, nil } - return &http.Response{ - StatusCode: resp.StatusCode, - Body: ioutil.NopCloser(bytes.NewReader(body)), - Header: http.Header{}, - }, nil + return resp, nil } func (s *spanIDDeduper) dedupe() { diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index d861b6575d9..b3f57d517b6 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -1,6 +1,7 @@ package frontend import ( + "bytes" "io/ioutil" "net/http" "strings" @@ -9,13 +10,17 @@ import ( "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "github.com/opentracing/opentracing-go" ot_log "github.com/opentracing/opentracing-go/log" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" + "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util" ) @@ -36,7 +41,7 @@ func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registe return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) { start := time.Now() // tracing instrumentation - span, ctx := opentracing.StartSpanFromContext(r.Context(), "frontend.ShardingTripper") + span, ctx := opentracing.StartSpanFromContext(r.Context(), "frontend.Tripperware") defer span.Finish() orgID, _ := user.ExtractOrgID(r.Context()) @@ -54,9 +59,46 @@ func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registe } span.LogFields(ot_log.String("msg", "validated traceID")) + // check marshalling format + marshallingFormat := util.JSONTypeHeaderValue + if r.Header.Get(util.AcceptHeaderKey) == util.ProtobufTypeHeaderValue { + marshallingFormat = util.ProtobufTypeHeaderValue + } + + // for context propagation with traceID set r = r.WithContext(ctx) + + // Enforce all communication internal to Tempo to be in protobuf bytes + r.Header.Set(util.AcceptHeaderKey, util.ProtobufTypeHeaderValue) + resp, err := rt.RoundTrip(r) + if resp.StatusCode == http.StatusOK { + if marshallingFormat == util.JSONTypeHeaderValue { + // if request is for application/json, unmarshal into proto object and re-marshal into json bytes + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return nil, errors.Wrap(err, "error reading response body at query frontend") + } + traceObject := &tempopb.Trace{} + err = proto.Unmarshal(body, traceObject) + if err != nil { + return nil, err + } + + var jsonTrace bytes.Buffer + marshaller := &jsonpb.Marshaler{} + err = marshaller.Marshal(&jsonTrace, traceObject) + if err != nil { + return nil, err + } + resp.Body = ioutil.NopCloser(bytes.NewReader(jsonTrace.Bytes())) + } + } + + span.SetTag("response marshalling format", marshallingFormat) + traceID, _ := middleware.ExtractTraceID(ctx) statusCode := 500 var contentLength int64 = 0 diff --git a/modules/frontend/querysharding.go b/modules/frontend/querysharding.go index df156a53e94..3a1c70ceced 100644 --- a/modules/frontend/querysharding.go +++ b/modules/frontend/querysharding.go @@ -11,16 +11,12 @@ import ( "strings" "github.com/go-kit/kit/log" - "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/weaveworks/common/user" "github.com/grafana/tempo/modules/querier" "github.com/grafana/tempo/pkg/model" - "github.com/grafana/tempo/pkg/tempopb" - "github.com/grafana/tempo/pkg/util" ) const ( @@ -51,17 +47,15 @@ type shardQuery struct { // Do implements Handler func (s shardQuery) Do(r *http.Request) (*http.Response, error) { - userID, err := user.ExtractOrgID(r.Context()) + ctx := r.Context() + span, _ := opentracing.StartSpanFromContext(ctx, "frontend.ShardQuery") + defer span.Finish() + + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err } - // check marshalling format - marshallingFormat := util.JSONTypeHeaderValue - if r.Header.Get(util.AcceptHeaderKey) == util.ProtobufTypeHeaderValue { - marshallingFormat = util.ProtobufTypeHeaderValue - } - reqs := make([]*http.Request, s.queryShards) for i := 0; i < s.queryShards; i++ { reqs[i] = r.Clone(r.Context()) @@ -77,9 +71,6 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { reqs[i].Header.Set(user.OrgIDHeaderName, userID) - // Enforce frontend <> querier communication to be in protobuf bytes - reqs[i].Header.Set(util.AcceptHeaderKey, util.ProtobufTypeHeaderValue) - // adding to RequestURI only because weaveworks/common uses the RequestURI field to // translate from http.Request to httpgrpc.Request // https://github.com/weaveworks/common/blob/47e357f4e1badb7da17ad74bae63e228bdd76e8f/httpgrpc/server/server.go#L48 @@ -91,7 +82,7 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { return nil, err } - return mergeResponses(r.Context(), marshallingFormat, rrs) + return mergeResponses(ctx, rrs) } // createBlockBoundaries splits the range of blockIDs into queryShards parts @@ -153,7 +144,7 @@ func doRequests(reqs []*http.Request, downstream Handler) ([]RequestResponse, er return resps, firstErr } -func mergeResponses(ctx context.Context, marshallingFormat string, rrs []RequestResponse) (*http.Response, error) { +func mergeResponses(ctx context.Context, rrs []RequestResponse) (*http.Response, error) { // tracing instrumentation span, _ := opentracing.StartSpanFromContext(ctx, "frontend.mergeResponses") defer span.Finish() @@ -196,24 +187,6 @@ func mergeResponses(ctx context.Context, marshallingFormat string, rrs []Request } if errCode == http.StatusOK { - if marshallingFormat == util.JSONTypeHeaderValue { - // if request is for application/json, unmarshal into proto object and re-marshal into json bytes - traceObject := &tempopb.Trace{} - err := proto.Unmarshal(combinedTrace, traceObject) - if err != nil { - return nil, err - } - - var jsonTrace bytes.Buffer - marshaller := &jsonpb.Marshaler{} - err = marshaller.Marshal(&jsonTrace, traceObject) - if err != nil { - return nil, err - } - combinedTrace = jsonTrace.Bytes() - } - - span.SetTag("response marshalling format", marshallingFormat) return &http.Response{ StatusCode: http.StatusOK, Body: ioutil.NopCloser(bytes.NewReader(combinedTrace)), diff --git a/modules/frontend/querysharding_test.go b/modules/frontend/querysharding_test.go index 87d4665ad29..736f8fd0acd 100644 --- a/modules/frontend/querysharding_test.go +++ b/modules/frontend/querysharding_test.go @@ -7,13 +7,10 @@ import ( "net/http" "testing" - "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/grafana/tempo/pkg/model" - "github.com/grafana/tempo/pkg/tempopb" - "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" ) @@ -67,20 +64,10 @@ func TestMergeResponses(t *testing.T) { combinedTrace, _, err := model.CombineTraceBytes(b1, b2, model.TracePBEncoding, model.TracePBEncoding) assert.NoError(t, err) - traceObject := &tempopb.Trace{} - err = proto.Unmarshal(combinedTrace, traceObject) - assert.NoError(t, err) - - var combinedTraceJSON bytes.Buffer - marshaller := &jsonpb.Marshaler{} - err = marshaller.Marshal(&combinedTraceJSON, traceObject) - assert.NoError(t, err) - tests := []struct { - name string - requestResponse []RequestResponse - marshallingFormat string - expected *http.Response + name string + requestResponse []RequestResponse + expected *http.Response }{ { name: "combine status ok responses", @@ -173,37 +160,10 @@ func TestMergeResponses(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte("foo"))), }, }, - { - name: "accept application/json", - requestResponse: []RequestResponse{ - { - Response: &http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(bytes.NewReader(b1)), - }, - }, - { - Response: &http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(bytes.NewReader(b2)), - }, - }, - }, - marshallingFormat: util.JSONTypeHeaderValue, - expected: &http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(bytes.NewReader(combinedTraceJSON.Bytes())), - ContentLength: int64(len(combinedTraceJSON.Bytes())), - }, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - marshallingFormat := util.ProtobufTypeHeaderValue - if len(tt.marshallingFormat) > 0 { - marshallingFormat = tt.marshallingFormat - } - merged, err := mergeResponses(context.Background(), marshallingFormat, tt.requestResponse) + merged, err := mergeResponses(context.Background(), tt.requestResponse) assert.NoError(t, err) assert.Equal(t, tt.expected.StatusCode, merged.StatusCode) expectedBody, err := ioutil.ReadAll(tt.expected.Body) From 840b728ad4a568ac30630c63286eae26c1eccdcd Mon Sep 17 00:00:00 2001 From: Annanay Date: Mon, 10 May 2021 20:36:30 +0530 Subject: [PATCH 6/6] Address comments Signed-off-by: Annanay --- modules/frontend/deduper.go | 3 ++ modules/frontend/deduper_test.go | 60 ++++++++++++++++++++++++++------ modules/frontend/frontend.go | 40 ++++++++++----------- 3 files changed, 71 insertions(+), 32 deletions(-) diff --git a/modules/frontend/deduper.go b/modules/frontend/deduper.go index 08f4cd70b31..3732be460de 100644 --- a/modules/frontend/deduper.go +++ b/modules/frontend/deduper.go @@ -146,6 +146,9 @@ func (s *spanIDDeduper) dedupeSpanIDs() { // swapParentIDs corrects ParentSpanID of all spans that are children of the server // spans whose IDs we deduped. func (s *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[uint64]uint64) { + if len(oldToNewSpanIDs) == 0 { + return + } for _, batch := range s.trace.Batches { for _, ils := range batch.InstrumentationLibrarySpans { for _, span := range ils.Spans { diff --git a/modules/frontend/deduper_test.go b/modules/frontend/deduper_test.go index 9f183a014fa..b7d34beb59e 100644 --- a/modules/frontend/deduper_test.go +++ b/modules/frontend/deduper_test.go @@ -14,7 +14,7 @@ func TestDedupeSpanIDs(t *testing.T) { tests := []struct { name string trace *tempopb.Trace - checkParent bool + expectedRes *tempopb.Trace }{ { name: "no duplicates", @@ -41,6 +41,29 @@ func TestDedupeSpanIDs(t *testing.T) { }, }, }, + expectedRes: &tempopb.Trace{ + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + Kind: v1.Span_SPAN_KIND_CLIENT, + }, + { + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02}, + }, + { + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03}, + Kind: v1.Span_SPAN_KIND_SERVER, + }, + }, + }, + }, + }, + }, + }, }, { name: "duplicate span id", @@ -67,7 +90,30 @@ func TestDedupeSpanIDs(t *testing.T) { }, }, }, - checkParent: true, + expectedRes: &tempopb.Trace{ + Batches: []*v1.ResourceSpans{ + { + InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{ + { + Spans: []*v1.Span{ + { + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + Kind: v1.Span_SPAN_KIND_CLIENT, + }, + { + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02}, + }, + { + SpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03}, + Kind: v1.Span_SPAN_KIND_SERVER, + ParentSpanId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + }, + }, + }, + }, + }, + }, + }, }, } for _, tt := range tests { @@ -76,15 +122,7 @@ func TestDedupeSpanIDs(t *testing.T) { trace: tt.trace, } s.dedupe() - assert.Equal(t, []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[0].SpanId) - assert.Equal(t, v1.Span_SPAN_KIND_CLIENT, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[0].Kind) - assert.Equal(t, []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02}, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[1].SpanId) - assert.Equal(t, []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03}, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[2].SpanId) - assert.Equal(t, v1.Span_SPAN_KIND_SERVER, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[2].Kind) - - if tt.checkParent { - assert.EqualValues(t, []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, s.trace.Batches[0].InstrumentationLibrarySpans[0].Spans[2].ParentSpanId) - } + assert.Equal(t, tt.expectedRes, s.trace) }) } diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index b3f57d517b6..6e6f1fb4d20 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -73,28 +73,26 @@ func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registe resp, err := rt.RoundTrip(r) - if resp.StatusCode == http.StatusOK { - if marshallingFormat == util.JSONTypeHeaderValue { - // if request is for application/json, unmarshal into proto object and re-marshal into json bytes - body, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - return nil, errors.Wrap(err, "error reading response body at query frontend") - } - traceObject := &tempopb.Trace{} - err = proto.Unmarshal(body, traceObject) - if err != nil { - return nil, err - } - - var jsonTrace bytes.Buffer - marshaller := &jsonpb.Marshaler{} - err = marshaller.Marshal(&jsonTrace, traceObject) - if err != nil { - return nil, err - } - resp.Body = ioutil.NopCloser(bytes.NewReader(jsonTrace.Bytes())) + if resp.StatusCode == http.StatusOK && marshallingFormat == util.JSONTypeHeaderValue { + // if request is for application/json, unmarshal into proto object and re-marshal into json bytes + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return nil, errors.Wrap(err, "error reading response body at query frontend") } + traceObject := &tempopb.Trace{} + err = proto.Unmarshal(body, traceObject) + if err != nil { + return nil, err + } + + var jsonTrace bytes.Buffer + marshaller := &jsonpb.Marshaler{} + err = marshaller.Marshal(&jsonTrace, traceObject) + if err != nil { + return nil, err + } + resp.Body = ioutil.NopCloser(bytes.NewReader(jsonTrace.Bytes())) } span.SetTag("response marshalling format", marshallingFormat)