diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 92872c595f..b2aff5932b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -20,11 +20,8 @@ import ( "sync" "time" - "github.com/weaveworks/common/httpgrpc" - - "github.com/cespare/xxhash" - "github.com/alecthomas/units" + "github.com/cespare/xxhash" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/types" @@ -38,14 +35,13 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" + "github.com/weaveworks/common/httpgrpc" "golang.org/x/sync/errgroup" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/thanos-io/objstore" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -2296,11 +2292,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M } result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) - - if ctx.Err() != nil { - return nil, ctx.Err() - } - ps, err := index.ExpandPostings(result) + ps, err := ExpandPostingsWithContext(ctx, result) if err != nil { return nil, errors.Wrap(err, "expand") } @@ -2322,6 +2314,17 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M return ps, nil } +// ExpandPostingsWithContext returns the postings expanded as a slice and considers context. +func ExpandPostingsWithContext(ctx context.Context, p index.Postings) (res []storage.SeriesRef, err error) { + for p.Next() { + if ctx.Err() != nil { + return nil, ctx.Err() + } + res = append(res, p.At()) + } + return res, p.Err() +} + // postingGroup keeps posting keys for single matcher. Logical result of the group is: // If addAll is set: special All postings minus postings for removeKeys labels. No need to merge postings for addKeys in this case. // If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels @@ -2458,7 +2461,7 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, return false, nil, nil } - ps, err := index.ExpandPostings(p) + ps, err := ExpandPostingsWithContext(ctx, p) if err != nil { level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String()) return false, nil, nil diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e59c6c0052..efd68f6e8d 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -38,6 +38,7 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/encoding" + "github.com/prometheus/prometheus/tsdb/index" "go.uber.org/atomic" "github.com/thanos-io/objstore" @@ -2604,3 +2605,14 @@ func BenchmarkDownsampledBlockSeries(b *testing.B) { } } } + +func TestExpandPostingsWithContextCancel(t *testing.T) { + p := index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8}) + ctx, cancel := context.WithCancel(context.Background()) + + cancel() + res, err := ExpandPostingsWithContext(ctx, p) + testutil.NotOk(t, err) + testutil.Equals(t, context.Canceled, err) + testutil.Equals(t, []storage.SeriesRef(nil), res) +}