Skip to content

Commit

Permalink
Check context when expanding postings (thanos-io#6363)
Browse files Browse the repository at this point in the history
* check context when expanding postings

Signed-off-by: Ben Ye <[email protected]>

* update changelog

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored and HC Zhu committed Jun 27, 2023
1 parent 5c0c9fc commit 40a5909
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6327](https://github.com/thanos-io/thanos/pull/6327) *: *breaking :warning:* Use histograms instead of summaries for instrumented handlers.
- [#6322](https://github.com/thanos-io/thanos/pull/6322) Logging: Avoid expensive log.Valuer evaluation for disallowed levels.
- [#6358](https://github.com/thanos-io/thanos/pull/6358) Query: Add +Inf bucket to query duration metrics
- [#6363](https://github.com/thanos-io/thanos/pull/6363) Store: Check context error when expanding postings.

### Removed

Expand Down
16 changes: 14 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,8 +2151,11 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M

// NOTE: Derived from tsdb.PostingsForMatchers.
for _, m := range ms {
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Each group is separate to tell later what postings are intersecting with what.
pg, err := toPostingGroup(r.block.indexHeaderReader.LabelValues, m)
pg, err := toPostingGroup(ctx, r.block.indexHeaderReader.LabelValues, m)
if err != nil {
return nil, errors.Wrap(err, "toPostingGroup")
}
Expand Down Expand Up @@ -2226,6 +2229,9 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M

result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...))

if ctx.Err() != nil {
return nil, ctx.Err()
}
ps, err := index.ExpandPostings(result)
if err != nil {
return nil, errors.Wrap(err, "expand")
Expand Down Expand Up @@ -2273,7 +2279,7 @@ func checkNilPosting(l labels.Label, p index.Postings) index.Postings {
}

// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication.
func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) {
func toPostingGroup(ctx context.Context, lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) {
if m.Type == labels.MatchRegexp {
if vals := findSetMatches(m.Value); len(vals) > 0 {
// Sorting will improve the performance dramatically if the dataset is relatively large
Expand All @@ -2299,6 +2305,9 @@ func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Match

var toRemove []labels.Label
for _, val := range vals {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if !m.Matches(val) {
toRemove = append(toRemove, labels.Label{Name: m.Name, Value: val})
}
Expand All @@ -2319,6 +2328,9 @@ func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Match

var toAdd []labels.Label
for _, val := range vals {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if m.Matches(val) {
toAdd = append(toAdd, labels.Label{Name: m.Name, Value: val})
}
Expand Down

0 comments on commit 40a5909

Please sign in to comment.