Skip to content

Commit

Permalink
MQE: Add support for count aggregation (#9342)
Browse files Browse the repository at this point in the history
* MQE: Add support for count aggregation

* Extend tests

* Enable upstream count tests

* Update CHANGELOG

* Fix linting

* Remove floatPresent

* Rename floatValues
  • Loading branch information
jhesketh authored Sep 20, 2024
1 parent aa08b49 commit 9febb6a
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 26 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* [CHANGE] Distributor: reject incoming requests until the distributor service has started. #9317
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
9 changes: 5 additions & 4 deletions pkg/streamingpromql/aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ type AggregationGroup interface {
type AggregationGroupFactory func() AggregationGroup

var AggregationGroupFactories = map[parser.ItemType]AggregationGroupFactory{
parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} },
parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) },
parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) },
parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} },
parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} },
parser.COUNT: func() AggregationGroup { return &CountAggregationGroup{} },
parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) },
parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) },
parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} },
}

// Sentinel value used to indicate a sample has seen an invalid combination of histograms and should be ignored.
Expand Down
75 changes: 75 additions & 0 deletions pkg/streamingpromql/aggregations/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package aggregations

import (
"github.com/prometheus/prometheus/promql"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type CountAggregationGroup struct {
values []float64
}

func (g *CountAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.values == nil {
var err error
// First series with values for this group, populate it.
g.values, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

if err != nil {
return err
}
g.values = g.values[:timeRange.StepCount]
}

for _, p := range data.Floats {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.values[idx]++
}

for _, p := range data.Histograms {
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.values[idx]++
}

types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
return nil
}

func (g *CountAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount := 0
for _, fv := range g.values {
if fv > 0 {
floatPointCount++
}
}
var floatPoints []promql.FPoint
var err error
if floatPointCount > 0 {
floatPoints, err = types.FPointSlicePool.Get(floatPointCount, memoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, false, err
}

for i, fv := range g.values {
if fv > 0 {
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
floatPoints = append(floatPoints, promql.FPoint{T: t, F: fv})
}
}
}

types.Float64SlicePool.Put(g.values, memoryConsumptionTracker)

return types.InstantVectorSeriesData{Floats: floatPoints}, false, nil
}
4 changes: 2 additions & 2 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
`count_values("foo", metric{})`: "'count_values' aggregation with parameter",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr for range vectors",
"quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function",
"count(metric{})": "aggregation operation with 'count'",
"quantile(0.95, metric{})": "'quantile' aggregation with parameter",
}

for expression, expectedError := range unsupportedExpressions {
Expand Down Expand Up @@ -1786,7 +1786,7 @@ func TestCompareVariousMixedMetrics(t *testing.T) {
for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
// Aggregations
for _, aggFunc := range []string{"sum", "avg", "min", "max"} {
for _, aggFunc := range []string{"avg", "count", "min", "max", "sum"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
Expand Down
10 changes: 10 additions & 0 deletions pkg/streamingpromql/testdata/ours/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,13 @@ load 1m

eval range from 0m to 4m step 1m avg(series)
{} {{schema:4 count:9 sum:13.5 buckets:[2.5 5 1.5]}} {{schema:4 count:12 sum:17 buckets:[3.5 7 1.5]}} {{schema:4 count:14.5 sum:19.5 buckets:[4 9 1.5]}} {{schema:4 count:17.5 sum:23 buckets:[4.5 11.5 1.5]}} {{schema:4 count:20.5 sum:26.5 buckets:[5.5 13.5 1.5]}}

clear

# Make sure count doesn't emit a 0 when there are no values
load 1m
series{label="a"} _ 9
series{label="b"} _ 9

eval range from 0m to 1m step 1m count(series)
{} _ 2
14 changes: 6 additions & 8 deletions pkg/streamingpromql/testdata/upstream/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ eval instant at 50m avg by (group) (http_requests{job="api-server"})
{group="production"} 150

# Simple count.
# Unsupported by streaming engine.
# eval instant at 50m count by (group) (http_requests{job="api-server"})
# {group="canary"} 2
# {group="production"} 2
eval instant at 50m count by (group) (http_requests{job="api-server"})
{group="canary"} 2
{group="production"} 2

# Simple without.
eval instant at 50m sum without (instance) (http_requests{job="api-server"})
Expand Down Expand Up @@ -100,10 +99,9 @@ eval instant at 50m SUM(http_requests) BY (job, nonexistent)
{job="api-server"} 1000
{job="app-server"} 2600

# Unsupported by streaming engine.
# eval instant at 50m COUNT(http_requests) BY (job)
# {job="api-server"} 4
# {job="app-server"} 4
eval instant at 50m COUNT(http_requests) BY (job)
{job="api-server"} 4
{job="app-server"} 4

eval instant at 50m SUM(http_requests) BY (job, group)
{group="canary", job="api-server"} 700
Expand Down
5 changes: 2 additions & 3 deletions pkg/streamingpromql/testdata/upstream/native_histograms.test
Original file line number Diff line number Diff line change
Expand Up @@ -1125,9 +1125,8 @@ eval_warn instant at 10m sum({idx="0"})
eval instant at 10m sum(histogram_sum{idx="0"} + ignoring(idx) histogram_sum{idx="1"} + ignoring(idx) histogram_sum{idx="2"} + ignoring(idx) histogram_sum{idx="3"})
{} {{schema:0 count:107 sum:4691.2 z_bucket:14 z_bucket_w:0.001 buckets:[3 8 2 5 3 2 2] n_buckets:[2 6 8 4 15 9 0 0 0 10 10 4]}}

# Unsupported by streaming engine.
# eval instant at 10m count(histogram_sum)
# {} 4
eval instant at 10m count(histogram_sum)
{} 4

eval instant at 10m avg(histogram_sum)
{} {{schema:0 count:26.75 sum:1172.8 z_bucket:3.5 z_bucket_w:0.001 buckets:[0.75 2 0.5 1.25 0.75 0.5 0.5] n_buckets:[0.5 1.5 2 1 3.75 2.25 0 0 0 2.5 2.5 1]}}
Expand Down
14 changes: 6 additions & 8 deletions pkg/streamingpromql/testdata/upstream/operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ load 5m
vector_matching_b{l="x"} 0+4x25


# Unsupported by streaming engine.
# eval instant at 50m SUM(http_requests) BY (job) - COUNT(http_requests) BY (job)
# {job="api-server"} 996
# {job="app-server"} 2596
eval instant at 50m SUM(http_requests) BY (job) - COUNT(http_requests) BY (job)
{job="api-server"} 996
{job="app-server"} 2596

eval instant at 50m 2 - SUM(http_requests) BY (job)
{job="api-server"} -998
Expand Down Expand Up @@ -88,10 +87,9 @@ eval instant at 50m SUM(http_requests) BY (job) % 2 ^ 3 ^ 2 ^ 2
{job="api-server"} 1000
{job="app-server"} 2600

# Unsupported by streaming engine.
# eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job)
# {job="api-server"} 256
# {job="app-server"} 256
eval instant at 50m COUNT(http_requests) BY (job) ^ COUNT(http_requests) BY (job)
{job="api-server"} 256
{job="app-server"} 256

eval instant at 50m SUM(http_requests) BY (job) / 0
{job="api-server"} +Inf
Expand Down

0 comments on commit 9febb6a

Please sign in to comment.