Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update traceql metrics to use the trace-level timestamp columns #3353

Merged
merged 5 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,11 @@ func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeReque
go func(b common.BackendBlock) {
defer wg.Done()

m := b.BlockMeta()

span, ctx := opentracing.StartSpanFromContext(ctx, "Processor.QueryRange.Block", opentracing.Tags{
"block": b.BlockMeta().BlockID,
"blockSize": b.BlockMeta().Size,
"block": m.BlockID,
"blockSize": m.Size,
})
defer span.Finish()

Expand All @@ -483,7 +485,7 @@ func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeReque
return b.Fetch(ctx, req, common.DefaultSearchOptions())
})

err := eval.Do(ctx, f)
err := eval.Do(ctx, f, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano()))
if err != nil {
jobErr.Store(err)
}
Expand Down
2 changes: 1 addition & 1 deletion modules/querier/querier_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeReque
})

// TODO handle error
err := eval.Do(ctx, f)
err := eval.Do(ctx, f, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano()))
if err != nil {
jobErr.Store(err)
}
Expand Down
69 changes: 56 additions & 13 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (e *Engine) ExecuteMetricsQueryRange(ctx context.Context, req *tempopb.Quer
return nil, err
}

err = eval.Do(ctx, fetcher)
err = eval.Do(ctx, fetcher, 0, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -393,11 +393,9 @@ func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, dedupe

// Timestamp filtering
// (1) Include any overlapping trace
// TODO - It can be faster to skip the trace-level timestamp check
// It can be faster to skip the trace-level timestamp check
// when all or most of the traces overlap the window.
// Maybe it can be dynamic.
// storageReq.StartTimeUnixNanos = req.Start
// storageReq.EndTimeUnixNanos = req.End // Should this be exclusive?
// So this is done dynamically on a per-fetcher basis in Do()
// (2) Only include spans that started in this time frame.
// This is checked outside the fetch layer in the evaluator. Timestamp
// is only checked on the spans that are the final results.
Expand Down Expand Up @@ -470,13 +468,51 @@ type MetricsEvalulator struct {
deduper *SpanDeduper2
storageReq *FetchSpansRequest
metricsPipeline metricsFirstStageElement
count int
deduped int
spansTotal uint64
spansDeduped uint64
bytes uint64
mtx sync.Mutex
}

func (e *MetricsEvalulator) Do(ctx context.Context, f SpansetFetcher) error {
fetch, err := f.Fetch(ctx, *e.storageReq)
func timeRangeOverlap(reqStart, reqEnd, dataStart, dataEnd uint64) float64 {
st := max(reqStart, dataStart)
end := min(reqEnd, dataEnd)

if end <= st {
return 0
}

return float64(end-st) / float64(dataEnd-dataStart)
}

// Do metrics on the given source of data and merge the results into the working set. Optionally, if provided,
// uses the known time range of the data for last-minute optimizations. Time range is unix nanos
func (e *MetricsEvalulator) Do(ctx context.Context, f SpansetFetcher, fetcherStart, fetcherEnd uint64) error {
// Make a copy of the request so we can modify it.
storageReq := *e.storageReq

if fetcherStart > 0 && fetcherEnd > 0 {
// Dynamically decide whether to use the trace-level timestamp columns
// for filtering.
overlap := timeRangeOverlap(e.start, e.end, fetcherStart, fetcherEnd)

if overlap == 0.0 {
// This shouldn't happen but might as well check.
// No overlap == nothing to do
return nil
}

// Our heuristic is if the overlap between the given fetcher (i.e. block)
// and the request is less than 20%, use them. Above 20%, the cost of loading
// them doesn't outweight the benefits. 20% was measured in local benchmarking.
// TODO - Make configurable or a query hint?
if overlap < 0.2 {
storageReq.StartTimeUnixNanos = e.start
storageReq.EndTimeUnixNanos = e.end // Should this be exclusive?
}
}

fetch, err := f.Fetch(ctx, storageReq)
if errors.Is(err, util.ErrUnsupported) {
return nil
}
Expand Down Expand Up @@ -510,23 +546,30 @@ func (e *MetricsEvalulator) Do(ctx context.Context, f SpansetFetcher) error {
}

if e.dedupeSpans && e.deduper.Skip(ss.TraceID, s.StartTimeUnixNanos()) {
e.deduped++
e.spansDeduped++
continue
}

e.count++
e.spansTotal++
e.metricsPipeline.observe(s)

}
e.mtx.Unlock()
ss.Release()
}

e.mtx.Lock()
defer e.mtx.Unlock()
e.bytes += fetch.Bytes()

return nil
}

func (e *MetricsEvalulator) SpanCount() {
fmt.Println(e.count, e.deduped)
func (e *MetricsEvalulator) Metrics() (uint64, uint64, uint64) {
e.mtx.Lock()
defer e.mtx.Unlock()

return e.bytes, e.spansTotal, e.spansDeduped
}

func (e *MetricsEvalulator) Results() (SeriesSet, error) {
Expand Down
19 changes: 19 additions & 0 deletions pkg/traceql/engine_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ func TestIntervalOf(t *testing.T) {
}
}

func TestTimeRangeOverlap(t *testing.T) {
tc := []struct {
reqStart, reqEnd, dataStart, dataEnd uint64
expected float64
}{
{1, 2, 3, 4, 0.0}, // No overlap
{0, 10, 0, 10, 1.0}, // Perfect overlap
{0, 10, 1, 2, 1.0}, // Request covers 100% of data
{3, 8, 0, 10, 0.5}, // 50% in the middle
{0, 10, 5, 15, 0.5}, // 50% of the start
{5, 15, 0, 10, 0.5}, // 50% of the end
}

for _, c := range tc {
actual := timeRangeOverlap(c.reqStart, c.reqEnd, c.dataStart, c.dataEnd)
require.Equal(t, c.expected, actual)
}
}

func TestCompileMetricsQueryRange(t *testing.T) {
tc := map[string]struct {
q string
Expand Down
51 changes: 36 additions & 15 deletions tempodb/encoding/vparquet3/block_traceql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/rand"
"path"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -694,7 +695,8 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) {
opts = common.DefaultSearchOptions()
tenantID = "1"
blockID = uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5")
path = "/Users/marty/src/tmp/"
// blockID = uuid.MustParse("18364616-f80d-45a6-b2a3-cb63e203edff")
path = "/Users/marty/src/tmp/"
)

r, _, _, err := local.New(&local.Config{
Expand All @@ -717,20 +719,39 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) {

for _, tc := range testCases {
b.Run(tc, func(b *testing.B) {
req := &tempopb.QueryRangeRequest{
Query: tc,
Step: uint64(time.Minute),
Start: uint64(meta.StartTime.UnixNano()),
End: uint64(meta.EndTime.UnixNano()),
}

eval, err := e.CompileMetricsQueryRange(req, false)
require.NoError(b, err)

b.ResetTimer()
for i := 0; i < b.N; i++ {
err := eval.Do(ctx, f)
require.NoError(b, err)
for _, minutes := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9} {
b.Run(strconv.Itoa(minutes), func(b *testing.B) {
st := meta.StartTime
end := st.Add(time.Duration(minutes) * time.Minute)

if end.After(meta.EndTime) {
b.SkipNow()
return
}

req := &tempopb.QueryRangeRequest{
Query: tc,
Step: uint64(time.Minute),
Start: uint64(st.UnixNano()),
End: uint64(end.UnixNano()),
ShardID: 30,
ShardCount: 65,
}

eval, err := e.CompileMetricsQueryRange(req, false)
require.NoError(b, err)

b.ResetTimer()
for i := 0; i < b.N; i++ {
err := eval.Do(ctx, f, uint64(block.meta.StartTime.UnixNano()), uint64(block.meta.EndTime.UnixNano()))
require.NoError(b, err)
}

bytes, spansTotal, _ := eval.Metrics()
b.ReportMetric(float64(bytes)/float64(b.N)/1024.0/1024.0, "MB_IO/op")
b.ReportMetric(float64(spansTotal)/float64(b.N), "spans/op")
b.ReportMetric(float64(spansTotal)/b.Elapsed().Seconds(), "spans/s")
})
}
})
}
Expand Down
Loading