diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e1d2143c3..6e2e854ce2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5734](https://github.com/thanos-io/thanos/pull/5734) Store: Support disable block viewer UI. - [#5411](https://github.com/thanos-io/thanos/pull/5411) Tracing: Add OpenTelemetry Protocol exporter. - [#5779](https://github.com/thanos-io/thanos/pull/5779) Objstore: Support specifying S3 storage class. +- [#5741](https://github.com/thanos-io/thanos/pull/5741) Query: add metrics on how much data is being selected by downstream Store APIs. - [#5673](https://github.com/thanos-io/thanos/pull/5673) Receive: Reload tenant limit configuration on file change. ### Changed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index e55bf4b520..38df8d8a4f 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -25,6 +25,8 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "google.golang.org/grpc" + v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/thanos-community/promql-engine/engine" apiv1 "github.com/thanos-io/thanos/pkg/api/query" @@ -54,7 +56,6 @@ import ( "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/ui" - "google.golang.org/grpc" ) const ( @@ -194,6 +195,10 @@ func registerQuery(app *extkingpin.App) { alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field.").String() grpcProxyStrategy := cmd.Flag("grpc.proxy-strategy", "Strategy to use when proxying Series requests to leaf nodes. Hidden and only used for testing, will be removed after lazy becomes the default.").Default(string(store.EagerRetrieval)).Hidden().Enum(string(store.EagerRetrieval), string(store.LazyRetrieval)) + queryTelemetryDurationQuantiles := cmd.Flag("query.telemetry.request-duration-seconds-quantiles", "The quantiles for exporting metrics about the request duration quantiles.").Default("0.1", "0.25", "0.75", "1.25", "1.75", "2.5", "3", "5", "10").Float64List() + queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Int64List() + queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Int64List() + cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { selectorLset, err := parseFlagLabels(*selectorLabels) if err != nil { @@ -305,6 +310,9 @@ func registerQuery(app *extkingpin.App) { *alertQueryURL, *grpcProxyStrategy, component.Query, + *queryTelemetryDurationQuantiles, + *queryTelemetrySamplesQuantiles, + *queryTelemetrySeriesQuantiles, promqlEngineType(*promqlEngine), ) }) @@ -377,6 +385,9 @@ func runQuery( alertQueryURL string, grpcProxyStrategy string, comp component.Component, + queryTelemetryDurationQuantiles []float64, + queryTelemetrySamplesQuantiles []int64, + queryTelemetrySeriesQuantiles []int64, promqlEngine promqlEngineType, ) error { if alertQueryURL == "" { @@ -680,6 +691,12 @@ func runQuery( extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg), maxConcurrentQueries, ), + store.NewSeriesStatsAggregator( + reg, + queryTelemetryDurationQuantiles, + queryTelemetrySamplesQuantiles, + queryTelemetrySeriesQuantiles, + ), reg, ) diff --git a/docs/components/query.md b/docs/components/query.md index 8b64a4830b..e1c3ac2112 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -378,6 +378,15 @@ Flags: be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules. + --query.telemetry.request-duration-seconds-quantiles=0.1... ... + The quantiles for exporting metrics about the + request duration quantiles. + --query.telemetry.request-samples-quantiles=100... ... + The quantiles for exporting metrics about the + samples count quantiles. + --query.telemetry.request-series-seconds-quantiles=10... ... + The quantiles for exporting metrics about the + series count quantiles. --query.timeout=2m Maximum time to process query by query node. --request.logging-config= Alternative to 'request.logging-config-file' @@ -460,3 +469,13 @@ Flags: of Prometheus. ``` + +## Exported metrics + +Thanos Query also exports metrics about its own performance. You can find a list with these metrics below. + +**Disclaimer**: this list is incomplete. The remaining metrics will be added over time. + +| Name | Type | Labels | Description | +|-----------------------------------------|-----------|-----------------------|-------------------------------------------------------------------------------------------------------------------| +| thanos_store_api_query_duration_seconds | Histogram | samples_le, series_le | Duration of the Thanos Store API select phase for a query according to the amount of samples and series selected. | diff --git a/pkg/api/query/grpc.go b/pkg/api/query/grpc.go index 144166f57b..8848cd2ffe 100644 --- a/pkg/api/query/grpc.go +++ b/pkg/api/query/grpc.go @@ -94,6 +94,7 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer request.EnableQueryPushdown, false, request.ShardInfo, + query.NoopSeriesStatsReporter, ) qry, err := g.queryEngine.NewInstantQuery(queryable, &promql.QueryOpts{LookbackDelta: lookbackDelta}, request.Query, ts) if err != nil { @@ -168,6 +169,7 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que request.EnableQueryPushdown, false, request.ShardInfo, + query.NoopSeriesStatsReporter, ) startTime := time.Unix(request.StartTimeSeconds, 0) diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index cbe1327a36..918bcbf5fd 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -41,10 +41,8 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" - v1 "github.com/prometheus/prometheus/web/api/v1" - "github.com/prometheus/prometheus/util/stats" - + v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/thanos-io/thanos/pkg/api" "github.com/thanos-io/thanos/pkg/exemplars" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" @@ -57,6 +55,7 @@ import ( "github.com/thanos-io/thanos/pkg/rules" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/targets/targetspb" @@ -107,6 +106,13 @@ type QueryAPI struct { defaultMetadataTimeRange time.Duration queryRangeHist prometheus.Histogram + + seriesStatsAggregator seriesQueryPerformanceMetricsAggregator +} + +type seriesQueryPerformanceMetricsAggregator interface { + Aggregate(seriesStats storepb.SeriesStatsCounter) + Observe(duration float64) } // NewQueryAPI returns an initialized QueryAPI type. @@ -134,8 +140,12 @@ func NewQueryAPI( defaultMetadataTimeRange time.Duration, disableCORS bool, gate gate.Gate, + statsAggregator seriesQueryPerformanceMetricsAggregator, reg *prometheus.Registry, ) *QueryAPI { + if statsAggregator == nil { + statsAggregator = &store.NoopSeriesStatsAggregator{} + } return &QueryAPI{ baseAPI: api.NewBaseAPI(logger, disableCORS, flagsMap), logger: logger, @@ -160,6 +170,7 @@ func NewQueryAPI( defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution, defaultMetadataTimeRange: defaultMetadataTimeRange, disableCORS: disableCORS, + seriesStatsAggregator: statsAggregator, queryRangeHist: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "thanos_query_range_requested_timespan_duration_seconds", @@ -396,7 +407,24 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() - qry, err := qapi.queryEngine.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false, shardInfo), &promql.QueryOpts{LookbackDelta: lookbackDelta}, r.FormValue("query"), ts) + var seriesStats []storepb.SeriesStatsCounter + qry, err := qapi.queryEngine.NewInstantQuery( + qapi.queryableCreate( + enableDedup, + replicaLabels, + storeDebugMatchers, + maxSourceResolution, + enablePartialResponse, + qapi.enableQueryPushdown, + false, + shardInfo, + query.NewAggregateStatsReporter(&seriesStats), + ), + &promql.QueryOpts{LookbackDelta: lookbackDelta}, + r.FormValue("query"), + ts, + ) + if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} } @@ -409,6 +437,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro } defer qapi.gate.Done() + beforeRange := time.Now() res := qry.Exec(ctx) if res.Err != nil { switch res.Err.(type) { @@ -421,6 +450,10 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro } return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: res.Err}, qry.Close } + for i := range seriesStats { + qapi.seriesStatsAggregator.Aggregate(seriesStats[i]) + } + qapi.seriesStatsAggregator.Observe(time.Since(beforeRange).Seconds()) // Optional stats field in response if parameter "stats" is not empty. var qs stats.QueryStats @@ -525,8 +558,19 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap span, ctx := tracing.StartSpan(ctx, "promql_range_query") defer span.Finish() + var seriesStats []storepb.SeriesStatsCounter qry, err := qapi.queryEngine.NewRangeQuery( - qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false, shardInfo), + qapi.queryableCreate( + enableDedup, + replicaLabels, + storeDebugMatchers, + maxSourceResolution, + enablePartialResponse, + qapi.enableQueryPushdown, + false, + shardInfo, + query.NewAggregateStatsReporter(&seriesStats), + ), &promql.QueryOpts{LookbackDelta: lookbackDelta}, r.FormValue("query"), start, @@ -545,6 +589,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap } defer qapi.gate.Done() + beforeRange := time.Now() res := qry.Exec(ctx) if res.Err != nil { switch res.Err.(type) { @@ -555,6 +600,10 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap } return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: res.Err}, qry.Close } + for i := range seriesStats { + qapi.seriesStatsAggregator.Aggregate(seriesStats[i]) + } + qapi.seriesStatsAggregator.Observe(time.Since(beforeRange).Seconds()) // Optional stats field in response if parameter "stats" is not empty. var qs stats.QueryStats @@ -600,8 +649,17 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A matcherSets = append(matcherSets, matchers) } - q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true, nil). - Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := qapi.queryableCreate( + true, + nil, + storeDebugMatchers, + 0, + enablePartialResponse, + qapi.enableQueryPushdown, + true, + nil, + query.NoopSeriesStatsReporter, + ).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } @@ -687,8 +745,18 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr return nil, nil, apiErr, func() {} } - q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true, nil). - Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := qapi.queryableCreate( + enableDedup, + replicaLabels, + storeDebugMatchers, + math.MaxInt64, + enablePartialResponse, + qapi.enableQueryPushdown, + true, + nil, + query.NoopSeriesStatsReporter, + ).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } @@ -737,8 +805,17 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap matcherSets = append(matcherSets, matchers) } - q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true, nil). - Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := qapi.queryableCreate( + true, + nil, + storeDebugMatchers, + 0, + enablePartialResponse, + qapi.enableQueryPushdown, + true, + nil, + query.NoopSeriesStatsReporter, + ).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 000410ddbd..07c562af9c 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -44,9 +44,8 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" promgate "github.com/prometheus/prometheus/util/gate" "github.com/prometheus/prometheus/util/stats" - "github.com/thanos-io/thanos/pkg/compact" - baseAPI "github.com/thanos-io/thanos/pkg/api" + "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/query" @@ -198,6 +197,7 @@ func TestQueryEndpoints(t *testing.T) { queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{ Name: "query_range_hist", }), + seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, } start := time.Unix(0, 0) @@ -737,6 +737,7 @@ func TestMetadataEndpoints(t *testing.T) { queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{ Name: "query_range_hist", }), + seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, } apiWithLabelLookback := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ @@ -750,6 +751,7 @@ func TestMetadataEndpoints(t *testing.T) { queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{ Name: "query_range_hist", }), + seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, } var tests = []endpointTestCase{ diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 361834c07d..b094cbd45c 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -7,6 +7,7 @@ import ( "context" "sort" "strings" + "sync" "time" "github.com/go-kit/log" @@ -28,21 +29,60 @@ import ( "github.com/thanos-io/thanos/pkg/tracing" ) +type seriesStatsReporter func(seriesStats storepb.SeriesStatsCounter) + +var NoopSeriesStatsReporter seriesStatsReporter = func(_ storepb.SeriesStatsCounter) {} + +func NewAggregateStatsReporter(stats *[]storepb.SeriesStatsCounter) seriesStatsReporter { + var mutex sync.Mutex + return func(s storepb.SeriesStatsCounter) { + mutex.Lock() + defer mutex.Unlock() + *stats = append(*stats, s) + } +} + // QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints. // If deduplication is enabled, all data retrieved from it will be deduplicated along all replicaLabels by default. // When the replicaLabels argument is not empty it overwrites the global replicaLabels flag. This allows specifying // replicaLabels at query time. // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy. -type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool, shardInfo *storepb.ShardInfo) storage.Queryable +type QueryableCreator func( + deduplicate bool, + replicaLabels []string, + storeDebugMatchers [][]*labels.Matcher, + maxResolutionMillis int64, + partialResponse, + enableQueryPushdown, + skipChunks bool, + shardInfo *storepb.ShardInfo, + seriesStatsReporter seriesStatsReporter, +) storage.Queryable // NewQueryableCreator creates QueryableCreator. -func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator { +func NewQueryableCreator( + logger log.Logger, + reg prometheus.Registerer, + proxy storepb.StoreServer, + maxConcurrentSelects int, + selectTimeout time.Duration, +) QueryableCreator { duration := promauto.With( extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), ).NewHistogram(gate.DurationHistogramOpts) - return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool, shardInfo *storepb.ShardInfo) storage.Queryable { + return func( + deduplicate bool, + replicaLabels []string, + storeDebugMatchers [][]*labels.Matcher, + maxResolutionMillis int64, + partialResponse, + enableQueryPushdown, + skipChunks bool, + shardInfo *storepb.ShardInfo, + seriesStatsReporter seriesStatsReporter, + ) storage.Queryable { return &queryable{ logger: logger, replicaLabels: replicaLabels, @@ -59,6 +99,7 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto selectTimeout: selectTimeout, enableQueryPushdown: enableQueryPushdown, shardInfo: shardInfo, + seriesStatsReporter: seriesStatsReporter, } } } @@ -77,11 +118,12 @@ type queryable struct { selectTimeout time.Duration enableQueryPushdown bool shardInfo *storepb.ShardInfo + seriesStatsReporter seriesStatsReporter } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil } type querier struct { @@ -100,6 +142,7 @@ type querier struct { selectGate gate.Gate selectTimeout time.Duration shardInfo *storepb.ShardInfo + seriesStatsReporter seriesStatsReporter } // newQuerier creates implementation of storage.Querier that fetches data from the proxy @@ -107,16 +150,20 @@ type querier struct { func newQuerier( ctx context.Context, logger log.Logger, - mint, maxt int64, + mint, + maxt int64, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, proxy storepb.StoreServer, deduplicate bool, maxResolutionMillis int64, - partialResponse, enableQueryPushdown bool, skipChunks bool, + partialResponse, + enableQueryPushdown, + skipChunks bool, selectGate gate.Gate, selectTimeout time.Duration, shardInfo *storepb.ShardInfo, + seriesStatsReporter seriesStatsReporter, ) *querier { if logger == nil { logger = log.NewNopLogger() @@ -145,6 +192,7 @@ func newQuerier( skipChunks: skipChunks, enableQueryPushdown: enableQueryPushdown, shardInfo: shardInfo, + seriesStatsReporter: seriesStatsReporter, } } @@ -157,8 +205,9 @@ type seriesServer struct { storepb.Store_SeriesServer ctx context.Context - seriesSet []storepb.Series - warnings []string + seriesSet []storepb.Series + seriesSetStats storepb.SeriesStatsCounter + warnings []string } func (s *seriesServer) Send(r *storepb.SeriesResponse) error { @@ -169,6 +218,7 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error { if r.GetSeries() != nil { s.seriesSet = append(s.seriesSet, *r.GetSeries()) + s.seriesSetStats.Count(r.GetSeries()) return nil } @@ -257,11 +307,12 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match span, ctx := tracing.StartSpan(ctx, "querier_select_select_fn") defer span.Finish() - set, err := q.selectFn(ctx, hints, ms...) + set, stats, err := q.selectFn(ctx, hints, ms...) if err != nil { promise <- storage.ErrSeriesSet(err) return } + q.seriesStatsReporter(stats) promise <- set }() @@ -279,10 +330,10 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match }} } -func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) { +func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storepb.SeriesStatsCounter, error) { sms, err := storepb.PromMatchersToMatchers(ms...) if err != nil { - return nil, errors.Wrap(err, "convert matchers") + return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "convert matchers") } aggrs := aggrsFromFunc(hints.Func) @@ -310,7 +361,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . Step: hints.Step, Range: hints.Range, }, resp); err != nil { - return nil, errors.Wrap(err, "proxy Series()") + return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()") } var warns storage.Warnings @@ -342,7 +393,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . set: newStoreSeriesSet(resp.seriesSet), aggrs: aggrs, warns: warns, - }, nil + }, resp.seriesSetStats, nil } // TODO(fabxc): this could potentially pushed further down into the store API to make true streaming possible. @@ -357,7 +408,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . // The merged series set assembles all potentially-overlapping time ranges of the same series into a single one. // TODO(bwplotka): We could potentially dedup on chunk level, use chunk iterator for that when available. - return dedup.NewSeriesSet(set, q.replicaLabels, hints.Func, q.enableQueryPushdown), nil + return dedup.NewSeriesSet(set, q.replicaLabels, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, nil } // sortDedupLabels re-sorts the set so that the same series with different replica diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index a43c75e7a5..2e31fa65a0 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -44,7 +44,17 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) - queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false, false, nil) + queryable := queryableCreator( + false, + nil, + nil, + oneHourMillis, + false, + false, + false, + nil, + NoopSeriesStatsReporter, + ) q, err := queryable.Querier(context.Background(), 0, 42) testutil.Ok(t, err) @@ -71,7 +81,22 @@ func TestQuerier_DownsampledData(t *testing.T) { } timeout := 10 * time.Second - q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false, false, nil) + q := NewQueryableCreator( + nil, + nil, + testProxy, + 2, + timeout, + )(false, + nil, + nil, + 9999999, + false, + false, + false, + nil, + NoopSeriesStatsReporter, + ) engine := promql.NewEngine( promql.EngineOpts{ MaxSamples: math.MaxInt32, @@ -365,7 +390,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) { g := gate.New(2) mq := &mockedQueryable{ Creator: func(mint, maxt int64) storage.Querier { - return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil) + return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) }, } t.Cleanup(func() { @@ -609,7 +634,7 @@ func TestQuerier_Select(t *testing.T) { {dedup: true, expected: []series{tcase.expectedAfterDedup}}, } { g := gate.New(2) - q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil) + q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil, func(i storepb.SeriesStatsCounter) {}) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) { @@ -838,7 +863,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 100 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, false, g, timeout, nil) + q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) @@ -908,7 +933,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 5 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, false, g, timeout, nil) + q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) diff --git a/pkg/query/query_bench_test.go b/pkg/query/query_bench_test.go index 301c880877..84efb46820 100644 --- a/pkg/query/query_bench_test.go +++ b/pkg/query/query_bench_test.go @@ -80,12 +80,13 @@ func benchQuerySelect(t testutil.TB, totalSamples, totalSeries int, dedup bool) logger := log.NewNopLogger() q := &querier{ - ctx: context.Background(), - logger: logger, - proxy: &mockedStoreServer{responses: resps}, - replicaLabels: map[string]struct{}{"a_replica": {}}, - deduplicate: dedup, - selectGate: gate.NewNoop(), + ctx: context.Background(), + logger: logger, + proxy: &mockedStoreServer{responses: resps}, + replicaLabels: map[string]struct{}{"a_replica": {}}, + deduplicate: dedup, + selectGate: gate.NewNoop(), + seriesStatsReporter: NoopSeriesStatsReporter, } testSelect(t, q, expectedSeries) } diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index 99e29be66f..060571fc70 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -54,7 +54,16 @@ func TestQuerier_Proxy(t *testing.T) { name: fmt.Sprintf("store number %v", i), }) } - return q(true, nil, nil, 0, false, false, false, nil) + return q(true, + nil, + nil, + 0, + false, + false, + false, + nil, + NoopSeriesStatsReporter, + ) } for _, fn := range files { diff --git a/pkg/store/telemetry.go b/pkg/store/telemetry.go new file mode 100644 index 0000000000..a854daaf0c --- /dev/null +++ b/pkg/store/telemetry.go @@ -0,0 +1,88 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +// seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their +// response's shape. +type seriesStatsAggregator struct { + queryDuration *prometheus.HistogramVec + + seriesLeBuckets []int64 + samplesLeBuckets []int64 + seriesStats storepb.SeriesStatsCounter +} + +// NewSeriesStatsAggregator is a constructor for seriesStatsAggregator. +func NewSeriesStatsAggregator( + reg prometheus.Registerer, + durationQuantiles []float64, + sampleQuantiles []int64, + seriesQuantiles []int64, +) *seriesStatsAggregator { + return &seriesStatsAggregator{ + queryDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_store_api_query_duration_seconds", + Help: "Duration of the Thanos Store API select phase for a query.", + Buckets: durationQuantiles, + }, []string{"series_le", "samples_le"}), + seriesLeBuckets: seriesQuantiles, + samplesLeBuckets: sampleQuantiles, + seriesStats: storepb.SeriesStatsCounter{}, + } +} + +// Aggregate is an aggregator for merging `storepb.SeriesStatsCounter` for each incoming fanned out query. +func (s *seriesStatsAggregator) Aggregate(stats storepb.SeriesStatsCounter) { + s.seriesStats.Series += stats.Series + s.seriesStats.Samples += stats.Samples + s.seriesStats.Chunks += stats.Chunks +} + +// Observe commits the aggregated SeriesStatsCounter as an observation. +func (s *seriesStatsAggregator) Observe(duration float64) { + if s.seriesStats.Series == 0 || s.seriesStats.Samples == 0 || s.seriesStats.Chunks == 0 { + return + } + // Bucket matching for series/labels matchSeriesBucket/matchSamplesBucket => float64, float64 + seriesLeBucket := s.findBucket(float64(s.seriesStats.Series), s.seriesLeBuckets) + samplesLeBucket := s.findBucket(float64(s.seriesStats.Samples), s.samplesLeBuckets) + s.queryDuration.With(prometheus.Labels{ + "series_le": strconv.Itoa(int(seriesLeBucket)), + "samples_le": strconv.Itoa(int(samplesLeBucket)), + }).Observe(duration) + s.reset() +} + +func (s *seriesStatsAggregator) reset() { + s.seriesStats = storepb.SeriesStatsCounter{} +} + +func (s *seriesStatsAggregator) findBucket(value float64, quantiles []int64) int64 { + if len(quantiles) == 0 { + return 0 + } + var foundBucket int64 + for _, bucket := range quantiles { + foundBucket = bucket + if value < float64(bucket) { + break + } + } + return foundBucket +} + +// NoopSeriesStatsAggregator is a query performance series aggregator that does nothing. +type NoopSeriesStatsAggregator struct{} + +func (s *NoopSeriesStatsAggregator) Aggregate(_ storepb.SeriesStatsCounter) {} + +func (s *NoopSeriesStatsAggregator) Observe(_ float64) {} diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 7fc56bda97..04b425061a 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -23,6 +23,7 @@ import ( "github.com/chromedp/cdproto/network" "github.com/chromedp/chromedp" "github.com/efficientgo/e2e" + e2edb "github.com/efficientgo/e2e/db" e2emon "github.com/efficientgo/e2e/monitoring" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" @@ -578,6 +579,130 @@ func newSample(s fakeMetricSample) model.Sample { } } +func TestQueryStoreMetrics(t *testing.T) { + t.Parallel() + + // Build up. + e, err := e2e.New(e2e.WithName("storemetrics01")) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + bucket := "store-gw-test" + minio := e2ethanos.NewMinio(e, "thanos-minio", bucket) + testutil.Ok(t, e2e.StartAndWaitReady(minio)) + + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, minio.Endpoint("https"), minio.Dir()), "test") + testutil.Ok(t, err) + + // Preparing 2 different blocks for the tests. + { + blockSizes := []struct { + samples int + series int + name string + }{ + {samples: 10, series: 1, name: "one_series"}, + {samples: 10, series: 1001, name: "thousand_one_series"}, + } + now := time.Now() + externalLabels := labels.FromStrings("prometheus", "p1", "replica", "0") + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm)) + for _, blockSize := range blockSizes { + series := make([]labels.Labels, blockSize.series) + for i := 0; i < blockSize.series; i++ { + bigSeriesLabels := labels.FromStrings("__name__", blockSize.name, "instance", fmt.Sprintf("foo_%d", i)) + series[i] = bigSeriesLabels + } + blockID, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir, + series, + blockSize.samples, + timestamp.FromTime(now), + timestamp.FromTime(now.Add(2*time.Hour)), + 30*time.Minute, + externalLabels, + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, blockID.String()), blockID.String())) + } + } + + storeGW := e2ethanos.NewStoreGW( + e, + "s1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("https"), minio.InternalDir()), + }, + "", + nil, + ) + querier := e2ethanos.NewQuerierBuilder(e, "1", storeGW.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(storeGW, querier)) + testutil.Ok(t, storeGW.WaitSumMetrics(e2emon.Equals(2), "thanos_blocks_meta_synced")) + + // Querying the series in the previously created blocks to ensure we produce Store API query metrics. + { + instantQuery(t, ctx, querier.Endpoint("http"), func() string { + return "max_over_time(one_series{instance='foo_0'}[2h])" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, 1) + testutil.Ok(t, err) + + instantQuery(t, ctx, querier.Endpoint("http"), func() string { + return "max_over_time(thousand_one_series[2h])" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, 1001) + testutil.Ok(t, err) + } + + mon, err := e2emon.Start(e) + testutil.Ok(t, err) + + queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { + return "thanos_store_api_query_duration_seconds_count{samples_le='100000',series_le='10000'}" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, model.Vector{ + &model.Sample{ + Metric: model.Metric{ + "__name__": "thanos_store_api_query_duration_seconds_count", + "instance": "storemetrics01-querier-1:8080", + "job": "querier-1", + "samples_le": "100000", + "series_le": "10000", + }, + Value: model.SampleValue(1), + }, + }) + + queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { + return "thanos_store_api_query_duration_seconds_count{samples_le='100',series_le='10'}" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, model.Vector{ + &model.Sample{ + Metric: model.Metric{ + "__name__": "thanos_store_api_query_duration_seconds_count", + "instance": "storemetrics01-querier-1:8080", + "job": "querier-1", + "samples_le": "100", + "series_le": "10", + }, + Value: model.SampleValue(1), + }, + }) +} + // Regression test for https://github.com/thanos-io/thanos/issues/5033. // Tests whether queries work with mixed sources, and with functions // that we are pushing down: min, max, min_over_time, max_over_time, @@ -882,18 +1007,10 @@ func instantQuery(t testing.TB, ctx context.Context, addr string, q func() strin "msg", fmt.Sprintf("Waiting for %d results for query %s", expectedSeriesLen, q()), ) testutil.Ok(t, runutil.RetryWithLog(logger, 5*time.Second, ctx.Done(), func() error { - res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+addr), q(), ts(), opts) + res, err := simpleInstantQuery(t, ctx, addr, q, ts, opts, expectedSeriesLen) if err != nil { return err } - - if len(warnings) > 0 { - return errors.Errorf("unexpected warnings %s", warnings) - } - - if len(res) != expectedSeriesLen { - return errors.Errorf("unexpected result size, expected %d; result %d: %v", expectedSeriesLen, len(res), res) - } result = res return nil })) @@ -901,6 +1018,24 @@ func instantQuery(t testing.TB, ctx context.Context, addr string, q func() strin return result } +func simpleInstantQuery(t testing.TB, ctx context.Context, addr string, q func() string, ts func() time.Time, opts promclient.QueryOptions, expectedSeriesLen int) (model.Vector, error) { + res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+addr), q(), ts(), opts) + if err != nil { + return nil, err + } + + if len(warnings) > 0 { + return nil, errors.Errorf("unexpected warnings %s", warnings) + } + + if len(res) != expectedSeriesLen { + return nil, errors.Errorf("unexpected result size, expected %d; result %d: %v", expectedSeriesLen, len(res), res) + } + + sortResults(res) + return res, nil +} + func queryWaitAndAssert(t *testing.T, ctx context.Context, addr string, q func() string, ts func() time.Time, opts promclient.QueryOptions, expected model.Vector) { t.Helper() @@ -912,7 +1047,7 @@ func queryWaitAndAssert(t *testing.T, ctx context.Context, addr string, q func() "caller", "queryWaitAndAssert", "msg", fmt.Sprintf("Waiting for %d results for query %s", len(expected), q()), ) - testutil.Ok(t, runutil.RetryWithLog(logger, 5*time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.RetryWithLog(logger, 10*time.Second, ctx.Done(), func() error { res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+addr), q(), ts(), opts) if err != nil { return err