Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check context when expanding postings #6363

Merged
merged 2 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6327](https://github.com/thanos-io/thanos/pull/6327) *: *breaking :warning:* Use histograms instead of summaries for instrumented handlers.
- [#6322](https://github.com/thanos-io/thanos/pull/6322) Logging: Avoid expensive log.Valuer evaluation for disallowed levels.
- [#6358](https://github.com/thanos-io/thanos/pull/6358) Query: Add +Inf bucket to query duration metrics
- [#6363](https://github.com/thanos-io/thanos/pull/6363) Store: Check context error when expanding postings.

### Removed

Expand Down
16 changes: 14 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,8 +2151,11 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M

// NOTE: Derived from tsdb.PostingsForMatchers.
for _, m := range ms {
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Each group is separate to tell later what postings are intersecting with what.
pg, err := toPostingGroup(r.block.indexHeaderReader.LabelValues, m)
pg, err := toPostingGroup(ctx, r.block.indexHeaderReader.LabelValues, m)
if err != nil {
return nil, errors.Wrap(err, "toPostingGroup")
}
Expand Down Expand Up @@ -2226,6 +2229,9 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M

result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...))

if ctx.Err() != nil {
return nil, ctx.Err()
}
ps, err := index.ExpandPostings(result)
if err != nil {
return nil, errors.Wrap(err, "expand")
Expand Down Expand Up @@ -2273,7 +2279,7 @@ func checkNilPosting(l labels.Label, p index.Postings) index.Postings {
}

// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication.
func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) {
func toPostingGroup(ctx context.Context, lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) {
if m.Type == labels.MatchRegexp {
if vals := findSetMatches(m.Value); len(vals) > 0 {
// Sorting will improve the performance dramatically if the dataset is relatively large
Expand All @@ -2299,6 +2305,9 @@ func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Match

var toRemove []labels.Label
for _, val := range vals {
if ctx.Err() != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a double edged sword because vals is often a high number, so we would end up checking for cancellation thousands of times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. Do you have any suggestions on where we should check context?

Copy link
Contributor

@alanprot alanprot May 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... =/

I did not find a better way as well to do on the prometheus PR. Maybe we should check the ctx.done channel somehow?

cancelled := false

go func() {
     <- ctx.Done()
     cancelled = true
}()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would still need a mutex for the cancelled variable, which would be the same as checking ctx.Err(). Is there any downside to just checking the context before the vals loop?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If u do a regex in a label that has lots of values (ex: __name__), the vals loop is the one the keeps running even after the request got cancelled.

Copy link
Contributor

@fpetkovski fpetkovski May 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. As a last thing, can we check the benchmark difference for BenchmarkBucketSeries? That should give us some indication if there is any regression.

return nil, ctx.Err()
}
if !m.Matches(val) {
toRemove = append(toRemove, labels.Label{Name: m.Name, Value: val})
}
Expand All @@ -2319,6 +2328,9 @@ func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Match

var toAdd []labels.Label
for _, val := range vals {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if m.Matches(val) {
toAdd = append(toAdd, labels.Label{Name: m.Name, Value: val})
}
Expand Down