diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 948aef03876b..ae313ea1fc48 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -57,6 +57,9 @@ func TestMappingEquivalence(t *testing.T) { {`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false, nil}, {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, nil}, {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true, nil}, + {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) without (stream)`, true, nil}, + {`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s])`, true, nil}, + {`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s]) without (stream)`, true, nil}, {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}}, { ` diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index e55b01504537..003362913171 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -397,13 +397,18 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, return m.mapSampleExpr(expr, r) } + grouping := expr.Grouping + if grouping == nil { + grouping = &syntax.Grouping{Without: true} + } + // avg_over_time() by (foo) -> sum by (foo) (sum_over_time()) / sum by (foo) (count_over_time()) lhs, lhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{ Left: &syntax.RangeAggregationExpr{ Left: expr.Left, Operation: syntax.OpRangeTypeSum, }, - Grouping: expr.Grouping, + Grouping: grouping, Operation: syntax.OpTypeSum, }, r, false) if err != nil { @@ -416,12 +421,21 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, return nil, 0, err } + // labelSampleExtractor includes the unwrap identifier in without() list if no grouping is specified + // similar change is required for the RHS here to ensure the resulting label sets match + rhsGrouping := *grouping + if rhsGrouping.Without { + if expr.Left.Unwrap != nil { + rhsGrouping.Groups = append(rhsGrouping.Groups, expr.Left.Unwrap.Identifier) + } + } + rhs, rhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{ Left: &syntax.RangeAggregationExpr{ Left: countOverTimeSelector, Operation: syntax.OpRangeTypeCount, }, - Grouping: expr.Grouping, + Grouping: &rhsGrouping, Operation: syntax.OpTypeSum, }, r, false) if err != nil { diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index 784301928583..9bdd128b6e49 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -392,6 +392,38 @@ func TestMappingStrings(t *testing.T) { ) )`, }, + { + in: `avg_over_time({job=~"myapps.*"} |= "stats" | json | keep busy | unwrap busy [5m])`, + out: `( + sum without() ( + downstream + ++ + downstream + ) + / + sum without(busy) ( + downstream + ++ + downstream + ) + )`, + }, + { + in: `avg_over_time({job=~"myapps.*"} |= "stats" | json | keep busy | unwrap busy [5m]) without (foo)`, + out: `( + sum without(foo) ( + downstream + ++ + downstream + ) + / + sum without(foo,busy) ( + downstream + ++ + downstream + ) + )`, + }, // should be noop if VectorExpr { in: `vector(0)`,