Skip to content

Commit

Permalink
planner: implement aggregation eliminate optimize trace (#30114)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer committed Nov 29, 2021
1 parent eb7672f commit 3baa95f
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 75 deletions.
15 changes: 10 additions & 5 deletions expression/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ func (ki KeyInfo) Clone() KeyInfo {
return result
}

// String implements fmt.Stringer interface.
func (ki KeyInfo) String() string {
ukColStrs := make([]string, 0, len(ki))
for _, col := range ki {
ukColStrs = append(ukColStrs, col.String())
}
return "[" + strings.Join(ukColStrs, ",") + "]"
}

// Schema stands for the row schema and unique key information get from input.
type Schema struct {
Columns []*Column
Expand All @@ -47,11 +56,7 @@ func (s *Schema) String() string {
}
ukStrs := make([]string, 0, len(s.Keys))
for _, key := range s.Keys {
ukColStrs := make([]string, 0, len(key))
for _, col := range key {
ukColStrs = append(ukColStrs, col.String())
}
ukStrs = append(ukStrs, "["+strings.Join(ukColStrs, ",")+"]")
ukStrs = append(ukStrs, key.String())
}
return "Column: [" + strings.Join(colStrs, ",") + "] Unique key: [" + strings.Join(ukStrs, ",") + "]"
}
Expand Down
55 changes: 0 additions & 55 deletions planner/core/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2071,58 +2071,3 @@ func (s *testPlanSuite) TestWindowLogicalPlanAmbiguous(c *C) {
}
}
}

func (s *testPlanSuite) TestLogicalOptimizeWithTraceEnabled(c *C) {
sql := "select * from t where a in (1,2)"
defer testleak.AfterTest(c)()
tt := []struct {
flags []uint64
steps int
}{
{
flags: []uint64{
flagEliminateAgg,
flagPushDownAgg},
steps: 2,
},
{
flags: []uint64{
flagEliminateAgg,
flagPushDownAgg,
flagPrunColumns,
flagBuildKeyInfo,
},
steps: 4,
},
{
flags: []uint64{},
steps: 0,
},
}

for i, tc := range tt {
comment := Commentf("case:%v sql:%s", i, sql)
stmt, err := s.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil, comment)
err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is}))
c.Assert(err, IsNil, comment)
sctx := MockContext()
sctx.GetSessionVars().EnableStmtOptimizeTrace = true
builder, _ := NewPlanBuilder().Init(sctx, s.is, &hint.BlockHintProcessor{})
domain.GetDomain(sctx).MockInfoCacheAndLoadInfoSchema(s.is)
ctx := context.TODO()
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
flag := uint64(0)
for _, f := range tc.flags {
flag = flag | f
}
p, err = logicalOptimize(ctx, flag, p.(LogicalPlan))
c.Assert(err, IsNil)
_, ok := p.(*LogicalProjection)
c.Assert(ok, IsTrue)
otrace := sctx.GetSessionVars().StmtCtx.LogicalOptimizeTrace
c.Assert(otrace, NotNil)
c.Assert(len(otrace.Steps), Equals, tc.steps)
}
}
147 changes: 147 additions & 0 deletions planner/core/logical_plan_trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/testleak"
)

func (s *testPlanSuite) TestLogicalOptimizeWithTraceEnabled(c *C) {
sql := "select * from t where a in (1,2)"
defer testleak.AfterTest(c)()
tt := []struct {
flags []uint64
steps int
}{
{
flags: []uint64{
flagEliminateAgg,
flagPushDownAgg},
steps: 2,
},
{
flags: []uint64{
flagEliminateAgg,
flagPushDownAgg,
flagPrunColumns,
flagBuildKeyInfo,
},
steps: 4,
},
{
flags: []uint64{},
steps: 0,
},
}

for i, tc := range tt {
comment := Commentf("case:%v sql:%s", i, sql)
stmt, err := s.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil, comment)
err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is}))
c.Assert(err, IsNil, comment)
sctx := MockContext()
sctx.GetSessionVars().EnableStmtOptimizeTrace = true
builder, _ := NewPlanBuilder().Init(sctx, s.is, &hint.BlockHintProcessor{})
domain.GetDomain(sctx).MockInfoCacheAndLoadInfoSchema(s.is)
ctx := context.TODO()
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
flag := uint64(0)
for _, f := range tc.flags {
flag = flag | f
}
p, err = logicalOptimize(ctx, flag, p.(LogicalPlan))
c.Assert(err, IsNil)
_, ok := p.(*LogicalProjection)
c.Assert(ok, IsTrue)
otrace := sctx.GetSessionVars().StmtCtx.LogicalOptimizeTrace
c.Assert(otrace, NotNil)
c.Assert(len(otrace.Steps), Equals, tc.steps)
}
}

func (s *testPlanSuite) TestSingleRuleTraceStep(c *C) {
defer testleak.AfterTest(c)()
tt := []struct {
sql string
flags []uint64
assertRuleName string
assertRuleSteps []assertTraceStep
}{
{
sql: "select min(distinct a) from t group by a",
flags: []uint64{flagBuildKeyInfo, flagEliminateAgg},
assertRuleName: "aggregation_eliminate",
assertRuleSteps: []assertTraceStep{
{
assertReason: "[test.t.a] is a unique key",
assertAction: "min(distinct ...) is simplified to min(...)",
},
{
assertReason: "[test.t.a] is a unique key",
assertAction: "aggregation is simplified to a projection",
},
},
},
}

for i, tc := range tt {
sql := tc.sql
comment := Commentf("case:%v sql:%s", i, sql)
stmt, err := s.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil, comment)
err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is}))
c.Assert(err, IsNil, comment)
sctx := MockContext()
sctx.GetSessionVars().EnableStmtOptimizeTrace = true
builder, _ := NewPlanBuilder().Init(sctx, s.is, &hint.BlockHintProcessor{})
domain.GetDomain(sctx).MockInfoCacheAndLoadInfoSchema(s.is)
ctx := context.TODO()
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
flag := uint64(0)
for _, f := range tc.flags {
flag = flag | f
}
p, err = logicalOptimize(ctx, flag, p.(LogicalPlan))
c.Assert(err, IsNil)
_, ok := p.(*LogicalProjection)
c.Assert(ok, IsTrue)
otrace := sctx.GetSessionVars().StmtCtx.LogicalOptimizeTrace
c.Assert(otrace, NotNil)
assert := false
for _, step := range otrace.Steps {
if step.RuleName == tc.assertRuleName {
assert = true
for i, ruleStep := range step.Steps {
c.Assert(ruleStep.Action, Equals, tc.assertRuleSteps[i].assertAction)
c.Assert(ruleStep.Reason, Equals, tc.assertRuleSteps[i].assertReason)
}
}
}
c.Assert(assert, IsTrue)
}
}

type assertTraceStep struct {
assertReason string
assertAction string
}
4 changes: 2 additions & 2 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (op *logicalOptimizeOp) appendBeforeRuleOptimize(index int, name string, be
if op.tracer == nil {
return
}
op.tracer.AppendRuleTracerBeforeRuleOptimize(index, name, before.buildLogicalPlanTrace())
op.tracer.AppendRuleTracerBeforeRuleOptimize(index, name, before.buildLogicalPlanTrace(before))
}

func (op *logicalOptimizeOp) appendStepToCurrent(id int, tp, reason, action string) {
Expand All @@ -116,7 +116,7 @@ func (op *logicalOptimizeOp) trackAfterRuleOptimize(after LogicalPlan) {
if op.tracer == nil {
return
}
op.tracer.TrackLogicalPlanAfterRuleOptimize(after.buildLogicalPlanTrace())
op.tracer.TrackLogicalPlanAfterRuleOptimize(after.buildLogicalPlanTrace(after))
}

// logicalOptRule means a logical optimizing rule, which contains decorrelate, ppd, column pruning, etc.
Expand Down
8 changes: 4 additions & 4 deletions planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ type LogicalPlan interface {
canPushToCop(store kv.StoreType) bool

// buildLogicalPlanTrace clone necessary information from LogicalPlan
buildLogicalPlanTrace() *tracing.LogicalPlanTrace
buildLogicalPlanTrace(p Plan) *tracing.LogicalPlanTrace
}

// PhysicalPlan is a tree of the physical operators.
Expand Down Expand Up @@ -382,10 +382,10 @@ func (p *baseLogicalPlan) ExplainInfo() string {
}

// buildLogicalPlanTrace implements LogicalPlan
func (p *baseLogicalPlan) buildLogicalPlanTrace() *tracing.LogicalPlanTrace {
planTrace := &tracing.LogicalPlanTrace{ID: p.ID(), TP: p.TP()}
func (p *baseLogicalPlan) buildLogicalPlanTrace(plan Plan) *tracing.LogicalPlanTrace {
planTrace := &tracing.LogicalPlanTrace{ID: p.ID(), TP: p.TP(), ExplainInfo: plan.ExplainInfo()}
for _, child := range p.Children() {
planTrace.Children = append(planTrace.Children, child.buildLogicalPlanTrace())
planTrace.Children = append(planTrace.Children, child.buildLogicalPlanTrace(child))
}
return planTrace
}
Expand Down
29 changes: 25 additions & 4 deletions planner/core/rule_aggregation_elimination.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core

import (
"context"
"fmt"
"math"

"github.com/pingcap/tidb/expression"
Expand All @@ -37,7 +38,7 @@ type aggregationEliminateChecker struct {
// e.g. select min(b) from t group by a. If a is a unique key, then this sql is equal to `select b from t group by a`.
// For count(expr), sum(expr), avg(expr), count(distinct expr, [expr...]) we may need to rewrite the expr. Details are shown below.
// If we can eliminate agg successful, we return a projection. Else we return a nil pointer.
func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggregation) *LogicalProjection {
func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggregation, opt *logicalOptimizeOp) *LogicalProjection {
for _, af := range agg.AggFuncs {
// TODO(issue #9968): Actually, we can rewrite GROUP_CONCAT when all the
// arguments it accepts are promised to be NOT-NULL.
Expand All @@ -54,22 +55,27 @@ func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggr
}
schemaByGroupby := expression.NewSchema(agg.GetGroupByCols()...)
coveredByUniqueKey := false
var uniqueKey expression.KeyInfo
for _, key := range agg.children[0].Schema().Keys {
if schemaByGroupby.ColumnsIndices(key) != nil {
coveredByUniqueKey = true
uniqueKey = key
break
}
}
if coveredByUniqueKey {
// GroupByCols has unique key, so this aggregation can be removed.
if ok, proj := ConvertAggToProj(agg, agg.schema); ok {
proj.SetChildren(agg.children[0])
appendAggregationEliminateTraceStep(agg, uniqueKey, opt)
return proj
}
}
return nil
}

// tryToEliminateDistinct will eliminate distinct in the aggregation function if the aggregation args
// have unique key column. see detail example in https://github.com/pingcap/tidb/issues/23436
func (a *aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggregation, opt *logicalOptimizeOp) {
for _, af := range agg.AggFuncs {
if af.HasDistinct {
Expand All @@ -86,28 +92,43 @@ func (a *aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggrega
if canEliminate {
distinctByUniqueKey := false
schemaByDistinct := expression.NewSchema(cols...)
var uniqueKey expression.KeyInfo
for _, key := range agg.children[0].Schema().Keys {
if schemaByDistinct.ColumnsIndices(key) != nil {
distinctByUniqueKey = true
uniqueKey = key
break
}
}
for _, key := range agg.children[0].Schema().UniqueKeys {
if schemaByDistinct.ColumnsIndices(key) != nil {
distinctByUniqueKey = true
uniqueKey = key
break
}
}
if distinctByUniqueKey {
af.HasDistinct = false
// TODO: fulfill in future pr
opt.appendStepToCurrent(agg.ID(), agg.TP(), "", "")
appendDistinctEliminateTraceStep(agg, uniqueKey, af, opt)
}
}
}
}
}

func appendAggregationEliminateTraceStep(agg *LogicalAggregation, uniqueKey expression.KeyInfo, opt *logicalOptimizeOp) {
opt.appendStepToCurrent(agg.ID(), agg.TP(),
fmt.Sprintf("%s is a unique key", uniqueKey.String()),
"aggregation is simplified to a projection")
}

func appendDistinctEliminateTraceStep(agg *LogicalAggregation, uniqueKey expression.KeyInfo, af *aggregation.AggFuncDesc,
opt *logicalOptimizeOp) {
opt.appendStepToCurrent(agg.ID(), agg.TP(),
fmt.Sprintf("%s is a unique key", uniqueKey.String()),
fmt.Sprintf("%s(distinct ...) is simplified to %s(...)", af.Name, af.Name))
}

// ConvertAggToProj convert aggregation to projection.
func ConvertAggToProj(agg *LogicalAggregation, schema *expression.Schema) (bool, *LogicalProjection) {
proj := LogicalProjection{
Expand Down Expand Up @@ -196,7 +217,7 @@ func (a *aggregationEliminator) optimize(ctx context.Context, p LogicalPlan, opt
return p, nil
}
a.tryToEliminateDistinct(agg, opt)
if proj := a.tryToEliminateAggregation(agg); proj != nil {
if proj := a.tryToEliminateAggregation(agg, opt); proj != nil {
return proj, nil
}
return p, nil
Expand Down
Loading

0 comments on commit 3baa95f

Please sign in to comment.