Skip to content

Commit

Permalink
fix(sharding): use without() grouping when merging avg_over_time sh…
Browse files Browse the repository at this point in the history
…ard results (#12176)
  • Loading branch information
ashwanthgoli authored Jul 1, 2024
1 parent a457c5d commit eb8a363
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func TestMappingEquivalence(t *testing.T) {
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) without (stream)`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s])`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s]) without (stream)`, true, nil},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}},
{
`
Expand Down
18 changes: 16 additions & 2 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,18 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
return m.mapSampleExpr(expr, r)
}

grouping := expr.Grouping
if grouping == nil {
grouping = &syntax.Grouping{Without: true}
}

// avg_over_time() by (foo) -> sum by (foo) (sum_over_time()) / sum by (foo) (count_over_time())
lhs, lhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: expr.Left,
Operation: syntax.OpRangeTypeSum,
},
Grouping: expr.Grouping,
Grouping: grouping,
Operation: syntax.OpTypeSum,
}, r, false)
if err != nil {
Expand All @@ -416,12 +421,21 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
return nil, 0, err
}

// labelSampleExtractor includes the unwrap identifier in without() list if no grouping is specified
// similar change is required for the RHS here to ensure the resulting label sets match
rhsGrouping := *grouping
if rhsGrouping.Without {
if expr.Left.Unwrap != nil {
rhsGrouping.Groups = append(rhsGrouping.Groups, expr.Left.Unwrap.Identifier)
}
}

rhs, rhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: countOverTimeSelector,
Operation: syntax.OpRangeTypeCount,
},
Grouping: expr.Grouping,
Grouping: &rhsGrouping,
Operation: syntax.OpTypeSum,
}, r, false)
if err != nil {
Expand Down
32 changes: 32 additions & 0 deletions pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,38 @@ func TestMappingStrings(t *testing.T) {
)
)`,
},
{
in: `avg_over_time({job=~"myapps.*"} |= "stats" | json | keep busy | unwrap busy [5m])`,
out: `(
sum without() (
downstream<sum without() (sum_over_time({job=~"myapps.*"} |="stats" | json | keep busy | unwrap busy [5m])),shard=0_of_2>
++
downstream<sum without() (sum_over_time({job=~"myapps.*"} |="stats" | json | keep busy | unwrap busy [5m])),shard=1_of_2>
)
/
sum without(busy) (
downstream<sum without(busy) (count_over_time({job=~"myapps.*"} |="stats" | json | keep busy [5m])),shard=0_of_2>
++
downstream<sum without(busy) (count_over_time({job=~"myapps.*"} |="stats" | json | keep busy [5m])),shard=1_of_2>
)
)`,
},
{
in: `avg_over_time({job=~"myapps.*"} |= "stats" | json | keep busy | unwrap busy [5m]) without (foo)`,
out: `(
sum without(foo) (
downstream<sum without(foo) (sum_over_time({job=~"myapps.*"} |="stats" | json | keep busy | unwrap busy [5m])),shard=0_of_2>
++
downstream<sum without(foo) (sum_over_time({job=~"myapps.*"} |="stats" | json | keep busy | unwrap busy [5m])),shard=1_of_2>
)
/
sum without(foo,busy) (
downstream<sum without(foo,busy) (count_over_time({job=~"myapps.*"} |="stats" | json | keep busy [5m])),shard=0_of_2>
++
downstream<sum without(foo,busy) (count_over_time({job=~"myapps.*"} |="stats" | json | keep busy [5m])),shard=1_of_2>
)
)`,
},
// should be noop if VectorExpr
{
in: `vector(0)`,
Expand Down

0 comments on commit eb8a363

Please sign in to comment.