Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Traceql metrics] New (unsafe) query hints #3396

Merged
merged 16 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## main / unreleased

* [ENHANCEMENT] Add string interning to TraceQL queries [#3411](https://github.com/grafana/tempo/pull/3411) (@mapno)
* [ENHANCEMENT] Add new (unsafe) query hints for metrics queries [#3396](https://github.com/grafana/tempo/pull/3396) (@mdisibio)
* [BUGFIX] Fix metrics query results when filtering and rating on the same attribute [#3428](https://github.com/grafana/tempo/issues/3428) (@mdisibio)
* [BUGFIX] Fix metrics query results when series contain empty strings or nil values [#3429](https://github.com/grafana/tempo/issues/3429) (@mdisibio)
* [BUGFIX] Return unfiltered results when a bad TraceQL query is provided in autocomplete. [#3426](https://github.com/grafana/tempo/pull/3426) (@mapno)
Expand Down
68 changes: 55 additions & 13 deletions modules/frontend/query_range_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,10 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
}, nil
}

// Check sampling rate hint
samplingRate := 1.0
if ok, v := expr.Hints.GetFloat(traceql.HintSample); ok {
if v > 0 && v < 1.0 {
samplingRate = v
}
}
allowUnsafe := s.overrides.UnsafeQueryHints(tenantID)
samplingRate := s.samplingRate(expr, allowUnsafe)
targetBytesPerRequest := s.jobSize(expr, samplingRate, allowUnsafe)
interval := s.jobInterval(expr, allowUnsafe)

generatorReq = s.generatorRequest(*queryRangeReq, samplingRate)

Expand All @@ -130,7 +127,7 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
reqCh <- generatorReq
}

totalBlocks, totalBlockBytes := s.backendRequests(tenantID, queryRangeReq, now, samplingRate, reqCh, stopCh)
totalBlocks, totalBlockBytes := s.backendRequests(tenantID, queryRangeReq, now, samplingRate, targetBytesPerRequest, interval, reqCh, stopCh)

wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests))
jobErr := atomic.Error{}
Expand Down Expand Up @@ -281,7 +278,7 @@ func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*bac
return metas
}

func (s *queryRangeSharder) backendRequests(tenantID string, searchReq *tempopb.QueryRangeRequest, now time.Time, samplingRate float64, reqCh chan *queryRangeJob, stopCh <-chan struct{}) (totalBlocks, totalBlockBytes int) {
func (s *queryRangeSharder) backendRequests(tenantID string, searchReq *tempopb.QueryRangeRequest, now time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *queryRangeJob, stopCh <-chan struct{}) (totalBlocks, totalBlockBytes int) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
Expand Down Expand Up @@ -317,16 +314,16 @@ func (s *queryRangeSharder) backendRequests(tenantID string, searchReq *tempopb.
}

go func() {
s.buildBackendRequests(tenantID, searchReq, start, end, samplingRate, reqCh, stopCh)
s.buildBackendRequests(tenantID, searchReq, start, end, samplingRate, targetBytesPerRequest, interval, reqCh, stopCh)
}()

return
}

func (s *queryRangeSharder) buildBackendRequests(tenantID string, searchReq *tempopb.QueryRangeRequest, start, end uint64, samplingRate float64, reqCh chan *queryRangeJob, stopCh <-chan struct{}) {
func (s *queryRangeSharder) buildBackendRequests(tenantID string, searchReq *tempopb.QueryRangeRequest, start, end uint64, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *queryRangeJob, stopCh <-chan struct{}) {
defer close(reqCh)

timeWindowSize := uint64(s.cfg.Interval.Nanoseconds())
timeWindowSize := uint64(interval.Nanoseconds())

for start < end {

Expand All @@ -347,7 +344,7 @@ func (s *queryRangeSharder) buildBackendRequests(tenantID string, searchReq *tem
totalBlockSize += b.Size
}

shards := uint32(math.Ceil(float64(totalBlockSize) / float64(s.cfg.TargetBytesPerRequest)))
shards := uint32(math.Ceil(float64(totalBlockSize) / float64(targetBytesPerRequest)))

for i := uint32(1); i <= shards; i++ {
shardR := *searchReq
Expand Down Expand Up @@ -445,6 +442,51 @@ func (s *queryRangeSharder) maxDuration(tenantID string) time.Duration {
return s.cfg.MaxDuration
}

func (s *queryRangeSharder) samplingRate(expr *traceql.RootExpr, allowUnsafe bool) float64 {
samplingRate := 1.0
if v, ok := expr.Hints.GetFloat(traceql.HintSample, allowUnsafe); ok {
if v > 0 && v < 1.0 {
samplingRate = v
}
}
return samplingRate
}

func (s *queryRangeSharder) jobSize(expr *traceql.RootExpr, samplingRate float64, allowUnsafe bool) int {
// If we have a query hint then use it
if v, ok := expr.Hints.GetInt(traceql.HintJobSize, allowUnsafe); ok && v > 0 {
return v
}

// Else use configured value.
size := s.cfg.TargetBytesPerRequest

// Automatically scale job size when sampling less than 100%
// This improves performance.
if samplingRate < 1.0 {
factor := 1.0 / samplingRate

// Keep it within reason
if factor > 10.0 {
factor = 10.0
}

size = int(float64(size) * factor)
}

return size
}

func (s *queryRangeSharder) jobInterval(expr *traceql.RootExpr, allowUnsafe bool) time.Duration {
// If we have a query hint then use it
if v, ok := expr.Hints.GetDuration(traceql.HintJobInterval, allowUnsafe); ok && v > 0 {
return v
}

// Else use configured value
return s.cfg.Interval
}

func (s *queryRangeSharder) convertToPromFormat(resp *tempopb.QueryRangeResponse) PromResponse {
// Sort in increasing timestamp so that lines are drawn correctly
for _, series := range resp.Series {
Expand Down
1 change: 1 addition & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type metricsGeneratorOverrides interface {
MetricsGeneratorProcessorSpanMetricsTargetInfoExcludedDimensions(userID string) []string
DedicatedColumns(userID string) backend.DedicatedColumns
MaxBytesPerTrace(userID string) int
UnsafeQueryHints(userID string) bool
}

var _ metricsGeneratorOverrides = (overrides.Interface)(nil)
5 changes: 5 additions & 0 deletions modules/generator/overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type mockOverrides struct {
localBlocksCompleteBlockTimeout time.Duration
dedicatedColumns backend.DedicatedColumns
maxBytesPerTrace int
unsafeQueryHints bool
}

var _ metricsGeneratorOverrides = (*mockOverrides)(nil)
Expand Down Expand Up @@ -138,3 +139,7 @@ func (m *mockOverrides) DedicatedColumns(string) backend.DedicatedColumns {
func (m *mockOverrides) MaxBytesPerTrace(string) int {
return m.maxBytesPerTrace
}

func (m *mockOverrides) UnsafeQueryHints(string) bool {
return m.unsafeQueryHints
}
18 changes: 16 additions & 2 deletions modules/generator/processor/localblocks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,19 @@ type Config struct {
MaxBlockBytes uint64 `yaml:"max_block_bytes"`
CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"`
MaxLiveTraces uint64 `yaml:"max_live_traces"`
ConcurrentBlocks uint `yaml:"concurrent_blocks"`
FilterServerSpans bool `yaml:"filter_server_spans"`
Metrics MetricsConfig `yaml:",inline"`
}

type MetricsConfig struct {
ConcurrentBlocks uint `yaml:"concurrent_blocks"`
// TimeOverlapCutoff is a tuning factor that controls whether the trace-level
// timestamp columns are used in a metrics query. Loading these columns has a cost,
// so in some cases it faster to skip these columns entirely, reducing I/O but
// increasing the number of spans evalulated and thrown away. The value is a ratio
// between 0.0 and 1.0. If a block overlaps the time window by less than this value,
// then we skip the columns. A value of 1.0 will always load the columns, and 0.0 never.
TimeOverlapCutoff float64 `yaml:"time_overlap_cutoff,omitempty"`
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
Expand All @@ -39,6 +50,9 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.MaxBlockDuration = 1 * time.Minute
cfg.MaxBlockBytes = 500_000_000
cfg.CompleteBlockTimeout = time.Hour
cfg.ConcurrentBlocks = 10
cfg.FilterServerSpans = true
cfg.Metrics = MetricsConfig{
ConcurrentBlocks: 10,
TimeOverlapCutoff: 0.2,
}
}
25 changes: 23 additions & 2 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const timeBuffer = 5 * time.Minute
type ProcessorOverrides interface {
DedicatedColumns(string) backend.DedicatedColumns
MaxBytesPerTrace(string) int
UnsafeQueryHints(string) bool
}

type Processor struct {
Expand Down Expand Up @@ -444,14 +445,34 @@ func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeReque
for _, b := range p.completeBlocks {
blocks = append(blocks, b)
}
if len(blocks) == 0 {
return nil, nil
}

expr, err := traceql.Parse(req.Query)
if err != nil {
return nil, fmt.Errorf("compiling query: %w", err)
}

unsafe := p.overrides.UnsafeQueryHints(p.tenant)

timeOverlapCutoff := p.Cfg.Metrics.TimeOverlapCutoff
if v, ok := expr.Hints.GetFloat(traceql.HintTimeOverlapCutoff, unsafe); ok && v >= 0 && v <= 1.0 {
timeOverlapCutoff = v
}

concurrency := p.Cfg.Metrics.ConcurrentBlocks
if v, ok := expr.Hints.GetInt(traceql.HintConcurrentBlocks, unsafe); ok && v > 0 && v < 100 {
concurrency = uint(v)
}

eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, false)
eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, false, timeOverlapCutoff, unsafe)
if err != nil {
return nil, err
}

var (
wg = boundedwaitgroup.New(p.Cfg.ConcurrentBlocks)
wg = boundedwaitgroup.New(concurrency)
jobErr = atomic.Error{}
)

Expand Down
9 changes: 8 additions & 1 deletion modules/generator/processor/localblocks/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (m *mockOverrides) MaxBytesPerTrace(string) int {
return 0
}

func (m *mockOverrides) UnsafeQueryHints(string) bool {
return false
}

func TestProcessorDoesNotRace(t *testing.T) {
wal, err := wal.New(&wal.Config{
Filepath: t.TempDir(),
Expand All @@ -43,12 +47,15 @@ func TestProcessorDoesNotRace(t *testing.T) {
FlushCheckPeriod: 10 * time.Millisecond,
TraceIdlePeriod: time.Second,
CompleteBlockTimeout: time.Minute,
ConcurrentBlocks: 10,
Block: &common.BlockConfig{
BloomShardSizeBytes: 100_000,
BloomFP: 0.05,
Version: encoding.DefaultEncoding().Version(),
},
Metrics: MetricsConfig{
ConcurrentBlocks: 10,
TimeOverlapCutoff: 0.2,
},
}
overrides = &mockOverrides{}
)
Expand Down
2 changes: 2 additions & 0 deletions modules/overrides/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ type ReadOverrides struct {

// QueryFrontend enforced overrides
MaxSearchDuration model.Duration `yaml:"max_search_duration,omitempty" json:"max_search_duration,omitempty"`

UnsafeQueryHints bool `yaml:"unsafe_query_hints,omitempty" json:"unsafe_query_hints,omitempty"`
}

type CompactionOverrides struct {
Expand Down
3 changes: 3 additions & 0 deletions modules/overrides/config_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (c *Overrides) toLegacy() LegacyOverrides {
MaxBytesPerTagValuesQuery: c.Read.MaxBytesPerTagValuesQuery,
MaxBlocksPerTagValuesQuery: c.Read.MaxBlocksPerTagValuesQuery,
MaxSearchDuration: c.Read.MaxSearchDuration,
UnsafeQueryHints: c.Read.UnsafeQueryHints,

MaxBytesPerTrace: c.Global.MaxBytesPerTrace,

Expand Down Expand Up @@ -117,6 +118,7 @@ type LegacyOverrides struct {

// QueryFrontend enforced limits
MaxSearchDuration model.Duration `yaml:"max_search_duration" json:"max_search_duration"`
UnsafeQueryHints bool `yaml:"unsafe_query_hints" json:"unsafe_query_hints"`

// MaxBytesPerTrace is enforced in the Ingester, Compactor, Querier (Search) and Serverless (Search). It
// is not used when doing a trace by id lookup.
Expand All @@ -139,6 +141,7 @@ func (l *LegacyOverrides) toNewLimits() Overrides {
MaxBytesPerTagValuesQuery: l.MaxBytesPerTagValuesQuery,
MaxBlocksPerTagValuesQuery: l.MaxBlocksPerTagValuesQuery,
MaxSearchDuration: l.MaxSearchDuration,
UnsafeQueryHints: l.UnsafeQueryHints,
},
Compaction: CompactionOverrides{
BlockRetention: l.BlockRetention,
Expand Down
1 change: 1 addition & 0 deletions modules/overrides/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Interface interface {
BlockRetention(userID string) time.Duration
MaxSearchDuration(userID string) time.Duration
DedicatedColumns(userID string) backend.DedicatedColumns
UnsafeQueryHints(userID string) bool

// Management API
WriteStatusRuntimeConfig(w io.Writer, r *http.Request) error
Expand Down
4 changes: 4 additions & 0 deletions modules/overrides/runtime_config_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ func (o *runtimeConfigOverridesManager) MaxBlocksPerTagValuesQuery(userID string
return o.getOverridesForUser(userID).Read.MaxBlocksPerTagValuesQuery
}

func (o *runtimeConfigOverridesManager) UnsafeQueryHints(userID string) bool {
return o.getOverridesForUser(userID).Read.UnsafeQueryHints
}

// MaxSearchDuration is the duration of the max search duration for this tenant.
func (o *runtimeConfigOverridesManager) MaxSearchDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).Read.MaxSearchDuration)
Expand Down
15 changes: 15 additions & 0 deletions modules/querier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type Config struct {
Search SearchConfig `yaml:"search"`
TraceByID TraceByIDConfig `yaml:"trace_by_id"`
Metrics MetricsConfig `yaml:"metrics"`

ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"`
MaxConcurrentQueries int `yaml:"max_concurrent_queries"`
Expand Down Expand Up @@ -43,6 +44,18 @@ type TraceByIDConfig struct {
QueryTimeout time.Duration `yaml:"query_timeout"`
}

type MetricsConfig struct {
ConcurrentBlocks int `yaml:"concurrent_blocks,omitempty"`

// TimeOverlapCutoff is a tuning factor that controls whether the trace-level
// timestamp columns are used in a metrics query. Loading these columns has a cost,
// so in some cases it faster to skip these columns entirely, reducing I/O but
// increasing the number of spans evalulated and thrown away. The value is a ratio
// between 0.0 and 1.0. If a block overlaps the time window by less than this value,
// then we skip the columns. A value of 1.0 will always load the columns, and 0.0 never.
TimeOverlapCutoff float64 `yaml:"time_overlap_cutoff,omitempty"`
}

// RegisterFlagsAndApplyDefaults register flags.
func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
cfg.TraceByID.QueryTimeout = 10 * time.Second
Expand All @@ -53,6 +66,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Search.HedgeRequestsAt = 8 * time.Second
cfg.Search.HedgeRequestsUpTo = 2
cfg.Search.QueryTimeout = 30 * time.Second
cfg.Metrics.ConcurrentBlocks = 2
cfg.Metrics.TimeOverlapCutoff = 0.2
cfg.Worker = worker.Config{
MatchMaxConcurrency: true,
MaxConcurrentRequests: cfg.MaxConcurrentQueries,
Expand Down
21 changes: 19 additions & 2 deletions modules/querier/querier_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,33 @@ func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeReque
return nil, nil
}

unsafe := q.limits.UnsafeQueryHints(tenantID)

// Optimization
// If there's only 1 block then dedupe not needed.
dedupe := len(withinTimeRange) > 1

eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, dedupe)
expr, err := traceql.Parse(req.Query)
if err != nil {
return nil, err
}

timeOverlapCutoff := q.cfg.Metrics.TimeOverlapCutoff
if v, ok := expr.Hints.GetFloat(traceql.HintTimeOverlapCutoff, unsafe); ok && v >= 0 && v <= 1.0 {
timeOverlapCutoff = v
}

concurrency := q.cfg.Metrics.ConcurrentBlocks
if v, ok := expr.Hints.GetInt(traceql.HintConcurrentBlocks, unsafe); ok && v > 0 && v < 100 {
concurrency = v
}

eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, dedupe, timeOverlapCutoff, unsafe)
if err != nil {
return nil, err
}

wg := boundedwaitgroup.New(2)
wg := boundedwaitgroup.New(uint(concurrency))
jobErr := atomic.Error{}

for _, m := range withinTimeRange {
Expand Down
Loading
Loading