Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TraceQL Metrics: Diff streaming #3808

Merged
merged 10 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [ENHANCEMENT] Protect ingesters from panics by adding defer/recover to all read path methods. [#3790](https://github.com/grafana/tempo/pull/3790) (@joe-elliott)
* [ENHANCEMENT] Added a boolean flag to enable or disable dualstack mode on Storage block config for S3 [#3721](https://github.com/grafana/tempo/pull/3721) (@sid-jar, @mapno)
* [ENHANCEMENT] Add caching to query range queries [#3796](https://github.com/grafana/tempo/pull/3796) (@mapno)
* [ENHANCEMENT] Only stream diffs on metrics queries [#3808](https://github.com/grafana/tempo/pull/3808) (@joe-elliott)
* [ENHANCEMENT] Add data quality metric to measure traces without a root [#3812](https://github.com/grafana/tempo/pull/3812) (@mapno)
* [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio)
* [BUGFIX] Fix frontend parsing error on cached responses [#3759](https://github.com/grafana/tempo/pull/3759) (@mdisibio)
Expand Down
4 changes: 2 additions & 2 deletions cmd/tempo-cli/cmd-query-metrics-query-range.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ func (cmd *metricsQueryRangeCmd) Run(_ *globalOptions) error {
if err != nil {
return err
}
start := startDate.Unix()
start := startDate.UnixNano()

endDate, err := time.Parse(time.RFC3339, cmd.End)
if err != nil {
return err
}
end := endDate.Unix()
end := endDate.UnixNano()

req := &tempopb.QueryRangeRequest{
Query: cmd.TraceQL,
Expand Down
11 changes: 5 additions & 6 deletions modules/frontend/combiner/metrics_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
var _ GRPCCombiner[*tempopb.QueryRangeResponse] = (*genericCombiner[*tempopb.QueryRangeResponse])(nil)

// NewQueryRange returns a query range combiner.
func NewQueryRange(req *tempopb.QueryRangeRequest) (Combiner, error) {
combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal)
func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, error) {
combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal, trackDiffs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -59,9 +59,8 @@ func NewQueryRange(req *tempopb.QueryRangeRequest) (Combiner, error) {
sortResponse(resp)
return resp, nil
},
// todo: the diff method still returns the full response every time. find a way to diff
diff: func(_ *tempopb.QueryRangeResponse) (*tempopb.QueryRangeResponse, error) {
resp := combiner.Response()
resp := combiner.Diff()
if resp == nil {
resp = &tempopb.QueryRangeResponse{}
}
Expand All @@ -71,8 +70,8 @@ func NewQueryRange(req *tempopb.QueryRangeRequest) (Combiner, error) {
}, nil
}

func NewTypedQueryRange(req *tempopb.QueryRangeRequest) (GRPCCombiner[*tempopb.QueryRangeResponse], error) {
c, err := NewQueryRange(req)
func NewTypedQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (GRPCCombiner[*tempopb.QueryRangeResponse], error) {
c, err := NewQueryRange(req, trackDiffs)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp
start := time.Now()

var finalResponse *tempopb.QueryRangeResponse
c, err := combiner.NewTypedQueryRange(req)
c, err := combiner.NewTypedQueryRange(req, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper
logQueryRangeRequest(logger, tenant, queryRangeReq)

// build and use roundtripper
combiner, err := combiner.NewTypedQueryRange(queryRangeReq)
combiner, err := combiner.NewTypedQueryRange(queryRangeReq, false)
if err != nil {
level.Error(logger).Log("msg", "query range: query range combiner failed", "err", err)
return &http.Response{
Expand Down Expand Up @@ -154,7 +154,7 @@ func logQueryRangeRequest(logger log.Logger, tenantID string, req *tempopb.Query
"msg", "query range request",
"tenant", tenantID,
"query", req.Query,
"range_seconds", req.End-req.Start,
"range_nanos", req.End-req.Start,
"mode", req.QueryMode,
"step", req.Step)
}
2 changes: 1 addition & 1 deletion modules/querier/querier_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (q *Querier) queryRangeRecent(ctx context.Context, req *tempopb.QueryRangeR
return nil, fmt.Errorf("error querying generators in Querier.MetricsQueryRange: %w", err)
}

c, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeSum)
c, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeSum, false)
if err != nil {
return nil, err
}
Expand Down
85 changes: 81 additions & 4 deletions pkg/traceql/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package traceql
import (
"sort"
"strings"
"time"

"github.com/grafana/tempo/pkg/tempopb"
)
Expand Down Expand Up @@ -141,22 +142,37 @@ func spansetID(ss *tempopb.SpanSet) string {
return id
}

type tsRange struct {
minTS, maxTS int64
}

type QueryRangeCombiner struct {
req *tempopb.QueryRangeRequest
eval *MetricsFrontendEvaluator
metrics *tempopb.SearchMetrics

// used to track which series were updated since the previous diff
// todo: it may not be worth it to track the diffs per series. it would be simpler (and possibly nearly as effective) to just calculate a global
// max/min for all series
seriesUpdated map[string]tsRange
}

func QueryRangeCombinerFor(req *tempopb.QueryRangeRequest, mode AggregateMode) (*QueryRangeCombiner, error) {
func QueryRangeCombinerFor(req *tempopb.QueryRangeRequest, mode AggregateMode, trackDiffs bool) (*QueryRangeCombiner, error) {
eval, err := NewEngine().CompileMetricsQueryRangeNonRaw(req, mode)
if err != nil {
return nil, err
}

var seriesUpdated map[string]tsRange
if trackDiffs {
seriesUpdated = map[string]tsRange{}
}

return &QueryRangeCombiner{
req: req,
eval: eval,
metrics: &tempopb.SearchMetrics{},
req: req,
eval: eval,
metrics: &tempopb.SearchMetrics{},
seriesUpdated: seriesUpdated,
}, nil
}

Expand All @@ -165,6 +181,9 @@ func (q *QueryRangeCombiner) Combine(resp *tempopb.QueryRangeResponse) {
return
}

// mark min/max for all series
q.markUpdatedRanges(resp)

// Here is where the job results are reentered into the pipeline
q.eval.ObserveSeries(resp.Series)

Expand All @@ -185,3 +204,61 @@ func (q *QueryRangeCombiner) Response() *tempopb.QueryRangeResponse {
Metrics: q.metrics,
}
}

func (q *QueryRangeCombiner) Diff() *tempopb.QueryRangeResponse {
if q.seriesUpdated == nil {
return q.Response()
}

seriesRangeFn := func(promLabels string) (uint64, uint64, bool) {
tsr, ok := q.seriesUpdated[promLabels]
return uint64(tsr.minTS), uint64(tsr.maxTS), ok
}

// filter out series that haven't change
resp := &tempopb.QueryRangeResponse{
Series: q.eval.Results().ToProtoDiff(q.req, seriesRangeFn),
Metrics: q.metrics,
}

// wipe out the diff for the next call
clear(q.seriesUpdated)

return resp
}

func (q *QueryRangeCombiner) markUpdatedRanges(resp *tempopb.QueryRangeResponse) {
if q.seriesUpdated == nil {
return
}

// mark all ranges that changed
for _, series := range resp.Series {
if len(series.Samples) == 0 {
continue
}

nanoMin := series.Samples[0].TimestampMs * int64(time.Millisecond)
nanoMax := series.Samples[len(series.Samples)-1].TimestampMs * int64(time.Millisecond)

tsr, ok := q.seriesUpdated[series.PromLabels]
if !ok {
q.seriesUpdated[series.PromLabels] = tsRange{minTS: nanoMin, maxTS: nanoMax}
continue
}

var updated bool
if nanoMin < tsr.minTS {
updated = true
tsr.minTS = nanoMin
}
if nanoMax > tsr.maxTS {
updated = true
tsr.maxTS = nanoMax
}

if updated {
q.seriesUpdated[series.PromLabels] = tsr
}
}
}
157 changes: 157 additions & 0 deletions pkg/traceql/combine_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package traceql

import (
"fmt"
"slices"
"strings"
"testing"
"time"

"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
Expand Down Expand Up @@ -263,3 +267,156 @@ func TestCombineResults(t *testing.T) {
})
}
}

// nolint:govet
func TestQueryRangeCombinerDiffs(t *testing.T) {
start := uint64(100 * time.Millisecond)
end := uint64(150 * time.Millisecond)
step := uint64(10 * time.Millisecond)

tcs := []struct {
resp, expectedResponse, expectedDiff *tempopb.QueryRangeResponse
}{
// push nothing get nothing
{
resp: &tempopb.QueryRangeResponse{},
expectedResponse: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{},
},
},
// push 3 data points, get them back
{
resp: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}),
},
},
expectedResponse: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}, {130, 0}, {140, 0}, {150, 0}}),
},
},
expectedDiff: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}),
},
},
},
// push 2 data points, check aggregation
{
resp: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("foo", "1", []tempopb.Sample{{120, 1}, {130, 2}, {150, 3}}),
},
},
expectedResponse: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 4}, {130, 2}, {140, 0}, {150, 3}}),
},
},
},
// push different series
{
resp: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("bar", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}),
},
},
expectedResponse: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 4}, {130, 2}, {140, 0}, {150, 3}}),
timeSeries("bar", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}, {130, 0}, {140, 0}, {150, 0}}),
},
},
// includes last 2 pushes
expectedDiff: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("foo", "1", []tempopb.Sample{{120, 4}, {130, 2}, {140, 0}, {150, 3}}),
timeSeries("bar", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}),
},
},
},
// push different series by label value
{
resp: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("foo", "2", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}),
},
},
expectedResponse: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 4}, {130, 2}, {140, 0}, {150, 3}}),
timeSeries("bar", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}, {130, 0}, {140, 0}, {150, 0}}),
timeSeries("foo", "2", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}, {130, 0}, {140, 0}, {150, 0}}),
},
},
// includes last 2 pushes
expectedDiff: &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
timeSeries("foo", "2", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}),
},
},
},
}

req := &tempopb.QueryRangeRequest{
Start: start,
End: end,
Step: step,
Query: "{} | rate()", // simple aggregate
}
combiner, err := QueryRangeCombinerFor(req, AggregateModeFinal, true)
require.NoError(t, err)

for i, tc := range tcs {
t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) {
combiner.Combine(tc.resp)

resp := combiner.Response()
resp.Metrics = nil // we want to ignore metrics for this test, just nil them out
metricsEqual(t, tc.expectedResponse, resp)

if tc.expectedDiff != nil {
// call diff and get expected
diff := combiner.Diff()
diff.Metrics = nil
metricsEqual(t, tc.expectedDiff, diff)

// call diff again and get nothing!
diff = combiner.Diff()
diff.Metrics = nil
require.Equal(t, &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{},
}, diff)
}
})
}
}

func metricsEqual(t *testing.T, a, b *tempopb.QueryRangeResponse) {
t.Helper()

slices.SortFunc(a.Series, func(a, b *tempopb.TimeSeries) int {
return strings.Compare(a.PromLabels, b.PromLabels)
})
slices.SortFunc(b.Series, func(a, b *tempopb.TimeSeries) int {
return strings.Compare(a.PromLabels, b.PromLabels)
})

require.Equal(t, a, b)
}

func timeSeries(name, val string, samples []tempopb.Sample) *tempopb.TimeSeries {
lbls := Labels{
{
Name: name,
Value: NewStaticString(val),
},
}

return &tempopb.TimeSeries{
Labels: []v1.KeyValue{{Key: name, Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: val}}}},
Samples: samples,
PromLabels: lbls.String(),
}
}
Loading
Loading