Skip to content

Commit

Permalink
Check context when expanding postings 2nd attempt (thanos-io#6471)
Browse files Browse the repository at this point in the history
* check context when expanding postings

Signed-off-by: Ben Ye <[email protected]>

* import

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored and GiedriusS committed Jul 27, 2023
1 parent fe79262 commit 3b199c8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
27 changes: 15 additions & 12 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ import (
"sync"
"time"

"github.com/weaveworks/common/httpgrpc"

"github.com/cespare/xxhash"

"github.com/alecthomas/units"
"github.com/cespare/xxhash"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/types"
Expand All @@ -38,14 +35,13 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/weaveworks/common/httpgrpc"
"golang.org/x/sync/errgroup"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -2296,11 +2292,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
}

result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...))

if ctx.Err() != nil {
return nil, ctx.Err()
}
ps, err := index.ExpandPostings(result)
ps, err := ExpandPostingsWithContext(ctx, result)
if err != nil {
return nil, errors.Wrap(err, "expand")
}
Expand All @@ -2322,6 +2314,17 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
return ps, nil
}

// ExpandPostingsWithContext returns the postings expanded as a slice and considers context.
func ExpandPostingsWithContext(ctx context.Context, p index.Postings) (res []storage.SeriesRef, err error) {
for p.Next() {
if ctx.Err() != nil {
return nil, ctx.Err()
}
res = append(res, p.At())
}
return res, p.Err()
}

// postingGroup keeps posting keys for single matcher. Logical result of the group is:
// If addAll is set: special All postings minus postings for removeKeys labels. No need to merge postings for addKeys in this case.
// If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels
Expand Down Expand Up @@ -2458,7 +2461,7 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
return false, nil, nil
}

ps, err := index.ExpandPostings(p)
ps, err := ExpandPostingsWithContext(ctx, p)
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String())
return false, nil, nil
Expand Down
12 changes: 12 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"
"go.uber.org/atomic"

"github.com/thanos-io/objstore"
Expand Down Expand Up @@ -2604,3 +2605,14 @@ func BenchmarkDownsampledBlockSeries(b *testing.B) {
}
}
}

func TestExpandPostingsWithContextCancel(t *testing.T) {
p := index.NewListPostings([]storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8})
ctx, cancel := context.WithCancel(context.Background())

cancel()
res, err := ExpandPostingsWithContext(ctx, p)
testutil.NotOk(t, err)
testutil.Equals(t, context.Canceled, err)
testutil.Equals(t, []storage.SeriesRef(nil), res)
}

0 comments on commit 3b199c8

Please sign in to comment.