Skip to content

Commit

Permalink
planner: move logical optimizing trace logic out of core pkg (#52161)
Browse files Browse the repository at this point in the history
ref #51664
  • Loading branch information
AilinKid committed Mar 28, 2024
1 parent f58f5e1 commit 927f3c6
Show file tree
Hide file tree
Showing 28 changed files with 292 additions and 237 deletions.
2 changes: 1 addition & 1 deletion pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ go_library(
"plan_cost_detail.go",
"plan_cost_ver1.go",
"plan_cost_ver2.go",
"plan_stats.go",
"plan_to_pb.go",
"planbuilder.go",
"point_get_plan.go",
Expand All @@ -50,6 +49,7 @@ go_library(
"rule_aggregation_push_down.go",
"rule_aggregation_skew_rewrite.go",
"rule_build_key_info.go",
"rule_collect_plan_stats.go",
"rule_column_pruning.go",
"rule_constant_propagation.go",
"rule_decorrelate.go",
Expand Down
47 changes: 6 additions & 41 deletions pkg/planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,41 +128,6 @@ var optRuleList = []logicalOptRule{
*/
var optInteractionRuleList = map[logicalOptRule]logicalOptRule{}

type logicalOptimizeOp struct {
// tracer is goring to track optimize steps during rule optimizing
tracer *tracing.LogicalOptimizeTracer
}

func defaultLogicalOptimizeOption() *logicalOptimizeOp {
return &logicalOptimizeOp{}
}

func (op *logicalOptimizeOp) withEnableOptimizeTracer(tracer *tracing.LogicalOptimizeTracer) *logicalOptimizeOp {
op.tracer = tracer
return op
}

func (op *logicalOptimizeOp) appendBeforeRuleOptimize(index int, name string, before LogicalPlan) {
if op == nil || op.tracer == nil {
return
}
op.tracer.AppendRuleTracerBeforeRuleOptimize(index, name, before.BuildPlanTrace())
}

func (op *logicalOptimizeOp) appendStepToCurrent(id int, tp string, reason, action func() string) {
if op == nil || op.tracer == nil {
return
}
op.tracer.AppendRuleTracerStepToCurrent(id, tp, reason(), action())
}

func (op *logicalOptimizeOp) recordFinalLogicalPlan(final LogicalPlan) {
if op == nil || op.tracer == nil {
return
}
op.tracer.RecordFinalLogicalPlan(final.BuildPlanTrace())
}

// logicalOptRule means a logical optimizing rule, which contains decorrelate, ppd, column pruning, etc.
type logicalOptRule interface {
/* Return Parameters:
Expand All @@ -172,7 +137,7 @@ type logicalOptRule interface {
The default value is false. It means that no interaction rule will be triggered.
3. error: If there is error during the rule optimizer, it will be thrown
*/
optimize(context.Context, LogicalPlan, *logicalOptimizeOp) (LogicalPlan, bool, error)
optimize(context.Context, LogicalPlan, *plannerutil.LogicalOptimizeOp) (LogicalPlan, bool, error)
name() string
}

Expand Down Expand Up @@ -1157,14 +1122,14 @@ func logicalOptimize(ctx context.Context, flag uint64, logic LogicalPlan) (Logic
debugtrace.EnterContextCommon(logic.SCtx())
defer debugtrace.LeaveContextCommon(logic.SCtx())
}
opt := defaultLogicalOptimizeOption()
opt := plannerutil.DefaultLogicalOptimizeOption()
vars := logic.SCtx().GetSessionVars()
if vars.StmtCtx.EnableOptimizeTrace {
vars.StmtCtx.OptimizeTracer = &tracing.OptimizeTracer{}
tracer := &tracing.LogicalOptimizeTracer{
Steps: make([]*tracing.LogicalRuleOptimizeTracer, 0),
}
opt = opt.withEnableOptimizeTracer(tracer)
opt = opt.WithEnableOptimizeTracer(tracer)
defer func() {
vars.StmtCtx.OptimizeTracer.Logical = tracer
}()
Expand All @@ -1178,7 +1143,7 @@ func logicalOptimize(ctx context.Context, flag uint64, logic LogicalPlan) (Logic
if flag&(1<<uint(i)) == 0 || isLogicalRuleDisabled(rule) {
continue
}
opt.appendBeforeRuleOptimize(i, rule.name(), logic)
opt.AppendBeforeRuleOptimize(i, rule.name(), logic.BuildPlanTrace)
var planChanged bool
logic, planChanged, err = rule.optimize(ctx, logic, opt)
if err != nil {
Expand All @@ -1193,14 +1158,14 @@ func logicalOptimize(ctx context.Context, flag uint64, logic LogicalPlan) (Logic

// Trigger the interaction rule
for i, rule := range againRuleList {
opt.appendBeforeRuleOptimize(i, rule.name(), logic)
opt.AppendBeforeRuleOptimize(i, rule.name(), logic.BuildPlanTrace)
logic, _, err = rule.optimize(ctx, logic, opt)
if err != nil {
return nil, err
}
}

opt.recordFinalLogicalPlan(logic)
opt.RecordFinalLogicalPlan(logic.BuildPlanTrace)
return logic, err
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ type LogicalPlan interface {
// PredicatePushDown pushes down the predicates in the where/on/having clauses as deeply as possible.
// It will accept a predicate that is an expression slice, and return the expressions that can't be pushed.
// Because it might change the root if the having clause exists, we need to return a plan that represents a new root.
PredicatePushDown([]expression.Expression, *logicalOptimizeOp) ([]expression.Expression, LogicalPlan)
PredicatePushDown([]expression.Expression, *util.LogicalOptimizeOp) ([]expression.Expression, LogicalPlan)

// PruneColumns prunes the unused columns, and return the new logical plan if changed, otherwise it's same.
PruneColumns([]*expression.Column, *logicalOptimizeOp) (LogicalPlan, error)
PruneColumns([]*expression.Column, *util.LogicalOptimizeOp) (LogicalPlan, error)

// findBestTask converts the logical plan to the physical plan. It's a new interface.
// It is called recursively from the parent to the children to create the result physical plan.
Expand All @@ -292,16 +292,16 @@ type LogicalPlan interface {
BuildKeyInfo(selfSchema *expression.Schema, childSchema []*expression.Schema)

// pushDownTopN will push down the topN or limit operator during logical optimization.
pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan
pushDownTopN(topN *LogicalTopN, opt *util.LogicalOptimizeOp) LogicalPlan

// deriveTopN derives an implicit TopN from a filter on row_number window function..
deriveTopN(opt *logicalOptimizeOp) LogicalPlan
deriveTopN(opt *util.LogicalOptimizeOp) LogicalPlan

// predicateSimplification consolidates different predcicates on a column and its equivalence classes.
predicateSimplification(opt *logicalOptimizeOp) LogicalPlan
predicateSimplification(opt *util.LogicalOptimizeOp) LogicalPlan

// constantPropagation generate new constant predicate according to column equivalence relation
constantPropagation(parentPlan LogicalPlan, currentChildIdx int, opt *logicalOptimizeOp) (newRoot LogicalPlan)
constantPropagation(parentPlan LogicalPlan, currentChildIdx int, opt *util.LogicalOptimizeOp) (newRoot LogicalPlan)

// pullUpConstantPredicates recursive find constant predicate, used for the constant propagation rule
pullUpConstantPredicates() []expression.Expression
Expand Down Expand Up @@ -775,7 +775,7 @@ func (*baseLogicalPlan) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
}

// PruneColumns implements LogicalPlan interface.
func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column, opt *logicalOptimizeOp) (LogicalPlan, error) {
func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column, opt *util.LogicalOptimizeOp) (LogicalPlan, error) {
if len(p.children) == 0 {
return p.self, nil
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/planner/core/rule_aggregation_elimination.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/types"
)

Expand All @@ -47,7 +48,7 @@ type aggregationEliminateChecker struct {
// e.g. select min(b) from t group by a. If a is a unique key, then this sql is equal to `select b from t group by a`.
// For count(expr), sum(expr), avg(expr), count(distinct expr, [expr...]) we may need to rewrite the expr. Details are shown below.
// If we can eliminate agg successful, we return a projection. Else we return a nil pointer.
func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggregation, opt *logicalOptimizeOp) *LogicalProjection {
func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggregation, opt *util.LogicalOptimizeOp) *LogicalProjection {
for _, af := range agg.AggFuncs {
// TODO(issue #9968): Actually, we can rewrite GROUP_CONCAT when all the
// arguments it accepts are promised to be NOT-NULL.
Expand Down Expand Up @@ -88,7 +89,7 @@ func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggr

// tryToEliminateDistinct will eliminate distinct in the aggregation function if the aggregation args
// have unique key column. see detail example in https://github.com/pingcap/tidb/issues/23436
func (*aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggregation, opt *logicalOptimizeOp) {
func (*aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggregation, opt *util.LogicalOptimizeOp) {
for _, af := range agg.AggFuncs {
if af.HasDistinct {
cols := make([]*expression.Column, 0, len(af.Args))
Expand Down Expand Up @@ -128,26 +129,26 @@ func (*aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggregati
}
}

func appendAggregationEliminateTraceStep(agg *LogicalAggregation, proj *LogicalProjection, uniqueKey expression.KeyInfo, opt *logicalOptimizeOp) {
func appendAggregationEliminateTraceStep(agg *LogicalAggregation, proj *LogicalProjection, uniqueKey expression.KeyInfo, opt *util.LogicalOptimizeOp) {
reason := func() string {
return fmt.Sprintf("%s is a unique key", uniqueKey.String())
}
action := func() string {
return fmt.Sprintf("%v_%v is simplified to a %v_%v", agg.TP(), agg.ID(), proj.TP(), proj.ID())
}

opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}

func appendDistinctEliminateTraceStep(agg *LogicalAggregation, uniqueKey expression.KeyInfo, af *aggregation.AggFuncDesc,
opt *logicalOptimizeOp) {
opt *util.LogicalOptimizeOp) {
reason := func() string {
return fmt.Sprintf("%s is a unique key", uniqueKey.String())
}
action := func() string {
return fmt.Sprintf("%s(distinct ...) is simplified to %s(...)", af.Name, af.Name)
}
opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}

// CheckCanConvertAggToProj check whether a special old aggregation (which has already been pushed down) to projection.
Expand Down Expand Up @@ -253,7 +254,7 @@ func wrapCastFunction(ctx expression.BuildContext, arg expression.Expression, ta
return expression.BuildCastFunction(ctx, arg, targetTp)
}

func (a *aggregationEliminator) optimize(ctx context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (a *aggregationEliminator) optimize(ctx context.Context, p LogicalPlan, opt *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
newChildren := make([]LogicalPlan, 0, len(p.Children()))
for _, child := range p.Children() {
Expand Down
20 changes: 10 additions & 10 deletions pkg/planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (*aggregationPushDownSolver) decompose(ctx PlanContext, aggFunc *aggregatio
// process it temporarily. If not, We will add additional group by columns and first row functions. We make a new aggregation operator.
// If the pushed aggregation is grouped by unique key, it's no need to push it down.
func (a *aggregationPushDownSolver) tryToPushDownAgg(oldAgg *LogicalAggregation, aggFuncs []*aggregation.AggFuncDesc, gbyCols []*expression.Column,
join *LogicalJoin, childIdx int, blockOffset int, opt *logicalOptimizeOp) (_ LogicalPlan, err error) {
join *LogicalJoin, childIdx int, blockOffset int, opt *util.LogicalOptimizeOp) (_ LogicalPlan, err error) {
child := join.children[childIdx]
if aggregation.IsAllFirstRow(aggFuncs) {
return child, nil
Expand Down Expand Up @@ -433,13 +433,13 @@ func (*aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, uni
return newAgg, nil
}

func (a *aggregationPushDownSolver) optimize(_ context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (a *aggregationPushDownSolver) optimize(_ context.Context, p LogicalPlan, opt *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
newLogicalPlan, err := a.aggPushDown(p, opt)
return newLogicalPlan, planChanged, err
}

func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAll, agg *LogicalAggregation, opt *logicalOptimizeOp) error {
func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAll, agg *LogicalAggregation, opt *util.LogicalOptimizeOp) error {
for _, aggFunc := range agg.AggFuncs {
if !a.isDecomposableWithUnion(aggFunc) {
return nil
Expand Down Expand Up @@ -474,7 +474,7 @@ func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAl
}

// aggPushDown tries to push down aggregate functions to join paths.
func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan, opt *logicalOptimizeOp) (_ LogicalPlan, err error) {
func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan, opt *util.LogicalOptimizeOp) (_ LogicalPlan, err error) {
if agg, ok := p.(*LogicalAggregation); ok {
proj := a.tryToEliminateAggregation(agg, opt)
if proj != nil {
Expand Down Expand Up @@ -683,7 +683,7 @@ func (*aggregationPushDownSolver) name() string {
}

func appendAggPushDownAcrossJoinTraceStep(oldAgg, newAgg *LogicalAggregation, aggFuncs []*aggregation.AggFuncDesc, join *LogicalJoin,
childIdx int, opt *logicalOptimizeOp) {
childIdx int, opt *util.LogicalOptimizeOp) {
reason := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v's functions[", oldAgg.TP(), oldAgg.ID()))
for i, aggFunc := range aggFuncs {
Expand All @@ -705,10 +705,10 @@ func appendAggPushDownAcrossJoinTraceStep(oldAgg, newAgg *LogicalAggregation, ag
}(), newAgg.TP(), newAgg.ID())
return buffer.String()
}
opt.appendStepToCurrent(join.ID(), join.TP(), reason, action)
opt.AppendStepToCurrent(join.ID(), join.TP(), reason, action)
}

func appendAggPushDownAcrossProjTraceStep(agg *LogicalAggregation, proj *LogicalProjection, opt *logicalOptimizeOp) {
func appendAggPushDownAcrossProjTraceStep(agg *LogicalAggregation, proj *LogicalProjection, opt *util.LogicalOptimizeOp) {
action := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v is eliminated, and %v_%v's functions changed into[", proj.TP(), proj.ID(), agg.TP(), agg.ID()))
for i, aggFunc := range agg.AggFuncs {
Expand All @@ -723,10 +723,10 @@ func appendAggPushDownAcrossProjTraceStep(agg *LogicalAggregation, proj *Logical
reason := func() string {
return fmt.Sprintf("%v_%v is directly below an %v_%v and has no side effects", proj.TP(), proj.ID(), agg.TP(), agg.ID())
}
opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}

func appendAggPushDownAcrossUnionTraceStep(union *LogicalUnionAll, agg *LogicalAggregation, opt *logicalOptimizeOp) {
func appendAggPushDownAcrossUnionTraceStep(union *LogicalUnionAll, agg *LogicalAggregation, opt *util.LogicalOptimizeOp) {
reason := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v functions[", agg.TP(), agg.ID()))
for i, aggFunc := range agg.AggFuncs {
Expand All @@ -749,5 +749,5 @@ func appendAggPushDownAcrossUnionTraceStep(union *LogicalUnionAll, agg *LogicalA
buffer.WriteString("]")
return buffer.String()
}
opt.appendStepToCurrent(union.ID(), union.TP(), reason, action)
opt.AppendStepToCurrent(union.ID(), union.TP(), reason, action)
}
9 changes: 5 additions & 4 deletions pkg/planner/core/rule_aggregation_skew_rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/util/intset"
)

Expand All @@ -46,7 +47,7 @@ type skewDistinctAggRewriter struct {
// - The aggregate has 1 and only 1 distinct aggregate function (limited to count, avg, sum)
//
// This rule is disabled by default. Use tidb_opt_skew_distinct_agg to enable the rule.
func (a *skewDistinctAggRewriter) rewriteSkewDistinctAgg(agg *LogicalAggregation, opt *logicalOptimizeOp) LogicalPlan {
func (a *skewDistinctAggRewriter) rewriteSkewDistinctAgg(agg *LogicalAggregation, opt *util.LogicalOptimizeOp) LogicalPlan {
// only group aggregate is applicable
if len(agg.GroupByItems) == 0 {
return nil
Expand Down Expand Up @@ -262,18 +263,18 @@ func (*skewDistinctAggRewriter) isQualifiedAgg(aggFunc *aggregation.AggFuncDesc)
}
}

func appendSkewDistinctAggRewriteTraceStep(agg *LogicalAggregation, result LogicalPlan, opt *logicalOptimizeOp) {
func appendSkewDistinctAggRewriteTraceStep(agg *LogicalAggregation, result LogicalPlan, opt *util.LogicalOptimizeOp) {
reason := func() string {
return fmt.Sprintf("%v_%v has a distinct agg function", agg.TP(), agg.ID())
}
action := func() string {
return fmt.Sprintf("%v_%v is rewritten to a %v_%v", agg.TP(), agg.ID(), result.TP(), result.ID())
}

opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
opt.AppendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}

func (a *skewDistinctAggRewriter) optimize(ctx context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (a *skewDistinctAggRewriter) optimize(ctx context.Context, p LogicalPlan, opt *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
newChildren := make([]LogicalPlan, 0, len(p.Children()))
for _, child := range p.Children() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/planner/core/rule_build_key_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/util"
)

type buildKeySolver struct{}

func (*buildKeySolver) optimize(_ context.Context, p LogicalPlan, _ *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (*buildKeySolver) optimize(_ context.Context, p LogicalPlan, _ *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
buildKeyInfo(p)
return p, planChanged, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/table"
Expand All @@ -31,7 +32,7 @@ import (

type collectPredicateColumnsPoint struct{}

func (collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPlan, _ *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPlan, _ *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
if plan.SCtx().GetSessionVars().InRestrictedSQL {
return plan, planChanged, nil
Expand Down Expand Up @@ -77,7 +78,7 @@ func (collectPredicateColumnsPoint) name() string {

type syncWaitStatsLoadPoint struct{}

func (syncWaitStatsLoadPoint) optimize(_ context.Context, plan LogicalPlan, _ *logicalOptimizeOp) (LogicalPlan, bool, error) {
func (syncWaitStatsLoadPoint) optimize(_ context.Context, plan LogicalPlan, _ *util.LogicalOptimizeOp) (LogicalPlan, bool, error) {
planChanged := false
if plan.SCtx().GetSessionVars().InRestrictedSQL {
return plan, planChanged, nil
Expand Down
Loading

0 comments on commit 927f3c6

Please sign in to comment.