diff --git a/expression/util.go b/expression/util.go index 7b34ef442067b..d7b92329d51f6 100644 --- a/expression/util.go +++ b/expression/util.go @@ -166,6 +166,29 @@ func extractColumns(result []*Column, expr Expression, filter func(*Column) bool return result } +// ExtractColumnsAndCorColumns extracts columns and correlated columns from `expr` and append them to `result`. +func ExtractColumnsAndCorColumns(result []*Column, expr Expression) []*Column { + switch v := expr.(type) { + case *Column: + result = append(result, v) + case *CorrelatedColumn: + result = append(result, &v.Column) + case *ScalarFunction: + for _, arg := range v.GetArgs() { + result = ExtractColumnsAndCorColumns(result, arg) + } + } + return result +} + +// ExtractColumnsAndCorColumnsFromExpressions extracts columns and correlated columns from expressions and append them to `result`. +func ExtractColumnsAndCorColumnsFromExpressions(result []*Column, list []Expression) []*Column { + for _, expr := range list { + result = ExtractColumnsAndCorColumns(result, expr) + } + return result +} + // ExtractColumnSet extracts the different values of `UniqueId` for columns in expressions. func ExtractColumnSet(exprs []Expression) *intsets.Sparse { set := &intsets.Sparse{} diff --git a/planner/core/collect_column_stats_usage.go b/planner/core/collect_column_stats_usage.go new file mode 100644 index 0000000000000..6396b1ddad34f --- /dev/null +++ b/planner/core/collect_column_stats_usage.go @@ -0,0 +1,252 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/parser/model" +) + +// predicateColumnCollector collects predicate columns from logical plan. Predicate columns are the columns whose statistics +// are utilized when making query plans, which usually occur in where conditions, join conditions and so on. +type predicateColumnCollector struct { + // colMap maps expression.Column.UniqueID to the table columns whose statistics are utilized to calculate statistics of the column. + colMap map[int64]map[model.TableColumnID]struct{} + // predicateCols records predicate columns. + predicateCols map[model.TableColumnID]struct{} + // cols is used to store columns collected from expressions and saves some allocation. + cols []*expression.Column +} + +func newPredicateColumnCollector() *predicateColumnCollector { + return &predicateColumnCollector{ + colMap: make(map[int64]map[model.TableColumnID]struct{}), + predicateCols: make(map[model.TableColumnID]struct{}), + // Pre-allocate a slice to reduce allocation, 8 doesn't have special meaning. + cols: make([]*expression.Column, 0, 8), + } +} + +func (c *predicateColumnCollector) addPredicateColumn(col *expression.Column) { + tblColIDs, ok := c.colMap[col.UniqueID] + if !ok { + // It may happen if some leaf of logical plan is LogicalMemTable/LogicalShow/LogicalShowDDLJobs. + return + } + for tblColID := range tblColIDs { + c.predicateCols[tblColID] = struct{}{} + } +} + +func (c *predicateColumnCollector) addPredicateColumnsFromExpression(expr expression.Expression) { + cols := expression.ExtractColumnsAndCorColumns(c.cols[:0], expr) + for _, col := range cols { + c.addPredicateColumn(col) + } +} + +func (c *predicateColumnCollector) addPredicateColumnsFromExpressions(list []expression.Expression) { + cols := expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list) + for _, col := range cols { + c.addPredicateColumn(col) + } +} + +func (c *predicateColumnCollector) updateColMap(col *expression.Column, relatedCols []*expression.Column) { + if _, ok := c.colMap[col.UniqueID]; !ok { + c.colMap[col.UniqueID] = map[model.TableColumnID]struct{}{} + } + for _, relatedCol := range relatedCols { + tblColIDs, ok := c.colMap[relatedCol.UniqueID] + if !ok { + // It may happen if some leaf of logical plan is LogicalMemTable/LogicalShow/LogicalShowDDLJobs. + continue + } + for tblColID := range tblColIDs { + c.colMap[col.UniqueID][tblColID] = struct{}{} + } + } +} + +func (c *predicateColumnCollector) updateColMapFromExpression(col *expression.Column, expr expression.Expression) { + c.updateColMap(col, expression.ExtractColumnsAndCorColumns(c.cols[:0], expr)) +} + +func (c *predicateColumnCollector) updateColMapFromExpressions(col *expression.Column, list []expression.Expression) { + c.updateColMap(col, expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list)) +} + +func (ds *DataSource) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) { + tblID := ds.TableInfo().ID + for _, col := range ds.Schema().Columns { + tblColID := model.TableColumnID{TableID: tblID, ColumnID: col.ID} + c.colMap[col.UniqueID] = map[model.TableColumnID]struct{}{tblColID: {}} + } + // We should use `pushedDownConds` here. `allConds` is used for partition pruning, which doesn't need stats. + c.addPredicateColumnsFromExpressions(ds.pushedDownConds) +} + +func (p *LogicalJoin) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) { + // The only schema change is merging two schemas so there is no new column. + // Assume statistics of all the columns in EqualConditions/LeftConditions/RightConditions/OtherConditions are needed. + exprs := make([]expression.Expression, 0, len(p.EqualConditions)+len(p.LeftConditions)+len(p.RightConditions)+len(p.OtherConditions)) + for _, cond := range p.EqualConditions { + exprs = append(exprs, cond) + } + for _, cond := range p.LeftConditions { + exprs = append(exprs, cond) + } + for _, cond := range p.RightConditions { + exprs = append(exprs, cond) + } + for _, cond := range p.OtherConditions { + exprs = append(exprs, cond) + } + c.addPredicateColumnsFromExpressions(exprs) +} + +func (p *LogicalUnionAll) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) { + // statistics of the ith column of UnionAll come from statistics of the ith column of each child. + schemas := make([]*expression.Schema, 0, len(p.Children())) + relatedCols := make([]*expression.Column, 0, len(p.Children())) + for _, child := range p.Children() { + schemas = append(schemas, child.Schema()) + } + for i, col := range p.Schema().Columns { + relatedCols = relatedCols[:0] + for j := range p.Children() { + relatedCols = append(relatedCols, schemas[j].Columns[i]) + } + c.updateColMap(col, relatedCols) + } +} + +func (c *predicateColumnCollector) collectFromPlan(lp LogicalPlan) { + for _, child := range lp.Children() { + c.collectFromPlan(child) + } + switch x := lp.(type) { + case *DataSource: + x.updateColMapAndAddPredicateColumns(c) + case *LogicalIndexScan: + x.Source.updateColMapAndAddPredicateColumns(c) + // TODO: Is it redundant to add predicate columns from LogicalIndexScan.AccessConds? Is LogicalIndexScan.AccessConds a subset of LogicalIndexScan.Source.pushedDownConds. + c.addPredicateColumnsFromExpressions(x.AccessConds) + case *LogicalTableScan: + x.Source.updateColMapAndAddPredicateColumns(c) + // TODO: Is it redundant to add predicate columns from LogicalTableScan.AccessConds? Is LogicalTableScan.AccessConds a subset of LogicalTableScan.Source.pushedDownConds. + c.addPredicateColumnsFromExpressions(x.AccessConds) + case *TiKVSingleGather: + // TODO: Is it redundant? + x.Source.updateColMapAndAddPredicateColumns(c) + case *LogicalProjection: + // Schema change from children to self. + schema := x.Schema() + for i, expr := range x.Exprs { + c.updateColMapFromExpression(schema.Columns[i], expr) + } + case *LogicalSelection: + // Though the conditions in LogicalSelection are complex conditions which cannot be pushed down to DataSource, we still + // regard statistics of the columns in the conditions as needed. + c.addPredicateColumnsFromExpressions(x.Conditions) + case *LogicalAggregation: + // Just assume statistics of all the columns in GroupByItems are needed. + c.addPredicateColumnsFromExpressions(x.GroupByItems) + // Schema change from children to self. + schema := x.Schema() + for i, aggFunc := range x.AggFuncs { + c.updateColMapFromExpressions(schema.Columns[i], aggFunc.Args) + } + case *LogicalWindow: + // Statistics of the columns in LogicalWindow.PartitionBy are used in optimizeByShuffle4Window. + // It seems that we don't use statistics of the columns in LogicalWindow.OrderBy currently? + for _, item := range x.PartitionBy { + c.addPredicateColumn(item.Col) + } + // Schema change from children to self. + windowColumns := x.GetWindowResultColumns() + for i, col := range windowColumns { + c.updateColMapFromExpressions(col, x.WindowFuncDescs[i].Args) + } + case *LogicalJoin: + x.updateColMapAndAddPredicateColumns(c) + case *LogicalApply: + x.updateColMapAndAddPredicateColumns(c) + // Assume statistics of correlated columns are needed. + // Correlated columns can be found in LogicalApply.Children()[0].Schema(). Since we already visit LogicalApply.Children()[0], + // correlated columns must have existed in predicateColumnCollector.colMap. + for _, corCols := range x.CorCols { + c.addPredicateColumn(&corCols.Column) + } + case *LogicalSort: + // Assume statistics of all the columns in ByItems are needed. + for _, item := range x.ByItems { + c.addPredicateColumnsFromExpression(item.Expr) + } + case *LogicalTopN: + // Assume statistics of all the columns in ByItems are needed. + for _, item := range x.ByItems { + c.addPredicateColumnsFromExpression(item.Expr) + } + case *LogicalUnionAll: + x.updateColMapAndAddPredicateColumns(c) + case *LogicalPartitionUnionAll: + x.updateColMapAndAddPredicateColumns(c) + case *LogicalCTE: + // Visit seedPartLogicalPlan and recursivePartLogicalPlan first. + c.collectFromPlan(x.cte.seedPartLogicalPlan) + if x.cte.recursivePartLogicalPlan != nil { + c.collectFromPlan(x.cte.recursivePartLogicalPlan) + } + // Schema change from seedPlan/recursivePlan to self. + columns := x.Schema().Columns + seedColumns := x.cte.seedPartLogicalPlan.Schema().Columns + var recursiveColumns []*expression.Column + if x.cte.recursivePartLogicalPlan != nil { + recursiveColumns = x.cte.recursivePartLogicalPlan.Schema().Columns + } + relatedCols := make([]*expression.Column, 0, 2) + for i, col := range columns { + relatedCols = append(relatedCols[:0], seedColumns[i]) + if recursiveColumns != nil { + relatedCols = append(relatedCols, recursiveColumns[i]) + } + c.updateColMap(col, relatedCols) + } + // If IsDistinct is true, then we use getColsNDV to calculate row count(see (*LogicalCTE).DeriveStat). In this case + // statistics of all the columns are needed. + if x.cte.IsDistinct { + for _, col := range columns { + c.addPredicateColumn(col) + } + } + case *LogicalCTETable: + // Schema change from seedPlan to self. + for i, col := range x.Schema().Columns { + c.updateColMap(col, []*expression.Column{x.seedSchema.Columns[i]}) + } + } +} + +// CollectPredicateColumnsForTest collects predicate columns from logical plan. It is only for test. +func CollectPredicateColumnsForTest(lp LogicalPlan) []model.TableColumnID { + collector := newPredicateColumnCollector() + collector.collectFromPlan(lp) + tblColIDs := make([]model.TableColumnID, 0, len(collector.predicateCols)) + for tblColID := range collector.predicateCols { + tblColIDs = append(tblColIDs, tblColID) + } + return tblColIDs +} diff --git a/planner/core/collect_column_stats_usage_test.go b/planner/core/collect_column_stats_usage_test.go new file mode 100644 index 0000000000000..912765549fa7b --- /dev/null +++ b/planner/core/collect_column_stats_usage_test.go @@ -0,0 +1,222 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core_test + +import ( + "context" + "fmt" + "testing" + + "github.com/pingcap/tidb/parser/model" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/hint" + "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/require" +) + +func TestCollectPredicateColumns(t *testing.T) { + t.Parallel() + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("set @@session.tidb_partition_prune_mode = 'static'") + tk.MustExec("create table t1(a int, b int, c int)") + tk.MustExec("create table t2(a int, b int, c int)") + tk.MustExec("create table t3(a int, b int, c int) partition by range(a) (partition p0 values less than (10), partition p1 values less than (20), partition p2 values less than maxvalue)") + + tests := []struct { + sql string + res []string + }{ + { + // DataSource + sql: "select * from t1 where a > 2", + res: []string{"t1.a"}, + }, + { + // DataSource + sql: "select * from t1 where b in (2, 5) or c = 5", + res: []string{"t1.b", "t1.c"}, + }, + { + // LogicalProjection + sql: "select * from (select a + b as ab, c from t1) as tmp where ab > 4", + res: []string{"t1.a", "t1.b"}, + }, + { + // LogicalAggregation + sql: "select b, count(*) from t1 group by b", + res: []string{"t1.b"}, + }, + { + // LogicalAggregation + sql: "select b, sum(a) from t1 group by b having sum(a) > 3", + res: []string{"t1.a", "t1.b"}, + }, + { + // LogicalAggregation + sql: "select count(*), sum(a), sum(c) from t1", + res: []string{}, + }, + { + // LogicalAggregation + sql: "(select a, b from t1) union (select a, c from t2)", + res: []string{"t1.a", "t1.b", "t2.a", "t2.c"}, + }, + { + // LogicalWindow + sql: "select avg(b) over(partition by a) from t1", + res: []string{"t1.a"}, + }, + { + // LogicalWindow + sql: "select * from (select avg(b) over(partition by a) as w from t1) as tmp where w > 4", + res: []string{"t1.a", "t1.b"}, + }, + { + // LogicalWindow + sql: "select row_number() over(partition by a order by c) from t1", + res: []string{"t1.a"}, + }, + { + // LogicalJoin + sql: "select * from t1, t2 where t1.a = t2.a", + res: []string{"t1.a", "t2.a"}, + }, + { + // LogicalJoin + sql: "select * from t1 as x join t2 as y on x.b + y.c > 2", + res: []string{"t1.b", "t2.c"}, + }, + { + // LogicalJoin + sql: "select * from t1 as x join t2 as y on x.a = y.a and x.b < 3 and y.c > 2", + res: []string{"t1.a", "t1.b", "t2.a", "t2.c"}, + }, + { + // LogicalJoin + sql: "select x.b, y.c, sum(x.c), sum(y.b) from t1 as x join t2 as y on x.a = y.a group by x.b, y.c order by x.b", + res: []string{"t1.a", "t1.b", "t2.a", "t2.c"}, + }, + { + // LogicalApply + sql: "select * from t1 where t1.b > all(select b from t2 where t2.c > 2)", + res: []string{"t1.b", "t2.b", "t2.c"}, + }, + { + // LogicalApply + sql: "select * from t1 where t1.b > (select count(b) from t2 where t2.c > t1.a)", + res: []string{"t1.a", "t1.b", "t2.b", "t2.c"}, + }, + { + // LogicalApply + sql: "select * from t1 where t1.b > (select count(*) from t2 where t2.c > t1.a)", + res: []string{"t1.a", "t1.b", "t2.c"}, + }, + { + // LogicalSort + sql: "select * from t1 order by c", + res: []string{"t1.c"}, + }, + { + // LogicalTopN + sql: "select * from t1 order by a + b limit 10", + res: []string{"t1.a", "t1.b"}, + }, + { + // LogicalUnionAll + sql: "select * from ((select a, b from t1) union all (select a, c from t2)) as tmp where tmp.b > 2", + res: []string{"t1.b", "t2.c"}, + }, + { + // LogicalPartitionUnionAll + sql: "select * from t3 where a < 15 and b > 1", + res: []string{"t3.a", "t3.b"}, + }, + { + // LogicalCTE + sql: "with cte(x, y) as (select a + 1, b from t1 where b > 1) select * from cte where x > 3", + res: []string{"t1.a", "t1.b"}, + }, + { + // LogicalCTE, LogicalCTETable + sql: "with recursive cte(x, y) as (select c, 1 from t1 union all select x + 1, y from cte where x < 5) select * from cte", + res: []string{"t1.c"}, + }, + { + // LogicalCTE, LogicalCTETable + sql: "with recursive cte(x, y) as (select 1, c from t1 union all select x + 1, y from cte where x < 5) select * from cte where y > 1", + res: []string{"t1.c"}, + }, + { + // LogicalCTE, LogicalCTETable + sql: "with recursive cte(x, y) as (select a, b from t1 union select x + 1, y from cte where x < 5) select * from cte", + res: []string{"t1.a", "t1.b"}, + }, + } + + ctx := context.Background() + sctx := tk.Session() + is := dom.InfoSchema() + getColName := func(tblColID model.TableColumnID) (string, bool) { + tbl, ok := is.TableByID(tblColID.TableID) + if !ok { + return "", false + } + tblInfo := tbl.Meta() + for _, col := range tblInfo.Columns { + if tblColID.ColumnID == col.ID { + return tblInfo.Name.L + "." + col.Name.L, true + } + } + return "", false + } + checkPredicateColumns := func(lp plannercore.LogicalPlan, expected []string, comment string) { + tblColIDs := plannercore.CollectPredicateColumnsForTest(lp) + cols := make([]string, 0, len(tblColIDs)) + for _, tblColID := range tblColIDs { + col, ok := getColName(tblColID) + require.True(t, ok, comment) + cols = append(cols, col) + } + require.ElementsMatch(t, expected, cols, comment) + } + + for _, tt := range tests { + comment := fmt.Sprintf("for %s", tt.sql) + logutil.BgLogger().Info(comment) + stmts, err := tk.Session().Parse(ctx, tt.sql) + require.NoError(t, err, comment) + stmt := stmts[0] + err = plannercore.Preprocess(sctx, stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is})) + require.NoError(t, err, comment) + builder, _ := plannercore.NewPlanBuilder().Init(sctx, is, &hint.BlockHintProcessor{}) + p, err := builder.Build(ctx, stmt) + require.NoError(t, err, comment) + lp, ok := p.(plannercore.LogicalPlan) + require.True(t, ok, comment) + // We check predicate columns twice, before and after logical optimization. Some logical plan patterns may occur before + // logical optimization while others may occur after logical optimization. + // logutil.BgLogger().Info("before logical opt", zap.String("lp", plannercore.ToString(lp))) + checkPredicateColumns(lp, tt.res, comment) + lp, err = plannercore.LogicalOptimize(ctx, builder.GetOptFlag(), lp) + require.NoError(t, err, comment) + // logutil.BgLogger().Info("after logical opt", zap.String("lp", plannercore.ToString(lp))) + checkPredicateColumns(lp, tt.res, comment) + } +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index b40db2fe4ef5d..c3b1239d4ffac 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -3871,7 +3871,7 @@ func (b *PlanBuilder) tryBuildCTE(ctx context.Context, tn *ast.TableName, asName } cte.recursiveRef = true - p := LogicalCTETable{name: cte.def.Name.String(), idForStorage: cte.storageID, seedStat: cte.seedStat}.Init(b.ctx, b.getSelectOffset()) + p := LogicalCTETable{name: cte.def.Name.String(), idForStorage: cte.storageID, seedStat: cte.seedStat, seedSchema: cte.seedLP.Schema()}.Init(b.ctx, b.getSelectOffset()) p.SetSchema(getResultCTESchema(cte.seedLP.Schema(), b.ctx.GetSessionVars())) p.SetOutputNames(cte.seedLP.OutputNames()) return p, nil diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 3d5bbeeaa9216..212f10d65346a 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -1309,6 +1309,9 @@ type LogicalCTETable struct { seedStat *property.StatsInfo name string idForStorage int + + // seedSchema is only used in predicateColumnCollector to get column mapping + seedSchema *expression.Schema } // ExtractCorrelatedCols implements LogicalPlan interface.