From 1bc1f5944450d7fef4bf5bdcd0f8773434c2caa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 1 Feb 2019 17:20:14 +0200 Subject: [PATCH 01/52] store: add ability to limit max samples / conc. queries --- Gopkg.lock | 3 ++ cmd/thanos/store.go | 11 +++++ docs/components/store.md | 6 +++ pkg/compact/downsample/downsample_test.go | 3 +- pkg/store/bucket.go | 50 ++++++++++++++++++----- pkg/store/bucket_e2e_test.go | 31 ++++++++++++-- pkg/store/gate.go | 43 +++++++++++++++++++ 7 files changed, 133 insertions(+), 14 deletions(-) create mode 100644 pkg/store/gate.go diff --git a/Gopkg.lock b/Gopkg.lock index 907ab5fca0..1461563fde 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -537,6 +537,7 @@ "fileutil", "index", "labels", + "testutil", "wal", ] pruneopts = "" @@ -853,6 +854,7 @@ "github.com/prometheus/common/version", "github.com/prometheus/prometheus/discovery/file", "github.com/prometheus/prometheus/discovery/targetgroup", + "github.com/prometheus/prometheus/pkg/gate", "github.com/prometheus/prometheus/pkg/labels", "github.com/prometheus/prometheus/pkg/timestamp", "github.com/prometheus/prometheus/pkg/value", @@ -867,6 +869,7 @@ "github.com/prometheus/tsdb/fileutil", "github.com/prometheus/tsdb/index", "github.com/prometheus/tsdb/labels", + "github.com/prometheus/tsdb/testutil", "golang.org/x/net/context", "golang.org/x/sync/errgroup", "golang.org/x/text/language", diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index ddf9893061..25f8512abf 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -36,6 +36,11 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks."). Default("2GB").Bytes() + maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call. 0 means no limit."). + Default("50000000").Uint() + + maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int() + objStoreConfig := regCommonObjStoreFlags(cmd, "", true) syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). @@ -63,6 +68,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string peer, uint64(*indexCacheSize), uint64(*chunkPoolSize), + uint64(*maxSampleCount), + int(*maxConcurrent), name, debugLogging, *syncInterval, @@ -87,6 +94,8 @@ func runStore( peer cluster.Peer, indexCacheSizeBytes uint64, chunkPoolSizeBytes uint64, + maxSampleCount uint64, + maxConcurrent int, component string, verbose bool, syncInterval time.Duration, @@ -117,6 +126,8 @@ func runStore( dataDir, indexCacheSizeBytes, chunkPoolSizeBytes, + maxSampleCount, + maxConcurrent, verbose, blockSyncConcurrency, ) diff --git a/docs/components/store.md b/docs/components/store.md index e32aa11d16..866cfe2642 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -104,6 +104,12 @@ Flags: --index-cache-size=250MB Maximum size of items held in the index cache. --chunk-pool-size=2GB Maximum size of concurrently allocatable bytes for chunks. + --grpc-sample-limit=50000000 + Maximum amount of samples returned via a single + Series call. 0 means no limit. + --grpc-concurrent-limit=20 + Maximum number of concurrent Series calls. 0 + means no limit. --objstore.config-file= Path to YAML file that contains object store configuration. diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index bb2c38b17a..2a56426c96 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -1,13 +1,14 @@ package downsample import ( - "github.com/prometheus/tsdb" "io/ioutil" "math" "os" "path/filepath" "testing" + "github.com/prometheus/tsdb" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/prometheus/prometheus/pkg/value" diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e006f5c70c..090d4367c5 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -171,6 +171,13 @@ type BucketStore struct { debugLogging bool // Number of goroutines to use when syncing blocks from object storage. blockSyncConcurrency int + + // The maximum of samples Thanos Store could return in one Series() call. + // Set to 0 to remove this limit (not recommended). + maxSampleCount uint64 + + // Query gate which limits the maximum amount of concurrent queries. + queryGate *Gate } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -182,6 +189,8 @@ func NewBucketStore( dir string, indexCacheSizeBytes uint64, maxChunkPoolBytes uint64, + maxSampleCount uint64, + maxConcurrent int, debugLogging bool, blockSyncConcurrency int, ) (*BucketStore, error) { @@ -204,8 +213,10 @@ func NewBucketStore( chunkPool: chunkPool, blocks: map[ulid.ULID]*bucketBlock{}, blockSets: map[uint64]*bucketBlockSet{}, + maxSampleCount: maxSampleCount, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, + queryGate: NewGate(maxConcurrent, reg), } s.metrics = newBucketStoreMetrics(reg) @@ -463,7 +474,7 @@ func (s *bucketSeriesSet) Err() error { return s.err } -func (s *BucketStore) blockSeries( +func (bs *BucketStore) blockSeries( ctx context.Context, ulid ulid.ULID, extLset map[string]string, @@ -471,6 +482,8 @@ func (s *BucketStore) blockSeries( chunkr *bucketChunkReader, matchers []labels.Matcher, req *storepb.SeriesRequest, + samples *uint64, + samplesLock *sync.Mutex, ) (storepb.SeriesSet, *queryStats, error) { ps, err := indexr.ExpandedPostings(matchers) if err != nil { @@ -559,7 +572,7 @@ func (s *BucketStore) blockSeries( if err != nil { return nil, nil, errors.Wrap(err, "get chunk") } - if err := populateChunk(&s.chks[i], chk, req.Aggregates); err != nil { + if err := bs.populateChunk(&s.chks[i], chk, req.Aggregates, samples, samplesLock); err != nil { return nil, nil, errors.Wrap(err, "populate chunk") } } @@ -568,7 +581,9 @@ func (s *BucketStore) blockSeries( return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } -func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error { +func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, + samples *uint64, samplesLock *sync.Mutex) error { + if in.Encoding() == chunkenc.EncXOR { out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()} return nil @@ -579,6 +594,14 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag ac := downsample.AggrChunk(in.Bytes()) + samplesLock.Lock() + *samples += uint64(ac.NumSamples()) + if bs.maxSampleCount > 0 && *samples > bs.maxSampleCount { + samplesLock.Unlock() + return errors.Errorf("sample limit violated (got %v, limit %v)", *samples, bs.maxSampleCount) + } + samplesLock.Unlock() + for _, at := range aggrs { switch at { case storepb.Aggr_COUNT: @@ -652,19 +675,24 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels } // Series implements the storepb.StoreServer interface. -// TODO(bwplotka): It buffers all chunks in memory and only then streams to client. -// 1. Either count chunk sizes and error out too big query. -// 2. Stream posting -> series -> chunk all together. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + err := s.queryGate.Start(srv.Context()) + if err != nil { + return errors.Wrapf(err, "gate Start failed") + } + defer s.queryGate.Done() + matchers, err := translateMatchers(req.Matchers) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } var ( - stats = &queryStats{} - g run.Group - res []storepb.SeriesSet - mtx sync.Mutex + stats = &queryStats{} + g run.Group + res []storepb.SeriesSet + mtx sync.Mutex + samples uint64 + samplesLock sync.Mutex ) s.mtx.RLock() @@ -701,6 +729,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie chunkr, blockMatchers, req, + &samples, + &samplesLock, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 28122ac367..0f3850dc0d 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -35,7 +35,7 @@ func (s *storeSuite) Close() { s.wg.Wait() } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, maxSampleCount uint64) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) * testutil.Ok(t, os.RemoveAll(dir2)) } - store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20) + store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20) testutil.Ok(t, err) s.store = store @@ -126,7 +126,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt) + s := prepareStoreWithTestBlocks(t, dir, bkt, 0) defer s.Close() mint, maxt := s.store.TimeRange() @@ -215,6 +215,31 @@ func TestBucketStore_e2e(t *testing.T) { MaxTime: maxt, }, srv)) testutil.Equals(t, 0, len(srv.SeriesSet)) + + // Test the samples limit. + testutil.Ok(t, os.RemoveAll(dir)) + s = prepareStoreWithTestBlocks(t, dir, bkt, 10) + mint, maxt = s.store.TimeRange() + defer s.Close() + + srv = newStoreSeriesServer(ctx) + + testutil.Ok(t, s.store.Series(&storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + }, + MinTime: mint, + MaxTime: maxt, + }, srv)) + + testutil.NotOk(t, s.store.Series(&storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, + }, + MinTime: mint, + MaxTime: maxt, + }, srv)) + }) } diff --git a/pkg/store/gate.go b/pkg/store/gate.go new file mode 100644 index 0000000000..555aa1828f --- /dev/null +++ b/pkg/store/gate.go @@ -0,0 +1,43 @@ +package store + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/gate" +) + +// Gate wraps the Prometheus gate with extra metrics. +type Gate struct { + g *gate.Gate + currentQueries prometheus.Gauge +} + +// NewGate returns a new gate. +func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { + g := &Gate{ + g: gate.New(maxConcurrent), + } + g.currentQueries = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "thanos_bucket_store_queries_total", + Help: "Total number of currently executing queries.", + }) + + if reg != nil { + reg.MustRegister(g.currentQueries) + } + + return g +} + +// Start iniates a new query. +func (g *Gate) Start(ctx context.Context) error { + g.currentQueries.Inc() + return g.g.Start(ctx) +} + +// Done finishes a query. +func (g *Gate) Done() { + g.currentQueries.Dec() + g.g.Done() +} From e87f763c356b953831f39cbb4600ccb1a25e0b34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 5 Feb 2019 11:18:33 +0200 Subject: [PATCH 02/52] store/bucket: account for the RawChunk case Convert raw chunks into XOR encoded chunks and call the NumSamples() method on them to calculate the number of samples. Rip out the samples calculation into a different function because it is used in two different places. --- pkg/store/bucket.go | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 090d4367c5..cdce5e4e41 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -581,10 +581,29 @@ func (bs *BucketStore) blockSeries( return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } +func (bs *BucketStore) checkSamples(gotSamples uint64, samples *uint64, samplesLock *sync.Mutex) error { + samplesLock.Lock() + *samples += gotSamples + if bs.maxSampleCount > 0 && *samples > bs.maxSampleCount { + samplesLock.Unlock() + return errors.Errorf("sample limit violated (got %v, limit %v)", *samples, bs.maxSampleCount) + } + samplesLock.Unlock() + return nil +} + func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, samples *uint64, samplesLock *sync.Mutex) error { if in.Encoding() == chunkenc.EncXOR { + ch, err := chunkenc.FromData(in.Encoding(), in.Bytes()) + if err != nil { + return errors.Errorf("failed to create a chunk") + } + err = bs.checkSamples(uint64(ch.NumSamples()), samples, samplesLock) + if err != nil { + return errors.Wrapf(err, "check samples") + } out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()} return nil } @@ -593,14 +612,10 @@ func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, } ac := downsample.AggrChunk(in.Bytes()) - - samplesLock.Lock() - *samples += uint64(ac.NumSamples()) - if bs.maxSampleCount > 0 && *samples > bs.maxSampleCount { - samplesLock.Unlock() - return errors.Errorf("sample limit violated (got %v, limit %v)", *samples, bs.maxSampleCount) + err := bs.checkSamples(uint64(ac.NumSamples()), samples, samplesLock) + if err != nil { + return errors.Wrapf(err, "check samples") } - samplesLock.Unlock() for _, at := range aggrs { switch at { From 1ab1dc6e36f74f1325e7818014bb9dcbcff03e46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 5 Feb 2019 11:36:09 +0200 Subject: [PATCH 03/52] store/bucket_e2e_test: adjust sample limit size It should be actually 30 - I miscalculated this. --- pkg/store/bucket_e2e_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 0f3850dc0d..e5e4f8db26 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -218,7 +218,7 @@ func TestBucketStore_e2e(t *testing.T) { // Test the samples limit. testutil.Ok(t, os.RemoveAll(dir)) - s = prepareStoreWithTestBlocks(t, dir, bkt, 10) + s = prepareStoreWithTestBlocks(t, dir, bkt, 30) mint, maxt = s.store.TimeRange() defer s.Close() From d7c3ade73559991714e556bc90485727bf2c7149 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 5 Feb 2019 11:40:48 +0200 Subject: [PATCH 04/52] store/bucket: add metric thanos_bucket_store_queries_limited_total --- pkg/store/bucket.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index cdce5e4e41..e42e84234c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -56,6 +56,7 @@ type bucketStoreMetrics struct { seriesMergeDuration prometheus.Histogram resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram + queriesLimited prometheus.Counter } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -131,6 +132,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }, }) + m.queriesLimited = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_queries_limited_total", + Help: "Total number of queries that were dropped due to the sample limit.", + }) + if reg != nil { reg.MustRegister( m.blockLoads, @@ -586,6 +592,7 @@ func (bs *BucketStore) checkSamples(gotSamples uint64, samples *uint64, samplesL *samples += gotSamples if bs.maxSampleCount > 0 && *samples > bs.maxSampleCount { samplesLock.Unlock() + bs.metrics.queriesLimited.Inc() return errors.Errorf("sample limit violated (got %v, limit %v)", *samples, bs.maxSampleCount) } samplesLock.Unlock() From 12db24a00108d09ea4bca737458001157a6ec7f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 5 Feb 2019 11:47:16 +0200 Subject: [PATCH 05/52] store/bucket: register queriesLimited metric --- pkg/store/bucket.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e42e84234c..c1411c15fc 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -153,6 +153,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { m.seriesMergeDuration, m.resultSeriesCount, m.chunkSizeBytes, + m.queriesLimited, ) } return &m From 9d0b8a76dd1031d1249491ee74244b74d1a5ae73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 11:24:27 +0200 Subject: [PATCH 06/52] store: make changes according to the review comments --- cmd/thanos/store.go | 3 +- pkg/store/bucket.go | 87 ++++++++++++++++-------------------- pkg/store/bucket_e2e_test.go | 2 +- pkg/store/limiter.go | 24 ++++++++++ 4 files changed, 66 insertions(+), 50 deletions(-) create mode 100644 pkg/store/limiter.go diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 25f8512abf..7ea51a13cc 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -36,7 +36,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks."). Default("2GB").Bytes() - maxSampleCount := cmd.Flag("grpc-sample-limit", "Maximum amount of samples returned via a single Series call. 0 means no limit."). + maxSampleCount := cmd.Flag("grpc-sample-limit", + "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: may unlikely underestimate the number of samples that would be needed to download."). Default("50000000").Uint() maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index c1411c15fc..a6d1fd89eb 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -41,6 +41,14 @@ import ( "google.golang.org/grpc/status" ) +// Approximately this is the max number of samples that we may have in any given chunk. This is needed +// for precalculating the number of samples that we may have to retrieve and decode for any given query +// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know +// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way +// because you barely get any improvements in compression when the number of samples is beyond this. +// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf. +const maxSamplesPerSeriesRef uint64 = 120 + type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge blockLoads prometheus.Counter @@ -57,6 +65,7 @@ type bucketStoreMetrics struct { resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram queriesLimited prometheus.Counter + queriesLimit prometheus.Gauge } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -133,8 +142,12 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }) m.queriesLimited = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_queries_limited_total", - Help: "Total number of queries that were dropped due to the sample limit.", + Name: "thanos_bucket_store_queries_limited", + Help: "Number of queries that were dropped due to the sample limit.", + }) + m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "thanos_bucket_store_queries_limit", + Help: "Number of maximum concurrent queries.", }) if reg != nil { @@ -154,6 +167,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { m.resultSeriesCount, m.chunkSizeBytes, m.queriesLimited, + m.queriesLimit, ) } return &m @@ -179,12 +193,11 @@ type BucketStore struct { // Number of goroutines to use when syncing blocks from object storage. blockSyncConcurrency int - // The maximum of samples Thanos Store could return in one Series() call. - // Set to 0 to remove this limit (not recommended). - maxSampleCount uint64 - // Query gate which limits the maximum amount of concurrent queries. queryGate *Gate + + // Samples limiter which limits the number of samples per each Series() call. + samplesLimiter *Limiter } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -220,10 +233,10 @@ func NewBucketStore( chunkPool: chunkPool, blocks: map[ulid.ULID]*bucketBlock{}, blockSets: map[uint64]*bucketBlockSet{}, - maxSampleCount: maxSampleCount, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, queryGate: NewGate(maxConcurrent, reg), + samplesLimiter: NewLimiter(maxSampleCount), } s.metrics = newBucketStoreMetrics(reg) @@ -231,6 +244,8 @@ func NewBucketStore( return nil, errors.Wrap(err, "create dir") } + s.metrics.queriesLimit.Set(float64(maxConcurrent)) + return s, nil } @@ -481,7 +496,7 @@ func (s *bucketSeriesSet) Err() error { return s.err } -func (bs *BucketStore) blockSeries( +func blockSeries( ctx context.Context, ulid ulid.ULID, extLset map[string]string, @@ -489,8 +504,7 @@ func (bs *BucketStore) blockSeries( chunkr *bucketChunkReader, matchers []labels.Matcher, req *storepb.SeriesRequest, - samples *uint64, - samplesLock *sync.Mutex, + samplesLimiter *Limiter, ) (storepb.SeriesSet, *queryStats, error) { ps, err := indexr.ExpandedPostings(matchers) if err != nil { @@ -568,7 +582,7 @@ func (bs *BucketStore) blockSeries( } // Preload all chunks that were marked in the previous stage. - if err := chunkr.preload(); err != nil { + if err := chunkr.preload(samplesLimiter); err != nil { return nil, nil, errors.Wrap(err, "preload chunks") } @@ -579,7 +593,7 @@ func (bs *BucketStore) blockSeries( if err != nil { return nil, nil, errors.Wrap(err, "get chunk") } - if err := bs.populateChunk(&s.chks[i], chk, req.Aggregates, samples, samplesLock); err != nil { + if err := populateChunk(&s.chks[i], chk, req.Aggregates); err != nil { return nil, nil, errors.Wrap(err, "populate chunk") } } @@ -588,30 +602,9 @@ func (bs *BucketStore) blockSeries( return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } -func (bs *BucketStore) checkSamples(gotSamples uint64, samples *uint64, samplesLock *sync.Mutex) error { - samplesLock.Lock() - *samples += gotSamples - if bs.maxSampleCount > 0 && *samples > bs.maxSampleCount { - samplesLock.Unlock() - bs.metrics.queriesLimited.Inc() - return errors.Errorf("sample limit violated (got %v, limit %v)", *samples, bs.maxSampleCount) - } - samplesLock.Unlock() - return nil -} - -func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, - samples *uint64, samplesLock *sync.Mutex) error { +func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error { if in.Encoding() == chunkenc.EncXOR { - ch, err := chunkenc.FromData(in.Encoding(), in.Bytes()) - if err != nil { - return errors.Errorf("failed to create a chunk") - } - err = bs.checkSamples(uint64(ch.NumSamples()), samples, samplesLock) - if err != nil { - return errors.Wrapf(err, "check samples") - } out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()} return nil } @@ -620,10 +613,6 @@ func (bs *BucketStore) populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, } ac := downsample.AggrChunk(in.Bytes()) - err := bs.checkSamples(uint64(ac.NumSamples()), samples, samplesLock) - if err != nil { - return errors.Wrapf(err, "check samples") - } for _, at := range aggrs { switch at { @@ -710,12 +699,10 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie return status.Error(codes.InvalidArgument, err.Error()) } var ( - stats = &queryStats{} - g run.Group - res []storepb.SeriesSet - mtx sync.Mutex - samples uint64 - samplesLock sync.Mutex + stats = &queryStats{} + g run.Group + res []storepb.SeriesSet + mtx sync.Mutex ) s.mtx.RLock() @@ -745,15 +732,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block") g.Add(func() error { - part, pstats, err := s.blockSeries(ctx, + part, pstats, err := blockSeries(ctx, b.meta.ULID, b.meta.Thanos.Labels, indexr, chunkr, blockMatchers, req, - &samples, - &samplesLock, + s.samplesLimiter, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) @@ -1570,12 +1556,17 @@ func (r *bucketChunkReader) addPreload(id uint64) error { } // preload all added chunk IDs. Must be called before the first call to Chunk is made. -func (r *bucketChunkReader) preload() error { +func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error { const maxChunkSize = 16000 const maxGapSize = 512 * 1024 var g run.Group + numSamples := uint64(len(r.preloads)) * maxSamplesPerSeriesRef + if err := samplesLimiter.Check(numSamples); err != nil { + return err + } + for seq, offsets := range r.preloads { sort.Slice(offsets, func(i, j int) bool { return offsets[i] < offsets[j] diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index e5e4f8db26..9ae5098397 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -218,7 +218,7 @@ func TestBucketStore_e2e(t *testing.T) { // Test the samples limit. testutil.Ok(t, os.RemoveAll(dir)) - s = prepareStoreWithTestBlocks(t, dir, bkt, 30) + s = prepareStoreWithTestBlocks(t, dir, bkt, 120) mint, maxt = s.store.TimeRange() defer s.Close() diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go new file mode 100644 index 0000000000..d27f4ce05f --- /dev/null +++ b/pkg/store/limiter.go @@ -0,0 +1,24 @@ +package store + +import "github.com/pkg/errors" + +// Limiter is a simple mechanism for checking if something has passed a certain threshold. +type Limiter struct { + limit uint64 +} + +// NewLimiter returns a new limiter with a specified limit. 0 disables the limit. +func NewLimiter(limit uint64) *Limiter { + return &Limiter{limit: limit} +} + +// Check checks if the passed number exceeds the limits or not. +func (l *Limiter) Check(num uint64) error { + if l.limit == 0 { + return nil + } + if num > l.limit { + return errors.Errorf("limit %v violated", l.limit) + } + return nil +} From 9727072695df074dd4f9a6135538f5759ee6d078 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 11:41:43 +0200 Subject: [PATCH 07/52] docs/store: update --- docs/components/store.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/components/store.md b/docs/components/store.md index 866cfe2642..4677a5bcd4 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -106,7 +106,9 @@ Flags: for chunks. --grpc-sample-limit=50000000 Maximum amount of samples returned via a single - Series call. 0 means no limit. + Series call. 0 means no limit. NOTE: may + unlikely underestimate the number of samples + that would be needed to download. --grpc-concurrent-limit=20 Maximum number of concurrent Series calls. 0 means no limit. From d9c733a13b38668ac19218c2aa1249fa0736d8c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 13:04:40 +0200 Subject: [PATCH 08/52] store: gating naming changes, add span/extra metric --- pkg/store/bucket.go | 22 ++++++++++++++++------ pkg/store/gate.go | 19 +++++++++++++++---- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index a6d1fd89eb..ed06ddc728 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -47,7 +47,7 @@ import ( // where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way // because you barely get any improvements in compression when the number of samples is beyond this. // Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf. -const maxSamplesPerSeriesRef uint64 = 120 +const maxSamplesPerChunk uint64 = 120 type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge @@ -688,11 +688,15 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - err := s.queryGate.Start(srv.Context()) - if err != nil { - return errors.Wrapf(err, "gate Start failed") + { + span, _ := tracing.StartSpan(srv.Context(), "bucket_store_gate") + err := s.queryGate.IsMyTurn(srv.Context()) + span.Finish() + if err != nil { + return errors.Wrapf(err, "failed to wait for turn") + } + defer s.queryGate.Done() } - defer s.queryGate.Done() matchers, err := translateMatchers(req.Matchers) if err != nil { @@ -1562,7 +1566,13 @@ func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error { var g run.Group - numSamples := uint64(len(r.preloads)) * maxSamplesPerSeriesRef + numChunks := uint64(0) + for _, offsets := range r.preloads { + for range offsets { + numChunks++ + } + } + numSamples := numChunks * maxSamplesPerChunk if err := samplesLimiter.Check(numSamples); err != nil { return err } diff --git a/pkg/store/gate.go b/pkg/store/gate.go index 555aa1828f..4e36c9a6a7 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -2,6 +2,7 @@ package store import ( "context" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/gate" @@ -11,6 +12,7 @@ import ( type Gate struct { g *gate.Gate currentQueries prometheus.Gauge + gateTiming prometheus.Summary } // NewGate returns a new gate. @@ -22,18 +24,27 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { Name: "thanos_bucket_store_queries_total", Help: "Total number of currently executing queries.", }) + g.gateTiming = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "thanos_bucket_store_gate_seconds", + Help: "How many time it took for a query to pass through the gate.", + }) if reg != nil { - reg.MustRegister(g.currentQueries) + reg.MustRegister(g.currentQueries, g.gateTiming) } return g } -// Start iniates a new query. -func (g *Gate) Start(ctx context.Context) error { +// IsMyTurn iniates a new query and wait untils its our turn to fulfill a query request. +func (g *Gate) IsMyTurn(ctx context.Context) error { g.currentQueries.Inc() - return g.g.Start(ctx) + start := time.Now() + if err := g.g.Start(ctx); err != nil { + return err + } + g.gateTiming.Observe(float64(time.Now().Sub(start))) + return nil } // Done finishes a query. From c4ce7355cca886719c6fbc11ca176f632c25462e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 13:15:58 +0200 Subject: [PATCH 09/52] store: improve error messages --- pkg/store/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ed06ddc728..0a003c968d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1574,7 +1574,7 @@ func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error { } numSamples := numChunks * maxSamplesPerChunk if err := samplesLimiter.Check(numSamples); err != nil { - return err + return errors.Wrapf(err, "exceeded samples limit") } for seq, offsets := range r.preloads { From 30eef193b9c22ec7ae712cad459d18b9590a5af1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 13:21:41 +0200 Subject: [PATCH 10/52] store/limiter: improve error messages --- pkg/store/limiter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index d27f4ce05f..325b668b50 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -18,7 +18,7 @@ func (l *Limiter) Check(num uint64) error { return nil } if num > l.limit { - return errors.Errorf("limit %v violated", l.limit) + return errors.Errorf("limit %v violated (got %v)", l.limit, num) } return nil } From 194394d4bf7be024c9b5cf290fe72ddeaf8539b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 13:26:35 +0200 Subject: [PATCH 11/52] store/gate: time -> seconds --- pkg/store/gate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/gate.go b/pkg/store/gate.go index 4e36c9a6a7..90d85a2d7f 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -26,7 +26,7 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { }) g.gateTiming = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "thanos_bucket_store_gate_seconds", - Help: "How many time it took for a query to pass through the gate.", + Help: "How many seconds it took for a query to pass through the gate.", }) if reg != nil { From 2e51c2ead598babf74b62927d26590343b277ad9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 13:28:31 +0200 Subject: [PATCH 12/52] store/bucket_e2e_test: narrow down the first query --- pkg/store/bucket_e2e_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 9ae5098397..84733e996d 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -227,6 +227,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, s.store.Series(&storepb.SeriesRequest{ Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + {Type: storepb.LabelMatcher_EQ, Name: "b", Value: "1"}, }, MinTime: mint, MaxTime: maxt, From 58a14faf252a49e9b36b0dddf0b6f11b005c50cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 13:30:56 +0200 Subject: [PATCH 13/52] store/bucket: check for negative maxConcurrent --- pkg/store/bucket.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 0a003c968d..4595e6451d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -217,6 +217,11 @@ func NewBucketStore( if logger == nil { logger = log.NewNopLogger() } + + if maxConcurrent < 0 { + return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent) + } + indexCache, err := newIndexCache(reg, indexCacheSizeBytes) if err != nil { return nil, errors.Wrap(err, "create index cache") From 4d1b7ed8eacf7b632aa42ef3807dd5747e4bc4b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 13:38:23 +0200 Subject: [PATCH 14/52] cmd/store: clarify help message --- cmd/thanos/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 7ea51a13cc..79f505d854 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -37,7 +37,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string Default("2GB").Bytes() maxSampleCount := cmd.Flag("grpc-sample-limit", - "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: may unlikely underestimate the number of samples that would be needed to download."). + "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: may overestimate the number of samples that would be needed to respond to a query."). Default("50000000").Uint() maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int() From b149f743fd6d6bae3f295c4215395dfa8cb67ba9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 13:46:39 +0200 Subject: [PATCH 15/52] pkg/store: hook thanos_bucket_store_queries_limited into Limiter --- pkg/store/bucket.go | 5 +++-- pkg/store/limiter.go | 15 ++++++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 3d91635c36..826fb3f5c0 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -231,6 +231,7 @@ func NewBucketStore( if err != nil { return nil, errors.Wrap(err, "create chunk pool") } + metrics := newBucketStoreMetrics(reg) s := &BucketStore{ logger: logger, bucket: bucket, @@ -242,9 +243,9 @@ func NewBucketStore( debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, queryGate: NewGate(maxConcurrent, reg), - samplesLimiter: NewLimiter(maxSampleCount), + samplesLimiter: NewLimiter(maxSampleCount, &metrics.queriesLimited), } - s.metrics = newBucketStoreMetrics(reg) + s.metrics = metrics if err := os.MkdirAll(dir, 0777); err != nil { return nil, errors.Wrap(err, "create dir") diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index 325b668b50..0f54ead301 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -1,15 +1,21 @@ package store -import "github.com/pkg/errors" +import ( + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" +) // Limiter is a simple mechanism for checking if something has passed a certain threshold. type Limiter struct { limit uint64 + + // A ptr to a counter metric which we will increase if Check() fails. + failedCounter *prometheus.Counter } // NewLimiter returns a new limiter with a specified limit. 0 disables the limit. -func NewLimiter(limit uint64) *Limiter { - return &Limiter{limit: limit} +func NewLimiter(limit uint64, ctr *prometheus.Counter) *Limiter { + return &Limiter{limit: limit, failedCounter: ctr} } // Check checks if the passed number exceeds the limits or not. @@ -18,6 +24,9 @@ func (l *Limiter) Check(num uint64) error { return nil } if num > l.limit { + if l.failedCounter != nil { + (*l.failedCounter).Inc() + } return errors.Errorf("limit %v violated (got %v)", l.limit, num) } return nil From e79c56d728b440784616a5315543aee8f66e7d43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 13:51:11 +0200 Subject: [PATCH 16/52] store/bucket_test: fix NewBucketStore call --- pkg/store/bucket_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 7cc57da4cf..b79a50af52 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -283,7 +283,7 @@ func TestBucketStore_Info(t *testing.T) { dir, err := ioutil.TempDir("", "prometheus-test") testutil.Ok(t, err) - bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, false, 20) + bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, 0, 0, false, 20) testutil.Ok(t, err) resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) From 1d07515cefed06084331e9a4cd530628f4d6e39a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 8 Feb 2019 13:55:11 +0200 Subject: [PATCH 17/52] docs: update again --- docs/components/store.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/components/store.md b/docs/components/store.md index 4677a5bcd4..259ce90d58 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -107,8 +107,8 @@ Flags: --grpc-sample-limit=50000000 Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: may - unlikely underestimate the number of samples - that would be needed to download. + overestimate the number of samples that would + be needed to respond to a query. --grpc-concurrent-limit=20 Maximum number of concurrent Series calls. 0 means no limit. From 38a093bb021d95173cf58be107bd7e46091fe650 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Sat, 9 Feb 2019 17:21:23 +0200 Subject: [PATCH 18/52] store/gate: spelling fix --- pkg/store/gate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/gate.go b/pkg/store/gate.go index 90d85a2d7f..cebdc4bc52 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -36,7 +36,7 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { return g } -// IsMyTurn iniates a new query and wait untils its our turn to fulfill a query request. +// IsMyTurn iniates a new query and wait until it's our turn to fulfill a query request. func (g *Gate) IsMyTurn(ctx context.Context) error { g.currentQueries.Inc() start := time.Now() From 7b13f7e2c4933cba303169a645c493b7bbd09ba7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Sat, 9 Feb 2019 17:22:02 +0200 Subject: [PATCH 19/52] store/gate: spelling fix #2 --- pkg/store/gate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/gate.go b/pkg/store/gate.go index cebdc4bc52..8275aa1317 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -36,7 +36,7 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { return g } -// IsMyTurn iniates a new query and wait until it's our turn to fulfill a query request. +// IsMyTurn iniates a new query and waits until it's our turn to fulfill a query request. func (g *Gate) IsMyTurn(ctx context.Context) error { g.currentQueries.Inc() start := time.Now() From 3e532fe8786739900a21e6da5086d58495861ab6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 15 Feb 2019 23:56:15 +0200 Subject: [PATCH 20/52] store/bucket: remove pointless newline --- pkg/store/bucket.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 626b477799..4f08f60a1e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -617,7 +617,6 @@ func blockSeries( } func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error { - if in.Encoding() == chunkenc.EncXOR { out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()} return nil From cff979c4f98dc67958cad245a1dd688e3ba40859 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 16:28:33 +0200 Subject: [PATCH 21/52] store/gate: generalize gate timing Make the metric show in general how much time it takes for queries to wait at the gate. --- pkg/store/gate.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/store/gate.go b/pkg/store/gate.go index 8275aa1317..8193011db8 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -26,7 +26,7 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { }) g.gateTiming = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "thanos_bucket_store_gate_seconds", - Help: "How many seconds it took for a query to pass through the gate.", + Help: "How many seconds it took for a query to wait at the gate.", }) if reg != nil { @@ -38,12 +38,16 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { // IsMyTurn iniates a new query and waits until it's our turn to fulfill a query request. func (g *Gate) IsMyTurn(ctx context.Context) error { - g.currentQueries.Inc() start := time.Now() + defer func() { + g.gateTiming.Observe(float64(time.Now().Sub(start))) + }() + if err := g.g.Start(ctx); err != nil { return err } - g.gateTiming.Observe(float64(time.Now().Sub(start))) + + g.currentQueries.Inc() return nil } From e7ea64be58dff8d3ca610d9c8f44e1dfa77db541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 16:30:56 +0200 Subject: [PATCH 22/52] store/gate: convert the g.gateTiming metric into a histogram --- pkg/store/gate.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/store/gate.go b/pkg/store/gate.go index 8193011db8..732bf40f6d 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -12,7 +12,7 @@ import ( type Gate struct { g *gate.Gate currentQueries prometheus.Gauge - gateTiming prometheus.Summary + gateTiming prometheus.Histogram } // NewGate returns a new gate. @@ -24,9 +24,12 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { Name: "thanos_bucket_store_queries_total", Help: "Total number of currently executing queries.", }) - g.gateTiming = prometheus.NewSummary(prometheus.SummaryOpts{ + g.gateTiming = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "thanos_bucket_store_gate_seconds", Help: "How many seconds it took for a query to wait at the gate.", + Buckets: []float64{ + 0.01, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5, 7.5, 10, 15, 30, 60, 120, + }, }) if reg != nil { From 23c7368c97ff183b93431aad105ed5c8ba964337 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 16:43:34 +0200 Subject: [PATCH 23/52] store/bucket: change comment wording --- pkg/store/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 4f08f60a1e..41637235b2 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -42,7 +42,7 @@ import ( "google.golang.org/grpc/status" ) -// Approximately this is the max number of samples that we may have in any given chunk. This is needed +// maxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed // for precalculating the number of samples that we may have to retrieve and decode for any given query // without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know // where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way From da575e43cbd9a67d92820447efe824f7e35cf0ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 16:47:00 +0200 Subject: [PATCH 24/52] store/bucket: remove type from maxSamplesPerChunk Let Go decide by itself what kind of type it needs. --- pkg/store/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 41637235b2..c211862a4b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -48,7 +48,7 @@ import ( // where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way // because you barely get any improvements in compression when the number of samples is beyond this. // Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf. -const maxSamplesPerChunk uint64 = 120 +const maxSamplesPerChunk = 120 type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge From e3908463a225724c0158add5af1c838706f17fa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 16:50:19 +0200 Subject: [PATCH 25/52] store/bucket: rename metric into thanos_bucket_store_queries_dropped --- pkg/store/bucket.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index c211862a4b..720427f509 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -65,7 +65,7 @@ type bucketStoreMetrics struct { seriesMergeDuration prometheus.Histogram resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram - queriesLimited prometheus.Counter + queriesDropped prometheus.Counter queriesLimit prometheus.Gauge } @@ -142,8 +142,8 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }, }) - m.queriesLimited = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_queries_limited", + m.queriesDropped = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_queries_dropped", Help: "Number of queries that were dropped due to the sample limit.", }) m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -167,7 +167,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { m.seriesMergeDuration, m.resultSeriesCount, m.chunkSizeBytes, - m.queriesLimited, + m.queriesDropped, m.queriesLimit, ) } @@ -247,7 +247,7 @@ func NewBucketStore( debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, queryGate: NewGate(maxConcurrent, reg), - samplesLimiter: NewLimiter(maxSampleCount, &metrics.queriesLimited), + samplesLimiter: NewLimiter(maxSampleCount, &metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, } s.metrics = metrics From 24e8e1fc36586b16dbe8fee9d5630c675e98aefe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 16:56:18 +0200 Subject: [PATCH 26/52] thanos/store: clarify help message Literally explain what it means in the help message so that it would be clearer. --- cmd/thanos/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 79f505d854..fec2fdbd41 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -37,7 +37,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string Default("2GB").Bytes() maxSampleCount := cmd.Flag("grpc-sample-limit", - "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: may overestimate the number of samples that would be needed to respond to a query."). + "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as number of samples in chunk, so the actual number of samples might be lower, even though maximum could be hit. Cannot be bigger than 120."). Default("50000000").Uint() maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int() From 4012eca2f076a6f30156c12db2ae7edab65c1dd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 17:00:15 +0200 Subject: [PATCH 27/52] store/gate: rename metric to thanos_bucket_store_queries_in_flight More fitting as decided by everyone. --- pkg/store/gate.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/store/gate.go b/pkg/store/gate.go index 732bf40f6d..bf2bfa91cd 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -10,9 +10,9 @@ import ( // Gate wraps the Prometheus gate with extra metrics. type Gate struct { - g *gate.Gate - currentQueries prometheus.Gauge - gateTiming prometheus.Histogram + g *gate.Gate + inflightQueries prometheus.Gauge + gateTiming prometheus.Histogram } // NewGate returns a new gate. @@ -20,9 +20,9 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { g := &Gate{ g: gate.New(maxConcurrent), } - g.currentQueries = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "thanos_bucket_store_queries_total", - Help: "Total number of currently executing queries.", + g.inflightQueries = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "thanos_bucket_store_queries_in_flight", + Help: "Total number of queries that are currently in flight.", }) g.gateTiming = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "thanos_bucket_store_gate_seconds", @@ -50,12 +50,12 @@ func (g *Gate) IsMyTurn(ctx context.Context) error { return err } - g.currentQueries.Inc() + g.inflightQueries.Inc() return nil } // Done finishes a query. func (g *Gate) Done() { - g.currentQueries.Dec() + g.inflightQueries.Dec() g.g.Done() } From e7be55dc352677936651dd342db9accd04b9a584 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 17:03:55 +0200 Subject: [PATCH 28/52] store/gate: fix MustRegister() call --- pkg/store/gate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/gate.go b/pkg/store/gate.go index bf2bfa91cd..9a9a5f0c0c 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -33,7 +33,7 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { }) if reg != nil { - reg.MustRegister(g.currentQueries, g.gateTiming) + reg.MustRegister(g.inflightQueries, g.gateTiming) } return g From 3e8150d5ddb0cd9337f9866cb240e0b97c45cc4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 17:05:31 +0200 Subject: [PATCH 29/52] docs: update --- docs/components/store.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/components/store.md b/docs/components/store.md index 259ce90d58..04f16a53f6 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -106,9 +106,11 @@ Flags: for chunks. --grpc-sample-limit=50000000 Maximum amount of samples returned via a single - Series call. 0 means no limit. NOTE: may - overestimate the number of samples that would - be needed to respond to a query. + Series call. 0 means no limit. NOTE: for + efficiency we take 120 as number of samples in + chunk, so the actual number of samples might be + lower, even though maximum could be hit. Cannot + be bigger than 120. --grpc-concurrent-limit=20 Maximum number of concurrent Series calls. 0 means no limit. From 5ec5ce999f58c784f1e943fcea28fbb5ea9564c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 17:06:31 +0200 Subject: [PATCH 30/52] store/bucket: clarify the name of the span Make it more clearer about what it is for. --- pkg/store/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 720427f509..e8705ce860 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -702,7 +702,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { { - span, _ := tracing.StartSpan(srv.Context(), "bucket_store_gate") + span, _ := tracing.StartSpan(srv.Context(), "store_query_gate_ismyturn") err := s.queryGate.IsMyTurn(srv.Context()) span.Finish() if err != nil { From 810a131db6ad3bea6bb664c04e7dc92c92265fb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 17:07:53 +0200 Subject: [PATCH 31/52] store/bucket: inline calculation into the function call No need to create an extra variable in a hot path in the code if we can inline it and it will be just as clear. --- pkg/store/bucket.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e8705ce860..8413b9e608 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1643,8 +1643,7 @@ func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error { numChunks++ } } - numSamples := numChunks * maxSamplesPerChunk - if err := samplesLimiter.Check(numSamples); err != nil { + if err := samplesLimiter.Check(numChunks * maxSamplesPerChunk); err != nil { return errors.Wrapf(err, "exceeded samples limit") } From ae8e425aebf5005e5512ef8a7a96e861ef87715f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 14 Mar 2019 17:16:18 +0200 Subject: [PATCH 32/52] CHANGELOG: add item about this --- CHANGELOG.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5dd5fdd7a4..41af548944 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,24 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added - [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver +- [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples. + +New options: + +* `--grpc-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 5e7. Consider increasing this limit if you run a huge deployment. Helps a lot to capacity plan your Thanos Store instance if you are running on a virtual machine, for example; +* `--grpc-concurrent-limit` limits the number of concurrent Series() calls in Thanos Store. By default it is 20. Consider increasing this limit if needed. + +New metrics: +* `thanos_bucket_store_queries_dropped` shows how many queries were dropped due to the samples limit; +* `thanos_bucket_store_queries_limit` is a constant metric which shows how many concurrent queries can come into Thanos Store; +* `thanos_bucket_store_queries_in_flight` shows how many queries are currently "in flight" i.e. they are being executed; +* `thanos_bucket_store_gate_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not. + +New tracing span: +* `store_query_gate_ismyturn` shows how long it took for a query to pass (or not) through the gate + +:warning: **WARNING** :warning: #798 adds some new default limits. Consider increasing them if you have a very huge deployment. + ### Fixed - [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far From de8a234f29e2a75495d43ce848f110335bdcc7c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 15 Mar 2019 13:56:28 +0200 Subject: [PATCH 33/52] store/gate: reduce number of buckets --- pkg/store/gate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/gate.go b/pkg/store/gate.go index 9a9a5f0c0c..8a8ce27892 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -28,7 +28,7 @@ func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { Name: "thanos_bucket_store_gate_seconds", Help: "How many seconds it took for a query to wait at the gate.", Buckets: []float64{ - 0.01, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5, 7.5, 10, 15, 30, 60, 120, + 0.01, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5, 10, }, }) From 07b4658178fba81ee9afeb1b6ceb2f5b24640bc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 15 Mar 2019 13:57:18 +0200 Subject: [PATCH 34/52] store/bucket: rename metric to thanos_bucket_store_queries_dropped_total --- CHANGELOG.md | 2 +- pkg/store/bucket.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41af548944..d5b34ba484 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ New options: * `--grpc-concurrent-limit` limits the number of concurrent Series() calls in Thanos Store. By default it is 20. Consider increasing this limit if needed. New metrics: -* `thanos_bucket_store_queries_dropped` shows how many queries were dropped due to the samples limit; +* `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit; * `thanos_bucket_store_queries_limit` is a constant metric which shows how many concurrent queries can come into Thanos Store; * `thanos_bucket_store_queries_in_flight` shows how many queries are currently "in flight" i.e. they are being executed; * `thanos_bucket_store_gate_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 8cd8faaa2b..2456da47d1 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -143,7 +143,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }) m.queriesDropped = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_queries_dropped", + Name: "thanos_bucket_store_queries_dropped_total", Help: "Number of queries that were dropped due to the sample limit.", }) m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{ From 61d6ecdb8098beda70ce333af43f6bd36290b6ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 15 Mar 2019 13:57:53 +0200 Subject: [PATCH 35/52] store/bucket: move defer out of code block --- pkg/store/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 2456da47d1..72ce77fa56 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -708,8 +708,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie if err != nil { return errors.Wrapf(err, "failed to wait for turn") } - defer s.queryGate.Done() } + defer s.queryGate.Done() matchers, err := translateMatchers(req.Matchers) if err != nil { From 70b115d0e2b9ae85eeb5c9d961e5af37a01898b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 15 Mar 2019 14:09:41 +0200 Subject: [PATCH 36/52] store/gate: generalize gate for different kinds of subsystems --- pkg/store/bucket.go | 3 ++- pkg/store/gate.go | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 72ce77fa56..7e37e0da52 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -22,6 +22,7 @@ import ( "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/component" + "github.com/improbable-eng/thanos/pkg/extprom" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/pool" "github.com/improbable-eng/thanos/pkg/runutil" @@ -246,7 +247,7 @@ func NewBucketStore( blockSets: map[uint64]*bucketBlockSet{}, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, - queryGate: NewGate(maxConcurrent, reg), + queryGate: NewGate(maxConcurrent, extprom.NewSubsystem(reg, "thanos_bucket_store")), samplesLimiter: NewLimiter(maxSampleCount, &metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, } diff --git a/pkg/store/gate.go b/pkg/store/gate.go index 8a8ce27892..e5b40db3ea 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/improbable-eng/thanos/pkg/extprom" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/gate" ) @@ -15,25 +16,27 @@ type Gate struct { gateTiming prometheus.Histogram } -// NewGate returns a new gate. -func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate { +// NewGate returns a new query gate. +func NewGate(maxConcurrent int, reg *extprom.SubsystemRegisterer) *Gate { g := &Gate{ g: gate.New(maxConcurrent), } g.inflightQueries = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "thanos_bucket_store_queries_in_flight", - Help: "Total number of queries that are currently in flight.", + Name: "queries_in_flight", + Help: "Total number of queries that are currently in flight.", + Subsystem: reg.Subsystem(), }) g.gateTiming = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "thanos_bucket_store_gate_seconds", - Help: "How many seconds it took for a query to wait at the gate.", + Name: "gate_seconds", + Help: "How many seconds it took for queries to wait at the gate.", Buckets: []float64{ 0.01, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5, 10, }, + Subsystem: reg.Subsystem(), }) - if reg != nil { - reg.MustRegister(g.inflightQueries, g.gateTiming) + if r := reg.Registerer(); r != nil { + r.MustRegister(g.inflightQueries, g.gateTiming) } return g From 36f1153625c8606ea26d636c3cbf67f1dcd346d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 15 Mar 2019 14:16:00 +0200 Subject: [PATCH 37/52] store/limiter: remove non-nil check --- pkg/store/limiter.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index 0f54ead301..5f85bef522 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -14,6 +14,7 @@ type Limiter struct { } // NewLimiter returns a new limiter with a specified limit. 0 disables the limit. +// Caller *must* ensure that ctr is non-nil. func NewLimiter(limit uint64, ctr *prometheus.Counter) *Limiter { return &Limiter{limit: limit, failedCounter: ctr} } @@ -24,9 +25,7 @@ func (l *Limiter) Check(num uint64) error { return nil } if num > l.limit { - if l.failedCounter != nil { - (*l.failedCounter).Inc() - } + (*l.failedCounter).Inc() return errors.Errorf("limit %v violated (got %v)", l.limit, num) } return nil From 9b74bbe8a5ab52c5116d61284ce7bbe8b42da83c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 15 Mar 2019 15:30:12 +0200 Subject: [PATCH 38/52] CHANGELOG: fixes --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d5b34ba484..57a98157b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,9 +27,9 @@ New metrics: * `thanos_bucket_store_gate_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not. New tracing span: -* `store_query_gate_ismyturn` shows how long it took for a query to pass (or not) through the gate +* `store_query_gate_ismyturn` shows how long it took for a query to pass (or not) through the gate. -:warning: **WARNING** :warning: #798 adds some new default limits. Consider increasing them if you have a very huge deployment. +:warning: **WARNING** :warning: #798 adds new default limits for max samples per one Series() gRPC method call and the maximum number of concurrent Series() gRPC method calls. Consider increasing them if you have a very huge deployment. ### Fixed From 82bdb3cbbefc424f79a806f79aa4e1de11ccec31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 15 Mar 2019 15:36:20 +0200 Subject: [PATCH 39/52] store/limiter: convert failedCounter to non-ptr --- pkg/store/bucket.go | 2 +- pkg/store/limiter.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 7e37e0da52..e739d0484f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -248,7 +248,7 @@ func NewBucketStore( debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, queryGate: NewGate(maxConcurrent, extprom.NewSubsystem(reg, "thanos_bucket_store")), - samplesLimiter: NewLimiter(maxSampleCount, &metrics.queriesDropped), + samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, } s.metrics = metrics diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index 5f85bef522..e4fd8eacf2 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -9,13 +9,13 @@ import ( type Limiter struct { limit uint64 - // A ptr to a counter metric which we will increase if Check() fails. - failedCounter *prometheus.Counter + // Counter metric which we will increase if Check() fails. + failedCounter prometheus.Counter } // NewLimiter returns a new limiter with a specified limit. 0 disables the limit. // Caller *must* ensure that ctr is non-nil. -func NewLimiter(limit uint64, ctr *prometheus.Counter) *Limiter { +func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter { return &Limiter{limit: limit, failedCounter: ctr} } @@ -25,7 +25,7 @@ func (l *Limiter) Check(num uint64) error { return nil } if num > l.limit { - (*l.failedCounter).Inc() + l.failedCounter.Inc() return errors.Errorf("limit %v violated (got %v)", l.limit, num) } return nil From 4d8420f12edb2308f33d3dbe62b76d042745060a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 15 Mar 2019 16:02:44 +0200 Subject: [PATCH 40/52] store/limiter: remove invalid comment --- pkg/store/limiter.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index e4fd8eacf2..2c332a2c6b 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -14,7 +14,6 @@ type Limiter struct { } // NewLimiter returns a new limiter with a specified limit. 0 disables the limit. -// Caller *must* ensure that ctr is non-nil. func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter { return &Limiter{limit: limit, failedCounter: ctr} } From 590b9a6b89e11ed326b678c6b8b9dcc317160a6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 18 Mar 2019 11:23:16 +0200 Subject: [PATCH 41/52] *: update according to review comments --- cmd/thanos/store.go | 2 +- docs/components/store.md | 8 ++++---- pkg/store/bucket.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index fec2fdbd41..f5b5d2f02f 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -37,7 +37,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string Default("2GB").Bytes() maxSampleCount := cmd.Flag("grpc-sample-limit", - "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as number of samples in chunk, so the actual number of samples might be lower, even though maximum could be hit. Cannot be bigger than 120."). + "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit."). Default("50000000").Uint() maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int() diff --git a/docs/components/store.md b/docs/components/store.md index fec7ff659d..9fada8b96a 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -107,10 +107,10 @@ Flags: --grpc-sample-limit=50000000 Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for - efficiency we take 120 as number of samples in - chunk, so the actual number of samples might be - lower, even though maximum could be hit. Cannot - be bigger than 120. + efficiency we take 120 as the number of samples + in chunk (it cannot be bigger than that), so + the actual number of samples might be lower, + even though the maximum could be hit. --grpc-concurrent-limit=20 Maximum number of concurrent Series calls. 0 means no limit. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e739d0484f..4d1262e7b4 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -148,7 +148,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Number of queries that were dropped due to the sample limit.", }) m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "thanos_bucket_store_queries_limit", + Name: "thanos_bucket_store_queries_concurrent_max", Help: "Number of maximum concurrent queries.", }) From 3f40bacd64e21c70beed8b88a18a1e968f9a795c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 18 Mar 2019 11:24:53 +0200 Subject: [PATCH 42/52] CHANGELOG: update --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 57a98157b5..259ed42c22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ New options: New metrics: * `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit; -* `thanos_bucket_store_queries_limit` is a constant metric which shows how many concurrent queries can come into Thanos Store; +* `thanos_bucket_store_queries_concurrent_max` is a constant metric which shows how many queries can concurrently be executed by Thanos Store; * `thanos_bucket_store_queries_in_flight` shows how many queries are currently "in flight" i.e. they are being executed; * `thanos_bucket_store_gate_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not. From f4734e568ffbf290a2f50684e6f0e08fe614f3d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 18 Mar 2019 18:07:59 +0200 Subject: [PATCH 43/52] *: fix according to review --- CHANGELOG.md | 7 ++----- cmd/thanos/store.go | 6 +++--- pkg/store/bucket.go | 2 +- pkg/store/gate.go | 4 ++-- 4 files changed, 8 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 259ed42c22..246ba8be8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,8 +17,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel New options: -* `--grpc-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 5e7. Consider increasing this limit if you run a huge deployment. Helps a lot to capacity plan your Thanos Store instance if you are running on a virtual machine, for example; -* `--grpc-concurrent-limit` limits the number of concurrent Series() calls in Thanos Store. By default it is 20. Consider increasing this limit if needed. +* `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources. +* `--store.grpc.max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 0. Consider enabling it by setting it to more than 0 if needed. New metrics: * `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit; @@ -29,9 +29,6 @@ New metrics: New tracing span: * `store_query_gate_ismyturn` shows how long it took for a query to pass (or not) through the gate. -:warning: **WARNING** :warning: #798 adds new default limits for max samples per one Series() gRPC method call and the maximum number of concurrent Series() gRPC method calls. Consider increasing them if you have a very huge deployment. - - ### Fixed - [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index f5b5d2f02f..41d1d4176a 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -36,11 +36,11 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks."). Default("2GB").Bytes() - maxSampleCount := cmd.Flag("grpc-sample-limit", + maxSampleCount := cmd.Flag("store.grpc.series-sample-limit", "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit."). - Default("50000000").Uint() + Default("0").Uint() - maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int() + maxConcurrent := cmd.Flag("store.grpc.max-concurrency", "Maximum number of concurrent Series calls. 0 means no limit.").Default("0").Int() objStoreConfig := regCommonObjStoreFlags(cmd, "", true) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 4d1262e7b4..b1af047e91 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -198,7 +198,7 @@ type BucketStore struct { // Query gate which limits the maximum amount of concurrent queries. queryGate *Gate - // Samples limiter which limits the number of samples per each Series() call. + // samplesLimiter limits the number of samples per each Series() call. samplesLimiter *Limiter partitioner partitioner } diff --git a/pkg/store/gate.go b/pkg/store/gate.go index e5b40db3ea..231e6b8aa9 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -22,12 +22,12 @@ func NewGate(maxConcurrent int, reg *extprom.SubsystemRegisterer) *Gate { g: gate.New(maxConcurrent), } g.inflightQueries = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "queries_in_flight", + Name: "queries_in_flight_total", Help: "Total number of queries that are currently in flight.", Subsystem: reg.Subsystem(), }) g.gateTiming = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "gate_seconds", + Name: "gate_duration_seconds", Help: "How many seconds it took for queries to wait at the gate.", Buckets: []float64{ 0.01, 0.05, 0.1, 0.25, 0.6, 1, 2, 3.5, 5, 10, From d6c15341ae5daad0b7190f135aefde131a4e9fb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 18 Mar 2019 18:09:16 +0200 Subject: [PATCH 44/52] *: fix according to review --- CHANGELOG.md | 2 +- cmd/thanos/store.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 246ba8be8e..9b0c702ac5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel New options: * `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources. -* `--store.grpc.max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 0. Consider enabling it by setting it to more than 0 if needed. +* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 0. Consider enabling it by setting it to more than 0 if needed. New metrics: * `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit; diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 41d1d4176a..cb81279a69 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -40,7 +40,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit."). Default("0").Uint() - maxConcurrent := cmd.Flag("store.grpc.max-concurrency", "Maximum number of concurrent Series calls. 0 means no limit.").Default("0").Int() + maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls. 0 means no limit.").Default("0").Int() objStoreConfig := regCommonObjStoreFlags(cmd, "", true) From 1147acdc48d2670c152ba2d66fc0744df76d013e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 18 Mar 2019 18:11:50 +0200 Subject: [PATCH 45/52] *: make docs --- docs/components/store.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/components/store.md b/docs/components/store.md index 9fada8b96a..33f8c58e8c 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -104,14 +104,14 @@ Flags: --index-cache-size=250MB Maximum size of items held in the index cache. --chunk-pool-size=2GB Maximum size of concurrently allocatable bytes for chunks. - --grpc-sample-limit=50000000 + --store.grpc.series-sample-limit=0 Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit. - --grpc-concurrent-limit=20 + --store.grpc.series-max-concurrency=0 Maximum number of concurrent Series calls. 0 means no limit. --objstore.config-file= From 1d0fad3d853e345848df179b7728bea08fc1d1a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 18 Mar 2019 18:14:14 +0200 Subject: [PATCH 46/52] CHANGELOG: clean up --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b0c702ac5..17740fe829 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,11 +18,11 @@ We use *breaking* word for marking changes that are not backward compatible (rel New options: * `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources. -* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 0. Consider enabling it by setting it to more than 0 if needed. +* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 0. Consider enabling it by setting it to more than 0 if you want to limit the maximum of concurrent Series() calls. New metrics: * `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit; -* `thanos_bucket_store_queries_concurrent_max` is a constant metric which shows how many queries can concurrently be executed by Thanos Store; +* `thanos_bucket_store_queries_concurrent_max` is a constant metric which shows how many Series() calls can concurrently be executed by Thanos Store; * `thanos_bucket_store_queries_in_flight` shows how many queries are currently "in flight" i.e. they are being executed; * `thanos_bucket_store_gate_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not. From ef4a51ebcfa19f7527d124c1e2749dac5bd93dfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 18 Mar 2019 19:06:30 +0200 Subject: [PATCH 47/52] CHANGELOG: update --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17740fe829..f6e22b288a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,8 +23,8 @@ New options: New metrics: * `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit; * `thanos_bucket_store_queries_concurrent_max` is a constant metric which shows how many Series() calls can concurrently be executed by Thanos Store; -* `thanos_bucket_store_queries_in_flight` shows how many queries are currently "in flight" i.e. they are being executed; -* `thanos_bucket_store_gate_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not. +* `thanos_bucket_store_queries_in_flight_total` shows how many queries are currently "in flight" i.e. they are being executed; +* `thanos_bucket_store_gate_duration_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not. New tracing span: * `store_query_gate_ismyturn` shows how long it took for a query to pass (or not) through the gate. From c9a7d83a469ec08bda56afd3c98894e4f024998e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 21 Mar 2019 09:01:47 +0200 Subject: [PATCH 48/52] *: queries_in_flight_total -> queries_in_flight --- CHANGELOG.md | 2 +- pkg/store/gate.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6e22b288a..800b72b78f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ New options: New metrics: * `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit; * `thanos_bucket_store_queries_concurrent_max` is a constant metric which shows how many Series() calls can concurrently be executed by Thanos Store; -* `thanos_bucket_store_queries_in_flight_total` shows how many queries are currently "in flight" i.e. they are being executed; +* `thanos_bucket_store_queries_in_flight` shows how many queries are currently "in flight" i.e. they are being executed; * `thanos_bucket_store_gate_duration_seconds` shows how many seconds it took for queries to pass through the gate in both cases - when that fails and when it does not. New tracing span: diff --git a/pkg/store/gate.go b/pkg/store/gate.go index 231e6b8aa9..dbbcb7d72a 100644 --- a/pkg/store/gate.go +++ b/pkg/store/gate.go @@ -22,8 +22,8 @@ func NewGate(maxConcurrent int, reg *extprom.SubsystemRegisterer) *Gate { g: gate.New(maxConcurrent), } g.inflightQueries = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "queries_in_flight_total", - Help: "Total number of queries that are currently in flight.", + Name: "queries_in_flight", + Help: "Number of queries that are currently in flight.", Subsystem: reg.Subsystem(), }) g.gateTiming = prometheus.NewHistogram(prometheus.HistogramOpts{ From 280a8ca7a8050a5ad214daa14c1e88b8ee313f37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 22 Mar 2019 17:21:50 +0200 Subject: [PATCH 49/52] store/bucket: do not wraper samplesLimiter error The original error already informs us about what is going wrong. --- pkg/store/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index db0647fd90..2c1f448518 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1649,7 +1649,7 @@ func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error { } } if err := samplesLimiter.Check(numChunks * maxSamplesPerChunk); err != nil { - return errors.Wrapf(err, "exceeded samples limit") + return err } for seq, offsets := range r.preloads { From 11c4b18db79d42afe942ea7903ae5b9bfbf7aaa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 22 Mar 2019 17:28:51 +0200 Subject: [PATCH 50/52] store/bucket: err -> errors.Wrap It's still useful to know that we are talking about samples here exactly. --- pkg/store/bucket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 2c1f448518..d03167499e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1649,7 +1649,7 @@ func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error { } } if err := samplesLimiter.Check(numChunks * maxSamplesPerChunk); err != nil { - return err + return errors.Wrap(err, "exceeded samples limit") } for seq, offsets := range r.preloads { From 31a8346744672e9774b6652e5f8e85bb10e2bafb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Sat, 23 Mar 2019 11:31:56 +0200 Subject: [PATCH 51/52] store: make store.grpc.series-max-concurrency 20 by default Setting it to 0 by default doesn't make sense since the Go channel becomes unbuffered and all queries will timeout. Set it to 20 by default since that's the limit on Thanos Query and naturally there won't be more than 20 by default so it's good. --- CHANGELOG.md | 4 ++-- cmd/thanos/store.go | 2 +- docs/components/store.md | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25dfe43a2d..dfd9942460 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel New options: * `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources. -* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 0. Consider enabling it by setting it to more than 0 if you want to limit the maximum of concurrent Series() calls. +* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 20. Considering making it lower or bigger depending on the scale of your deployment. New metrics: * `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit; @@ -38,7 +38,7 @@ New tracing span: - [#851](https://github.com/improbable-eng/thanos/pull/851) New read API endpoint for api/v1/rules and api/v1/alerts. - [#873](https://github.com/improbable-eng/thanos/pull/873) Store: fix set index cache LRU -:warning: **WARING** :warning: #873 fix fixes actual handling of `index-cache-size`. Handling of limit for this cache was +:warning: **WARNING** :warning: #873 fix fixes actual handling of `index-cache-size`. Handling of limit for this cache was broken so it was unbounded all the time. From this release actual value matters and is extremely low by default. To "revert" the old behaviour (no boundary), use a large enough value. diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index cb81279a69..7f77e15412 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -40,7 +40,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit."). Default("0").Uint() - maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls. 0 means no limit.").Default("0").Int() + maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int() objStoreConfig := regCommonObjStoreFlags(cmd, "", true) diff --git a/docs/components/store.md b/docs/components/store.md index afbf93f5c6..7d26d79627 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -111,9 +111,8 @@ Flags: in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit. - --store.grpc.series-max-concurrency=0 - Maximum number of concurrent Series calls. 0 - means no limit. + --store.grpc.series-max-concurrency=20 + Maximum number of concurrent Series calls. --objstore.config-file= Path to YAML file that contains object store configuration. From 6e98dfd9c4327ee0c326fb9db09908fbd57404f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Sat, 23 Mar 2019 11:35:56 +0200 Subject: [PATCH 52/52] CHANGELOG: add warning about new limit --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfd9942460..0f573b42ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ New metrics: New tracing span: * `store_query_gate_ismyturn` shows how long it took for a query to pass (or not) through the gate. +:warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query. + ### Fixed - [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far