Skip to content

Commit

Permalink
MQE: Add support for group aggregation (#9343)
Browse files Browse the repository at this point in the history
* MQE: Add support for group aggregation

* Extend tests

* Enable upstream group tests

* Merge count and group

* Add extra test case

* Update CHANGELOG
  • Loading branch information
jhesketh authored Sep 20, 2024
1 parent 9febb6a commit 3e06d8a
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 12 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* [CHANGE] Distributor: reject incoming requests until the distributor service has started. #9317
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
3 changes: 2 additions & 1 deletion pkg/streamingpromql/aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type AggregationGroupFactory func() AggregationGroup

var AggregationGroupFactories = map[parser.ItemType]AggregationGroupFactory{
parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} },
parser.COUNT: func() AggregationGroup { return &CountAggregationGroup{} },
parser.COUNT: func() AggregationGroup { return NewCountGroupAggregationGroup(true) },
parser.GROUP: func() AggregationGroup { return NewCountGroupAggregationGroup(false) },
parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) },
parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) },
parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} },
Expand Down
31 changes: 26 additions & 5 deletions pkg/streamingpromql/aggregations/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,32 @@ import (
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type CountAggregationGroup struct {
// count represents whether this aggregation is `count` (true), or `group` (false)
func NewCountGroupAggregationGroup(count bool) *CountGroupAggregationGroup {
g := &CountGroupAggregationGroup{}
if count {
g.accumulatePoint = g.countAccumulatePoint
} else {
g.accumulatePoint = g.groupAccumulatePoint
}
return g
}

type CountGroupAggregationGroup struct {
values []float64

accumulatePoint func(idx int64)
}

func (g *CountGroupAggregationGroup) countAccumulatePoint(idx int64) {
g.values[idx]++
}

func (g *CountGroupAggregationGroup) groupAccumulatePoint(idx int64) {
g.values[idx] = 1
}

func (g *CountAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
func (g *CountGroupAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.values == nil {
var err error
// First series with values for this group, populate it.
Expand All @@ -34,19 +55,19 @@ func (g *CountAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesD

for _, p := range data.Floats {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.values[idx]++
g.accumulatePoint(idx)
}

for _, p := range data.Histograms {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.values[idx]++
g.accumulatePoint(idx)
}

types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
return nil
}

func (g *CountAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *CountGroupAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount := 0
for _, fv := range g.values {
if fv > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1786,7 +1786,7 @@ func TestCompareVariousMixedMetrics(t *testing.T) {
for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
// Aggregations
for _, aggFunc := range []string{"avg", "count", "min", "max", "sum"} {
for _, aggFunc := range []string{"avg", "count", "group", "min", "max", "sum"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
Expand Down
5 changes: 4 additions & 1 deletion pkg/streamingpromql/testdata/ours/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,13 @@ eval range from 0m to 4m step 1m avg(series)

clear

# Make sure count doesn't emit a 0 when there are no values
# Make sure count and group don't emit a 0 when there are no values
load 1m
series{label="a"} _ 9
series{label="b"} _ 9

eval range from 0m to 1m step 1m count(series)
{} _ 2

eval range from 0m to 1m step 1m group(series)
{} _ 1
5 changes: 2 additions & 3 deletions pkg/streamingpromql/testdata/upstream/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,8 @@ load 10s
# {test="three samples"} 1
# {test="uneven samples"} 1

# Unsupported by streaming engine.
# eval instant at 1m group(foo)
# {} 1
eval instant at 1m group(foo)
{} 1

# Tests for avg.
clear
Expand Down

0 comments on commit 3e06d8a

Please sign in to comment.