From 29661d028ad7e1a813bd45fda67225687006c555 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 9 Jun 2023 11:44:43 -0700 Subject: [PATCH] address review comments Signed-off-by: Ben Ye --- pkg/store/bucket.go | 85 ++++++++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 81d8c57f4b8..6ad519aae88 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2166,42 +2166,12 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M } return ms[i].Type < ms[j].Type }) - dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms) + hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter) + if err != nil { + return nil, err + } if hit { - if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { - return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err) - } - r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache)) - r.stats.postingsTouched++ - r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache)) - p, closeFns, err := r.decodeCachedPostings(dataFromCache) - defer func() { - for _, closeFn := range closeFns { - closeFn() - } - }() - if err == nil { - ps, err := index.ExpandPostings(p) - if err != nil { - return nil, errors.Wrap(err, "expand") - } - - if len(ps) > 0 { - // As of version two all series entries are 16 byte padded. All references - // we get have to account for that to get the correct offset. - version, err := r.block.indexHeaderReader.IndexVersion() - if err != nil { - return nil, errors.Wrap(err, "get index version") - } - if version >= 2 { - for i, id := range ps { - ps[i] = id * 16 - } - } - } - return ps, nil - } - // If failed to decode cached postings, try to expand postings again. + return postings, nil } var ( postingGroups []*postingGroup @@ -2437,6 +2407,51 @@ type postingPtr struct { ptr index.Range } +func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) (bool, []storage.SeriesRef, error) { + dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms) + if !hit { + return false, nil, nil + } + if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { + return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err) + } + r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache)) + r.stats.postingsTouched++ + r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache)) + p, closeFns, err := r.decodeCachedPostings(dataFromCache) + defer func() { + for _, closeFn := range closeFns { + closeFn() + } + }() + // If failed to decode or expand cached postings, return and expand postings again. + if err != nil { + level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String()) + return false, nil, nil + } + + ps, err := index.ExpandPostings(p) + if err != nil { + level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String()) + return false, nil, nil + } + + if len(ps) > 0 { + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + version, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return false, nil, errors.Wrap(err, "get index version") + } + if version >= 2 { + for i, id := range ps { + ps[i] = id * 16 + } + } + } + return true, ps, nil +} + // fetchPostings fill postings requested by posting groups. // It returns one posting for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil.