diff --git a/CHANGELOG.md b/CHANGELOG.md index e1781770577..9b990e61d5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,13 +19,29 @@ * [ENHANCEMENT] Partially persist traces that exceed `max_bytes_per_trace` during compaction [#1317](https://github.com/grafana/tempo/pull/1317) (@joe-elliott) * [ENHANCEMENT] Make search respect per tenant `max_bytes_per_trace` and added `skippedTraces` to returned search metrics. [#1318](https://github.com/grafana/tempo/pull/1318) (@joe-elliott) * [ENHANCEMENT] Improve serverless consistency by forcing a GC before returning. [#1324](https://github.com/grafana/tempo/pull/1324) (@joe-elliott) -* [ENHANCEMENT] Add hedging to sharded search queries created by the frontend. [#1334](https://github.com/grafana/tempo/pull/1334) (@joe-elliott) +* [ENHANCEMENT] Add hedging to queries to external endpoints. [#1350](https://github.com/grafana/tempo/pull/1350) (@joe-elliott) New config options and defaults: ``` - query_frontend: + querier: + search: + external_hedge_requests_at: 5s + external_hedge_requests_up_to: 3 + ``` + ** BREAKING CHANGE ** + Querier options related to search have moved under a `search` block: + ``` + querier: + search_query_timeout: 30s + search_external_endpoints: [] + search_prefer_self: 2 + ``` + becomes + ``` + querier: search: - hedge_requests_at: 5s - hedge_requests_up_to: 3 + query_timeout: 30s + prefer_self: 2 + external_endpoints: [] ``` * [BUGFIX]: Enable compaction and retention in Tanka single-binary [#1352](https://github.com/grafana/tempo/issues/1352) * [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https://github.com/grafana/tempo/issues/1245) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index ab9b2c9ef60..237ba522342 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -231,15 +231,6 @@ query_frontend: # (default: 1h) [query_ingesters_until: ] - - # If set to a non-zero value a second request will be issued at the provided duration. Recommended to - # be set to p99 of search requests to reduce long tail latency. - # (default: 5s) - [hedge_requests_at: ] - - # The maximum number of requests to execute when hedging. Requires hedge_requests_at to be set. - # (default: 3) - [hedge_requests_up_to: ] ``` ## Querier @@ -254,28 +245,38 @@ querier: # Timeout for trace lookup requests [query_timeout: | default = 10s] - # Timeout for search requests - [search_query_timeout: | default = 30s] - - # A list of external endpoints that the querier will use to offload backend search requests. They must - # take and return the same value as /api/search endpoint on the querier. This is intended to be - # used with serverless technologies for massive parrallelization of the search path. - # The default value of "" disables this feature. - [search_external_endpoints: | default = ] - - # If search_external_endpoints is set then the querier will primarily act as a proxy for whatever serverless backend - # you have configured. This setting allows the operator to have the querier prefer itself for a configurable - # number of subqueries. In the default case of 2 the querier will process up to 2 search requests subqueries before starting - # to reach out to search_external_endpoints. - # Setting this to 0 will disable this feature and the querier will proxy all search subqueries to search_external_endpoints. - [search_prefer_self: | default = 2 ] - # The query frontend turns both trace by id (/api/traces/) and search (/api/search?) requests # into subqueries that are then pulled and serviced by the queriers. # This value controls the overall number of simultaneous subqueries that the querier will service at once. It does # not distinguish between the types of queries. [max_concurrent_queries: | default = 5] + search: + # Timeout for search requests + [query_timeout: | default = 30s] + + # A list of external endpoints that the querier will use to offload backend search requests. They must + # take and return the same value as /api/search endpoint on the querier. This is intended to be + # used with serverless technologies for massive parrallelization of the search path. + # The default value of "" disables this feature. + [external_endpoints: | default = ] + + # If search_external_endpoints is set then the querier will primarily act as a proxy for whatever serverless backend + # you have configured. This setting allows the operator to have the querier prefer itself for a configurable + # number of subqueries. In the default case of 2 the querier will process up to 2 search requests subqueries before starting + # to reach out to search_external_endpoints. + # Setting this to 0 will disable this feature and the querier will proxy all search subqueries to search_external_endpoints. + [prefer_self: | default = 2 ] + + # If set to a non-zero value a second request will be issued at the provided duration. Recommended to + # be set to p99 of external search requests to reduce long tail latency. + # (default: 4s) + [external_hedge_requests_at: ] + + # The maximum number of requests to execute when hedging. Requires hedge_requests_at to be set. + # (default: 3) + [external_hedge_requests_up_to: ] + # config of the worker that connects to the query frontend frontend_worker: diff --git a/docs/tempo/website/configuration/manifest.md b/docs/tempo/website/configuration/manifest.md index 1a620cccc78..eda736abc90 100644 --- a/docs/tempo/website/configuration/manifest.md +++ b/docs/tempo/website/configuration/manifest.md @@ -16,7 +16,7 @@ go run ./cmd/tempo --storage.trace.backend=local --storage.trace.local.path=/tmp ## Complete Configuration -> **Note**: This manifest was generated on 2022-03-09. +> **Note**: This manifest was generated on 2022-03-23. ```yaml target: all @@ -151,10 +151,13 @@ metrics_generator_client: tls_server_name: "" tls_insecure_skip_verify: false querier: + search: + query_timeout: 30s + prefer_self: 2 + external_endpoints: [] + external_hedge_requests_at: 4s + external_hedge_requests_up_to: 3 query_timeout: 10s - search_query_timeout: 30s - search_external_endpoints: [] - search_prefer_self: 2 max_concurrent_queries: 5 frontend_worker: frontend_address: 127.0.0.1:9095 @@ -220,8 +223,6 @@ query_frontend: max_duration: 1h1m0s query_backend_after: 15m0s query_ingesters_until: 1h0m0s - hedge_requests_at: 5s - hedge_requests_up_to: 3 compactor: ring: kvstore: @@ -273,6 +274,7 @@ compactor: retention_concurrency: 10 iterator_buffer_size: 1000 max_time_per_tenant: 5m0s + compaction_cycle: 30s override_ring_key: compactor ingester: lifecycler: diff --git a/docs/tempo/website/operations/backend_search.md b/docs/tempo/website/operations/backend_search.md index 673b7168e06..7195f4e309d 100644 --- a/docs/tempo/website/operations/backend_search.md +++ b/docs/tempo/website/operations/backend_search.md @@ -45,10 +45,11 @@ querier: # Increase this greatly to permit needed throughput. max_concurrent_queries: 100 - # A list of endpoints to query. Load will be spread evenly across - # these multiple serverless functions. - search_external_endpoints: - - https:// + search: + # A list of endpoints to query. Load will be spread evenly across + # these multiple serverless functions. + external_endpoints: + - https:// ``` ### Query frontend @@ -94,18 +95,28 @@ settings to configure Tempo to use the serverless: ``` querier: + search: # A list of external endpoints that the querier will use to offload backend search requests. They must # take and return the same value as /api/search endpoint on the querier. This is intended to be # used with serverless technologies for massive parrallelization of the search path. # The default value of "" disables this feature. - [search_external_endpoints: | default = ] + [external_endpoints: | default = ] - # If search_external_endpoints is set then the querier will primarily act as a proxy for whatever serverless backend + # If external_endpoints is set then the querier will primarily act as a proxy for whatever serverless backend # you have configured. This setting allows the operator to have the querier prefer itself for a configurable # number of subqueries. In the default case of 2 the querier will process up to 2 search requests subqueries before starting - # to reach out to search_external_endpoints. - # Setting this to 0 will disable this feature and the querier will proxy all search subqueries to search_external_endpoints. - [search_prefer_self: | default = 2 ] + # to reach out to external_endpoints. + # Setting this to 0 will disable this feature and the querier will proxy all search subqueries to external_endpoints. + [prefer_self: | default = 2 ] + + # If set to a non-zero value a second request will be issued at the provided duration. Recommended to + # be set to p99 of external search requests to reduce long tail latency. + # (default: 4s) + [external_hedge_requests_at: ] + + # The maximum number of requests to execute when hedging. Requires hedge_requests_at to be set. + # (default: 3) + [external_hedge_requests_up_to: ] ``` See here for cloud-specific details: diff --git a/docs/tempo/website/operations/serverless_aws.md b/docs/tempo/website/operations/serverless_aws.md index 49dd1b49f28..814534382db 100644 --- a/docs/tempo/website/operations/serverless_aws.md +++ b/docs/tempo/website/operations/serverless_aws.md @@ -76,6 +76,7 @@ For more guidance on configuration options for full backend search [check here]( ``` querier: - search_external_endpoints: - - http:// + search: + external_endpoints: + - http:// ``` diff --git a/docs/tempo/website/operations/serverless_gcp.md b/docs/tempo/website/operations/serverless_gcp.md index f76f7bd4cb9..388db9a308a 100644 --- a/docs/tempo/website/operations/serverless_gcp.md +++ b/docs/tempo/website/operations/serverless_gcp.md @@ -72,6 +72,7 @@ The endpoint can be retrieved from the trigger tab in Google Cloud Functions: ``` querier: - search_external_endpoints: - - + search: + external_endpoints: + - ``` diff --git a/integration/e2e/serverless/config-serverless.yaml b/integration/e2e/serverless/config-serverless.yaml index 374a4d76447..20088742819 100644 --- a/integration/e2e/serverless/config-serverless.yaml +++ b/integration/e2e/serverless/config-serverless.yaml @@ -51,7 +51,8 @@ memberlist: - tempo_e2e-querier:7946 querier: - search_external_endpoints: - - http://tempo_e2e-serverless:8080/ + search: + external_endpoints: + - http://tempo_e2e-serverless:8080/ frontend_worker: frontend_address: tempo_e2e-query-frontend:9095 diff --git a/modules/frontend/config.go b/modules/frontend/config.go index 268d75eec84..2702557ac50 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -25,9 +25,7 @@ type Config struct { } type SearchConfig struct { - Sharder SearchSharderConfig `yaml:",inline"` - HedgeRequestsAt time.Duration `yaml:"hedge_requests_at"` - HedgeRequestsUpTo int `yaml:"hedge_requests_up_to"` + Sharder SearchSharderConfig `yaml:",inline"` // jpe - what did this look like before } func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { @@ -38,8 +36,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.QueryShards = 20 cfg.TolerateFailedBlocks = 0 cfg.Search = SearchConfig{ - HedgeRequestsAt: 5 * time.Second, - HedgeRequestsUpTo: 3, Sharder: SearchSharderConfig{ QueryBackendAfter: 15 * time.Minute, QueryIngestersUntil: time.Hour, diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 9628aba6b80..e05af0dfef0 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -67,16 +67,7 @@ func New(cfg Config, next http.RoundTripper, store storage.Store, logger log.Log // tracebyid middleware traceByIDMiddleware := MergeMiddlewares(newTraceByIDMiddleware(cfg, logger), retryWare) - - // search pipeline and middleware - searchPipeline := []Middleware{ - newSearchMiddleware(cfg, store, logger), - } - if cfg.Search.HedgeRequestsAt > 0 { - searchPipeline = append(searchPipeline, newHedgedRequestWare(cfg.Search.HedgeRequestsAt, cfg.Search.HedgeRequestsUpTo)) - } - searchPipeline = append(searchPipeline, retryWare) - searchMiddleware := MergeMiddlewares(searchPipeline...) + searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, store, logger), retryWare) traceByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{ "op": traceByIDOp, diff --git a/modules/frontend/hedged_requests.go b/modules/frontend/hedged_requests.go deleted file mode 100644 index e3e120eea4b..00000000000 --- a/modules/frontend/hedged_requests.go +++ /dev/null @@ -1,50 +0,0 @@ -package frontend - -import ( - "net/http" - "time" - - "github.com/cristalhq/hedgedhttp" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -const ( - hedgedMetricsPublishDuration = 10 * time.Second -) - -var ( - hedgedRequestsMetrics = promauto.NewGauge( - prometheus.GaugeOpts{ - Namespace: "tempo", - Name: "query_frontend_hedged_roundtrips_total", - Help: "Total number of hedged search requests. Registered as a gauge for code sanity. This is a counter.", - }, - ) -) - -func newHedgedRequestWare(hedgeRequestsAt time.Duration, hedgeRequestsUpTo int) Middleware { - return MiddlewareFunc(func(r http.RoundTripper) http.RoundTripper { - ret, stats, err := hedgedhttp.NewRoundTripperAndStats(hedgeRequestsAt, hedgeRequestsUpTo, r) - if err != nil { - panic(err) - } - publishHedgedMetrics(stats) - return ret - }) -} - -// PublishHedgedMetrics flushes metrics from hedged requests every 10 seconds -func publishHedgedMetrics(s *hedgedhttp.Stats) { - ticker := time.NewTicker(hedgedMetricsPublishDuration) - go func() { - for range ticker.C { - snap := s.Snapshot() - hedgedRequests := int64(snap.ActualRoundTrips) - int64(snap.RequestedRoundTrips) - if hedgedRequests < 0 { - hedgedRequests = 0 - } - hedgedRequestsMetrics.Set(float64(hedgedRequests)) - } - }() -} diff --git a/modules/querier/config.go b/modules/querier/config.go index 66250503078..9a615c1df3e 100644 --- a/modules/querier/config.go +++ b/modules/querier/config.go @@ -11,22 +11,31 @@ import ( // Config for a querier. type Config struct { + Search SearchConfig `yaml:"search"` + TraceLookupQueryTimeout time.Duration `yaml:"query_timeout"` - SearchQueryTimeout time.Duration `yaml:"search_query_timeout"` - SearchExternalEndpoints []string `yaml:"search_external_endpoints"` - SearchPreferSelf int `yaml:"search_prefer_self"` ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` MaxConcurrentQueries int `yaml:"max_concurrent_queries"` Worker worker.Config `yaml:"frontend_worker"` } +type SearchConfig struct { + QueryTimeout time.Duration `yaml:"query_timeout"` + PreferSelf int `yaml:"prefer_self"` + ExternalEndpoints []string `yaml:"external_endpoints"` + HedgeRequestsAt time.Duration `yaml:"external_hedge_requests_at"` + HedgeRequestsUpTo int `yaml:"external_hedge_requests_up_to"` +} + // RegisterFlagsAndApplyDefaults register flags. func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { cfg.TraceLookupQueryTimeout = 10 * time.Second - cfg.SearchQueryTimeout = 30 * time.Second cfg.ExtraQueryDelay = 0 cfg.MaxConcurrentQueries = 5 - cfg.SearchPreferSelf = 2 + cfg.Search.PreferSelf = 2 + cfg.Search.HedgeRequestsAt = 4 * time.Second + cfg.Search.HedgeRequestsUpTo = 3 + cfg.Search.QueryTimeout = 30 * time.Second cfg.Worker = worker.Config{ MatchMaxConcurrency: true, MaxConcurrentRequests: cfg.MaxConcurrentQueries, diff --git a/modules/querier/http.go b/modules/querier/http.go index f01c1196864..da64b1e7481 100644 --- a/modules/querier/http.go +++ b/modules/querier/http.go @@ -149,7 +149,7 @@ func (q *Querier) SearchHandler(w http.ResponseWriter, r *http.Request) { isSearchBlock := api.IsSearchBlock(r) // Enforce the query timeout while querying backends - ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.SearchQueryTimeout)) + ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.Search.QueryTimeout)) defer cancel() span, ctx := opentracing.StartSpanFromContext(ctx, "Querier.SearchHandler") @@ -200,7 +200,7 @@ func (q *Querier) SearchHandler(w http.ResponseWriter, r *http.Request) { func (q *Querier) SearchTagsHandler(w http.ResponseWriter, r *http.Request) { // Enforce the query timeout while querying backends - ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.SearchQueryTimeout)) + ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.Search.QueryTimeout)) defer cancel() span, ctx := opentracing.StartSpanFromContext(ctx, "Querier.SearchTagsHandler") @@ -225,7 +225,7 @@ func (q *Querier) SearchTagsHandler(w http.ResponseWriter, r *http.Request) { func (q *Querier) SearchTagValuesHandler(w http.ResponseWriter, r *http.Request) { // Enforce the query timeout while querying backends - ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.SearchQueryTimeout)) + ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.Search.QueryTimeout)) defer cancel() span, ctx := opentracing.StartSpanFromContext(ctx, "Querier.SearchTagValuesHandler") diff --git a/modules/querier/querier.go b/modules/querier/querier.go index d2e0a1cf267..edf67aa4989 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -10,6 +10,7 @@ import ( "sort" "time" + "github.com/cristalhq/hedgedhttp" "github.com/go-kit/log/level" "github.com/gogo/protobuf/jsonpb" "github.com/google/uuid" @@ -65,6 +66,7 @@ type Querier struct { store storage.Store limits *overrides.Overrides + searchClient *http.Client searchPreferSelf *semaphore.Weighted subservices *services.Manager @@ -93,7 +95,17 @@ func New(cfg Config, clientCfg ingester_client.Config, ring ring.ReadRing, store log.Logger), store: store, limits: limits, - searchPreferSelf: semaphore.NewWeighted(int64(cfg.SearchPreferSelf)), + searchPreferSelf: semaphore.NewWeighted(int64(cfg.Search.PreferSelf)), + searchClient: http.DefaultClient, + } + + // + if cfg.Search.HedgeRequestsAt != 0 { + var err error + q.searchClient, err = hedgedhttp.NewClient(cfg.Search.HedgeRequestsAt, cfg.Search.HedgeRequestsUpTo, http.DefaultClient) + if err != nil { + return nil, err + } } q.Service = services.NewBasicService(q.starting, q.running, q.stopping) @@ -382,7 +394,7 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal // SearchBlock searches the specified subset of the block for the passed tags. func (q *Querier) SearchBlock(ctx context.Context, req *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) { // if we have no external configuration always search in the querier - if len(q.cfg.SearchExternalEndpoints) == 0 { + if len(q.cfg.Search.ExternalEndpoints) == 0 { return q.internalSearchBlock(ctx, req) } @@ -399,8 +411,8 @@ func (q *Querier) SearchBlock(ctx context.Context, req *tempopb.SearchBlockReque } maxBytes := q.limits.MaxBytesPerTrace(tenantID) - endpoint := q.cfg.SearchExternalEndpoints[rand.Intn(len(q.cfg.SearchExternalEndpoints))] - return searchExternalEndpoint(ctx, endpoint, maxBytes, req) + endpoint := q.cfg.Search.ExternalEndpoints[rand.Intn(len(q.cfg.Search.ExternalEndpoints))] + return q.searchExternalEndpoint(ctx, endpoint, maxBytes, req) } func (q *Querier) internalSearchBlock(ctx context.Context, req *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) { @@ -478,7 +490,7 @@ func (q *Querier) postProcessSearchResults(req *tempopb.SearchRequest, rr []resp return response } -func searchExternalEndpoint(ctx context.Context, externalEndpoint string, maxBytes int, searchReq *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) { +func (q *Querier) searchExternalEndpoint(ctx context.Context, externalEndpoint string, maxBytes int, searchReq *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) { req, err := http.NewRequest(http.MethodGet, externalEndpoint, nil) if err != nil { return nil, fmt.Errorf("external endpoint failed to make new request: %w", err) @@ -493,7 +505,7 @@ func searchExternalEndpoint(ctx context.Context, externalEndpoint string, maxByt return nil, fmt.Errorf("external endpoint failed to inject tenant id: %w", err) } start := time.Now() - resp, err := http.DefaultClient.Do(req) + resp, err := q.searchClient.Do(req) metricEndpointDuration.WithLabelValues(externalEndpoint).Observe(time.Since(start).Seconds()) if err != nil { return nil, fmt.Errorf("external endpoint failed to call http: %s, %w", externalEndpoint, err) diff --git a/modules/querier/querier_test.go b/modules/querier/querier_test.go index f6d950cc198..7bfcb09b863 100644 --- a/modules/querier/querier_test.go +++ b/modules/querier/querier_test.go @@ -32,7 +32,9 @@ func TestQuerierUsesSearchExternalEndpoint(t *testing.T) { // SearchExternalEndpoints is respected { cfg: Config{ - SearchExternalEndpoints: []string{srv.URL}, + Search: SearchConfig{ + ExternalEndpoints: []string{srv.URL}, + }, }, queriesToExecute: 3, externalExpected: 3,