From 4698c981fb258556ed6554449cbcff6a318c4324 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Wed, 9 Sep 2020 17:25:34 +0200 Subject: [PATCH 1/4] pkg/thanos: prefix gate metrics for concurrent selects Thanos query exposed `gate_queries_in_flight` and `gate_duration_seconds_bucket` metrics for concurrent selects. These metrics are now prefixed by `thanos_query_concurrent_selects_`. Signed-off-by: Simon Pasquier --- cmd/thanos/query.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index db2c7b7bf1..bb3ea9f888 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -304,8 +304,14 @@ func runQuery( ) proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout) rulesProxy = rules.NewProxy(logger, stores.GetRulesClients) - queryableCreator = query.NewQueryableCreator(logger, reg, proxy, maxConcurrentSelects, queryTimeout) - engine = promql.NewEngine( + queryableCreator = query.NewQueryableCreator( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_concurrent_selects_", reg), + proxy, + maxConcurrentSelects, + queryTimeout, + ) + engine = promql.NewEngine( promql.EngineOpts{ Logger: logger, Reg: reg, From 0a6b36af5712ecb94956da694a4ac053abf8ed75 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Thu, 10 Sep 2020 17:17:24 +0200 Subject: [PATCH 2/4] *: refactor instrumentation of the gate package This change deprecates the gate.(*Keeper) struct. When Keeper is used to create several gates, the metric tracking the number of in-flight metric isn't meaningful because it is hard to say whether requests are being blocked or not. As such the `thanos_query_concurrent_selects_gate_queries_in_flight` is removed. The following metrics have been added to record the maximum number of concurrent requests per gate: * `thanos_query_gate_queries_max` * `thanos_bucket_store_series_gate_queries_max`, previously known as `thanos_bucket_store_queries_concurrent_max.` * `thanos_memcached_getmulti_gate_queries_max` Signed-off-by: Simon Pasquier --- cmd/thanos/query.go | 9 +- cmd/thanos/store.go | 7 +- pkg/api/query/v1.go | 9 +- pkg/api/query/v1_test.go | 7 +- pkg/cacheutil/memcached_client.go | 6 +- pkg/gate/gate.go | 143 +++++++++++++++++++++++------- pkg/query/querier.go | 33 ++++--- pkg/query/querier_test.go | 8 +- pkg/ui/query.go | 5 +- 9 files changed, 148 insertions(+), 79 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index bb3ea9f888..0c0ef8d3b2 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -32,6 +32,7 @@ import ( "github.com/thanos-io/thanos/pkg/extgrpc" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/logging" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/query" @@ -426,11 +427,10 @@ func runQuery( ins := extpromhttp.NewInstrumentationMiddleware(reg) // TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting. - ui.NewQueryUI(logger, reg, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins) + ui.NewQueryUI(logger, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins) api := v1.NewQueryAPI( logger, - reg, stores, engine, queryableCreator, @@ -442,7 +442,10 @@ func runQuery( queryReplicaLabels, flagsMap, instantDefaultMaxSourceResolution, - maxConcurrentQueries, + gate.New( + extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg), + maxConcurrentQueries, + ), ) api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 2fd75a5b44..39105fe316 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -14,7 +14,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks" @@ -286,11 +285,7 @@ func runStore( return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrency) } - queriesGate := gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrency) - promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "thanos_bucket_store_queries_concurrent_max", - Help: "Number of maximum concurrent queries.", - }).Set(float64(maxConcurrency)) + queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), maxConcurrency) bs, err := store.NewBucketStore( logger, diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index dbd76e067b..b674898ad6 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -30,7 +30,6 @@ import ( "github.com/go-kit/kit/log" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" @@ -40,7 +39,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/api" - "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/logging" @@ -64,7 +62,6 @@ const ( type QueryAPI struct { baseAPI *api.BaseAPI logger log.Logger - reg prometheus.Registerer gate gate.Gate queryableCreate query.QueryableCreator queryEngine *promql.Engine @@ -82,7 +79,6 @@ type QueryAPI struct { // NewQueryAPI returns an initialized QueryAPI type. func NewQueryAPI( logger log.Logger, - reg *prometheus.Registry, storeSet *query.StoreSet, qe *promql.Engine, c query.QueryableCreator, @@ -93,15 +89,14 @@ func NewQueryAPI( replicaLabels []string, flagsMap map[string]string, defaultInstantQueryMaxSourceResolution time.Duration, - maxConcurrentQueries int, + gate gate.Gate, ) *QueryAPI { return &QueryAPI{ baseAPI: api.NewBaseAPI(logger, flagsMap), logger: logger, - reg: reg, queryEngine: qe, queryableCreate: c, - gate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg)).NewGate(maxConcurrentQueries), + gate: gate, ruleGroups: ruleGroups, enableAutodownsampling: enableAutodownsampling, diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 6c300e0197..d3ae8dc0d7 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -30,6 +30,7 @@ import ( "time" "github.com/prometheus/common/route" + promgate "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" @@ -116,7 +117,7 @@ func TestEndpoints(t *testing.T) { MaxSamples: 10000, Timeout: timeout, }), - gate: gate.NewKeeper(nil).NewGate(4), + gate: gate.New(nil, 4), } start := time.Unix(0, 0) @@ -1053,7 +1054,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) { for i, test := range tests { api := QueryAPI{ enableAutodownsampling: test.enableAutodownsampling, - gate: gate.NewKeeper(nil).NewGate(4), + gate: gate.New(nil, 4), } v := url.Values{} v.Set(MaxSourceResolutionParam, test.maxSourceResolutionParam) @@ -1101,7 +1102,7 @@ func TestParseStoreMatchersParam(t *testing.T) { } { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { api := QueryAPI{ - gate: gate.NewKeeper(nil).NewGate(4), + gate: promgate.New(4), } v := url.Values{} v.Set(StoreMatcherParam, tc.storeMatchers) diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 1ac20d436b..d4857ec795 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -231,8 +231,10 @@ func newMemcachedClient( dnsProvider: dnsProvider, asyncQueue: make(chan func(), config.MaxAsyncBufferSize), stop: make(chan struct{}, 1), - getMultiGate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg)). - NewGate(config.MaxGetMultiConcurrency), + getMultiGate: gate.New( + extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg), + config.MaxGetMultiConcurrency, + ), } c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ diff --git a/pkg/gate/gate.go b/pkg/gate/gate.go index a87e3af5aa..a77045066a 100644 --- a/pkg/gate/gate.go +++ b/pkg/gate/gate.go @@ -12,67 +12,144 @@ import ( promgate "github.com/prometheus/prometheus/pkg/gate" ) -// Gate is an interface that mimics prometheus/pkg/gate behavior. +var ( + MaxGaugeOpts = prometheus.GaugeOpts{ + Name: "gate_queries_max", + Help: "Maximum number of concurrent queries.", + } + InFlightGaugeOpts = prometheus.GaugeOpts{ + Name: "gate_queries_in_flight", + Help: "Number of queries that are currently in flight.", + } + DurationHistogramOpts = prometheus.HistogramOpts{ + Name: "gate_duration_seconds", + Help: "How many seconds it took for queries to wait at the gate.", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, + } +) + +// Gate controls the maximum number of concurrently running and waiting queries. +// +// Example of use: +// +// g := gate.New(r, 5) +// +// if err := g.Start(ctx); err != nil { +// return +// } +// defer g.Done() +// type Gate interface { + // Start initiates a new request and waits until it's our turn to fulfill a request. Start(ctx context.Context) error + // Done finishes a query. Done() } -// Gate wraps the Prometheus gate with extra metrics. -type gate struct { - g *promgate.Gate - m *metrics -} - -type metrics struct { - inflightQueries prometheus.Gauge - gateTiming prometheus.Histogram -} - // Keeper is used to create multiple gates sharing the same metrics. +// +// Deprecated: when Keeper is used to create several gates, the metric tracking +// the number of in-flight metric isn't meaningful because it is hard to say +// whether requests are being blocked or not. For clients that call +// gate.(*Keeper).NewGate only once, it is recommended to use gate.New() +// instead. Otherwise it is recommended to use the +// github.com/prometheus/prometheus/pkg/gate package directly and wrap the +// returned gate with gate.InstrumentGateDuration(). type Keeper struct { - m *metrics + reg prometheus.Registerer } // NewKeeper creates a new Keeper. +// +// Deprecated: see Keeper. func NewKeeper(reg prometheus.Registerer) *Keeper { return &Keeper{ - m: &metrics{ - inflightQueries: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "gate_queries_in_flight", - Help: "Number of queries that are currently in flight.", - }), - gateTiming: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "gate_duration_seconds", - Help: "How many seconds it took for queries to wait at the gate.", - Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, - }), - }, + reg: reg, } } -// NewGate returns a new Gate that collects metrics. +// NewGate returns a new Gate ready for use. +// +// Deprecated: see Keeper. func (k *Keeper) NewGate(maxConcurrent int) Gate { - return &gate{g: promgate.New(maxConcurrent), m: k.m} + return New(k.reg, maxConcurrent) +} + +// New returns an instrumented gate limiting the number of requests being +// executed concurrently. +// +// The gate implementation is based on the +// github.com/prometheus/prometheus/pkg/gate package. +// +// It can be called several times but not with the same registerer otherwise it +// will panic when trying to register the same metric multiple times. +func New(reg prometheus.Registerer, maxConcurrent int) Gate { + promauto.With(reg).NewGauge(MaxGaugeOpts).Set(float64(maxConcurrent)) + + return InstrumentGateDuration( + promauto.With(reg).NewHistogram(DurationHistogramOpts), + InstrumentGateInFlight( + promauto.With(reg).NewGauge(InFlightGaugeOpts), + promgate.New(maxConcurrent), + ), + ) +} + +type instrumentedDurationGate struct { + g Gate + duration prometheus.Observer } -// Start initiates a new request and waits until it's our turn to fulfill a request. -func (g *gate) Start(ctx context.Context) error { +// InstrumentGateDuration instruments the provided Gate to track how much time +// the request has been waiting in the gate. +func InstrumentGateDuration(duration prometheus.Observer, g Gate) Gate { + return &instrumentedDurationGate{ + g: g, + duration: duration, + } +} + +// Start implements the Gate interface. +func (g *instrumentedDurationGate) Start(ctx context.Context) error { start := time.Now() defer func() { - g.m.gateTiming.Observe(time.Since(start).Seconds()) + g.duration.Observe(time.Since(start).Seconds()) }() + return g.g.Start(ctx) +} + +// Done implements the Gate interface. +func (g *instrumentedDurationGate) Done() { + g.g.Done() +} + +type instrumentedInFlightGate struct { + g Gate + inflight prometheus.Gauge +} + +// InstrumentGateInFlight instruments the provided Gate to track how many +// requests are currently in flight. +func InstrumentGateInFlight(inflight prometheus.Gauge, g Gate) Gate { + return &instrumentedInFlightGate{ + g: g, + inflight: inflight, + } +} + +// Start implements the Gate interface. +func (g *instrumentedInFlightGate) Start(ctx context.Context) error { if err := g.g.Start(ctx); err != nil { return err } - g.m.inflightQueries.Inc() + g.inflight.Inc() return nil } -// Done finishes a query. -func (g *gate) Done() { - g.m.inflightQueries.Dec() +// Done implements the Gate interface. +func (g *instrumentedInFlightGate) Done() { + g.inflight.Dec() g.g.Done() } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 8d8765471b..6624cd8dee 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -13,6 +13,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + promgate "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" @@ -32,20 +34,21 @@ type QueryableCreator func(deduplicate bool, replicaLabels []string, storeMatche // NewQueryableCreator creates QueryableCreator. func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator { - keeper := gate.NewKeeper(reg) + duration := promauto.With(reg).NewHistogram(gate.DurationHistogramOpts) return func(deduplicate bool, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { return &queryable{ - logger: logger, - reg: reg, - replicaLabels: replicaLabels, - storeMatchers: storeMatchers, - proxy: proxy, - deduplicate: deduplicate, - maxResolutionMillis: maxResolutionMillis, - partialResponse: partialResponse, - skipChunks: skipChunks, - gateKeeper: keeper, + logger: logger, + replicaLabels: replicaLabels, + storeMatchers: storeMatchers, + proxy: proxy, + deduplicate: deduplicate, + maxResolutionMillis: maxResolutionMillis, + partialResponse: partialResponse, + skipChunks: skipChunks, + gateFn: func() gate.Gate { + return gate.InstrumentGateDuration(duration, promgate.New(maxConcurrentSelects)) + }, maxConcurrentSelects: maxConcurrentSelects, selectTimeout: selectTimeout, } @@ -54,7 +57,6 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto type queryable struct { logger log.Logger - reg prometheus.Registerer replicaLabels []string storeMatchers [][]storepb.LabelMatcher proxy storepb.StoreServer @@ -62,20 +64,19 @@ type queryable struct { maxResolutionMillis int64 partialResponse bool skipChunks bool - gateKeeper *gate.Keeper + gateFn func() gate.Gate maxConcurrentSelects int selectTimeout time.Duration } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateKeeper.NewGate(q.maxConcurrentSelects), q.selectTimeout), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateFn(), q.selectTimeout), nil } type querier struct { ctx context.Context logger log.Logger - reg prometheus.Registerer cancel func() mint, maxt int64 replicaLabels map[string]struct{} @@ -94,7 +95,6 @@ type querier struct { func newQuerier( ctx context.Context, logger log.Logger, - reg prometheus.Registerer, mint, maxt int64, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, @@ -117,7 +117,6 @@ func newQuerier( return &querier{ ctx: ctx, logger: logger, - reg: reg, cancel: cancel, selectGate: selectGate, selectTimeout: selectTimeout, diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 58518aa791..02fca614f5 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -362,7 +362,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) { g := gate.New(2) mq := &mockedQueryable{ Creator: func(mint, maxt int64) storage.Querier { - return newQuerier(context.Background(), nil, nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) + return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) }, } t.Cleanup(func() { @@ -606,7 +606,7 @@ func TestQuerier_Select(t *testing.T) { {dedup: true, expected: []series{tcase.expectedAfterDedup}}, } { g := gate.New(2) - q := newQuerier(context.Background(), nil, nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) + q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) { @@ -853,7 +853,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 100 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout) + q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) @@ -923,7 +923,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 5 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, g, timeout) + q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, g, timeout) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) diff --git a/pkg/ui/query.go b/pkg/ui/query.go index 53f7bd1339..57b25326b8 100644 --- a/pkg/ui/query.go +++ b/pkg/ui/query.go @@ -12,7 +12,6 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/common/route" @@ -31,11 +30,10 @@ type Query struct { cwd string birth time.Time version api.ThanosVersion - reg prometheus.Registerer now func() model.Time } -func NewQueryUI(logger log.Logger, reg prometheus.Registerer, storeSet *query.StoreSet, externalPrefix, prefixHeader string) *Query { +func NewQueryUI(logger log.Logger, storeSet *query.StoreSet, externalPrefix, prefixHeader string) *Query { tmplVariables := map[string]string{ "Component": component.Query.String(), } @@ -49,7 +47,6 @@ func NewQueryUI(logger log.Logger, reg prometheus.Registerer, storeSet *query.St cwd: runtimeInfo().CWD, birth: runtimeInfo().StartTime, version: *api.BuildInfo, - reg: reg, now: model.Now, } } From 8a7491382255b3f9ee71077773b2f050af07dd0a Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 14 Sep 2020 14:30:01 +0200 Subject: [PATCH 3/4] Decompose registry wrapping for concurrent selects Signed-off-by: Simon Pasquier --- cmd/thanos/query.go | 2 +- pkg/query/querier.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 0c0ef8d3b2..f3999e350e 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -307,7 +307,7 @@ func runQuery( rulesProxy = rules.NewProxy(logger, stores.GetRulesClients) queryableCreator = query.NewQueryableCreator( logger, - extprom.WrapRegistererWithPrefix("thanos_query_concurrent_selects_", reg), + extprom.WrapRegistererWithPrefix("thanos_query_", reg), proxy, maxConcurrentSelects, queryTimeout, diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 6624cd8dee..bda188fae1 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -34,7 +35,9 @@ type QueryableCreator func(deduplicate bool, replicaLabels []string, storeMatche // NewQueryableCreator creates QueryableCreator. func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator { - duration := promauto.With(reg).NewHistogram(gate.DurationHistogramOpts) + duration := promauto.With( + extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), + ).NewHistogram(gate.DurationHistogramOpts) return func(deduplicate bool, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { return &queryable{ From f49633226285069bc6489c733b688966bdb35ad6 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 14 Sep 2020 15:12:50 +0200 Subject: [PATCH 4/4] Rename gateFn to gateProviderFn Signed-off-by: Simon Pasquier --- pkg/query/querier.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/query/querier.go b/pkg/query/querier.go index bda188fae1..68dd12c043 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -49,7 +49,7 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto maxResolutionMillis: maxResolutionMillis, partialResponse: partialResponse, skipChunks: skipChunks, - gateFn: func() gate.Gate { + gateProviderFn: func() gate.Gate { return gate.InstrumentGateDuration(duration, promgate.New(maxConcurrentSelects)) }, maxConcurrentSelects: maxConcurrentSelects, @@ -67,14 +67,14 @@ type queryable struct { maxResolutionMillis int64 partialResponse bool skipChunks bool - gateFn func() gate.Gate + gateProviderFn func() gate.Gate maxConcurrentSelects int selectTimeout time.Duration } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateFn(), q.selectTimeout), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil } type querier struct {