diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index a7a5d3abd96..e0fc7104821 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -37,6 +37,8 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" "golang.org/x/sync/errgroup" + + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -128,6 +130,7 @@ type bucketStoreMetrics struct { queriesDropped *prometheus.CounterVec seriesRefetches prometheus.Counter emptyPostingCount prometheus.Counter + emptyStreamResponses prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec cachedPostingsCompressionErrors *prometheus.CounterVec @@ -276,6 +279,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Help: "Total number of empty postings when fetching block series.", }) + m.emptyStreamResponses = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_empty_stream_responses_total", + Help: "Total number of empty responses received.", + }) + return &m } @@ -782,33 +790,133 @@ type seriesEntry struct { chks []storepb.AggrChunk } -type bucketSeriesSet struct { - set []seriesEntry - i int - err error +// blockSeriesClient is a storepb.Store_SeriesClient for a +// single TSDB block in object storage. +type blockSeriesClient struct { + grpc.ClientStream + ctx context.Context + extLset labels.Labels + + ps []storage.SeriesRef + i int + mint int64 + maxt int64 + indexr *bucketIndexReader + chunkr *bucketChunkReader + chkLimiter ChunksLimiter + bytesLimiter BytesLimiter + loadAggregates []storepb.Aggr + skipChunks bool + + shardMatcher *storepb.ShardMatcher + calculateChunkHash bool + + // Transform all series into the response types and mark their relevant chunks + // for preloading. + symbolizedLset []symbolizedLabel } -func newBucketSeriesSet(set []seriesEntry) *bucketSeriesSet { - return &bucketSeriesSet{ - set: set, - i: -1, +func emptyBlockSeriesClient() *blockSeriesClient { + return &blockSeriesClient{ + ps: nil, } } -func (s *bucketSeriesSet) Next() bool { - if s.i >= len(s.set)-1 { - return false +func newBlockSeriesClient( + ctx context.Context, + extLset labels.Labels, + ps []storage.SeriesRef, + minTime int64, + maxTime int64, + indexr *bucketIndexReader, + chunkr *bucketChunkReader, + limiter ChunksLimiter, + bytesLimiter BytesLimiter, + skipChunks bool, + loadAggregates []storepb.Aggr, + shardMatcher *storepb.ShardMatcher, + calculateChunkHash bool, +) *blockSeriesClient { + return &blockSeriesClient{ + ctx: ctx, + extLset: extLset, + ps: ps, + i: -1, + mint: minTime, + maxt: maxTime, + indexr: indexr, + chunkr: chunkr, + chkLimiter: limiter, + bytesLimiter: bytesLimiter, + skipChunks: skipChunks, + + loadAggregates: loadAggregates, + shardMatcher: shardMatcher, + calculateChunkHash: calculateChunkHash, + } +} + +func (b *blockSeriesClient) Recv() (*storepb.SeriesResponse, error) { + b.i++ + if b.i >= len(b.ps) { + return nil, io.EOF + } + + var chks []chunks.Meta + ok, err := b.indexr.LoadSeriesForTime(b.ps[b.i], &b.symbolizedLset, &chks, b.skipChunks, b.mint, b.maxt) + if err != nil { + return storepb.NewWarnSeriesResponse(errors.Wrap(err, "read series")), nil + } + if !ok { + return b.Recv() } - s.i++ - return true -} -func (s *bucketSeriesSet) At() (labels.Labels, []storepb.AggrChunk) { - return s.set[s.i].lset, s.set[s.i].chks -} + var lset labels.Labels + if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &lset); err != nil { + return storepb.NewWarnSeriesResponse(errors.Wrap(err, "Lookup labels symbols")), nil + } -func (s *bucketSeriesSet) Err() error { - return s.err + completeLabelset := labelpb.ExtendSortedLabels(lset, b.extLset) + if !b.shardMatcher.MatchesLabels(completeLabelset) { + return b.Recv() + } + + if b.skipChunks { + return storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(completeLabelset), + }), nil + } + + s := seriesEntry{lset: completeLabelset} + //entries := []seriesEntry{s} + + // Schedule loading chunks. + s.refs = make([]chunks.ChunkRef, 0, len(chks)) + s.chks = make([]storepb.AggrChunk, 0, len(chks)) + b.chunkr.reset() + for j, meta := range chks { + if err := b.chunkr.addLoad(meta.Ref, 0, j); err != nil { + return storepb.NewWarnSeriesResponse(errors.Wrap(err, "add chunk load")), nil + } + s.chks = append(s.chks, storepb.AggrChunk{ + MinTime: meta.MinTime, + MaxTime: meta.MaxTime, + }) + s.refs = append(s.refs, meta.Ref) + } + + // Ensure sample limit through chunksLimiter if we return chunks. + if err := b.chkLimiter.Reserve(uint64(len(chks))); err != nil { + return storepb.NewWarnSeriesResponse(errors.Wrap(err, "exceeded chunks limit")), nil + } + if err := b.chunkr.load(b.ctx, []seriesEntry{s}, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter); err != nil { + return storepb.NewWarnSeriesResponse(errors.Wrap(err, "load chunks")), nil + } + + return storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(s.lset), + Chunks: s.chks, + }), nil } // blockSeries returns series matching given matchers, that have some data in given time range. @@ -827,7 +935,7 @@ func blockSeries( shardMatcher *storepb.ShardMatcher, emptyPostingsCount prometheus.Counter, calculateChunkHash bool, -) (storepb.SeriesSet, *queryStats, error) { +) (*blockSeriesClient, *queryStats, error) { ps, err := indexr.ExpandedPostings(ctx, matchers, bytesLimiter) if err != nil { return nil, nil, errors.Wrap(err, "expanded matching posting") @@ -835,7 +943,7 @@ func blockSeries( if len(ps) == 0 { emptyPostingsCount.Inc() - return storepb.EmptySeriesSet(), indexr.stats, nil + return emptyBlockSeriesClient(), indexr.stats, nil } // Reserve series seriesLimiter @@ -850,72 +958,7 @@ func blockSeries( return nil, nil, errors.Wrap(err, "preload series") } - // Transform all series into the response types and mark their relevant chunks - // for preloading. - var ( - res []seriesEntry - symbolizedLset []symbolizedLabel - lset labels.Labels - chks []chunks.Meta - ) - - for _, id := range ps { - ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, skipChunks, minTime, maxTime) - if err != nil { - return nil, nil, errors.Wrap(err, "read series") - } - if !ok { - // No matching chunks for this time duration, skip series. - continue - } - - if err := indexr.LookupLabelsSymbols(symbolizedLset, &lset); err != nil { - return nil, nil, errors.Wrap(err, "Lookup labels symbols") - } - - completeLabelset := labelpb.ExtendSortedLabels(lset, extLset) - if !shardMatcher.MatchesLabels(completeLabelset) { - continue - } - - s := seriesEntry{} - s.lset = completeLabelset - - if !skipChunks { - // Schedule loading chunks. - s.refs = make([]chunks.ChunkRef, 0, len(chks)) - s.chks = make([]storepb.AggrChunk, 0, len(chks)) - for j, meta := range chks { - // seriesEntry s is appended to res, but not at every outer loop iteration, - // therefore len(res) is the index we need here, not outer loop iteration number. - if err := chunkr.addLoad(meta.Ref, len(res), j); err != nil { - return nil, nil, errors.Wrap(err, "add chunk load") - } - s.chks = append(s.chks, storepb.AggrChunk{ - MinTime: meta.MinTime, - MaxTime: meta.MaxTime, - }) - s.refs = append(s.refs, meta.Ref) - } - - // Ensure sample limit through chunksLimiter if we return chunks. - if err := chunksLimiter.Reserve(uint64(len(s.chks))); err != nil { - return nil, nil, errors.Wrap(err, "exceeded chunks limit") - } - } - - res = append(res, s) - } - - if skipChunks { - return newBucketSeriesSet(res), indexr.stats, nil - } - - if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash, bytesLimiter); err != nil { - return nil, nil, errors.Wrap(err, "load chunks") - } - - return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil + return newBlockSeriesClient(ctx, extLset, ps, minTime, maxTime, indexr, chunkr, chunksLimiter, bytesLimiter, skipChunks, loadAggregates, shardMatcher, calculateChunkHash), indexr.stats, nil } func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error), calculateChecksum bool) error { @@ -1062,7 +1105,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) ctx = srv.Context() stats = &queryStats{} - res []storepb.SeriesSet + res []respSet mtx sync.Mutex g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} @@ -1125,9 +1168,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie }) defer span.Finish() + ctx, cancel := context.WithCancel(ctx) + shardMatcher := req.ShardInfo.Matcher(&s.buffers) defer shardMatcher.Close() - part, pstats, err := blockSeries( + blockClient, pstats, err := blockSeries( newCtx, b.extLset, indexr, @@ -1144,8 +1189,21 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.enableChunkHashCalculation, ) if err != nil { + defer cancel() return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) } + part := newLazyRespSet( + ctx, + span, + 10*time.Minute, + "object-store-block", + []labels.Labels{b.extLset}, + cancel, + blockClient, + shardMatcher, + true, + b.metrics.emptyStreamResponses, + ) mtx.Lock() res = append(res, part) @@ -1155,6 +1213,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie // No info about samples exactly, so pass at least chunks. span.SetTag("processed.series", len(indexr.loadedSeries)) span.SetTag("processed.chunks", pstats.chunksFetched) + return nil }) } @@ -1209,37 +1268,29 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds()) s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried)) } + // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { begin := time.Now() - - // NOTE: We "carefully" assume series and chunks are sorted within each SeriesSet. This should be guaranteed by - // blockSeries method. In worst case deduplication logic won't deduplicate correctly, which will be accounted later. - set := storepb.MergeSeriesSets(res...) + set := NewDedupResponseHeap(NewProxyResponseHeap(res...)) for set.Next() { var series storepb.Series stats.mergedSeriesCount++ - var lset labels.Labels if req.SkipChunks { - lset, _ = set.At() - } else { - lset, series.Chunks = set.At() - stats.mergedChunksCount += len(series.Chunks) s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) } - series.Labels = labelpb.ZLabelsFromPromLabels(lset) - if err = srv.Send(storepb.NewSeriesResponse(&series)); err != nil { + at := set.At() + if at == nil { + continue + } + if err = srv.Send(at); err != nil { err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) return } } - if set.Err() != nil { - err = status.Error(codes.Unknown, errors.Wrap(set.Err(), "expand series set").Error()) - return - } stats.MergeDuration = time.Since(begin) s.metrics.seriesMergeDuration.Observe(stats.MergeDuration.Seconds()) @@ -1342,7 +1393,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq } // Add a set for the external labels as well. - // We're not adding them directly to res because there could be duplicates. + // We're not adding them directly to refs because there could be duplicates. // b.extLset is already sorted by label name, no need to sort it again. extRes := make([]string, 0, len(b.extLset)) for _, l := range b.extLset { @@ -1376,15 +1427,25 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq // Note that label names will already include external labels (passed to blockSeries), so we don't need // to add them again. labelNames := map[string]struct{}{} - for seriesSet.Next() { - ls, _ := seriesSet.At() - for _, l := range ls { + for { + ls, err := seriesSet.Recv() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrapf(err, "iterate series for block %s", b.meta.ULID) + } + + if ls.GetWarning() != "" { + return errors.Wrapf(errors.New(ls.GetWarning()), "iterate series for block %s", b.meta.ULID) + } + if ls.GetSeries() == nil { + continue + } + for _, l := range ls.GetSeries().Labels { labelNames[l.Name] = struct{}{} } } - if seriesSet.Err() != nil { - return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID) - } result = make([]string, 0, len(labelNames)) for n := range labelNames { @@ -1546,16 +1607,27 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR // Extract given label's value from all series and deduplicate them. // We don't need to deal with external labels, since they are already added by blockSeries. values := map[string]struct{}{} - for seriesSet.Next() { - ls, _ := seriesSet.At() - val := ls.Get(req.Label) + for { + ls, err := seriesSet.Recv() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrapf(err, "iterate series for block %s", b.meta.ULID) + } + + if ls.GetWarning() != "" { + return errors.Wrapf(errors.New(ls.GetWarning()), "iterate series for block %s", b.meta.ULID) + } + if ls.GetSeries() == nil { + continue + } + + val := labelpb.ZLabelsToPromLabels(ls.GetSeries().Labels).Get(req.Label) if val != "" { // Should never be empty since we added labelName!="" matcher to the list of matchers. values[val] = struct{}{} } } - if seriesSet.Err() != nil { - return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID) - } result = make([]string, 0, len(values)) for n := range values { @@ -2610,6 +2682,10 @@ func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { } } +func (r *bucketChunkReader) reset() { + r.toLoad = make([][]loadIdx, len(r.block.chunkObjs)) +} + func (r *bucketChunkReader) Close() error { r.block.pendingReaders.Done() @@ -2620,7 +2696,7 @@ func (r *bucketChunkReader) Close() error { } // addLoad adds the chunk with id to the data set to be fetched. -// Chunk will be fetched and saved to res[seriesEntry][chunk] upon r.load(res, <...>) call. +// Chunk will be fetched and saved to refs[seriesEntry][chunk] upon r.load(refs, <...>) call. func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) error { var ( seq = int(id >> 32) @@ -2633,7 +2709,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) return nil } -// load loads all added chunks and saves resulting aggrs to res. +// load loads all added chunks and saves resulting aggrs to refs. func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { g, ctx := errgroup.WithContext(ctx) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a186b323763..bff4c4b6bbd 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2451,7 +2451,8 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet testutil.Ok(b, err) // Ensure at least 1 series has been returned (as expected). - testutil.Equals(b, true, seriesSet.Next()) + _, err = seriesSet.Recv() + testutil.Ok(b, err) testutil.Ok(b, indexReader.Close()) testutil.Ok(b, chunkReader.Close()) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index d354d4db066..a63e54dd24b 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -260,11 +260,11 @@ func (h *ProxyResponseHeap) At() *storepb.SeriesResponse { } func (l *lazyRespSet) StoreID() string { - return l.st.String() + return l.storeName } func (l *lazyRespSet) Labelset() string { - return labelpb.PromLabelSetsToString(l.st.LabelSets()) + return labelpb.PromLabelSetsToString(l.storeLabelSets) } // lazyRespSet is a lazy storepb.SeriesSet that buffers @@ -273,12 +273,13 @@ func (l *lazyRespSet) Labelset() string { // in Next(). type lazyRespSet struct { // Generic parameters. - span opentracing.Span - cl storepb.Store_SeriesClient - closeSeries context.CancelFunc - st Client - frameTimeout time.Duration - ctx context.Context + span opentracing.Span + cl storepb.Store_SeriesClient + closeSeries context.CancelFunc + storeName string + storeLabelSets []labels.Labels + frameTimeout time.Duration + ctx context.Context // Internal bookkeeping. dataOrFinishEvent *sync.Cond @@ -358,7 +359,8 @@ func newLazyRespSet( ctx context.Context, span opentracing.Span, frameTimeout time.Duration, - st Client, + storeName string, + storeLabelSets []labels.Labels, closeSeries context.CancelFunc, cl storepb.Store_SeriesClient, shardMatcher *storepb.ShardMatcher, @@ -373,7 +375,8 @@ func newLazyRespSet( respSet := &lazyRespSet{ frameTimeout: frameTimeout, cl: cl, - st: st, + storeName: storeName, + storeLabelSets: storeLabelSets, closeSeries: closeSeries, span: span, ctx: ctx, @@ -383,7 +386,7 @@ func newLazyRespSet( shardMatcher: shardMatcher, } - go func(st Client, l *lazyRespSet) { + go func(st string, l *lazyRespSet) { bytesProcessed := 0 seriesStats := &storepb.SeriesStatsCounter{} @@ -409,7 +412,7 @@ func newLazyRespSet( select { case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String()) + err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st) l.span.SetTag("err", err.Error()) l.bufferedResponsesMtx.Lock() @@ -434,9 +437,9 @@ func newLazyRespSet( // Most likely the per-Recv timeout has been reached. // There's a small race between canceling and the Recv() // but this is most likely true. - rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st.String()) + rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st) } else { - rerr = errors.Wrapf(err, "receive series from %s", st.String()) + rerr = errors.Wrapf(err, "receive series from %s", st) } l.span.SetTag("err", rerr.Error()) @@ -478,7 +481,7 @@ func newLazyRespSet( return } } - }(st, respSet) + }(storeName, respSet) return respSet } @@ -552,7 +555,8 @@ func newAsyncRespSet(ctx context.Context, seriesCtx, span, frameTimeout, - st, + st.String(), + st.LabelSets(), closeSeries, cl, shardMatcher,