Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per tenant max search duration #1421

Merged
merged 10 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## main / unreleased

* [BUGFIX] metrics-generator: don't inject X-Scope-OrgID header for single-tenant setups [1417](https://github.com/grafana/tempo/pull/1417) (@kvrhdn)
* [ENHANCEMENT] Added the ability to have a per tenant max search duration. [#1421](https://github.com/grafana/tempo/pull/1421) (@joe-elliott)

## v1.4.0 / 2022-04-28

Expand Down
4 changes: 2 additions & 2 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {
t.frontend = v1

// create query frontend
queryFrontend, err := frontend.New(t.cfg.Frontend, cortexTripper, t.store, log.Logger, prometheus.DefaultRegisterer)
queryFrontend, err := frontend.New(t.cfg.Frontend, cortexTripper, t.overrides, t.store, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func (t *App) setupModuleManager() error {
// Store: nil,
Overrides: {Server},
MemberlistKV: {Server},
QueryFrontend: {Store, Server},
QueryFrontend: {Store, Server, Overrides},
Ring: {Server, MemberlistKV},
MetricsGeneratorRing: {Server, MemberlistKV},
Distributor: {Ring, Server, Overrides},
Expand Down
10 changes: 9 additions & 1 deletion docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,15 @@ overrides:
# This setting is useful if you wish to test how many active series a tenant will generate, without
# actually writing these metrics.
[metrics_generator_disable_collection: <bool> | default = false]


# Per-user block retention. If this value is set to 0 (default) then block_retention
# in the compactor config is used.
[block_retention: <duration> | default = 0s]

# Per-user max search duration. If this value is set to 0 (default) then max_duration
# in the frontend config is used.
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
[max_search_duration: <duration> | default = 0s]

# Tenant-specific overrides settings configuration file. The empty string (default
# value) disables using an overrides file.
[per_tenant_override_config: <string> | default = ""]
Expand Down
21 changes: 15 additions & 6 deletions docs/tempo/website/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ ingester_client:
remote_timeout: 5s
grpc_client_config:
max_recv_msg_size: 104857600
max_send_msg_size: 16777216
max_send_msg_size: 104857600
grpc_compression: snappy
rate_limit: 0
rate_limit_burst: 0
Expand All @@ -135,7 +135,7 @@ metrics_generator_client:
remote_timeout: 5s
grpc_client_config:
max_recv_msg_size: 104857600
max_send_msg_size: 16777216
max_send_msg_size: 104857600
grpc_compression: snappy
rate_limit: 0
rate_limit_burst: 0
Expand Down Expand Up @@ -316,9 +316,13 @@ ingester:
join_after: 0s
min_ready_duration: 15s
interface_names:
- eth0
- en0
final_sleep: 30s
- wlp2s0
- docker0
- br-f163873defd4
- br-f56e9de73d01
- br-16536cce4aa3
- br-3bc02eb7efdd
final_sleep: 0s
tokens_file_path: ""
availability_zone: ""
unregister_on_shutdown: true
Expand Down Expand Up @@ -384,6 +388,7 @@ metrics_generator:
- 3.2
- 6.4
- 12.8
dimensions: []
span_metrics:
histogram_buckets:
- 0.002
Expand Down Expand Up @@ -448,9 +453,9 @@ storage:
bucket_name: ""
chunk_buffer_size: 10485760
endpoint: ""
insecure: false
hedge_requests_at: 0s
hedge_requests_up_to: 2
insecure: false
object_cache_control: ""
object_metadata: {}
s3:
Expand Down Expand Up @@ -494,8 +499,12 @@ overrides:
metrics_generator_processors: null
metrics_generator_max_active_series: 0
metrics_generator_collection_interval: 0s
metrics_generator_disable_collection: false
metrics_generator_forwarder_queue_size: 0
metrics_generator_forwarder_workers: 0
block_retention: 0s
max_bytes_per_tag_values_query: 5000000
max_search_duration: 0s
max_bytes_per_trace: 5000000
per_tenant_override_config: ""
per_tenant_override_period: 10s
Expand Down
4 changes: 2 additions & 2 deletions example/tk/lib/dashboards/grafana.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ local mixins = import 'mixins.libsonnet';
deploy(frontend_url='http://query-frontend'):
grafana
+ grafana.withReplicas(1)
+ grafana.withImage('grafana/grafana:8.3.0-beta2')
+ grafana.withImage('grafana/grafana:8.5.2')
+ grafana.withRootUrl('http://grafana')
+ grafana.withTheme('dark')
+ grafana.withAnonymous()

+ grafana.withGrafanaIniConfig({
sections+: {
feature_toggles: {
enable: 'tempoSearch',
enable: 'tempoSearch tempoBackendSearch',
},
},
})
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Config struct {
}

type SearchConfig struct {
Sharder SearchSharderConfig `yaml:",inline"` // jpe - what did this look like before
Sharder SearchSharderConfig `yaml:",inline"`
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
Expand Down
9 changes: 5 additions & 4 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/storage"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
Expand All @@ -38,7 +39,7 @@ type QueryFrontend struct {
}

// New returns a new QueryFrontend
func New(cfg Config, next http.RoundTripper, store storage.Store, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
func New(cfg Config, next http.RoundTripper, o *overrides.Overrides, store storage.Store, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
level.Info(logger).Log("msg", "creating middleware in query frontend")

if cfg.QueryShards < minQueryShards || cfg.QueryShards > maxQueryShards {
Expand Down Expand Up @@ -67,7 +68,7 @@ func New(cfg Config, next http.RoundTripper, store storage.Store, logger log.Log

// tracebyid middleware
traceByIDMiddleware := MergeMiddlewares(newTraceByIDMiddleware(cfg, logger), retryWare)
searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, store, logger), retryWare)
searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, o, store, logger), retryWare)

traceByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{
"op": traceByIDOp,
Expand Down Expand Up @@ -174,10 +175,10 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware {
}

// newSearchMiddleware creates a new frontend middleware to handle search and search tags requests.
func newSearchMiddleware(cfg Config, reader tempodb.Reader, logger log.Logger) Middleware {
func newSearchMiddleware(cfg Config, o *overrides.Overrides, reader tempodb.Reader, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
ingesterSearchRT := next
backendSearchRT := NewRoundTripper(next, newSearchSharder(reader, cfg.Search.Sharder, logger))
backendSearchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, logger))

return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// backend search queries require sharding so we pass through a special roundtripper
Expand Down
12 changes: 6 additions & 6 deletions modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestFrontendRoundTripsSearch(t *testing.T) {
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
},
}, next, nil, log.NewNopLogger(), nil)
}, next, nil, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

req := httptest.NewRequest("GET", "/", nil)
Expand All @@ -50,7 +50,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
},
}, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
assert.Nil(t, f)

Expand All @@ -61,7 +61,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
},
}, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
assert.Nil(t, f)

Expand All @@ -72,7 +72,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
},
}, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0")
assert.Nil(t, f)

Expand All @@ -83,7 +83,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
TargetBytesPerRequest: 0,
},
},
}, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0")
assert.Nil(t, f)

Expand All @@ -96,7 +96,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
QueryBackendAfter: time.Hour,
},
},
}, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
assert.Nil(t, f)
}
44 changes: 30 additions & 14 deletions modules/frontend/searchsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -124,8 +125,9 @@ func (r *searchResponse) result() *tempopb.SearchResponse {
}

type searchSharder struct {
next http.RoundTripper
reader tempodb.Reader
next http.RoundTripper
reader tempodb.Reader
overrides *overrides.Overrides

cfg SearchSharderConfig
logger log.Logger
Expand All @@ -142,13 +144,14 @@ type SearchSharderConfig struct {
}

// newSearchSharder creates a sharding middleware for search
func newSearchSharder(reader tempodb.Reader, cfg SearchSharderConfig, logger log.Logger) Middleware {
func newSearchSharder(reader tempodb.Reader, o *overrides.Overrides, cfg SearchSharderConfig, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
return searchSharder{
next: next,
reader: reader,
logger: logger,
cfg: cfg,
next: next,
reader: reader,
overrides: o,
logger: logger,
cfg: cfg,
}
})
}
Expand All @@ -169,15 +172,9 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
}, nil
}

// adjust limit based on config
searchReq.Limit = adjustLimit(searchReq.Limit, s.cfg.DefaultLimit, s.cfg.MaxLimit)

if s.cfg.MaxDuration != 0 && time.Duration(searchReq.End-searchReq.Start)*time.Second > s.cfg.MaxDuration {
return &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(strings.NewReader(fmt.Sprintf("range specified by start and end exceeds %s. received start=%d end=%d", s.cfg.MaxDuration, searchReq.Start, searchReq.End))),
}, nil
}

ctx := r.Context()
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
Expand All @@ -189,6 +186,15 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.ShardSearch")
defer span.Finish()

// calculate and enforce max search duration
maxDuration := s.maxDuration(tenantID)
if maxDuration != 0 && time.Duration(searchReq.End-searchReq.Start)*time.Second > maxDuration {
return &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(strings.NewReader(fmt.Sprintf("range specified by start and end exceeds %s. received start=%d end=%d", maxDuration, searchReq.Start, searchReq.End))),
}, nil
}

ingesterReq, err := s.ingesterRequest(ctx, tenantID, r, *searchReq)
if err != nil {
return nil, err
Expand Down Expand Up @@ -439,3 +445,13 @@ func adjustLimit(limit, defaultLimit, maxLimit uint32) uint32 {

return limit
}

func (s *searchSharder) maxDuration(tenantID string) time.Duration {
// check overrides first, if no overrides then grab from our config
maxDuration := s.overrides.MaxSearchDuration(tenantID)
if maxDuration != 0 {
return maxDuration
}

return s.cfg.MaxDuration
}
Loading