Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

physicalplan: add support for multi-stage execution of aggregate func… #59174

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,13 @@ message AggregatorSpec {
REGR_COUNT = 43;
REGR_AVGX = 44;
REGR_AVGY = 45;
REGR_SX = 46;
REGR_SY = 47;
FINAL_SQRDIFF = 48;
FINAL_STDDEV_POP = 49;
FINAL_VAR_POP = 50;
FINAL_REGR_SXX = 51;
FINAL_REGR_SYY = 52;
}

enum Type {
Expand Down
43 changes: 43 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/distsql_agg
Original file line number Diff line number Diff line change
Expand Up @@ -654,3 +654,46 @@ ALTER TABLE table58683_1 EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_
CREATE TABLE table58683_2 (col2 BOOL);
ALTER TABLE table58683_2 EXPERIMENTAL_RELOCATE SELECT ARRAY[2], 2;
SELECT every(col2) FROM table58683_1 JOIN table58683_2 ON col1 = (table58683_2.tableoid)::INT8 GROUP BY col2 HAVING bool_and(col2);

# Test muti-stage aggregate functions for #58347 (add support for multi-stage execution of aggregate functions).
statement ok
CREATE TABLE t (a INT, b INT, c FLOAT, d DECIMAL, PRIMARY KEY (a, b, c, d))

# Generate all combinations of values 1 to 10.
statement ok
INSERT INTO t SELECT a, b, c::FLOAT, d::DECIMAL FROM
generate_series(1, 10) AS a(a),
generate_series(1, 10) AS b(b),
generate_series(1, 10) AS c(c),
generate_series(1, 10) AS d(d)

# Test aggregrate functions without muti-stage.
query FFF
select sqrdiff(d), stddev_pop(d), var_pop(d) from t
----
82499.99999999999999999976 2.8722813232690143299 8.25

query IFFFF
select regr_count(c,d), regr_sxx(c,d), regr_syy(c,d), regr_avgx(c,d), regr_avgy(c,d) from t
----
10000 82499.99999999996 82500.00000000012 5.5 5.5

# Split into ten parts.
statement ok
ALTER TABLE t SPLIT AT SELECT i FROM generate_series(1, 9) AS g(i)

# Relocate the ten parts to the five nodes.
statement ok
ALTER TABLE t EXPERIMENTAL_RELOCATE
SELECT ARRAY[i%5+1], i FROM generate_series(0, 9) AS g(i)

# Test aggregrate functions in muti-stage.
query FFF
select sqrdiff(d), stddev_pop(d), var_pop(d) from t
----
82499.99999999999999999952 2.8724249481071304094 8.25

query IFFFF
select regr_count(c,d), regr_sxx(c,d), regr_syy(c,d), regr_avgx(c,d), regr_avgy(c,d) from t
----
10000 82499.99999999996 82500.00000000012 5.5 5.5
3 changes: 3 additions & 0 deletions pkg/sql/opt/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ var AggregateOpReverseMap = map[Operator]string{
RegressionInterceptOp: "regr_intercept",
RegressionR2Op: "regr_r2",
RegressionSlopeOp: "regr_slope",
RegressionSXOp: "regr_sx",
RegressionSYOp: "regr_sy",
RegressionSXXOp: "regr_sxx",
RegressionSXYOp: "regr_sxy",
RegressionSYYOp: "regr_syy",
Expand Down Expand Up @@ -375,6 +377,7 @@ func AggregateIsNeverNullOnNonNullInput(op Operator) bool {
StringAggOp, SumOp, SumIntOp, XorAggOp, PercentileDiscOp, PercentileContOp,
JsonObjectAggOp, JsonbObjectAggOp, StdDevPopOp, STCollectOp, STExtentOp, STUnionOp,
VarPopOp, CovarPopOp, RegressionAvgXOp, RegressionAvgYOp, RegressionSXXOp,
RegressionSXOp, RegressionSYOp,
RegressionSXYOp, RegressionSYYOp, RegressionCountOp:
return true

Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/opt/ops/scalar.opt
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,18 @@ define RegressionSXX {
X ScalarExpr
}

[Scalar, Aggregate]
define RegressionSX {
Y ScalarExpr
X ScalarExpr
}

[Scalar, Aggregate]
define RegressionSY {
Y ScalarExpr
X ScalarExpr
}

[Scalar, Aggregate]
define RegressionSXY {
Y ScalarExpr
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt/optbuilder/groupby.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,10 @@ func (b *Builder) constructAggregate(name string, args []opt.ScalarExpr) opt.Sca
return b.factory.ConstructRegressionR2(args[0], args[1])
case "regr_slope":
return b.factory.ConstructRegressionSlope(args[0], args[1])
case "regr_sx":
return b.factory.ConstructRegressionSX(args[0], args[1])
case "regr_sy":
return b.factory.ConstructRegressionSY(args[0], args[1])
case "regr_sxx":
return b.factory.ConstructRegressionSXX(args[0], args[1])
case "regr_sxy":
Expand Down
119 changes: 119 additions & 0 deletions pkg/sql/physicalplan/aggregator_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,123 @@ var DistAggregationTable = map[execinfrapb.AggregatorSpec_Func]DistAggregationIn
},
},
},

execinfrapb.AggregatorSpec_STDDEV_POP: {
LocalStage: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_SQRDIFF,
execinfrapb.AggregatorSpec_SUM,
execinfrapb.AggregatorSpec_COUNT,
},
FinalStage: []FinalStageInfo{
{
Fn: execinfrapb.AggregatorSpec_FINAL_STDDEV_POP,
LocalIdxs: []uint32{0, 1, 2},
},
},
},

execinfrapb.AggregatorSpec_VAR_POP: {
LocalStage: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_SQRDIFF,
execinfrapb.AggregatorSpec_SUM,
execinfrapb.AggregatorSpec_COUNT,
},
FinalStage: []FinalStageInfo{
{
Fn: execinfrapb.AggregatorSpec_FINAL_VAR_POP,
LocalIdxs: []uint32{0, 1, 2},
},
},
},

execinfrapb.AggregatorSpec_REGR_COUNT: {
LocalStage: []execinfrapb.AggregatorSpec_Func{execinfrapb.AggregatorSpec_REGR_COUNT},
FinalStage: []FinalStageInfo{
{
Fn: execinfrapb.AggregatorSpec_SUM_INT,
LocalIdxs: passThroughLocalIdxs,
},
},
},

execinfrapb.AggregatorSpec_REGR_AVGX: {
LocalStage: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_REGR_SX,
execinfrapb.AggregatorSpec_REGR_COUNT,
},
FinalStage: []FinalStageInfo{
{
Fn: execinfrapb.AggregatorSpec_SUM,
LocalIdxs: []uint32{0},
},
{
Fn: execinfrapb.AggregatorSpec_SUM_INT,
LocalIdxs: []uint32{1},
},
},
FinalRendering: func(h *tree.IndexedVarHelper, varIdxs []int) (tree.TypedExpr, error) {
if len(varIdxs) < 2 {
panic("fewer than two final aggregation values passed into final render")
}
sum := h.IndexedVar(varIdxs[0])
count := h.IndexedVar(varIdxs[1])

expr := &tree.BinaryExpr{
Operator: tree.Div,
Left: sum,
Right: count,
}
if sum.ResolvedType().Family() == types.FloatFamily {
expr.Right = &tree.CastExpr{
Expr: count,
Type: types.Float,
}
}
semaCtx := tree.MakeSemaContext()
semaCtx.IVarContainer = h.Container()
return expr.TypeCheck(context.TODO(), &semaCtx, types.Any)
},
},

execinfrapb.AggregatorSpec_SQRDIFF: {
LocalStage: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_SQRDIFF,
execinfrapb.AggregatorSpec_SUM,
execinfrapb.AggregatorSpec_COUNT,
},
FinalStage: []FinalStageInfo{
{
Fn: execinfrapb.AggregatorSpec_FINAL_SQRDIFF,
LocalIdxs: []uint32{0, 1, 2},
},
},
},

execinfrapb.AggregatorSpec_REGR_SXX: {
LocalStage: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_REGR_SXX,
execinfrapb.AggregatorSpec_REGR_SX,
execinfrapb.AggregatorSpec_REGR_COUNT,
},
FinalStage: []FinalStageInfo{
{
Fn: execinfrapb.AggregatorSpec_FINAL_REGR_SXX,
LocalIdxs: []uint32{0, 1, 2},
},
},
},

execinfrapb.AggregatorSpec_REGR_SYY: {
LocalStage: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_REGR_SYY,
execinfrapb.AggregatorSpec_REGR_SY,
execinfrapb.AggregatorSpec_REGR_COUNT,
},
FinalStage: []FinalStageInfo{
{
Fn: execinfrapb.AggregatorSpec_FINAL_REGR_SYY,
LocalIdxs: []uint32{0, 1, 2},
},
},
},
}
Loading