Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sharding): use without() grouping when merging avg_over_time shard results #12176

Merged
merged 2 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func TestMappingEquivalence(t *testing.T) {
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) without (stream)`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s])`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s]) without (stream)`, true, nil},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}},
{
`
Expand Down
18 changes: 16 additions & 2 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,18 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
return m.mapSampleExpr(expr, r)
}

grouping := expr.Grouping
if grouping == nil {
grouping = &syntax.Grouping{Without: true}
}

// avg_over_time() by (foo) -> sum by (foo) (sum_over_time()) / sum by (foo) (count_over_time())
lhs, lhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: expr.Left,
Operation: syntax.OpRangeTypeSum,
},
Grouping: expr.Grouping,
Grouping: grouping,
Operation: syntax.OpTypeSum,
}, r, false)
if err != nil {
Expand All @@ -416,12 +421,21 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
return nil, 0, err
}

// labelSampleExtractor includes the unwrap identifier in without() list if no grouping is specified
// similar change is required for the RHS here to ensure the resulting label sets match
rhsGrouping := *grouping
if rhsGrouping.Without {
if expr.Left.Unwrap != nil {
rhsGrouping.Groups = append(rhsGrouping.Groups, expr.Left.Unwrap.Identifier)
}
}

rhs, rhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: countOverTimeSelector,
Operation: syntax.OpRangeTypeCount,
},
Grouping: expr.Grouping,
Grouping: &rhsGrouping,
Operation: syntax.OpTypeSum,
}, r, false)
if err != nil {
Expand Down
32 changes: 32 additions & 0 deletions pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,38 @@ func TestMappingStrings(t *testing.T) {
)
)`,
},
{
in: `avg_over_time({job=~"myapps.*"} |= "stats" | json | keep busy | unwrap busy [5m])`,
out: `(
sum without() (
downstream<sum without() (sum_over_time({job=~"myapps.*"} |="stats" | json | keep busy | unwrap busy [5m])),shard=0_of_2>
++
downstream<sum without() (sum_over_time({job=~"myapps.*"} |="stats" | json | keep busy | unwrap busy [5m])),shard=1_of_2>
)
/
sum without(busy) (
downstream<sum without(busy) (count_over_time({job=~"myapps.*"} |="stats" | json | keep busy [5m])),shard=0_of_2>
++
downstream<sum without(busy) (count_over_time({job=~"myapps.*"} |="stats" | json | keep busy [5m])),shard=1_of_2>
)
)`,
},
{
in: `avg_over_time({job=~"myapps.*"} |= "stats" | json | keep busy | unwrap busy [5m]) without (foo)`,
out: `(
sum without(foo) (
downstream<sum without(foo) (sum_over_time({job=~"myapps.*"} |="stats" | json | keep busy | unwrap busy [5m])),shard=0_of_2>
++
downstream<sum without(foo) (sum_over_time({job=~"myapps.*"} |="stats" | json | keep busy | unwrap busy [5m])),shard=1_of_2>
)
/
sum without(foo,busy) (
downstream<sum without(foo,busy) (count_over_time({job=~"myapps.*"} |="stats" | json | keep busy [5m])),shard=0_of_2>
++
downstream<sum without(foo,busy) (count_over_time({job=~"myapps.*"} |="stats" | json | keep busy [5m])),shard=1_of_2>
)
)`,
},
// should be noop if VectorExpr
{
in: `vector(0)`,
Expand Down
Loading