From 20b5fbde5c59514b557557c2f4af123d61a20d0d Mon Sep 17 00:00:00 2001 From: HuaiyuXu Date: Fri, 15 Feb 2019 18:35:10 +0800 Subject: [PATCH] planner, executor: inject proj below TopN and Sort if byItem contains scalarFunc (#9197) (#9319) --- executor/aggregate_test.go | 26 ++++ executor/sort.go | 128 ++----------------- planner/core/physical_plan_test.go | 4 +- planner/core/rule_inject_extra_projection.go | 95 ++++++++++++-- 4 files changed, 125 insertions(+), 128 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index da7ab15cea6b8..9fdd7c0d8c3e3 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -603,6 +603,32 @@ func (s *testSuite) TestBuildProjBelowAgg(c *C) { "4 3 18 7,7,7 8")) } +func (s *testSuite) TestInjectProjBelowTopN(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (i int);") + tk.MustExec("insert into t values (1), (1), (1),(2),(3),(2),(3),(2),(3);") + tk.MustQuery("explain select * from t order by i + 1").Check(testkit.Rows( + "Projection_8 10000.00 root test.t.i", + "└─Sort_4 10000.00 root col_1:asc", + " └─Projection_9 10000.00 root test.t.i, plus(test.t.i, 1)", + " └─TableReader_7 10000.00 root data:TableScan_6", + " └─TableScan_6 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo")) + rs := tk.MustQuery("select * from t order by i + 1 ") + rs.Check(testkit.Rows( + "1", "1", "1", "2", "2", "2", "3", "3", "3")) + tk.MustQuery("explain select * from t order by i + 1 limit 2").Check(testkit.Rows( + "Projection_15 2.00 root test.t.i", + "└─TopN_7 2.00 root col_1:asc, offset:0, count:2", + " └─Projection_16 2.00 root test.t.i, plus(test.t.i, 1)", + " └─TableReader_12 2.00 root data:TopN_11", + " └─TopN_11 2.00 cop plus(test.t.i, 1):asc, offset:0, count:2", + " └─TableScan_10 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo")) + rs = tk.MustQuery("select * from t order by i + 1 limit 2") + rs.Check(testkit.Rows("1", "1")) + tk.MustQuery("select i, i, i from t order by i + 1").Check(testkit.Rows("1 1 1", "1 1 1", "1 1 1", "2 2 2", "2 2 2", "2 2 2", "3 3 3", "3 3 3", "3 3 3")) +} + func (s *testSuite) TestFirstRowEnum(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec(`use test;`) diff --git a/executor/sort.go b/executor/sort.go index 0625567423c1e..f34759c07fdf7 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -42,8 +42,6 @@ type SortExec struct { keyColumns []int // keyCmpFuncs is used to compare each ByItem. keyCmpFuncs []chunk.CompareFunc - // keyChunks is used to store ByItems values when not all ByItems are column. - keyChunks *chunk.List // rowChunks is the chunks to store row values. rowChunks *chunk.List // rowPointer store the chunk index and row index for each row. @@ -86,17 +84,8 @@ func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error { } e.initPointers() e.initCompareFuncs() - allColumnExpr := e.buildKeyColumns() - if allColumnExpr { - sort.Slice(e.rowPtrs, e.keyColumnsLess) - } else { - e.buildKeyExprsAndTypes() - err = e.buildKeyChunks() - if err != nil { - return errors.Trace(err) - } - sort.Slice(e.rowPtrs, e.keyChunksLess) - } + e.buildKeyColumns() + sort.Slice(e.rowPtrs, e.keyColumnsLess) e.fetched = true } for chk.NumRows() < e.maxChunkSize { @@ -149,49 +138,14 @@ func (e *SortExec) initCompareFuncs() { } } -func (e *SortExec) buildKeyColumns() (allColumnExpr bool) { +func (e *SortExec) buildKeyColumns() { e.keyColumns = make([]int, 0, len(e.ByItems)) for _, by := range e.ByItems { - if col, ok := by.Expr.(*expression.Column); ok { - e.keyColumns = append(e.keyColumns, col.Index) - } else { - e.keyColumns = e.keyColumns[:0] - for i := range e.ByItems { - e.keyColumns = append(e.keyColumns, i) - } - return false - } - } - return true -} - -func (e *SortExec) buildKeyExprsAndTypes() { - keyLen := len(e.ByItems) - e.keyTypes = make([]*types.FieldType, keyLen) - e.keyExprs = make([]expression.Expression, keyLen) - for keyColIdx := range e.ByItems { - e.keyExprs[keyColIdx] = e.ByItems[keyColIdx].Expr - e.keyTypes[keyColIdx] = e.ByItems[keyColIdx].Expr.GetType() + col := by.Expr.(*expression.Column) + e.keyColumns = append(e.keyColumns, col.Index) } } -func (e *SortExec) buildKeyChunks() error { - e.keyChunks = chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize) - e.keyChunks.GetMemTracker().SetLabel("keyChunks") - e.keyChunks.GetMemTracker().AttachTo(e.memTracker) - - for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ { - keyChk := chunk.NewChunkWithCapacity(e.keyTypes, e.rowChunks.GetChunk(chkIdx).NumRows()) - childIter := chunk.NewIterator4Chunk(e.rowChunks.GetChunk(chkIdx)) - err := expression.VectorizedExecute(e.ctx, e.keyExprs, childIter, keyChk) - if err != nil { - return errors.Trace(err) - } - e.keyChunks.Add(keyChk) - } - return nil -} - func (e *SortExec) lessRow(rowI, rowJ chunk.Row) bool { for i, colIdx := range e.keyColumns { cmpFunc := e.keyCmpFuncs[i] @@ -215,13 +169,6 @@ func (e *SortExec) keyColumnsLess(i, j int) bool { return e.lessRow(rowI, rowJ) } -// keyChunksLess is the less function for key chunk. -func (e *SortExec) keyChunksLess(i, j int) bool { - keyRowI := e.keyChunks.GetRow(e.rowPtrs[i]) - keyRowJ := e.keyChunks.GetRow(e.rowPtrs[j]) - return e.lessRow(keyRowI, keyRowJ) -} - // TopNExec implements a Top-N algorithm and it is built from a SELECT statement with ORDER BY and LIMIT. // Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage. type TopNExec struct { @@ -240,19 +187,6 @@ type topNChunkHeap struct { // Less implement heap.Interface, but since we mantains a max heap, // this function returns true if row i is greater than row j. func (h *topNChunkHeap) Less(i, j int) bool { - if h.keyChunks != nil { - return h.keyChunksGreater(i, j) - } - return h.keyColumnsGreater(i, j) -} - -func (h *topNChunkHeap) keyChunksGreater(i, j int) bool { - keyRowI := h.keyChunks.GetRow(h.rowPtrs[i]) - keyRowJ := h.keyChunks.GetRow(h.rowPtrs[j]) - return h.greaterRow(keyRowI, keyRowJ) -} - -func (h *topNChunkHeap) keyColumnsGreater(i, j int) bool { rowI := h.rowChunks.GetRow(h.rowPtrs[i]) rowJ := h.rowChunks.GetRow(h.rowPtrs[j]) return h.greaterRow(rowI, rowJ) @@ -348,14 +282,7 @@ func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error { } e.initPointers() e.initCompareFuncs() - allColumnExpr := e.buildKeyColumns() - if !allColumnExpr { - e.buildKeyExprsAndTypes() - err := e.buildKeyChunks() - if err != nil { - return errors.Trace(err) - } - } + e.buildKeyColumns() return nil } @@ -367,10 +294,6 @@ func (e *TopNExec) executeTopN(ctx context.Context) error { // The number of rows we loaded may exceeds total limit, remove greatest rows by Pop. heap.Pop(e.chkHeap) } - var childKeyChk *chunk.Chunk - if e.keyChunks != nil { - childKeyChk = chunk.NewChunkWithCapacity(e.keyTypes, e.maxChunkSize) - } childRowChk := e.children[0].newFirstChunk() for { err := e.children[0].Next(ctx, childRowChk) @@ -380,7 +303,7 @@ func (e *TopNExec) executeTopN(ctx context.Context) error { if childRowChk.NumRows() == 0 { break } - err = e.processChildChk(childRowChk, childKeyChk) + err = e.processChildChk(childRowChk) if err != nil { return errors.Trace(err) } @@ -391,38 +314,19 @@ func (e *TopNExec) executeTopN(ctx context.Context) error { } } } - if e.keyChunks != nil { - sort.Slice(e.rowPtrs, e.keyChunksLess) - } else { - sort.Slice(e.rowPtrs, e.keyColumnsLess) - } + sort.Slice(e.rowPtrs, e.keyColumnsLess) return nil } -func (e *TopNExec) processChildChk(childRowChk, childKeyChk *chunk.Chunk) error { - if childKeyChk != nil { - childKeyChk.Reset() - err := expression.VectorizedExecute(e.ctx, e.keyExprs, chunk.NewIterator4Chunk(childRowChk), childKeyChk) - if err != nil { - return errors.Trace(err) - } - } +func (e *TopNExec) processChildChk(childRowChk *chunk.Chunk) error { for i := 0; i < childRowChk.NumRows(); i++ { heapMaxPtr := e.rowPtrs[0] var heapMax, next chunk.Row - if childKeyChk != nil { - heapMax = e.keyChunks.GetRow(heapMaxPtr) - next = childKeyChk.GetRow(i) - } else { - heapMax = e.rowChunks.GetRow(heapMaxPtr) - next = childRowChk.GetRow(i) - } + heapMax = e.rowChunks.GetRow(heapMaxPtr) + next = childRowChk.GetRow(i) if e.chkHeap.greaterRow(heapMax, next) { // Evict heap max, keep the next row. e.rowPtrs[0] = e.rowChunks.AppendRow(childRowChk.GetRow(i)) - if childKeyChk != nil { - e.keyChunks.AppendRow(childKeyChk.GetRow(i)) - } heap.Fix(e.chkHeap, 0) } } @@ -444,16 +348,6 @@ func (e *TopNExec) doCompaction() error { e.memTracker.ReplaceChild(e.rowChunks.GetMemTracker(), newRowChunks.GetMemTracker()) e.rowChunks = newRowChunks - if e.keyChunks != nil { - newKeyChunks := chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize) - for _, rowPtr := range e.rowPtrs { - newKeyChunks.AppendRow(e.keyChunks.GetRow(rowPtr)) - } - newKeyChunks.GetMemTracker().SetLabel("keyChunks") - e.memTracker.ReplaceChild(e.keyChunks.GetMemTracker(), newKeyChunks.GetMemTracker()) - e.keyChunks = newKeyChunks - } - e.memTracker.Consume(int64(-8 * len(e.rowPtrs))) e.memTracker.Consume(int64(8 * len(newRowPtrs))) e.rowPtrs = newRowPtrs diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 2eb775fd8907f..17fc636a8b327 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -113,7 +113,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) { // Test TopN push down in table single read. { sql: "select c from t order by t.a + t.b limit 1", - best: "TableReader(Table(t)->TopN([plus(test.t.a, test.t.b)],0,1))->TopN([plus(test.t.a, test.t.b)],0,1)->Projection", + best: "TableReader(Table(t)->TopN([plus(test.t.a, test.t.b)],0,1))->Projection->TopN([col_3],0,1)->Projection", }, // Test Limit push down in table single read. { @@ -1215,7 +1215,7 @@ func (s *testPlanSuite) TestAggEliminater(c *C) { // If max/min contains scalar function, we can still do transformation. { sql: "select max(a+1) from t;", - best: "TableReader(Table(t)->Sel([not(isnull(plus(test.t.a, 1)))])->TopN([plus(test.t.a, 1) true],0,1))->TopN([plus(test.t.a, 1) true],0,1)->Projection->StreamAgg", + best: "TableReader(Table(t)->Sel([not(isnull(plus(test.t.a, 1)))])->TopN([plus(test.t.a, 1) true],0,1))->Projection->TopN([col_1 true],0,1)->Projection->StreamAgg", }, // Do nothing to max+min. { diff --git a/planner/core/rule_inject_extra_projection.go b/planner/core/rule_inject_extra_projection.go index 8dae3d3ef9b94..057d774073d4f 100644 --- a/planner/core/rule_inject_extra_projection.go +++ b/planner/core/rule_inject_extra_projection.go @@ -24,9 +24,10 @@ import ( "github.com/pingcap/tidb/types" ) -// injectExtraProjection is used to extract the expressions of specific operators into a -// physical Projection operator and inject the Projection below the operators. -// Thus we can accelerate the expression evaluation by eager evaluation. +// injectExtraProjection is used to extract the expressions of specific +// operators into a physical Projection operator and inject the Projection below +// the operators. Thus we can accelerate the expression evaluation by eager +// evaluation. func injectExtraProjection(plan PhysicalPlan) PhysicalPlan { return NewProjInjector().inject(plan) } @@ -43,18 +44,23 @@ func (pe *projInjector) inject(plan PhysicalPlan) PhysicalPlan { for _, child := range plan.Children() { pe.inject(child) } + switch p := plan.(type) { case *PhysicalHashAgg: plan = injectProjBelowAgg(plan, p.AggFuncs, p.GroupByItems) case *PhysicalStreamAgg: plan = injectProjBelowAgg(plan, p.AggFuncs, p.GroupByItems) + case *PhysicalSort: + plan = injectProjBelowSort(p, p.ByItems) + case *PhysicalTopN: + plan = injectProjBelowSort(p, p.ByItems) } return plan } -// injectProjBelowAgg injects a ProjOperator below AggOperator. -// If all the args of `aggFuncs`, and all the item of `groupByItems` -// are columns or constants, we do not need to build the `proj`. +// injectProjBelowAgg injects a ProjOperator below AggOperator. If all the args +// of `aggFuncs`, and all the item of `groupByItems` are columns or constants, +// we do not need to build the `proj`. func injectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDesc, groupByItems []expression.Expression) PhysicalPlan { hasScalarFunc := false @@ -105,9 +111,10 @@ func injectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDes } projExprs = append(projExprs, item) newArg := &expression.Column{ - RetType: item.GetType(), - ColName: model.NewCIStr(fmt.Sprintf("col_%d", len(projSchemaCols))), - Index: cursor, + UniqueID: aggPlan.context().GetSessionVars().AllocPlanColumnID(), + RetType: item.GetType(), + ColName: model.NewCIStr(fmt.Sprintf("col_%d", len(projSchemaCols))), + Index: cursor, } projSchemaCols = append(projSchemaCols, newArg) groupByItems[i] = newArg @@ -174,3 +181,73 @@ func wrapCastForAggArgs(ctx sessionctx.Context, funcs []*aggregation.AggFuncDesc } } } + +// injectProjBelowSort extracts the ScalarFunctions of `orderByItems` into a +// PhysicalProjection and injects it below PhysicalTopN/PhysicalSort. The schema +// of PhysicalSort and PhysicalTopN are the same as the schema of their +// children. When a projection is injected as the child of PhysicalSort and +// PhysicalTopN, some extra columns will be added into the schema of the +// Projection, thus we need to add another Projection upon them to prune the +// redundant columns. +func injectProjBelowSort(p PhysicalPlan, orderByItems []*ByItems) PhysicalPlan { + hasScalarFunc, numOrderByItems := false, len(orderByItems) + for i := 0; !hasScalarFunc && i < numOrderByItems; i++ { + _, isScalarFunc := orderByItems[i].Expr.(*expression.ScalarFunction) + hasScalarFunc = hasScalarFunc || isScalarFunc + } + if !hasScalarFunc { + return p + } + + topProjExprs := make([]expression.Expression, 0, p.Schema().Len()) + for i := range p.Schema().Columns { + col := p.Schema().Columns[i] + col.Index = i + topProjExprs = append(topProjExprs, col) + } + topProj := PhysicalProjection{ + Exprs: topProjExprs, + AvoidColumnEvaluator: false, + }.init(p.context(), p.statsInfo(), nil) + topProj.SetSchema(p.Schema().Clone()) + topProj.SetChildren(p) + + childPlan := p.Children()[0] + bottomProjSchemaCols := make([]*expression.Column, 0, len(childPlan.Schema().Columns)+numOrderByItems) + bottomProjExprs := make([]expression.Expression, 0, len(childPlan.Schema().Columns)+numOrderByItems) + for i, col := range childPlan.Schema().Columns { + col.Index = i + bottomProjSchemaCols = append(bottomProjSchemaCols, col) + bottomProjExprs = append(bottomProjExprs, col) + } + + for _, item := range orderByItems { + itemExpr := item.Expr + if _, isScalarFunc := itemExpr.(*expression.ScalarFunction); !isScalarFunc { + continue + } + bottomProjExprs = append(bottomProjExprs, itemExpr) + newArg := &expression.Column{ + UniqueID: p.context().GetSessionVars().AllocPlanColumnID(), + RetType: itemExpr.GetType(), + ColName: model.NewCIStr(fmt.Sprintf("col_%d", len(bottomProjSchemaCols))), + Index: len(bottomProjSchemaCols), + } + bottomProjSchemaCols = append(bottomProjSchemaCols, newArg) + item.Expr = newArg + } + + childProp := p.getChildReqProps(0).Clone() + bottomProj := PhysicalProjection{ + Exprs: bottomProjExprs, + AvoidColumnEvaluator: false, + }.init(p.context(), childPlan.statsInfo().ScaleByExpectCnt(childProp.ExpectedCnt), childProp) + bottomProj.SetSchema(expression.NewSchema(bottomProjSchemaCols...)) + bottomProj.SetChildren(childPlan) + p.SetChildren(bottomProj) + + if origChildProj, isChildProj := childPlan.(*PhysicalProjection); isChildProj { + refine4NeighbourProj(bottomProj, origChildProj) + } + return topProj +}