diff --git a/.gitignore b/.gitignore index e5068f1580..b11d066506 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,9 @@ kube/.minikube data/ test/e2e/e2e_* +# Ignore benchmarks dir. +benchmarks/ + # Ignore promu artifacts. /.build /.release diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 644091650e..8aa6d28bf6 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -29,6 +29,7 @@ import ( v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/thanos-community/promql-engine/engine" + apiv1 "github.com/thanos-io/thanos/pkg/api/query" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" @@ -97,6 +98,7 @@ func registerQuery(app *extkingpin.App) { queryTimeout := extkingpin.ModelDuration(cmd.Flag("query.timeout", "Maximum time to process query by query node."). Default("2m")) + promqlEngine := cmd.Flag("query.promql-engine", "PromQL engine to use.").Default(string(promqlEnginePrometheus)). Enum(string(promqlEnginePrometheus), string(promqlEngineThanos)) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 1f5d638aea..30df09ba5e 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -6,6 +6,7 @@ package main import ( "context" "fmt" + "strconv" "time" "github.com/alecthomas/units" @@ -56,6 +57,7 @@ type storeConfig struct { httpConfig httpConfig indexCacheSizeBytes units.Base2Bytes chunkPoolSize units.Base2Bytes + seriesBatchSize int maxSampleCount uint64 maxTouchedSeriesCount uint64 maxDownloadedBytes units.Base2Bytes @@ -129,6 +131,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&sc.blockMetaFetchConcurrency) + cmd.Flag("debug.series-batch-size", "The batch size when fetching series from TSDB blocks. Setting the number too high can lead to slower retrieval, while setting it too low can lead to throttling caused by too many calls made to object storage."). + Hidden().Default(strconv.Itoa(store.SeriesBatchSize)).IntVar(&sc.seriesBatchSize) + sc.filterConf = &store.FilterConfig{} cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). @@ -340,6 +345,7 @@ func runStore( store.WithChunkPool(chunkPool), store.WithFilterConfig(conf.filterConf), store.WithChunkHashCalculation(true), + store.WithSeriesBatchSize(conf.seriesBatchSize), } if conf.debugLogging { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 973642d06d..c8aecdc352 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -37,6 +37,8 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" "golang.org/x/sync/errgroup" + + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -101,6 +103,9 @@ const ( minBlockSyncConcurrency = 1 enableChunkHashCalculation = true + + // SeriesBatchSize is the default batch size when fetching series from object storage. + SeriesBatchSize = 10000 ) var ( @@ -137,6 +142,7 @@ type bucketStoreMetrics struct { seriesFetchDuration prometheus.Histogram postingsFetchDuration prometheus.Histogram + chunkFetchDuration prometheus.Histogram } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -275,6 +281,12 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, }) + m.chunkFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_bucket_store_chunks_fetch_duration_seconds", + Help: "The total time spent fetching chunks within a single request a store gateway.", + Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, + }) + m.emptyPostingCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_empty_postings_total", Help: "Total number of empty postings when fetching block series.", @@ -305,6 +317,7 @@ type BucketStore struct { indexReaderPool *indexheader.ReaderPool buffers sync.Pool chunkPool pool.Bytes + seriesBatchSize int // Sets of blocks that have the same labels. They are indexed by a hash over their label set. mtx sync.RWMutex @@ -419,6 +432,12 @@ func WithChunkHashCalculation(enableChunkHashCalculation bool) BucketStoreOption } } +func WithSeriesBatchSize(seriesBatchSize int) BucketStoreOption { + return func(s *BucketStore) { + s.seriesBatchSize = seriesBatchSize + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -460,6 +479,7 @@ func NewBucketStore( postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesResponseHints: enableSeriesResponseHints, enableChunkHashCalculation: enableChunkHashCalculation, + seriesBatchSize: SeriesBatchSize, } for _, option := range options { @@ -786,140 +806,218 @@ type seriesEntry struct { chks []storepb.AggrChunk } -type bucketSeriesSet struct { - set []seriesEntry - i int - err error -} +// blockSeriesClient is a storepb.Store_SeriesClient for a +// single TSDB block in object storage. +type blockSeriesClient struct { + grpc.ClientStream + ctx context.Context + logger log.Logger + extLset labels.Labels + + mint int64 + maxt int64 + indexr *bucketIndexReader + chunkr *bucketChunkReader + loadAggregates []storepb.Aggr + chunksLimiter ChunksLimiter + bytesLimiter BytesLimiter + + skipChunks bool + shardMatcher *storepb.ShardMatcher + calculateChunkHash bool + chunkFetchDuration prometheus.Histogram + + // Internal state. + i uint64 + postings []storage.SeriesRef + chkMetas []chunks.Meta + lset labels.Labels + symbolizedLset []symbolizedLabel + entries []seriesEntry + hasMorePostings bool + batchSize int +} + +func newBlockSeriesClient( + ctx context.Context, + logger log.Logger, + b *bucketBlock, + req *storepb.SeriesRequest, + limiter ChunksLimiter, + bytesLimiter BytesLimiter, + shardMatcher *storepb.ShardMatcher, + calculateChunkHash bool, + batchSize int, + chunkFetchDuration prometheus.Histogram, +) *blockSeriesClient { + var chunkr *bucketChunkReader + if !req.SkipChunks { + chunkr = b.chunkReader() + } + + return &blockSeriesClient{ + ctx: ctx, + logger: logger, + extLset: b.extLset, + mint: req.MinTime, + maxt: req.MaxTime, + indexr: b.indexReader(), + chunkr: chunkr, + chunksLimiter: limiter, + bytesLimiter: bytesLimiter, + skipChunks: req.SkipChunks, + chunkFetchDuration: chunkFetchDuration, -func newBucketSeriesSet(set []seriesEntry) *bucketSeriesSet { - return &bucketSeriesSet{ - set: set, - i: -1, + loadAggregates: req.Aggregates, + shardMatcher: shardMatcher, + calculateChunkHash: calculateChunkHash, + hasMorePostings: true, + batchSize: batchSize, } } -func (s *bucketSeriesSet) Next() bool { - if s.i >= len(s.set)-1 { - return false +func (b *blockSeriesClient) Close() { + if !b.skipChunks { + runutil.CloseWithLogOnErr(b.logger, b.chunkr, "series block") } - s.i++ - return true -} -func (s *bucketSeriesSet) At() (labels.Labels, []storepb.AggrChunk) { - return s.set[s.i].lset, s.set[s.i].chks + runutil.CloseWithLogOnErr(b.logger, b.indexr, "series block") } -func (s *bucketSeriesSet) Err() error { - return s.err +func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats { + stats = stats.merge(b.indexr.stats) + if !b.skipChunks { + stats = stats.merge(b.chunkr.stats) + } + return stats } -// blockSeries returns series matching given matchers, that have some data in given time range. -func blockSeries( - ctx context.Context, - extLset labels.Labels, - indexr *bucketIndexReader, - chunkr *bucketChunkReader, +func (b *blockSeriesClient) ExpandPostings( matchers []*labels.Matcher, - chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, - bytesLimiter BytesLimiter, // Rate limiter for used bytes. - skipChunks bool, - minTime, maxTime int64, - loadAggregates []storepb.Aggr, - shardMatcher *storepb.ShardMatcher, - emptyPostingsCount prometheus.Counter, - calculateChunkHash bool, -) (storepb.SeriesSet, *queryStats, error) { - ps, err := indexr.ExpandedPostings(ctx, matchers, bytesLimiter) +) error { + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter) if err != nil { - return nil, nil, errors.Wrap(err, "expanded matching posting") + return errors.Wrap(err, "expanded matching posting") } if len(ps) == 0 { - emptyPostingsCount.Inc() - return storepb.EmptySeriesSet(), indexr.stats, nil + return nil } - // Reserve series seriesLimiter if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { - return nil, nil, errors.Wrap(err, "exceeded series limit") + return errors.Wrap(err, "exceeded series limit") } - // Preload all series index data. - // TODO(bwplotka): Consider not keeping all series in memory all the time. - // TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method. - if err := indexr.PreloadSeries(ctx, ps, bytesLimiter); err != nil { - return nil, nil, errors.Wrap(err, "preload series") + b.postings = ps + if b.batchSize > len(ps) { + b.batchSize = len(ps) } + b.entries = make([]seriesEntry, 0, b.batchSize) + return nil +} - // Transform all series into the response types and mark their relevant chunks - // for preloading. - var ( - res []seriesEntry - symbolizedLset []symbolizedLabel - lset labels.Labels - chks []chunks.Meta - ) +func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { + for len(b.entries) == 0 && b.hasMorePostings { + if err := b.nextBatch(); err != nil { + return nil, err + } + } + + if len(b.entries) == 0 { + if b.chunkr != nil { + b.chunkFetchDuration.Observe(float64(b.chunkr.stats.ChunksFetchDurationSum)) + } + return nil, io.EOF + } + + next := b.entries[0] + b.entries = b.entries[1:] + + return storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(next.lset), + Chunks: next.chks, + }), nil +} + +func (b *blockSeriesClient) nextBatch() error { + start := b.i + end := start + SeriesBatchSize + if end > uint64(len(b.postings)) { + end = uint64(len(b.postings)) + } + b.i = end - for _, id := range ps { - ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, skipChunks, minTime, maxTime) + postingsBatch := b.postings[start:end] + if len(postingsBatch) == 0 { + b.hasMorePostings = false + return nil + } + + b.indexr.reset() + if !b.skipChunks { + b.chunkr.reset() + } + + if err := b.indexr.PreloadSeries(b.ctx, postingsBatch, b.bytesLimiter); err != nil { + return errors.Wrap(err, "preload series") + } + + b.entries = b.entries[:0] + for i := 0; i < len(postingsBatch); i++ { + ok, err := b.indexr.LoadSeriesForTime(postingsBatch[i], &b.symbolizedLset, &b.chkMetas, b.skipChunks, b.mint, b.maxt) if err != nil { - return nil, nil, errors.Wrap(err, "read series") + return errors.Wrap(err, "read series") } if !ok { - // No matching chunks for this time duration, skip series. continue } - if err := indexr.LookupLabelsSymbols(symbolizedLset, &lset); err != nil { - return nil, nil, errors.Wrap(err, "Lookup labels symbols") + if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &b.lset); err != nil { + return errors.Wrap(err, "Lookup labels symbols") } - completeLabelset := labelpb.ExtendSortedLabels(lset, extLset) - if !shardMatcher.MatchesLabels(completeLabelset) { + completeLabelset := labelpb.ExtendSortedLabels(b.lset, b.extLset) + if !b.shardMatcher.MatchesLabels(completeLabelset) { continue } - s := seriesEntry{} - s.lset = completeLabelset + s := seriesEntry{lset: completeLabelset} + if b.skipChunks { + b.entries = append(b.entries, s) + continue + } - if !skipChunks { - // Schedule loading chunks. - s.refs = make([]chunks.ChunkRef, 0, len(chks)) - s.chks = make([]storepb.AggrChunk, 0, len(chks)) - for j, meta := range chks { - // seriesEntry s is appended to res, but not at every outer loop iteration, - // therefore len(res) is the index we need here, not outer loop iteration number. - if err := chunkr.addLoad(meta.Ref, len(res), j); err != nil { - return nil, nil, errors.Wrap(err, "add chunk load") - } - s.chks = append(s.chks, storepb.AggrChunk{ - MinTime: meta.MinTime, - MaxTime: meta.MaxTime, - }) - s.refs = append(s.refs, meta.Ref) - } + // Schedule loading chunks. + s.refs = make([]chunks.ChunkRef, 0, len(b.chkMetas)) + s.chks = make([]storepb.AggrChunk, 0, len(b.chkMetas)) - // Ensure sample limit through chunksLimiter if we return chunks. - if err := chunksLimiter.Reserve(uint64(len(s.chks))); err != nil { - return nil, nil, errors.Wrap(err, "exceeded chunks limit") + for j, meta := range b.chkMetas { + if err := b.chunkr.addLoad(meta.Ref, len(b.entries), j); err != nil { + return errors.Wrap(err, "add chunk load") } + s.chks = append(s.chks, storepb.AggrChunk{ + MinTime: meta.MinTime, + MaxTime: meta.MaxTime, + }) + s.refs = append(s.refs, meta.Ref) } - res = append(res, s) - } + // Ensure sample limit through chunksLimiter if we return chunks. + if err := b.chunksLimiter.Reserve(uint64(len(b.chkMetas))); err != nil { + return errors.Wrap(err, "exceeded chunks limit") + } - if skipChunks { - return newBucketSeriesSet(res), indexr.stats, nil + b.entries = append(b.entries, s) } - if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash, bytesLimiter); err != nil { - return nil, nil, errors.Wrap(err, "load chunks") + if !b.skipChunks { + if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil { + return errors.Wrap(err, "load chunks") + } } - return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil + return nil } func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error), calculateChecksum bool) error { @@ -1066,7 +1164,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) ctx = srv.Context() stats = &queryStats{} - res []storepb.SeriesSet + respSets []respSet mtx sync.Mutex g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} @@ -1101,64 +1199,63 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } for _, b := range blocks { - b := b + blk := b gctx := gctx if s.enableSeriesResponseHints { // Keep track of queried blocks. - resHints.AddQueriedBlock(b.meta.ULID) + resHints.AddQueriedBlock(blk.meta.ULID) } - var chunkr *bucketChunkReader - // We must keep the readers open until all their data has been sent. - indexr := b.indexReader() - if !req.SkipChunks { - chunkr = b.chunkReader() - defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block") - } - - // Defer all closes to the end of Series method. - defer runutil.CloseWithLogOnErr(s.logger, indexr, "series block") + shardMatcher := req.ShardInfo.Matcher(&s.buffers) + blockClient := newBlockSeriesClient( + srv.Context(), + s.logger, + blk, + req, + chunksLimiter, + bytesLimiter, + shardMatcher, + s.enableChunkHashCalculation, + s.seriesBatchSize, + s.metrics.chunkFetchDuration, + ) + defer blockClient.Close() g.Go(func() error { - span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ - "block.id": b.meta.ULID, - "block.mint": b.meta.MinTime, - "block.maxt": b.meta.MaxTime, - "block.resolution": b.meta.Thanos.Downsample.Resolution, + span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ + "block.id": blk.meta.ULID, + "block.mint": blk.meta.MinTime, + "block.maxt": blk.meta.MaxTime, + "block.resolution": blk.meta.Thanos.Downsample.Resolution, }) - defer span.Finish() - - shardMatcher := req.ShardInfo.Matcher(&s.buffers) - defer shardMatcher.Close() - part, pstats, err := blockSeries( - newCtx, - b.extLset, - indexr, - chunkr, - blockMatchers, - chunksLimiter, - seriesLimiter, - bytesLimiter, - req.SkipChunks, - req.MinTime, req.MaxTime, - req.Aggregates, + + if err := blockClient.ExpandPostings(blockMatchers, seriesLimiter); err != nil { + span.Finish() + return errors.Wrapf(err, "fetch series for block %s", blk.meta.ULID) + } + onClose := func() { + mtx.Lock() + stats = blockClient.MergeStats(stats) + mtx.Unlock() + } + part := newLazyRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, shardMatcher, + false, s.metrics.emptyPostingCount, - s.enableChunkHashCalculation, ) - if err != nil { - return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) - } mtx.Lock() - res = append(res, part) - stats = stats.merge(pstats) + respSets = append(respSets, part) mtx.Unlock() - // No info about samples exactly, so pass at least chunks. - span.SetTag("processed.series", len(indexr.loadedSeries)) - span.SetTag("processed.chunks", pstats.chunksFetched) return nil }) } @@ -1208,47 +1305,53 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } return status.Error(code, err.Error()) } - stats.blocksQueried = len(res) + stats.blocksQueried = len(respSets) stats.GetAllDuration = time.Since(begin) s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds()) s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried)) } + // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { + defer func() { + for _, resp := range respSets { + resp.Close() + } + }() begin := time.Now() - - // NOTE: We "carefully" assume series and chunks are sorted within each SeriesSet. This should be guaranteed by - // blockSeries method. In worst case deduplication logic won't deduplicate correctly, which will be accounted later. - set := storepb.MergeSeriesSets(res...) + set := NewDedupResponseHeap(NewProxyResponseHeap(respSets...)) for set.Next() { - var series storepb.Series - - stats.mergedSeriesCount++ - - var lset labels.Labels - if req.SkipChunks { - lset, _ = set.At() - } else { - lset, series.Chunks = set.At() + at := set.At() + warn := at.GetWarning() + if warn != "" { + // TODO(fpetkovski): Consider deprecating string based warnings in favor of a + // separate protobuf message containing the grpc code and + // a human readable error message. + err = status.Error(storepb.GRPCCodeFromWarn(warn), at.GetWarning()) + return + } - stats.mergedChunksCount += len(series.Chunks) - s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) + series := at.GetSeries() + if series != nil { + stats.mergedSeriesCount++ + if !req.SkipChunks { + stats.mergedChunksCount += len(series.Chunks) + s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) + } } - series.Labels = labelpb.ZLabelsFromPromLabels(lset) - if err = srv.Send(storepb.NewSeriesResponse(&series)); err != nil { + if err = srv.Send(at); err != nil { err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) return } } - if set.Err() != nil { - err = status.Error(codes.Unknown, errors.Wrap(set.Err(), "expand series set").Error()) - return - } stats.MergeDuration = time.Since(begin) s.metrics.seriesMergeDuration.Observe(stats.MergeDuration.Seconds()) err = nil }) + if err != nil { + return err + } if s.enableSeriesResponseHints { var anyHints *types.Any @@ -1346,7 +1449,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq } // Add a set for the external labels as well. - // We're not adding them directly to res because there could be duplicates. + // We're not adding them directly to refs because there could be duplicates. // b.extLset is already sorted by label name, no need to sort it again. extRes := make([]string, 0, len(b.extLset)) for _, l := range b.extLset { @@ -1355,40 +1458,43 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq result = strutil.MergeSlices(res, extRes) } else { - seriesSet, _, err := blockSeries( - newCtx, - b.extLset, - indexr, - nil, + seriesReq := &storepb.SeriesRequest{ + MinTime: req.Start, + MaxTime: req.End, + SkipChunks: true, + } + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration) + + if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, - nil, seriesLimiter, - bytesLimiter, - true, - req.Start, - req.End, - nil, - nil, - s.metrics.emptyPostingCount, - false, - ) - if err != nil { - return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) + ); err != nil { + return err } // Extract label names from all series. Many label names will be the same, so we need to deduplicate them. // Note that label names will already include external labels (passed to blockSeries), so we don't need // to add them again. labelNames := map[string]struct{}{} - for seriesSet.Next() { - ls, _ := seriesSet.At() - for _, l := range ls { + for { + ls, err := blockClient.Recv() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrapf(err, "iterate series for block %s", b.meta.ULID) + } + + if ls.GetWarning() != "" { + return errors.Wrapf(errors.New(ls.GetWarning()), "iterate series for block %s", b.meta.ULID) + } + if ls.GetSeries() == nil { + continue + } + for _, l := range ls.GetSeries().Labels { labelNames[l.Name] = struct{}{} } } - if seriesSet.Err() != nil { - return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID) - } result = make([]string, 0, len(labelNames)) for n := range labelNames { @@ -1526,40 +1632,44 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR } result = res } else { - seriesSet, _, err := blockSeries( - newCtx, - b.extLset, - indexr, - nil, + seriesReq := &storepb.SeriesRequest{ + MinTime: req.Start, + MaxTime: req.End, + SkipChunks: true, + } + blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration) + + if err := blockClient.ExpandPostings( reqSeriesMatchersNoExtLabels, - nil, seriesLimiter, - bytesLimiter, - true, - req.Start, - req.End, - nil, - nil, - s.metrics.emptyPostingCount, - false, - ) - if err != nil { - return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) + ); err != nil { + return err } // Extract given label's value from all series and deduplicate them. // We don't need to deal with external labels, since they are already added by blockSeries. values := map[string]struct{}{} - for seriesSet.Next() { - ls, _ := seriesSet.At() - val := ls.Get(req.Label) + for { + ls, err := blockClient.Recv() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrapf(err, "iterate series for block %s", b.meta.ULID) + } + + if ls.GetWarning() != "" { + return errors.Wrapf(errors.New(ls.GetWarning()), "iterate series for block %s", b.meta.ULID) + } + if ls.GetSeries() == nil { + continue + } + + val := labelpb.ZLabelsToPromLabels(ls.GetSeries().Labels).Get(req.Label) if val != "" { // Should never be empty since we added labelName!="" matcher to the list of matchers. values[val] = struct{}{} } } - if seriesSet.Err() != nil { - return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID) - } result = make([]string, 0, len(values)) for n := range values { @@ -1919,6 +2029,9 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { } return r } +func (r *bucketIndexReader) reset() { + r.loadedSeries = map[storage.SeriesRef][]byte{} +} // ExpandedPostings returns postings in expanded list instead of index.Postings. // This is because we need to have them buffered anyway to perform efficient lookup @@ -2614,6 +2727,12 @@ func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { } } +func (r *bucketChunkReader) reset() { + for i := range r.toLoad { + r.toLoad[i] = r.toLoad[i][:0] + } +} + func (r *bucketChunkReader) Close() error { r.block.pendingReaders.Done() @@ -2624,7 +2743,7 @@ func (r *bucketChunkReader) Close() error { } // addLoad adds the chunk with id to the data set to be fetched. -// Chunk will be fetched and saved to res[seriesEntry][chunk] upon r.load(res, <...>) call. +// Chunk will be fetched and saved to refs[seriesEntry][chunk] upon r.load(refs, <...>) call. func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) error { var ( seq = int(id >> 32) @@ -2637,7 +2756,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) return nil } -// load loads all added chunks and saves resulting aggrs to res. +// load loads all added chunks and saves resulting aggrs to refs. func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { g, ctx := errgroup.WithContext(ctx) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a186b32376..f643222c57 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -23,7 +23,6 @@ import ( "time" "github.com/cespare/xxhash" - "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" @@ -31,8 +30,8 @@ import ( "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" @@ -2409,10 +2408,6 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet // No limits. chunksLimiter := NewChunksLimiterFactory(0)(nil) seriesLimiter := NewSeriesLimiterFactory(0)(nil) - dummyCounter := promauto.NewCounter(prometheus.CounterOpts{ - Name: "dummy", - Help: "dummy help", - }) ctx := context.Background() // Run multiple workers to execute the queries. @@ -2444,17 +2439,14 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet // must be called only from the goroutine running the Benchmark function. testutil.Ok(b, err) - indexReader := blk.indexReader() - chunkReader := blk.chunkReader() - - seriesSet, _, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, NewBytesLimiterFactory(0)(nil), req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, dummyCounter, false) - testutil.Ok(b, err) + dummyHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{}) + blockClient := newBlockSeriesClient(ctx, nil, blk, req, chunksLimiter, NewBytesLimiterFactory(0)(nil), nil, false, SeriesBatchSize, dummyHistogram) + testutil.Ok(b, blockClient.ExpandPostings(matchers, seriesLimiter)) + defer blockClient.Close() // Ensure at least 1 series has been returned (as expected). - testutil.Equals(b, true, seriesSet.Next()) - - testutil.Ok(b, indexReader.Close()) - testutil.Ok(b, chunkReader.Close()) + _, err = blockClient.Recv() + testutil.Ok(b, err) } }() } diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index d354d4db06..5cdb5a0b78 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -260,11 +260,11 @@ func (h *ProxyResponseHeap) At() *storepb.SeriesResponse { } func (l *lazyRespSet) StoreID() string { - return l.st.String() + return l.storeName } func (l *lazyRespSet) Labelset() string { - return labelpb.PromLabelSetsToString(l.st.LabelSets()) + return labelpb.PromLabelSetsToString(l.storeLabelSets) } // lazyRespSet is a lazy storepb.SeriesSet that buffers @@ -273,12 +273,13 @@ func (l *lazyRespSet) Labelset() string { // in Next(). type lazyRespSet struct { // Generic parameters. - span opentracing.Span - cl storepb.Store_SeriesClient - closeSeries context.CancelFunc - st Client - frameTimeout time.Duration - ctx context.Context + span opentracing.Span + cl storepb.Store_SeriesClient + closeSeries context.CancelFunc + storeName string + storeLabelSets []labels.Labels + frameTimeout time.Duration + ctx context.Context // Internal bookkeeping. dataOrFinishEvent *sync.Cond @@ -358,13 +359,13 @@ func newLazyRespSet( ctx context.Context, span opentracing.Span, frameTimeout time.Duration, - st Client, + storeName string, + storeLabelSets []labels.Labels, closeSeries context.CancelFunc, cl storepb.Store_SeriesClient, shardMatcher *storepb.ShardMatcher, applySharding bool, emptyStreamResponses prometheus.Counter, - ) respSet { bufferedResponses := []*storepb.SeriesResponse{} bufferedResponsesMtx := &sync.Mutex{} @@ -373,7 +374,8 @@ func newLazyRespSet( respSet := &lazyRespSet{ frameTimeout: frameTimeout, cl: cl, - st: st, + storeName: storeName, + storeLabelSets: storeLabelSets, closeSeries: closeSeries, span: span, ctx: ctx, @@ -383,7 +385,7 @@ func newLazyRespSet( shardMatcher: shardMatcher, } - go func(st Client, l *lazyRespSet) { + go func(st string, l *lazyRespSet) { bytesProcessed := 0 seriesStats := &storepb.SeriesStatsCounter{} @@ -409,7 +411,7 @@ func newLazyRespSet( select { case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String()) + err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st) l.span.SetTag("err", err.Error()) l.bufferedResponsesMtx.Lock() @@ -434,9 +436,9 @@ func newLazyRespSet( // Most likely the per-Recv timeout has been reached. // There's a small race between canceling and the Recv() // but this is most likely true. - rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st.String()) + rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st) } else { - rerr = errors.Wrapf(err, "receive series from %s", st.String()) + rerr = errors.Wrapf(err, "receive series from %s", st) } l.span.SetTag("err", rerr.Error()) @@ -478,7 +480,7 @@ func newLazyRespSet( return } } - }(st, respSet) + }(storeName, respSet) return respSet } @@ -552,7 +554,8 @@ func newAsyncRespSet(ctx context.Context, seriesCtx, span, frameTimeout, - st, + st.String(), + st.LabelSets(), closeSeries, cl, shardMatcher, diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index eaa96f1ede..c1f4b9b8bf 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -14,6 +14,7 @@ import ( "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "google.golang.org/grpc/codes" "github.com/thanos-io/thanos/pkg/store/labelpb" ) @@ -51,6 +52,16 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse { } } +func GRPCCodeFromWarn(warn string) codes.Code { + if strings.Contains(warn, "rpc error: code = ResourceExhausted") { + return codes.ResourceExhausted + } + if strings.Contains(warn, "rpc error: code = Code(422)") { + return 422 + } + return codes.Unknown +} + type emptySeriesSet struct{} func (emptySeriesSet) Next() bool { return false }