Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168) #33180

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,7 @@ func (s *testSuiteAgg) TestHaving(c *C) {
tk.MustQuery("select 1 from t group by c1 having sum(abs(c2 + c3)) = c1").Check(testkit.Rows("1"))
}

<<<<<<< HEAD
func (s *testSuiteAgg) TestIssue26496(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

Expand All @@ -930,6 +931,13 @@ func (s *testSuiteAgg) TestIssue26496(c *C) {

func (s *testSuiteAgg) TestAggEliminator(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
=======
func TestAggEliminator(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)

tk.MustExec("create table t(a int primary key, b int)")
tk.MustQuery("select min(a), min(a) from t").Check(testkit.Rows("<nil> <nil>"))
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ func (b *PlanBuilder) buildSelection(ctx context.Context, p LogicalPlan, where a

conditions := splitWhere(where)
expressions := make([]expression.Expression, 0, len(conditions))
selection := LogicalSelection{buildByHaving: aggMapper != nil}.Init(b.ctx, b.getSelectOffset())
selection := LogicalSelection{}.Init(b.ctx, b.getSelectOffset())
for _, cond := range conditions {
expr, np, err := b.rewrite(ctx, cond, p, aggMapper, false)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,6 @@ type LogicalSelection struct {
// but after we converted to CNF(Conjunctive normal form), it can be
// split into a list of AND conditions.
Conditions []expression.Expression

// having selection can't be pushed down, because it must above the aggregation.
buildByHaving bool
}

// ExtractCorrelatedCols implements LogicalPlan interface.
Expand Down
201 changes: 201 additions & 0 deletions planner/core/rule_predicate_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression)
p.Conditions = DeleteTrueExprs(p, p.Conditions)
var child LogicalPlan
var retConditions []expression.Expression
<<<<<<< HEAD
if p.buildByHaving {
retConditions, child = p.children[0].PredicatePushDown(predicates)
retConditions = append(retConditions, p.Conditions...)
Expand All @@ -91,6 +92,13 @@ func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression)
retConditions, child = p.children[0].PredicatePushDown(append(canBePushDown, predicates...))
retConditions = append(retConditions, canNotBePushDown...)
}
=======
var originConditions []expression.Expression
canBePushDown, canNotBePushDown := splitSetGetVarFunc(p.Conditions)
originConditions = canBePushDown
retConditions, child = p.children[0].PredicatePushDown(append(canBePushDown, predicates...), opt)
retConditions = append(retConditions, canNotBePushDown...)
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
if len(retConditions) > 0 {
p.Conditions = expression.PropagateConstant(p.ctx, retConditions)
// Return table dual when filter is constant false or null.
Expand Down Expand Up @@ -632,3 +640,196 @@ func (p *LogicalMemTable) PredicatePushDown(predicates []expression.Expression)
func (*ppdSolver) name() string {
return "predicate_push_down"
}
<<<<<<< HEAD
=======

func appendTableDualTraceStep(replaced LogicalPlan, dual LogicalPlan, conditions []expression.Expression, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v is replaced by %v_%v", replaced.TP(), replaced.ID(), dual.TP(), dual.ID())
}
reason := func() string {
buffer := bytes.NewBufferString("The conditions[")
for i, cond := range conditions {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString("] are constant false or null")
return buffer.String()
}
opt.appendStepToCurrent(dual.ID(), dual.TP(), reason, action)
}

func appendSelectionPredicatePushDownTraceStep(p *LogicalSelection, conditions []expression.Expression, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v is removed", p.TP(), p.ID())
}
reason := func() string {
return ""
}
if len(conditions) > 0 {
reason = func() string {
buffer := bytes.NewBufferString("The conditions[")
for i, cond := range conditions {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString(fmt.Sprintf("] in %v_%v are pushed down", p.TP(), p.ID()))
return buffer.String()
}
}
opt.appendStepToCurrent(p.ID(), p.TP(), reason, action)
}

func appendDataSourcePredicatePushDownTraceStep(ds *DataSource, opt *logicalOptimizeOp) {
if len(ds.pushedDownConds) < 1 {
return
}
reason := func() string {
return ""
}
action := func() string {
buffer := bytes.NewBufferString("The conditions[")
for i, cond := range ds.pushedDownConds {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString(fmt.Sprintf("] are pushed down across %v_%v", ds.TP(), ds.ID()))
return buffer.String()
}
opt.appendStepToCurrent(ds.ID(), ds.TP(), reason, action)
}

func appendAddSelectionTraceStep(p LogicalPlan, child LogicalPlan, sel *LogicalSelection, opt *logicalOptimizeOp) {
reason := func() string {
return ""
}
action := func() string {
return fmt.Sprintf("add %v_%v to connect %v_%v and %v_%v", sel.TP(), sel.ID(), p.TP(), p.ID(), child.TP(), child.ID())
}
opt.appendStepToCurrent(sel.ID(), sel.TP(), reason, action)
}

// AddPrefix4ShardIndexes add expression prefix for shard index. e.g. an index is test.uk(tidb_shard(a), a).
// It transforms the sql "SELECT * FROM test WHERE a = 10" to
// "SELECT * FROM test WHERE tidb_shard(a) = val AND a = 10", val is the value of tidb_shard(10).
// It also transforms the sql "SELECT * FROM test WHERE a IN (10, 20, 30)" to
// "SELECT * FROM test WHERE tidb_shard(a) = val1 AND a = 10 OR tidb_shard(a) = val2 AND a = 20"
// @param[in] conds the original condtion of this datasource
// @retval - the new condition after adding expression prefix
func (ds *DataSource) AddPrefix4ShardIndexes(sc sessionctx.Context, conds []expression.Expression) []expression.Expression {
if !ds.containExprPrefixUk {
return conds
}

var err error
newConds := conds

for _, path := range ds.possibleAccessPaths {
if !path.IsUkShardIndexPath {
continue
}
newConds, err = ds.addExprPrefixCond(sc, path, newConds)
if err != nil {
logutil.BgLogger().Error("Add tidb_shard expression failed",
zap.Error(err),
zap.Uint64("connection id", sc.GetSessionVars().ConnectionID),
zap.String("database name", ds.DBName.L),
zap.String("table name", ds.tableInfo.Name.L),
zap.String("index name", path.Index.Name.L))
return conds
}
}

return newConds
}

func (ds *DataSource) addExprPrefixCond(sc sessionctx.Context, path *util.AccessPath,
conds []expression.Expression) ([]expression.Expression, error) {
IdxCols, IdxColLens :=
expression.IndexInfo2PrefixCols(ds.Columns, ds.schema.Columns, path.Index)
if len(IdxCols) == 0 {
return conds, nil
}

adder := &exprPrefixAdder{
sctx: sc,
OrigConds: conds,
cols: IdxCols,
lengths: IdxColLens,
}

return adder.addExprPrefix4ShardIndex()
}

// AddExprPrefix4ShardIndex
// if original condition is a LogicOr expression, such as `WHERE a = 1 OR a = 10`,
// call the function AddExprPrefix4DNFCond to add prefix expression tidb_shard(a) = xxx for shard index.
// Otherwise, if the condition is `WHERE a = 1`, `WHERE a = 1 AND b = 10`, `WHERE a IN (1, 2, 3)`......,
// call the function AddExprPrefix4CNFCond to add prefix expression for shard index.
func (adder *exprPrefixAdder) addExprPrefix4ShardIndex() ([]expression.Expression, error) {
if len(adder.OrigConds) == 1 {
if sf, ok := adder.OrigConds[0].(*expression.ScalarFunction); ok && sf.FuncName.L == ast.LogicOr {
return adder.addExprPrefix4DNFCond(sf)
}
}
return adder.addExprPrefix4CNFCond(adder.OrigConds)
}

// AddExprPrefix4CNFCond
// add the prefix expression for CNF condition, e.g. `WHERE a = 1`, `WHERE a = 1 AND b = 10`, ......
// @param[in] conds the original condtion of the datasoure. e.g. `WHERE t1.a = 1 AND t1.b = 10 AND t2.a = 20`.
// if current datasource is `t1`, conds is {t1.a = 1, t1.b = 10}. if current datasource is
// `t2`, conds is {t2.a = 20}
// @return - the new condition after adding expression prefix
func (adder *exprPrefixAdder) addExprPrefix4CNFCond(conds []expression.Expression) ([]expression.Expression, error) {
newCondtionds, err := ranger.AddExpr4EqAndInCondition(adder.sctx,
conds, adder.cols)

return newCondtionds, err
}

// AddExprPrefix4DNFCond
// add the prefix expression for DNF condition, e.g. `WHERE a = 1 OR a = 10`, ......
// The condition returned is `WHERE (tidb_shard(a) = 214 AND a = 1) OR (tidb_shard(a) = 142 AND a = 10)`
// @param[in] condition the original condtion of the datasoure. e.g. `WHERE a = 1 OR a = 10`.
// condtion is `a = 1 OR a = 10`
// @return - the new condition after adding expression prefix. It's still a LogicOr expression.
func (adder *exprPrefixAdder) addExprPrefix4DNFCond(condition *expression.ScalarFunction) ([]expression.Expression, error) {
var err error
dnfItems := expression.FlattenDNFConditions(condition)
newAccessItems := make([]expression.Expression, 0, len(dnfItems))

for _, item := range dnfItems {
if sf, ok := item.(*expression.ScalarFunction); ok {
var accesses []expression.Expression
if sf.FuncName.L == ast.LogicAnd {
cnfItems := expression.FlattenCNFConditions(sf)
accesses, err = adder.addExprPrefix4CNFCond(cnfItems)
if err != nil {
return []expression.Expression{condition}, err
}
newAccessItems = append(newAccessItems, expression.ComposeCNFCondition(adder.sctx, accesses...))
} else if sf.FuncName.L == ast.EQ || sf.FuncName.L == ast.In {
// only add prefix expression for EQ or IN function
accesses, err = adder.addExprPrefix4CNFCond([]expression.Expression{sf})
if err != nil {
return []expression.Expression{condition}, err
}
newAccessItems = append(newAccessItems, expression.ComposeCNFCondition(adder.sctx, accesses...))
} else {
newAccessItems = append(newAccessItems, item)
}
} else {
newAccessItems = append(newAccessItems, item)
}
}

return []expression.Expression{expression.ComposeDNFCondition(adder.sctx, newAccessItems...)}, nil
}
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
12 changes: 6 additions & 6 deletions planner/core/testdata/ordered_result_mode_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,12 @@
},
{
"Plan": [
"Selection_8 6400.00 root lt(Column#6, 20)",
"└─Sort_9 8000.00 root Column#5, Column#6, Column#7",
" └─HashAgg_15 8000.00 root group by:test.t1.d, funcs:min(Column#11)->Column#5, funcs:max(Column#12)->Column#6, funcs:sum(Column#13)->Column#7",
" └─TableReader_16 8000.00 root data:HashAgg_11",
" └─HashAgg_11 8000.00 cop[tikv] group by:test.t1.d, funcs:min(test.t1.a)->Column#11, funcs:max(test.t1.b)->Column#12, funcs:sum(test.t1.c)->Column#13",
" └─TableFullScan_14 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"
"Sort_9 6400.00 root Column#5, Column#6, Column#7",
"└─Selection_11 6400.00 root lt(Column#6, 20)",
" └─HashAgg_16 8000.00 root group by:test.t1.d, funcs:min(Column#11)->Column#5, funcs:max(Column#12)->Column#6, funcs:sum(Column#13)->Column#7",
" └─TableReader_17 8000.00 root data:HashAgg_12",
" └─HashAgg_12 8000.00 cop[tikv] group by:test.t1.d, funcs:min(test.t1.a)->Column#11, funcs:max(test.t1.b)->Column#12, funcs:sum(test.t1.c)->Column#13",
" └─TableFullScan_15 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"
]
},
{
Expand Down
16 changes: 12 additions & 4 deletions planner/core/testdata/plan_suite_unexported_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"Name": "TestEagerAggregation",
"Cases": [
"DataScan(t)->Aggr(sum(test.t.a),sum(plus(test.t.a, 1)),count(test.t.a))->Projection",
"DataScan(t)->Aggr(sum(plus(test.t.a, test.t.b)),sum(plus(test.t.a, test.t.c)),count(test.t.a))->Projection->Sel([gt(Column#13, 0)])->Sort->Projection",
"DataScan(t)->Aggr(sum(plus(test.t.a, test.t.b)),sum(plus(test.t.a, test.t.c)),count(test.t.a))->Sel([gt(Column#13, 0)])->Projection->Sort->Projection",
"Join{DataScan(a)->Aggr(sum(test.t.a),firstrow(test.t.c))->DataScan(b)}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection",
"Join{DataScan(a)->DataScan(b)->Aggr(sum(test.t.a),firstrow(test.t.c))}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection",
"Join{DataScan(a)->DataScan(b)->Aggr(sum(test.t.a),firstrow(test.t.c))}(test.t.c,test.t.c)->Aggr(sum(Column#26),firstrow(test.t.a))->Projection",
Expand Down Expand Up @@ -89,7 +89,7 @@
"DataScan(t)->Aggr(sum(test.t.b),firstrow(test.t.a))->Sel([gt(cast(test.t.a, decimal(20,0) BINARY), Column#13)])->Projection->Projection",
"DataScan(t)->Aggr(sum(test.t.b),firstrow(test.t.a))->Sel([gt(test.t.a, 1)])->Projection->Projection",
"Dual->Sel([gt(test.t.a, 1)])->Projection",
"DataScan(t)->Aggr(count(test.t.a),firstrow(test.t.a))->Projection->Sel([lt(Column#13, 1)])",
"DataScan(t)->Aggr(count(test.t.a),firstrow(test.t.a))->Sel([lt(Column#13, 1)])->Projection",
"Join{DataScan(t1)->DataScan(t2)}(test.t.a,test.t.a)->Projection",
"Dual->Projection",
"DataScan(t)->Projection->Projection->Window(min(test.t.a)->Column#14)->Sel([lt(test.t.a, 10) eq(test.t.b, Column#14)])->Projection->Projection",
Expand Down Expand Up @@ -194,7 +194,11 @@
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over())->Sort->Projection",
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over(partition by test.t.a))->Sort->Projection",
"TableReader(Table(t)->StreamAgg)->StreamAgg->Window(sum(Column#13)->Column#15 over())->Sort->Projection",
<<<<<<< HEAD
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#38 over())->MaxOneRow}->Sel([Column#38])->Projection",
=======
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(10,0) BINARY))->Column#38 over())->MaxOneRow->Sel([Column#38])}->Projection",
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
"[planner:3594]You cannot use the alias 'w' of an expression containing a window function in this context.'",
"[planner:1247]Reference 'sum_a' not supported (reference to window function)",
"[planner:3579]Window name 'w2' is not defined.",
Expand Down Expand Up @@ -267,7 +271,11 @@
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over())->Sort->Projection",
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over(partition by test.t.a))->Sort->Projection",
"TableReader(Table(t)->StreamAgg)->StreamAgg->Window(sum(Column#13)->Column#15 over())->Sort->Projection",
<<<<<<< HEAD
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#38 over())->MaxOneRow}->Sel([Column#38])->Projection",
=======
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(10,0) BINARY))->Column#38 over())->MaxOneRow->Sel([Column#38])}->Projection",
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
"[planner:3594]You cannot use the alias 'w' of an expression containing a window function in this context.'",
"[planner:1247]Reference 'sum_a' not supported (reference to window function)",
"[planner:3579]Window name 'w2' is not defined.",
Expand Down Expand Up @@ -482,12 +490,12 @@
"test.t.f"
]
],
"4": [
"5": [
[
"test.t.f"
]
],
"5": [
"6": [
[
"test.t.f"
]
Expand Down
Loading