Skip to content

Commit

Permalink
planner: trace predicate push down (#30902)
Browse files Browse the repository at this point in the history
ref #29661
  • Loading branch information
Yisaer committed Dec 24, 2021
1 parent 6e6db1f commit 30c5f5b
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 33 deletions.
38 changes: 38 additions & 0 deletions planner/core/logical_plan_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,44 @@ func (s *testPlanSuite) TestSingleRuleTraceStep(c *C) {
assertRuleName string
assertRuleSteps []assertTraceStep
}{
{
sql: "select * from t as t1 join t as t2 on t1.a = t2.a where t1.a < 1;",
flags: []uint64{flagPredicatePushDown, flagBuildKeyInfo, flagPrunColumns},
assertRuleName: "predicate_push_down",
assertRuleSteps: []assertTraceStep{
{
assertReason: "",
assertAction: "The conditions[lt(test.t.a, 1)] are pushed down across DataSource_1",
},
{
assertReason: "",
assertAction: "The conditions[lt(test.t.a, 1)] are pushed down across DataSource_2",
},
{
assertAction: "Selection_4 is removed",
assertReason: "The conditions[eq(test.t.a, test.t.a)] in Selection_4 are pushed down",
},
{
assertAction: "Selection_5 is removed",
assertReason: "The conditions[lt(test.t.a, 1)] in Selection_5 are pushed down",
},
},
},
{
sql: "select * from t where a < 1;",
flags: []uint64{flagPredicatePushDown, flagBuildKeyInfo, flagPrunColumns},
assertRuleName: "predicate_push_down",
assertRuleSteps: []assertTraceStep{
{
assertReason: "",
assertAction: "The conditions[lt(test.t.a, 1)] are pushed down across DataSource_1",
},
{
assertReason: "The conditions[lt(test.t.a, 1)] in Selection_2 are pushed down",
assertAction: "Selection_2 is removed",
},
},
},
{
sql: "select * from t as t1 left join t as t2 on t1.a = t2.a order by t1.a limit 10;",
flags: []uint64{flagPrunColumns, flagBuildKeyInfo, flagPushDownTopN},
Expand Down
2 changes: 1 addition & 1 deletion planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ type LogicalPlan interface {
// PredicatePushDown pushes down the predicates in the where/on/having clauses as deeply as possible.
// It will accept a predicate that is an expression slice, and return the expressions that can't be pushed.
// Because it might change the root if the having clause exists, we need to return a plan that represents a new root.
PredicatePushDown([]expression.Expression) ([]expression.Expression, LogicalPlan)
PredicatePushDown([]expression.Expression, *logicalOptimizeOp) ([]expression.Expression, LogicalPlan)

// PruneColumns prunes the unused columns.
PruneColumns([]*expression.Column) error
Expand Down
149 changes: 117 additions & 32 deletions planner/core/rule_predicate_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package core

import (
"bytes"
"context"
"fmt"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand All @@ -28,11 +30,11 @@ import (
type ppdSolver struct{}

func (s *ppdSolver) optimize(ctx context.Context, lp LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, error) {
_, p := lp.PredicatePushDown(nil)
_, p := lp.PredicatePushDown(nil, opt)
return p, nil
}

func addSelection(p LogicalPlan, child LogicalPlan, conditions []expression.Expression, chIdx int) {
func addSelection(p LogicalPlan, child LogicalPlan, conditions []expression.Expression, chIdx int, opt *logicalOptimizeOp) {
if len(conditions) == 0 {
p.Children()[chIdx] = child
return
Expand All @@ -42,6 +44,7 @@ func addSelection(p LogicalPlan, child LogicalPlan, conditions []expression.Expr
dual := Conds2TableDual(child, conditions)
if dual != nil {
p.Children()[chIdx] = dual
appendTableDualTraceStep(child, dual, conditions, opt)
return
}

Expand All @@ -53,16 +56,17 @@ func addSelection(p LogicalPlan, child LogicalPlan, conditions []expression.Expr
selection := LogicalSelection{Conditions: conditions}.Init(p.SCtx(), p.SelectBlockOffset())
selection.SetChildren(child)
p.Children()[chIdx] = selection
appendAddSelectionTraceStep(p, child, selection, opt)
}

// PredicatePushDown implements LogicalPlan interface.
func (p *baseLogicalPlan) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
func (p *baseLogicalPlan) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
if len(p.children) == 0 {
return predicates, p.self
}
child := p.children[0]
rest, newChild := child.PredicatePushDown(predicates)
addSelection(p.self, newChild, rest, 0)
rest, newChild := child.PredicatePushDown(predicates, opt)
addSelection(p.self, newChild, rest, 0, opt)
return nil, p.self
}

Expand All @@ -80,56 +84,61 @@ func splitSetGetVarFunc(filters []expression.Expression) ([]expression.Expressio
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
predicates = DeleteTrueExprs(p, predicates)
p.Conditions = DeleteTrueExprs(p, p.Conditions)
var child LogicalPlan
var retConditions []expression.Expression
var originConditions []expression.Expression
if p.buildByHaving {
retConditions, child = p.children[0].PredicatePushDown(predicates)
retConditions, child = p.children[0].PredicatePushDown(predicates, opt)
retConditions = append(retConditions, p.Conditions...)
} else {
canBePushDown, canNotBePushDown := splitSetGetVarFunc(p.Conditions)
retConditions, child = p.children[0].PredicatePushDown(append(canBePushDown, predicates...))
originConditions = canBePushDown
retConditions, child = p.children[0].PredicatePushDown(append(canBePushDown, predicates...), opt)
retConditions = append(retConditions, canNotBePushDown...)
}
if len(retConditions) > 0 {
p.Conditions = expression.PropagateConstant(p.ctx, retConditions)
// Return table dual when filter is constant false or null.
dual := Conds2TableDual(p, p.Conditions)
if dual != nil {
appendTableDualTraceStep(p, dual, p.Conditions, opt)
return nil, dual
}
return nil, p
}
appendSelectionPredicatePushDownTraceStep(p, originConditions, opt)
return nil, child
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *LogicalUnionScan) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
retainedPredicates, _ := p.children[0].PredicatePushDown(predicates)
func (p *LogicalUnionScan) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
retainedPredicates, _ := p.children[0].PredicatePushDown(predicates, opt)
p.conditions = make([]expression.Expression, 0, len(predicates))
p.conditions = append(p.conditions, predicates...)
// The conditions in UnionScan is only used for added rows, so parent Selection should not be removed.
return retainedPredicates, p
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (ds *DataSource) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
func (ds *DataSource) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
predicates = expression.PropagateConstant(ds.ctx, predicates)
predicates = DeleteTrueExprs(ds, predicates)
ds.allConds = predicates
ds.pushedDownConds, predicates = expression.PushDownExprs(ds.ctx.GetSessionVars().StmtCtx, predicates, ds.ctx.GetClient(), kv.UnSpecified)
appendDataSourcePredicatePushDownTraceStep(ds, opt)
return predicates, ds
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *LogicalTableDual) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
func (p *LogicalTableDual) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
return predicates, p
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *LogicalJoin) PredicatePushDown(predicates []expression.Expression) (ret []expression.Expression, retPlan LogicalPlan) {
func (p *LogicalJoin) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) (ret []expression.Expression, retPlan LogicalPlan) {
simplifyOuterJoin(p, predicates)
var equalCond []*expression.ScalarFunction
var leftPushCond, rightPushCond, otherCond, leftCond, rightCond []expression.Expression
Expand All @@ -138,6 +147,7 @@ func (p *LogicalJoin) PredicatePushDown(predicates []expression.Expression) (ret
predicates = p.outerJoinPropConst(predicates)
dual := Conds2TableDual(p, predicates)
if dual != nil {
appendTableDualTraceStep(p, dual, predicates, opt)
return ret, dual
}
// Handle where conditions
Expand All @@ -156,6 +166,7 @@ func (p *LogicalJoin) PredicatePushDown(predicates []expression.Expression) (ret
predicates = p.outerJoinPropConst(predicates)
dual := Conds2TableDual(p, predicates)
if dual != nil {
appendTableDualTraceStep(p, dual, predicates, opt)
return ret, dual
}
// Handle where conditions
Expand All @@ -182,6 +193,7 @@ func (p *LogicalJoin) PredicatePushDown(predicates []expression.Expression) (ret
// Return table dual when filter is constant false or null.
dual := Conds2TableDual(p, tempCond)
if dual != nil {
appendTableDualTraceStep(p, dual, tempCond, opt)
return ret, dual
}
equalCond, leftPushCond, rightPushCond, otherCond = p.extractOnCondition(tempCond, true, true)
Expand All @@ -196,6 +208,7 @@ func (p *LogicalJoin) PredicatePushDown(predicates []expression.Expression) (ret
// Return table dual when filter is constant false or null.
dual := Conds2TableDual(p, predicates)
if dual != nil {
appendTableDualTraceStep(p, dual, predicates, opt)
return ret, dual
}
// `predicates` should only contain left conditions or constant filters.
Expand All @@ -212,10 +225,10 @@ func (p *LogicalJoin) PredicatePushDown(predicates []expression.Expression) (ret
}
leftCond = expression.RemoveDupExprs(p.ctx, leftCond)
rightCond = expression.RemoveDupExprs(p.ctx, rightCond)
leftRet, lCh := p.children[0].PredicatePushDown(leftCond)
rightRet, rCh := p.children[1].PredicatePushDown(rightCond)
addSelection(p, lCh, leftRet, 0)
addSelection(p, rCh, rightRet, 1)
leftRet, lCh := p.children[0].PredicatePushDown(leftCond, opt)
rightRet, rCh := p.children[1].PredicatePushDown(rightCond, opt)
addSelection(p, lCh, leftRet, 0, opt)
addSelection(p, rCh, rightRet, 1, opt)
p.updateEQCond()
buildKeyInfo(p)
return ret, p.self
Expand Down Expand Up @@ -380,12 +393,12 @@ func isNullRejected(ctx sessionctx.Context, schema *expression.Schema, expr expr
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *LogicalProjection) PredicatePushDown(predicates []expression.Expression) (ret []expression.Expression, retPlan LogicalPlan) {
func (p *LogicalProjection) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) (ret []expression.Expression, retPlan LogicalPlan) {
canBePushed := make([]expression.Expression, 0, len(predicates))
canNotBePushed := make([]expression.Expression, 0, len(predicates))
for _, expr := range p.Exprs {
if expression.HasAssignSetVarFunc(expr) {
_, child := p.baseLogicalPlan.PredicatePushDown(nil)
_, child := p.baseLogicalPlan.PredicatePushDown(nil, opt)
return predicates, child
}
}
Expand All @@ -397,23 +410,23 @@ func (p *LogicalProjection) PredicatePushDown(predicates []expression.Expression
canNotBePushed = append(canNotBePushed, cond)
}
}
remained, child := p.baseLogicalPlan.PredicatePushDown(canBePushed)
remained, child := p.baseLogicalPlan.PredicatePushDown(canBePushed, opt)
return append(remained, canNotBePushed...), child
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *LogicalUnionAll) PredicatePushDown(predicates []expression.Expression) (ret []expression.Expression, retPlan LogicalPlan) {
func (p *LogicalUnionAll) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) (ret []expression.Expression, retPlan LogicalPlan) {
for i, proj := range p.children {
newExprs := make([]expression.Expression, 0, len(predicates))
newExprs = append(newExprs, predicates...)
retCond, newChild := proj.PredicatePushDown(newExprs)
addSelection(p, newChild, retCond, i)
retCond, newChild := proj.PredicatePushDown(newExprs, opt)
addSelection(p, newChild, retCond, i, opt)
}
return nil, p
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (la *LogicalAggregation) PredicatePushDown(predicates []expression.Expression) (ret []expression.Expression, retPlan LogicalPlan) {
func (la *LogicalAggregation) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) (ret []expression.Expression, retPlan LogicalPlan) {
var condsToPush []expression.Expression
exprsOriginal := make([]expression.Expression, 0, len(la.AggFuncs))
for _, fun := range la.AggFuncs {
Expand Down Expand Up @@ -447,21 +460,21 @@ func (la *LogicalAggregation) PredicatePushDown(predicates []expression.Expressi
ret = append(ret, cond)
}
}
la.baseLogicalPlan.PredicatePushDown(condsToPush)
la.baseLogicalPlan.PredicatePushDown(condsToPush, opt)
return ret, la
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *LogicalLimit) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
func (p *LogicalLimit) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
// Limit forbids any condition to push down.
p.baseLogicalPlan.PredicatePushDown(nil)
p.baseLogicalPlan.PredicatePushDown(nil, opt)
return predicates, p
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *LogicalMaxOneRow) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
func (p *LogicalMaxOneRow) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
// MaxOneRow forbids any condition to push down.
p.baseLogicalPlan.PredicatePushDown(nil)
p.baseLogicalPlan.PredicatePushDown(nil, opt)
return predicates, p
}

Expand Down Expand Up @@ -610,7 +623,7 @@ func (p *LogicalWindow) GetPartitionByCols() []*expression.Column {
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *LogicalWindow) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
func (p *LogicalWindow) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
canBePushed := make([]expression.Expression, 0, len(predicates))
canNotBePushed := make([]expression.Expression, 0, len(predicates))
partitionCols := expression.NewSchema(p.GetPartitionByCols()...)
Expand All @@ -623,12 +636,12 @@ func (p *LogicalWindow) PredicatePushDown(predicates []expression.Expression) ([
canNotBePushed = append(canNotBePushed, cond)
}
}
p.baseLogicalPlan.PredicatePushDown(canBePushed)
p.baseLogicalPlan.PredicatePushDown(canBePushed, opt)
return canNotBePushed, p
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *LogicalMemTable) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
func (p *LogicalMemTable) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
if p.Extractor != nil {
predicates = p.Extractor.Extract(p.ctx, p.schema, p.names, predicates)
}
Expand All @@ -638,3 +651,75 @@ func (p *LogicalMemTable) PredicatePushDown(predicates []expression.Expression)
func (*ppdSolver) name() string {
return "predicate_push_down"
}

func appendTableDualTraceStep(replaced LogicalPlan, dual LogicalPlan, conditions []expression.Expression, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v is replaced by %v_%v", replaced.TP(), replaced.ID(), dual.TP(), dual.ID())
}
reason := func() string {
buffer := bytes.NewBufferString("The conditions[")
for i, cond := range conditions {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString("] are constant false or null")
return buffer.String()
}
opt.appendStepToCurrent(dual.ID(), dual.TP(), reason, action)
}

func appendSelectionPredicatePushDownTraceStep(p *LogicalSelection, conditions []expression.Expression, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v is removed", p.TP(), p.ID())
}
reason := func() string {
return ""
}
if len(conditions) > 0 && !p.buildByHaving {
reason = func() string {
buffer := bytes.NewBufferString("The conditions[")
for i, cond := range conditions {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString(fmt.Sprintf("] in %v_%v are pushed down", p.TP(), p.ID()))
return buffer.String()
}
}
opt.appendStepToCurrent(p.ID(), p.TP(), reason, action)
}

func appendDataSourcePredicatePushDownTraceStep(ds *DataSource, opt *logicalOptimizeOp) {
if len(ds.pushedDownConds) < 1 {
return
}
reason := func() string {
return ""
}
action := func() string {
buffer := bytes.NewBufferString("The conditions[")
for i, cond := range ds.pushedDownConds {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString(fmt.Sprintf("] are pushed down across %v_%v", ds.TP(), ds.ID()))
return buffer.String()
}
opt.appendStepToCurrent(ds.ID(), ds.TP(), reason, action)
}

func appendAddSelectionTraceStep(p LogicalPlan, child LogicalPlan, sel *LogicalSelection, opt *logicalOptimizeOp) {
reason := func() string {
return ""
}
action := func() string {
return fmt.Sprintf("add %v_%v to connect %v_%v and %v_%v", sel.TP(), sel.ID(), p.TP(), p.ID(), child.TP(), child.ID())
}
opt.appendStepToCurrent(sel.ID(), sel.TP(), reason, action)
}

0 comments on commit 30c5f5b

Please sign in to comment.