From d7f739b6eb504857d2e2ebb64f8ec5a27afda628 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Thu, 3 Jan 2019 19:15:32 +0800 Subject: [PATCH 01/13] executor: support window func for aggregate without frame clause --- executor/aggregate.go | 23 +++- executor/builder.go | 77 ++++------- executor/window.go | 158 ++++++++++++++++++++++ executor/window_test.go | 41 ++++++ executor/windowfuncs/window_funcs.go | 85 ++++++++++++ expression/aggregation/base_func.go | 46 +++++++ planner/core/exhaust_physical_plans.go | 7 +- planner/core/logical_plan_builder.go | 8 +- planner/core/logical_plan_test.go | 18 +-- planner/core/logical_plans.go | 3 +- planner/core/physical_plans.go | 4 + planner/core/resolve_indices.go | 21 +++ planner/core/rule_build_proj_below_agg.go | 55 +------- planner/core/rule_column_pruning.go | 5 +- planner/core/rule_eliminate_projection.go | 5 +- util/chunk/chunk.go | 10 ++ 16 files changed, 444 insertions(+), 122 deletions(-) create mode 100644 executor/window.go create mode 100644 executor/window_test.go create mode 100644 executor/windowfuncs/window_funcs.go diff --git a/executor/aggregate.go b/executor/aggregate.go index f32f5de50ce22..a43108d69ef67 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -760,10 +760,7 @@ type StreamAggExec struct { // isChildReturnEmpty indicates whether the child executor only returns an empty input. isChildReturnEmpty bool defaultVal *chunk.Chunk - StmtCtx *stmtctx.StatementContext - GroupByItems []expression.Expression - curGroupKey []types.Datum - tmpGroupKey []types.Datum + groupChecker *groupChecker inputIter *chunk.Iterator4Chunk inputRow chunk.Row aggFuncs []aggfuncs.AggFunc @@ -824,7 +821,7 @@ func (e *StreamAggExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) e return errors.Trace(err) } for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() { - meetNewGroup, err := e.meetNewGroup(e.inputRow) + meetNewGroup, err := e.groupChecker.meetNewGroup(e.inputRow) if err != nil { return errors.Trace(err) } @@ -911,8 +908,22 @@ func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) error { return nil } +type groupChecker struct { + StmtCtx *stmtctx.StatementContext + GroupByItems []expression.Expression + curGroupKey []types.Datum + tmpGroupKey []types.Datum +} + +func newGroupChecker(stmtCtx *stmtctx.StatementContext, items []expression.Expression) *groupChecker { + return &groupChecker{ + StmtCtx: stmtCtx, + GroupByItems: items, + } +} + // meetNewGroup returns a value that represents if the new group is different from last group. -func (e *StreamAggExec) meetNewGroup(row chunk.Row) (bool, error) { +func (e *groupChecker) meetNewGroup(row chunk.Row) (bool, error) { if len(e.GroupByItems) == 0 { return false, nil } diff --git a/executor/builder.go b/executor/builder.go index 27ba1b9360c16..59cc78b03dfdd 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor/aggfuncs" + "github.com/pingcap/tidb/executor/windowfuncs" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" @@ -167,6 +168,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor { return b.buildIndexReader(v) case *plannercore.PhysicalIndexLookUpReader: return b.buildIndexLookUpReader(v) + case *plannercore.PhysicalWindow: + return b.buildWindow(v) default: b.err = ErrUnknownPlan.GenWithStack("Unknown Plan %T", p) return nil @@ -918,54 +921,6 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo return e } -// wrapCastForAggArgs wraps the args of an aggregate function with a cast function. -func (b *executorBuilder) wrapCastForAggArgs(funcs []*aggregation.AggFuncDesc) { - for _, f := range funcs { - // We do not need to wrap cast upon these functions, - // since the EvalXXX method called by the arg is determined by the corresponding arg type. - if f.Name == ast.AggFuncCount || f.Name == ast.AggFuncMin || f.Name == ast.AggFuncMax || f.Name == ast.AggFuncFirstRow { - continue - } - var castFunc func(ctx sessionctx.Context, expr expression.Expression) expression.Expression - switch retTp := f.RetTp; retTp.EvalType() { - case types.ETInt: - castFunc = expression.WrapWithCastAsInt - case types.ETReal: - castFunc = expression.WrapWithCastAsReal - case types.ETString: - castFunc = expression.WrapWithCastAsString - case types.ETDecimal: - castFunc = expression.WrapWithCastAsDecimal - default: - panic("should never happen in executorBuilder.wrapCastForAggArgs") - } - for i := range f.Args { - f.Args[i] = castFunc(b.ctx, f.Args[i]) - if f.Name != ast.AggFuncAvg && f.Name != ast.AggFuncSum { - continue - } - // After wrapping cast on the argument, flen etc. may not the same - // as the type of the aggregation function. The following part set - // the type of the argument exactly as the type of the aggregation - // function. - // Note: If the `Tp` of argument is the same as the `Tp` of the - // aggregation function, it will not wrap cast function on it - // internally. The reason of the special handling for `Column` is - // that the `RetType` of `Column` refers to the `infoschema`, so we - // need to set a new variable for it to avoid modifying the - // definition in `infoschema`. - if col, ok := f.Args[i].(*expression.Column); ok { - col.RetType = types.NewFieldType(col.RetType.Tp) - } - // originTp is used when the the `Tp` of column is TypeFloat32 while - // the type of the aggregation function is TypeFloat64. - originTp := f.Args[i].GetType().Tp - *(f.Args[i].GetType()) = *(f.RetTp) - f.Args[i].GetType().Tp = originTp - } - } -} - func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor { src := b.build(v.Children()[0]) if b.err != nil { @@ -1055,9 +1010,8 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu } e := &StreamAggExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), - StmtCtx: b.ctx.GetSessionVars().StmtCtx, + groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, v.GroupByItems), aggFuncs: make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)), - GroupByItems: v.GroupByItems, } if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) { e.defaultVal = nil @@ -1922,3 +1876,26 @@ func buildKvRangesForIndexJoin(sc *stmtctx.StatementContext, tableID, indexID in }) return kvRanges, nil } + +func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec { + childExec := b.build(v.Children()[0]) + if b.err != nil { + return nil + } + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec) + var groupByItems []expression.Expression + for _, item := range v.PartitionBy { + groupByItems = append(groupByItems, item.Col) + } + windowFunc, err := windowfuncs.BuildWindowFunc(b.ctx, v.WindowFuncDesc, len(v.Schema().Columns)-1) + if err != nil { + b.err = err + return nil + } + e := &WindowExec{baseExecutor: base, + windowFunc: windowFunc, + groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), + childCols: v.ChildCols, + } + return e +} diff --git a/executor/window.go b/executor/window.go new file mode 100644 index 0000000000000..9e5b2246dbf30 --- /dev/null +++ b/executor/window.go @@ -0,0 +1,158 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/executor/windowfuncs" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/util/chunk" +) + +// WindowExec is the executor for window functions. +type WindowExec struct { + baseExecutor + + groupChecker *groupChecker + inputIter *chunk.Iterator4Chunk + inputRow chunk.Row + groupRows []chunk.Row + childResults []*chunk.Chunk + windowFunc windowfuncs.WindowFunc + executed bool + childCols []*expression.Column +} + +// Close implements the Executor Close interface. +func (e *WindowExec) Close() error { + e.childResults = nil + return errors.Trace(e.baseExecutor.Close()) +} + +// Next implements the Executor Next interface. +func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("windowExec.Next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + } + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } + chk.Reset() + if e.windowFunc.HasRemainingResults() { + e.appendResult2Chunk(chk) + } + for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(chk.NumCols()-1) > 0) { + err := e.consumeOneGroup(ctx, chk) + if err != nil { + e.executed = true + return errors.Trace(err) + } + } + return nil +} + +func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) error { + for !e.executed { + if err := e.fetchChildIfNecessary(ctx, chk); err != nil { + return errors.Trace(err) + } + for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() { + meetNewGroup, err := e.groupChecker.meetNewGroup(e.inputRow) + if err != nil { + return errors.Trace(err) + } + if meetNewGroup { + err := e.consumeGroupRows(chk) + if err != nil { + return errors.Trace(err) + } + err = e.appendResult2Chunk(chk) + if err != nil { + return errors.Trace(err) + } + } + e.groupRows = append(e.groupRows, e.inputRow) + if meetNewGroup { + e.inputRow = e.inputIter.Next() + return nil + } + } + } + return nil +} + +func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) error { + if len(e.groupRows) == 0 { + return nil + } + e.copyChk(chk) + var err error + e.groupRows, err = e.windowFunc.ProcessOneChunk(e.ctx, e.groupRows, chk) + return err +} + +func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk) (err error) { + if e.inputIter != nil && e.inputRow != e.inputIter.End() { + return nil + } + + // Before fetching a new batch of input, we should consume the last groupChecker. + err = e.consumeGroupRows(chk) + if err != nil { + return errors.Trace(err) + } + + childResult := e.children[0].newFirstChunk() + err = e.children[0].Next(ctx, childResult) + if err != nil { + return errors.Trace(err) + } + e.childResults = append(e.childResults, childResult) + // No more data. + if childResult.NumRows() == 0 { + e.executed = true + err = e.appendResult2Chunk(chk) + return errors.Trace(err) + } + + e.inputIter = chunk.NewIterator4Chunk(childResult) + e.inputRow = e.inputIter.Begin() + return nil +} + +// appendResult2Chunk appends result of all the aggregation functions to the +// result chunk, and reset the evaluation context for each aggregation. +func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error { + e.copyChk(chk) + var err error + e.groupRows, err = e.windowFunc.ExhaustResult(e.ctx, e.groupRows, chk) + return err +} + +func (e *WindowExec) copyChk(chk *chunk.Chunk) { + if len(e.childResults) == 0 || chk.NumRows() > 0 { + return + } + childResult := e.childResults[0] + e.childResults = e.childResults[1:] + for i, col := range e.childCols { + chk.CopyColumns(childResult, i, col.Index) + } +} diff --git a/executor/window_test.go b/executor/window_test.go new file mode 100644 index 0000000000000..4bda1bd23e2a0 --- /dev/null +++ b/executor/window_test.go @@ -0,0 +1,41 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/tidb/util/testkit" +) + +func (s *testSuite2) TestWindowFunctions(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int, b int, c int)") + tk.MustExec("set @@tidb_enable_window_function = 1") + defer func() { + tk.MustExec("set @@tidb_enable_window_function = 0") + }() + tk.MustExec("insert into t values (1,2,3),(4,3,2),(2,3,4)") + result := tk.MustQuery("select count(a) over () from t") + result.Check(testkit.Rows("3", "3", "3")) + result = tk.MustQuery("select sum(a) over () + count(a) over () from t") + result.Check(testkit.Rows("10", "10", "10")) + result = tk.MustQuery("select sum(a) over (partition by a) from t") + result.Check(testkit.Rows("1", "2", "4")) + result = tk.MustQuery("select 1 + sum(a) over (), count(a) over () from t") + result.Check(testkit.Rows("8 3", "8 3", "8 3")) + result = tk.MustQuery("select sum(t1.a) over() from t t1, t t2") + result.Check(testkit.Rows("21", "21", "21", "21", "21", "21", "21", "21", "21")) +} diff --git a/executor/windowfuncs/window_funcs.go b/executor/windowfuncs/window_funcs.go new file mode 100644 index 0000000000000..59e99009091bb --- /dev/null +++ b/executor/windowfuncs/window_funcs.go @@ -0,0 +1,85 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package windowfuncs + +import ( + "errors" + + "github.com/pingcap/tidb/executor/aggfuncs" + "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/chunk" +) + +// WindowFunc is the interface for processing window functions. +type WindowFunc interface { + // ProcessOneChunk processes one chunk. + ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk) ([]chunk.Row, error) + // ExhaustResult exhausts result to the result chunk. + ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk) ([]chunk.Row, error) + // HasRemainingResults checks if there are some remained results to be exhausted. + HasRemainingResults() bool +} + +// aggWithoutFrame deals with agg functions with no frame specification. +type aggWithoutFrame struct { + result aggfuncs.PartialResult + agg aggfuncs.AggFunc + remained int64 +} + +// ProcessOneChunk implements the WindowFunc interface. +func (wf *aggWithoutFrame) ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk) ([]chunk.Row, error) { + err := wf.agg.UpdatePartialResult(sctx, rows, wf.result) + if err != nil { + return nil, err + } + wf.remained += int64(len(rows)) + rows = rows[:0] + return rows, nil +} + +// ExhaustResult implements the WindowFunc interface. +func (wf *aggWithoutFrame) ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk) ([]chunk.Row, error) { + rows = rows[:0] + for wf.remained > 0 && dest.RemainedRows(dest.NumCols()-1) > 0 { + err := wf.agg.AppendFinalResult2Chunk(sctx, wf.result, dest) + if err != nil { + return rows, err + } + wf.remained-- + } + if wf.remained == 0 { + wf.agg.ResetPartialResult(wf.result) + } + return rows, nil +} + +// HasRemainingResults implements the WindowFunc interface. +func (wf *aggWithoutFrame) HasRemainingResults() bool { + return wf.remained > 0 +} + +// BuildWindowFunc builds window functions according to the window functions description. +func BuildWindowFunc(ctx sessionctx.Context, window *aggregation.WindowFuncDesc, ordinal int) (WindowFunc, error) { + aggDesc := aggregation.NewAggFuncDesc(ctx, window.Name, window.Args, false) + agg := aggfuncs.Build(ctx, aggDesc, ordinal) + if agg == nil { + return nil, errors.New("window evaluator only support aggregation functions without frame now") + } + return &aggWithoutFrame{ + agg: agg, + result: agg.AllocPartialResult(), + }, nil +} diff --git a/expression/aggregation/base_func.go b/expression/aggregation/base_func.go index d7ab7a3d0816a..400ca2210a902 100644 --- a/expression/aggregation/base_func.go +++ b/expression/aggregation/base_func.go @@ -211,3 +211,49 @@ func (a *baseFuncDesc) GetDefaultValue() (v types.Datum) { } return } + +// WrapCastForAggArgs wraps the args of an aggregate function with a cast function. +func (a *baseFuncDesc) WrapCastForAggArgs(ctx sessionctx.Context) { + // We do not need to wrap cast upon these functions, + // since the EvalXXX method called by the arg is determined by the corresponding arg type. + if a.Name == ast.AggFuncCount || a.Name == ast.AggFuncMin || a.Name == ast.AggFuncMax || a.Name == ast.AggFuncFirstRow { + return + } + var castFunc func(ctx sessionctx.Context, expr expression.Expression) expression.Expression + switch retTp := a.RetTp; retTp.EvalType() { + case types.ETInt: + castFunc = expression.WrapWithCastAsInt + case types.ETReal: + castFunc = expression.WrapWithCastAsReal + case types.ETString: + castFunc = expression.WrapWithCastAsString + case types.ETDecimal: + castFunc = expression.WrapWithCastAsDecimal + default: + panic("should never happen in executorBuilder.wrapCastForAggArgs") + } + for i := range a.Args { + a.Args[i] = castFunc(ctx, a.Args[i]) + if a.Name != ast.AggFuncAvg && a.Name != ast.AggFuncSum { + continue + } + // After wrapping cast on the argument, flen etc. may not the same + // as the type of the aggregation function. The following part set + // the type of the argument exactly as the type of the aggregation + // function. + // Note: If the `Tp` of argument is the same as the `Tp` of the + // aggregation function, it will not wrap cast function on it + // internally. The reason of the special handling for `Column` is + // that the `RetType` of `Column` refers to the `infoschema`, so we + // need to set a new variable for it to avoid modifying the + // definition in `infoschema`. + if col, ok := a.Args[i].(*expression.Column); ok { + col.RetType = types.NewFieldType(col.RetType.Tp) + } + // originTp is used when the the `Tp` of column is TypeFloat32 while + // the type of the aggregation function is TypeFloat64. + originTp := a.Args[i].GetType().Tp + *(a.Args[i].GetType()) = *(a.RetTp) + a.Args[i].GetType().Tp = originTp + } +} diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index ee8796fb19994..fba5ee6e906d3 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -790,12 +790,17 @@ func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) [] } func (p *LogicalWindow) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan { - childProperty := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, Items: p.ByItems, Enforced: true} + var byItems []property.Item + byItems = append(byItems, p.PartitionBy...) + byItems = append(byItems, p.OrderBy...) + childProperty := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, Items: byItems, Enforced: true} if !prop.IsPrefix(childProperty) { return nil } window := PhysicalWindow{ WindowFuncDesc: p.WindowFuncDesc, + PartitionBy: p.PartitionBy, + OrderBy: p.OrderBy, }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProperty) window.SetSchema(p.Schema()) return []PhysicalPlan{window} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 20e76e036e787..34add28defaaf 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2662,9 +2662,15 @@ func (b *PlanBuilder) buildWindowFunction(p LogicalPlan, expr *ast.WindowFuncExp } desc := aggregation.NewWindowFuncDesc(b.ctx, expr.F, args) + desc.WrapCastForAggArgs(b.ctx) + lenPartition := 0 + if expr.Spec.PartitionBy != nil { + lenPartition = len(expr.Spec.PartitionBy.Items) + } window := LogicalWindow{ WindowFuncDesc: desc, - ByItems: byItems, + PartitionBy: byItems[0:lenPartition], + OrderBy: byItems[lenPartition:], }.Init(b.ctx) schema := p.Schema().Clone() schema.Append(&expression.Column{ diff --git a/planner/core/logical_plan_test.go b/planner/core/logical_plan_test.go index 43a00e4cff1c1..d59b11f37acb7 100644 --- a/planner/core/logical_plan_test.go +++ b/planner/core/logical_plan_test.go @@ -1906,23 +1906,23 @@ func (s *testPlanSuite) TestWindowFunction(c *C) { }{ { sql: "select a, avg(a) over(partition by a) from t", - result: "TableReader(Table(t))->Window(avg(test.t.a))->Projection", + result: "TableReader(Table(t))->Window(avg(cast(test.t.a)))->Projection", }, { sql: "select a, avg(a) over(partition by b) from t", - result: "TableReader(Table(t))->Sort->Window(avg(test.t.a))->Projection", + result: "TableReader(Table(t))->Sort->Window(avg(cast(test.t.a)))->Projection", }, { sql: "select a, avg(a+1) over(partition by (a+1)) from t", - result: "TableReader(Table(t))->Projection->Sort->Window(avg(2_proj_window_3))->Projection", + result: "TableReader(Table(t))->Projection->Sort->Window(avg(cast(2_proj_window_3)))->Projection", }, { sql: "select a, avg(a) over(order by a asc, b desc) from t order by a asc, b desc", - result: "TableReader(Table(t))->Sort->Window(avg(test.t.a))->Projection", + result: "TableReader(Table(t))->Sort->Window(avg(cast(test.t.a)))->Projection", }, { sql: "select a, b as a, avg(a) over(partition by a) from t", - result: "TableReader(Table(t))->Window(avg(test.t.a))->Projection", + result: "TableReader(Table(t))->Window(avg(cast(test.t.a)))->Projection", }, { sql: "select a, b as z, sum(z) over() from t", @@ -1930,7 +1930,7 @@ func (s *testPlanSuite) TestWindowFunction(c *C) { }, { sql: "select a, b as z from t order by (sum(z) over())", - result: "TableReader(Table(t))->Window(sum(test.t.z))->Sort->Projection", + result: "TableReader(Table(t))->Window(sum(cast(test.t.z)))->Sort->Projection", }, { sql: "select sum(avg(a)) over() from t", @@ -1938,11 +1938,11 @@ func (s *testPlanSuite) TestWindowFunction(c *C) { }, { sql: "select b from t order by(sum(a) over())", - result: "TableReader(Table(t))->Window(sum(test.t.a))->Sort->Projection", + result: "TableReader(Table(t))->Window(sum(cast(test.t.a)))->Sort->Projection", }, { sql: "select b from t order by(sum(a) over(partition by a))", - result: "TableReader(Table(t))->Window(sum(test.t.a))->Sort->Projection", + result: "TableReader(Table(t))->Window(sum(cast(test.t.a)))->Sort->Projection", }, { sql: "select b from t order by(sum(avg(a)) over())", @@ -1950,7 +1950,7 @@ func (s *testPlanSuite) TestWindowFunction(c *C) { }, { sql: "select a from t having (select sum(a) over() as w from t tt where a > t.a)", - result: "Apply{TableReader(Table(t))->TableReader(Table(t)->Sel([gt(tt.a, test.t.a)]))->Window(sum(tt.a))->MaxOneRow->Sel([w])}->Projection", + result: "Apply{TableReader(Table(t))->TableReader(Table(t)->Sel([gt(tt.a, test.t.a)]))->Window(sum(cast(tt.a)))->MaxOneRow->Sel([w])}->Projection", }, { sql: "select avg(a) over() as w from t having w > 1", diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 1a625a93ad709..b6f7463ee3d65 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -625,7 +625,8 @@ type LogicalWindow struct { logicalSchemaProducer WindowFuncDesc *aggregation.WindowFuncDesc - ByItems []property.Item // ByItems is composed of `PARTITION BY` and `ORDER BY` items. + PartitionBy []property.Item + OrderBy []property.Item // TODO: add frame clause } diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 4f404e88d9f2c..bb9f6188c2c33 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" @@ -380,4 +381,7 @@ type PhysicalWindow struct { physicalSchemaProducer WindowFuncDesc *aggregation.WindowFuncDesc + PartitionBy []property.Item + OrderBy []property.Item + ChildCols []*expression.Column } diff --git a/planner/core/resolve_indices.go b/planner/core/resolve_indices.go index 27562bc21838b..23c0a3e07bb9f 100644 --- a/planner/core/resolve_indices.go +++ b/planner/core/resolve_indices.go @@ -184,6 +184,27 @@ func (p *PhysicalSort) ResolveIndices() { } } +// ResolveIndices implements Plan interface. +func (p *PhysicalWindow) ResolveIndices() { + p.physicalSchemaProducer.ResolveIndices() + p.ChildCols = p.Schema().Columns[:len(p.Schema().Columns)-1] + for i, col := range p.ChildCols { + newCol := col.ResolveIndices(p.children[0].Schema()) + p.ChildCols[i] = newCol.(*expression.Column) + } + for i, item := range p.PartitionBy { + newCol := item.Col.ResolveIndices(p.children[0].Schema()) + p.PartitionBy[i].Col = newCol.(*expression.Column) + } + for i, item := range p.OrderBy { + newCol := item.Col.ResolveIndices(p.children[0].Schema()) + p.OrderBy[i].Col = newCol.(*expression.Column) + } + for i, arg := range p.WindowFuncDesc.Args { + p.WindowFuncDesc.Args[i] = arg.ResolveIndices(p.children[0].Schema()) + } +} + // ResolveIndices implements Plan interface. func (p *PhysicalTopN) ResolveIndices() { p.basePhysicalPlan.ResolveIndices() diff --git a/planner/core/rule_build_proj_below_agg.go b/planner/core/rule_build_proj_below_agg.go index 69d1a00cdb79d..fda998581f878 100644 --- a/planner/core/rule_build_proj_below_agg.go +++ b/planner/core/rule_build_proj_below_agg.go @@ -16,12 +16,9 @@ package core import ( "fmt" - "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/types" ) // buildProjBelowAgg builds a ProjOperator below AggOperator. @@ -53,7 +50,9 @@ func doBuildProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDe // If the mode is FinalMode, we do not need to wrap cast upon the args, // since the types of the args are already the expected. if len(aggFuncs) > 0 && aggFuncs[0].Mode != aggregation.FinalMode { - wrapCastForAggArgs(aggPlan.context(), aggFuncs) + for _, agg := range aggFuncs { + agg.WrapCastForAggArgs(aggPlan.context()) + } } for i := 0; !hasScalarFunc && i < len(aggFuncs); i++ { @@ -118,51 +117,3 @@ func doBuildProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDe aggPlan.SetChildren(proj) return aggPlan } - -// wrapCastForAggArgs wraps the args of an aggregate function with a cast function. -func wrapCastForAggArgs(ctx sessionctx.Context, funcs []*aggregation.AggFuncDesc) { - for _, f := range funcs { - // We do not need to wrap cast upon these functions, - // since the EvalXXX method called by the arg is determined by the corresponding arg type. - if f.Name == ast.AggFuncCount || f.Name == ast.AggFuncMin || f.Name == ast.AggFuncMax || f.Name == ast.AggFuncFirstRow { - continue - } - var castFunc func(ctx sessionctx.Context, expr expression.Expression) expression.Expression - switch retTp := f.RetTp; retTp.EvalType() { - case types.ETInt: - castFunc = expression.WrapWithCastAsInt - case types.ETReal: - castFunc = expression.WrapWithCastAsReal - case types.ETString: - castFunc = expression.WrapWithCastAsString - case types.ETDecimal: - castFunc = expression.WrapWithCastAsDecimal - default: - panic("should never happen in executorBuilder.wrapCastForAggArgs") - } - for i := range f.Args { - f.Args[i] = castFunc(ctx, f.Args[i]) - if f.Name != ast.AggFuncAvg && f.Name != ast.AggFuncSum { - continue - } - // After wrapping cast on the argument, flen etc. may not the same - // as the type of the aggregation function. The following part set - // the type of the argument exactly as the type of the aggregation - // function. - // Note: If the `Tp` of argument is the same as the `Tp` of the - // aggregation function, it will not wrap cast function on it - // internally. The reason of the special handling for `Column` is - // that the `RetType` of `Column` refers to the `infoschema`, so we - // need to set a new variable for it to avoid modifying the - // definition in `infoschema`. - if col, ok := f.Args[i].(*expression.Column); ok { - col.RetType = types.NewFieldType(col.RetType.Tp) - } - // originTp is used when the the `Tp` of column is TypeFloat32 while - // the type of the aggregation function is TypeFloat64. - originTp := f.Args[i].GetType().Tp - *(f.Args[i].GetType()) = *(f.RetTp) - f.Args[i].GetType().Tp = originTp - } - } -} diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index bf2d86a916575..07f6e7fd7b38c 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -286,7 +286,10 @@ func (p *LogicalWindow) extractUsedCols(parentUsedCols []*expression.Column) []* for _, arg := range p.WindowFuncDesc.Args { parentUsedCols = append(parentUsedCols, expression.ExtractColumns(arg)...) } - for _, by := range p.ByItems { + for _, by := range p.PartitionBy { + parentUsedCols = append(parentUsedCols, by.Col) + } + for _, by := range p.OrderBy { parentUsedCols = append(parentUsedCols, by.Col) } return parentUsedCols diff --git a/planner/core/rule_eliminate_projection.go b/planner/core/rule_eliminate_projection.go index 597ce50a9e8d6..65cc99218e071 100644 --- a/planner/core/rule_eliminate_projection.go +++ b/planner/core/rule_eliminate_projection.go @@ -211,7 +211,10 @@ func (p *LogicalWindow) replaceExprColumns(replace map[string]*expression.Column for _, arg := range p.WindowFuncDesc.Args { resolveExprAndReplace(arg, replace) } - for _, item := range p.ByItems { + for _, item := range p.PartitionBy { + resolveColumnAndReplace(item.Col, replace) + } + for _, item := range p.OrderBy { resolveColumnAndReplace(item.Col, replace) } } diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index 52ad133c4fd2b..aa68763eec57d 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -187,6 +187,11 @@ func (c *Chunk) SwapColumns(other *Chunk) { c.numVirtualRows, other.numVirtualRows = other.numVirtualRows, c.numVirtualRows } +// CopyColumns copies columns `other.columns[from]` to `c.columns[dst]`. +func (c *Chunk) CopyColumns(other *Chunk, dst, from int) { + c.columns[dst] = other.columns[from] +} + // SetNumVirtualRows sets the virtual row number for a Chunk. // It should only be used when there exists no column in the Chunk. func (c *Chunk) SetNumVirtualRows(numVirtualRows int) { @@ -537,3 +542,8 @@ func readTime(buf []byte) types.Time { Fsp: fsp, } } + +// RemainedRows returns the number of rows needs to be appended in specific column. +func (c *Chunk) RemainedRows(colIdx int) int { + return c.columns[0].length - c.columns[colIdx].length +} From 9736ba35e26a9e2ebb6c7e037bb4e738cfa28647 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Thu, 3 Jan 2019 19:21:51 +0800 Subject: [PATCH 02/13] update license year --- executor/window_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/window_test.go b/executor/window_test.go index 4bda1bd23e2a0..b5a4ae647243a 100644 --- a/executor/window_test.go +++ b/executor/window_test.go @@ -1,4 +1,4 @@ -// Copyright 2018 PingCAP, Inc. +// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 3eca104f40713837377dbf95ec8f7dc3db4b299a Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Thu, 3 Jan 2019 19:34:35 +0800 Subject: [PATCH 03/13] fix error check --- executor/window.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/executor/window.go b/executor/window.go index 9e5b2246dbf30..7d19afaa90215 100644 --- a/executor/window.go +++ b/executor/window.go @@ -56,7 +56,10 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { } chk.Reset() if e.windowFunc.HasRemainingResults() { - e.appendResult2Chunk(chk) + err := e.appendResult2Chunk(chk) + if err != nil { + return err + } } for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(chk.NumCols()-1) > 0) { err := e.consumeOneGroup(ctx, chk) From 1dbee2dc6d33cc652897b63c4fbfbcc2e8abca1b Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Thu, 3 Jan 2019 19:44:22 +0800 Subject: [PATCH 04/13] refine code comments --- executor/window.go | 3 +-- expression/aggregation/base_func.go | 2 +- planner/core/logical_plan_builder.go | 1 + 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/executor/window.go b/executor/window.go index 7d19afaa90215..99ee1005b151e 100644 --- a/executor/window.go +++ b/executor/window.go @@ -140,8 +140,7 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk return nil } -// appendResult2Chunk appends result of all the aggregation functions to the -// result chunk, and reset the evaluation context for each aggregation. +// appendResult2Chunk appends result of the window function to the result chunk. func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error { e.copyChk(chk) var err error diff --git a/expression/aggregation/base_func.go b/expression/aggregation/base_func.go index 400ca2210a902..803856304f029 100644 --- a/expression/aggregation/base_func.go +++ b/expression/aggregation/base_func.go @@ -230,7 +230,7 @@ func (a *baseFuncDesc) WrapCastForAggArgs(ctx sessionctx.Context) { case types.ETDecimal: castFunc = expression.WrapWithCastAsDecimal default: - panic("should never happen in executorBuilder.wrapCastForAggArgs") + panic("should never happen in baseFuncDesc.WrapCastForAggArgs") } for i := range a.Args { a.Args[i] = castFunc(ctx, a.Args[i]) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 34add28defaaf..68ba0b96bd4f0 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2662,6 +2662,7 @@ func (b *PlanBuilder) buildWindowFunction(p LogicalPlan, expr *ast.WindowFuncExp } desc := aggregation.NewWindowFuncDesc(b.ctx, expr.F, args) + // TODO: Check if the function is aggregation function after we support more functions. desc.WrapCastForAggArgs(b.ctx) lenPartition := 0 if expr.Spec.PartitionBy != nil { From b4e733683ee34003ed940761ce731becd94bc065 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Mon, 7 Jan 2019 17:00:14 +0800 Subject: [PATCH 05/13] address comments --- executor/aggregate.go | 1 + executor/builder.go | 3 +- executor/window.go | 21 ++++++------ executor/windowfuncs/builder.go | 31 +++++++++++++++++ executor/windowfuncs/window_funcs.go | 51 +++++++++++++++------------- planner/core/logical_plan_builder.go | 20 +++++------ 6 files changed, 81 insertions(+), 46 deletions(-) create mode 100644 executor/windowfuncs/builder.go diff --git a/executor/aggregate.go b/executor/aggregate.go index a43108d69ef67..480da96700719 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -923,6 +923,7 @@ func newGroupChecker(stmtCtx *stmtctx.StatementContext, items []expression.Expre } // meetNewGroup returns a value that represents if the new group is different from last group. +// TODO: Since all the group by items are only a column reference, guaranteed by building projection below aggregation, we can directly compare data in a chunk. func (e *groupChecker) meetNewGroup(row chunk.Row) (bool, error) { if len(e.GroupByItems) == 0 { return false, nil diff --git a/executor/builder.go b/executor/builder.go index 59cc78b03dfdd..6e4bde9b42e7f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1887,13 +1887,14 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec for _, item := range v.PartitionBy { groupByItems = append(groupByItems, item.Col) } - windowFunc, err := windowfuncs.BuildWindowFunc(b.ctx, v.WindowFuncDesc, len(v.Schema().Columns)-1) + windowFunc, err := windowfuncs.Build(b.ctx, v.WindowFuncDesc, len(v.Schema().Columns)-1) if err != nil { b.err = err return nil } e := &WindowExec{baseExecutor: base, windowFunc: windowFunc, + partialResult: windowFunc.AllocPartialResult(), groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), childCols: v.ChildCols, } diff --git a/executor/window.go b/executor/window.go index 99ee1005b151e..257140d1aa42a 100644 --- a/executor/window.go +++ b/executor/window.go @@ -28,14 +28,15 @@ import ( type WindowExec struct { baseExecutor - groupChecker *groupChecker - inputIter *chunk.Iterator4Chunk - inputRow chunk.Row - groupRows []chunk.Row - childResults []*chunk.Chunk - windowFunc windowfuncs.WindowFunc - executed bool - childCols []*expression.Column + groupChecker *groupChecker + inputIter *chunk.Iterator4Chunk + inputRow chunk.Row + groupRows []chunk.Row + childResults []*chunk.Chunk + windowFunc windowfuncs.WindowFunc + partialResult windowfuncs.PartialResult + executed bool + childCols []*expression.Column } // Close implements the Executor Close interface. @@ -107,7 +108,7 @@ func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) error { } e.copyChk(chk) var err error - e.groupRows, err = e.windowFunc.ProcessOneChunk(e.ctx, e.groupRows, chk) + e.groupRows, err = e.windowFunc.ProcessOneChunk(e.ctx, e.groupRows, chk, e.partialResult) return err } @@ -144,7 +145,7 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error { e.copyChk(chk) var err error - e.groupRows, err = e.windowFunc.ExhaustResult(e.ctx, e.groupRows, chk) + e.groupRows, err = e.windowFunc.ExhaustResult(e.ctx, e.groupRows, chk, e.partialResult) return err } diff --git a/executor/windowfuncs/builder.go b/executor/windowfuncs/builder.go new file mode 100644 index 0000000000000..8243e64a524f1 --- /dev/null +++ b/executor/windowfuncs/builder.go @@ -0,0 +1,31 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package windowfuncs + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/executor/aggfuncs" + "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/sessionctx" +) + +// Build builds window functions according to the window functions description. +func Build(sctx sessionctx.Context, windowFuncDesc *aggregation.WindowFuncDesc, ordinal int) (WindowFunc, error) { + aggDesc := aggregation.NewAggFuncDesc(sctx, windowFuncDesc.Name, windowFuncDesc.Args, false) + agg := aggfuncs.Build(sctx, aggDesc, ordinal) + if agg == nil { + return nil, errors.New("window evaluator only support aggregation functions without frame now") + } + return &aggWithoutFrame{agg: agg}, nil +} diff --git a/executor/windowfuncs/window_funcs.go b/executor/windowfuncs/window_funcs.go index 59e99009091bb..6591f764dff2f 100644 --- a/executor/windowfuncs/window_funcs.go +++ b/executor/windowfuncs/window_funcs.go @@ -14,34 +14,44 @@ package windowfuncs import ( - "errors" + "unsafe" "github.com/pingcap/tidb/executor/aggfuncs" - "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/chunk" ) +// PartialResult represents data structure to store the partial result for the +// aggregate functions. Here we use unsafe.Pointer to allow the partial result +// to be any type. +type PartialResult unsafe.Pointer + // WindowFunc is the interface for processing window functions. type WindowFunc interface { // ProcessOneChunk processes one chunk. - ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk) ([]chunk.Row, error) + ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) // ExhaustResult exhausts result to the result chunk. - ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk) ([]chunk.Row, error) + ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) // HasRemainingResults checks if there are some remained results to be exhausted. HasRemainingResults() bool + // AllocPartialResult allocates a specific data structure to store the partial result. + AllocPartialResult() PartialResult } // aggWithoutFrame deals with agg functions with no frame specification. type aggWithoutFrame struct { - result aggfuncs.PartialResult - agg aggfuncs.AggFunc + agg aggfuncs.AggFunc remained int64 } +type partialResult4AggWithoutFrame struct { + result aggfuncs.PartialResult +} + // ProcessOneChunk implements the WindowFunc interface. -func (wf *aggWithoutFrame) ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk) ([]chunk.Row, error) { - err := wf.agg.UpdatePartialResult(sctx, rows, wf.result) +func (wf *aggWithoutFrame) ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) { + p := (*partialResult4AggWithoutFrame)(pr) + err := wf.agg.UpdatePartialResult(sctx, rows, p.result) if err != nil { return nil, err } @@ -51,35 +61,28 @@ func (wf *aggWithoutFrame) ProcessOneChunk(sctx sessionctx.Context, rows []chunk } // ExhaustResult implements the WindowFunc interface. -func (wf *aggWithoutFrame) ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk) ([]chunk.Row, error) { +func (wf *aggWithoutFrame) ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) { rows = rows[:0] + p := (*partialResult4AggWithoutFrame)(pr) for wf.remained > 0 && dest.RemainedRows(dest.NumCols()-1) > 0 { - err := wf.agg.AppendFinalResult2Chunk(sctx, wf.result, dest) + err := wf.agg.AppendFinalResult2Chunk(sctx, p.result, dest) if err != nil { return rows, err } wf.remained-- } if wf.remained == 0 { - wf.agg.ResetPartialResult(wf.result) + wf.agg.ResetPartialResult(p.result) } return rows, nil } +// AllocPartialResult implements the WindowFunc interface. +func (wf *aggWithoutFrame) AllocPartialResult() PartialResult{ + return PartialResult(&partialResult4AggWithoutFrame{wf.agg.AllocPartialResult()}) +} + // HasRemainingResults implements the WindowFunc interface. func (wf *aggWithoutFrame) HasRemainingResults() bool { return wf.remained > 0 } - -// BuildWindowFunc builds window functions according to the window functions description. -func BuildWindowFunc(ctx sessionctx.Context, window *aggregation.WindowFuncDesc, ordinal int) (WindowFunc, error) { - aggDesc := aggregation.NewAggFuncDesc(ctx, window.Name, window.Args, false) - agg := aggfuncs.Build(ctx, aggDesc, ordinal) - if agg == nil { - return nil, errors.New("window evaluator only support aggregation functions without frame now") - } - return &aggWithoutFrame{ - agg: agg, - result: agg.AllocPartialResult(), - }, nil -} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 68ba0b96bd4f0..8899813e7adc1 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2586,13 +2586,15 @@ func (b *PlanBuilder) buildDelete(delete *ast.DeleteStmt) (Plan, error) { // buildProjectionForWindow builds the projection for expressions in the window specification that is not an column, // so after the projection, window functions only needs to deal with columns. -func (b *PlanBuilder) buildProjectionForWindow(p LogicalPlan, expr *ast.WindowFuncExpr, aggMap map[*ast.AggregateFuncExpr]int) (LogicalPlan, []property.Item, []expression.Expression, error) { +func (b *PlanBuilder) buildProjectionForWindow(p LogicalPlan, expr *ast.WindowFuncExpr, aggMap map[*ast.AggregateFuncExpr]int) (LogicalPlan, []property.Item, []property.Item, []expression.Expression, error) { b.optFlag |= flagEliminateProjection var items []*ast.ByItem spec := expr.Spec + lenPartition := 0 if spec.PartitionBy != nil { items = append(items, spec.PartitionBy.Items...) + lenPartition = len(spec.PartitionBy.Items) } if spec.OrderBy != nil { items = append(items, spec.OrderBy.Items...) @@ -2612,7 +2614,7 @@ func (b *PlanBuilder) buildProjectionForWindow(p LogicalPlan, expr *ast.WindowFu item.Expr = newExpr.(ast.ExprNode) it, np, err := b.rewrite(item.Expr, p, aggMap, true) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } p = np if col, ok := it.(*expression.Column); ok { @@ -2633,7 +2635,7 @@ func (b *PlanBuilder) buildProjectionForWindow(p LogicalPlan, expr *ast.WindowFu for _, arg := range expr.Args { newArg, np, err := b.rewrite(arg, p, aggMap, true) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } p = np if col, ok := newArg.(*expression.Column); ok { @@ -2652,11 +2654,11 @@ func (b *PlanBuilder) buildProjectionForWindow(p LogicalPlan, expr *ast.WindowFu proj.SetSchema(schema) proj.SetChildren(p) - return proj, propertyItems, newArgList, nil + return proj, propertyItems[:lenPartition], propertyItems[lenPartition:], newArgList, nil } func (b *PlanBuilder) buildWindowFunction(p LogicalPlan, expr *ast.WindowFuncExpr, aggMap map[*ast.AggregateFuncExpr]int) (*LogicalWindow, error) { - p, byItems, args, err := b.buildProjectionForWindow(p, expr, aggMap) + p, partitionBy, orderBy, args, err := b.buildProjectionForWindow(p, expr, aggMap) if err != nil { return nil, err } @@ -2664,14 +2666,10 @@ func (b *PlanBuilder) buildWindowFunction(p LogicalPlan, expr *ast.WindowFuncExp desc := aggregation.NewWindowFuncDesc(b.ctx, expr.F, args) // TODO: Check if the function is aggregation function after we support more functions. desc.WrapCastForAggArgs(b.ctx) - lenPartition := 0 - if expr.Spec.PartitionBy != nil { - lenPartition = len(expr.Spec.PartitionBy.Items) - } window := LogicalWindow{ WindowFuncDesc: desc, - PartitionBy: byItems[0:lenPartition], - OrderBy: byItems[lenPartition:], + PartitionBy: partitionBy, + OrderBy: orderBy, }.Init(b.ctx) schema := p.Schema().Clone() schema.Append(&expression.Column{ From 59c86e34e630b7bdb082ab6b5af4ae5213b8a0de Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Mon, 7 Jan 2019 17:15:19 +0800 Subject: [PATCH 06/13] fix ci --- executor/builder.go | 6 +++--- executor/windowfuncs/window_funcs.go | 6 +++--- planner/core/logical_plan_test.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 4e2479fb88c33..1893bab0ebf5f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1894,10 +1894,10 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec return nil } e := &WindowExec{baseExecutor: base, - windowFunc: windowFunc, + windowFunc: windowFunc, partialResult: windowFunc.AllocPartialResult(), - groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), - childCols: v.ChildCols, + groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), + childCols: v.ChildCols, } return e } diff --git a/executor/windowfuncs/window_funcs.go b/executor/windowfuncs/window_funcs.go index 6591f764dff2f..3c502afc23830 100644 --- a/executor/windowfuncs/window_funcs.go +++ b/executor/windowfuncs/window_funcs.go @@ -40,12 +40,12 @@ type WindowFunc interface { // aggWithoutFrame deals with agg functions with no frame specification. type aggWithoutFrame struct { - agg aggfuncs.AggFunc + agg aggfuncs.AggFunc remained int64 } type partialResult4AggWithoutFrame struct { - result aggfuncs.PartialResult + result aggfuncs.PartialResult } // ProcessOneChunk implements the WindowFunc interface. @@ -78,7 +78,7 @@ func (wf *aggWithoutFrame) ExhaustResult(sctx sessionctx.Context, rows []chunk.R } // AllocPartialResult implements the WindowFunc interface. -func (wf *aggWithoutFrame) AllocPartialResult() PartialResult{ +func (wf *aggWithoutFrame) AllocPartialResult() PartialResult { return PartialResult(&partialResult4AggWithoutFrame{wf.agg.AllocPartialResult()}) } diff --git a/planner/core/logical_plan_test.go b/planner/core/logical_plan_test.go index 46c02250df02e..3abf7e6caad04 100644 --- a/planner/core/logical_plan_test.go +++ b/planner/core/logical_plan_test.go @@ -1990,11 +1990,11 @@ func (s *testPlanSuite) TestWindowFunction(c *C) { }, { sql: "select sum(a) over(w1), avg(a) over(w2) from t window w1 as (partition by a), w2 as (w1)", - result: "TableReader(Table(t))->Window(sum(test.t.a))->Window(avg(test.t.a))->Projection", + result: "TableReader(Table(t))->Window(sum(cast(test.t.a)))->Window(avg(cast(test.t.a)))->Projection", }, { sql: "select a from t window w1 as (partition by a) order by (sum(a) over(w1))", - result: "TableReader(Table(t))->Window(sum(test.t.a))->Sort->Projection", + result: "TableReader(Table(t))->Window(sum(cast(test.t.a)))->Sort->Projection", }, } From 313de1fd7f1415bc0ab9d2e9f121e9c9b2c8213a Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Mon, 7 Jan 2019 18:33:32 +0800 Subject: [PATCH 07/13] address comments --- executor/window.go | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/executor/window.go b/executor/window.go index 257140d1aa42a..55aa7efcff53c 100644 --- a/executor/window.go +++ b/executor/window.go @@ -73,31 +73,29 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { } func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) error { - for !e.executed { - if err := e.fetchChildIfNecessary(ctx, chk); err != nil { + if err := e.fetchChildIfNecessary(ctx, chk); err != nil { + return errors.Trace(err) + } + for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() { + meetNewGroup, err := e.groupChecker.meetNewGroup(e.inputRow) + if err != nil { return errors.Trace(err) } - for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() { - meetNewGroup, err := e.groupChecker.meetNewGroup(e.inputRow) + if meetNewGroup { + err := e.consumeGroupRows(chk) if err != nil { return errors.Trace(err) } - if meetNewGroup { - err := e.consumeGroupRows(chk) - if err != nil { - return errors.Trace(err) - } - err = e.appendResult2Chunk(chk) - if err != nil { - return errors.Trace(err) - } - } - e.groupRows = append(e.groupRows, e.inputRow) - if meetNewGroup { - e.inputRow = e.inputIter.Next() - return nil + err = e.appendResult2Chunk(chk) + if err != nil { + return errors.Trace(err) } } + e.groupRows = append(e.groupRows, e.inputRow) + if meetNewGroup { + e.inputRow = e.inputIter.Next() + return nil + } } return nil } From d29cc6896de6aa2ac55b64f2db603e27aefec7ef Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Mon, 7 Jan 2019 20:24:24 +0800 Subject: [PATCH 08/13] fix bug --- executor/window.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/executor/window.go b/executor/window.go index 55aa7efcff53c..7c0ddeb8309b1 100644 --- a/executor/window.go +++ b/executor/window.go @@ -37,6 +37,7 @@ type WindowExec struct { partialResult windowfuncs.PartialResult executed bool childCols []*expression.Column + meetNewGroup bool } // Close implements the Executor Close interface. @@ -56,7 +57,7 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } chk.Reset() - if e.windowFunc.HasRemainingResults() { + if e.meetNewGroup && e.windowFunc.HasRemainingResults() { err := e.appendResult2Chunk(chk) if err != nil { return err @@ -73,15 +74,16 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { } func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) error { - if err := e.fetchChildIfNecessary(ctx, chk); err != nil { + var err error + if err = e.fetchChildIfNecessary(ctx, chk); err != nil { return errors.Trace(err) } for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() { - meetNewGroup, err := e.groupChecker.meetNewGroup(e.inputRow) + e.meetNewGroup, err = e.groupChecker.meetNewGroup(e.inputRow) if err != nil { return errors.Trace(err) } - if meetNewGroup { + if e.meetNewGroup { err := e.consumeGroupRows(chk) if err != nil { return errors.Trace(err) @@ -92,7 +94,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro } } e.groupRows = append(e.groupRows, e.inputRow) - if meetNewGroup { + if e.meetNewGroup { e.inputRow = e.inputIter.Next() return nil } From cec25b3290b0bab28a19c9fc8bbb623a923cc3da Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Wed, 9 Jan 2019 14:31:32 +0800 Subject: [PATCH 09/13] resolve schema column index instead --- executor/builder.go | 1 - executor/window.go | 5 ++--- executor/window_test.go | 2 ++ planner/core/physical_plans.go | 1 - planner/core/resolve_indices.go | 6 +++--- 5 files changed, 7 insertions(+), 8 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 1893bab0ebf5f..7a245ca3ebd9d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1897,7 +1897,6 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec windowFunc: windowFunc, partialResult: windowFunc.AllocPartialResult(), groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), - childCols: v.ChildCols, } return e } diff --git a/executor/window.go b/executor/window.go index 7c0ddeb8309b1..de615f84a9f30 100644 --- a/executor/window.go +++ b/executor/window.go @@ -20,7 +20,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/tidb/executor/windowfuncs" - "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/util/chunk" ) @@ -36,7 +35,6 @@ type WindowExec struct { windowFunc windowfuncs.WindowFunc partialResult windowfuncs.PartialResult executed bool - childCols []*expression.Column meetNewGroup bool } @@ -155,7 +153,8 @@ func (e *WindowExec) copyChk(chk *chunk.Chunk) { } childResult := e.childResults[0] e.childResults = e.childResults[1:] - for i, col := range e.childCols { + columns := e.Schema().Columns[:len(e.Schema().Columns)-1] + for i, col := range columns { chk.CopyColumns(childResult, i, col.Index) } } diff --git a/executor/window_test.go b/executor/window_test.go index b5a4ae647243a..0e966664add41 100644 --- a/executor/window_test.go +++ b/executor/window_test.go @@ -38,4 +38,6 @@ func (s *testSuite2) TestWindowFunctions(c *C) { result.Check(testkit.Rows("8 3", "8 3", "8 3")) result = tk.MustQuery("select sum(t1.a) over() from t t1, t t2") result.Check(testkit.Rows("21", "21", "21", "21", "21", "21", "21", "21", "21")) + result = tk.MustQuery("select _tidb_rowid, sum(t.a) over() from t") + result.Check(testkit.Rows("1 7", "2 7", "3 7")) } diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index bb9f6188c2c33..12d829cf3f9d6 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -383,5 +383,4 @@ type PhysicalWindow struct { WindowFuncDesc *aggregation.WindowFuncDesc PartitionBy []property.Item OrderBy []property.Item - ChildCols []*expression.Column } diff --git a/planner/core/resolve_indices.go b/planner/core/resolve_indices.go index 23c0a3e07bb9f..3145c45f26cdb 100644 --- a/planner/core/resolve_indices.go +++ b/planner/core/resolve_indices.go @@ -187,10 +187,10 @@ func (p *PhysicalSort) ResolveIndices() { // ResolveIndices implements Plan interface. func (p *PhysicalWindow) ResolveIndices() { p.physicalSchemaProducer.ResolveIndices() - p.ChildCols = p.Schema().Columns[:len(p.Schema().Columns)-1] - for i, col := range p.ChildCols { + for i := 0; i < len(p.Schema().Columns)-1; i++ { + col := p.Schema().Columns[i] newCol := col.ResolveIndices(p.children[0].Schema()) - p.ChildCols[i] = newCol.(*expression.Column) + p.Schema().Columns[i] = newCol.(*expression.Column) } for i, item := range p.PartitionBy { newCol := item.Col.ResolveIndices(p.children[0].Schema()) From 8a1d0ffb0b9e7fc02ffa2f15f11457841a3d5a91 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Fri, 11 Jan 2019 12:49:35 +0800 Subject: [PATCH 10/13] let window exec only handles agg without frame now. --- executor/builder.go | 14 +++-- executor/window.go | 40 +++++++++---- executor/windowfuncs/builder.go | 31 ---------- executor/windowfuncs/window_funcs.go | 88 ---------------------------- 4 files changed, 35 insertions(+), 138 deletions(-) delete mode 100644 executor/windowfuncs/builder.go delete mode 100644 executor/windowfuncs/window_funcs.go diff --git a/executor/builder.go b/executor/builder.go index 7a245ca3ebd9d..9aff073e5a59c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor/aggfuncs" - "github.com/pingcap/tidb/executor/windowfuncs" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" @@ -1888,14 +1887,17 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec for _, item := range v.PartitionBy { groupByItems = append(groupByItems, item.Col) } - windowFunc, err := windowfuncs.Build(b.ctx, v.WindowFuncDesc, len(v.Schema().Columns)-1) - if err != nil { - b.err = err + aggDesc := aggregation.NewAggFuncDesc(b.ctx, v.WindowFuncDesc.Name, v.WindowFuncDesc.Args, false) + resultColIdx := len(v.Schema().Columns) - 1 + agg := aggfuncs.Build(b.ctx, aggDesc, resultColIdx) + if agg == nil { + b.err = errors.Trace(errors.New("window evaluator only support aggregation functions without frame now")) return nil } e := &WindowExec{baseExecutor: base, - windowFunc: windowFunc, - partialResult: windowFunc.AllocPartialResult(), + windowFunc: agg, + partialResult: agg.AllocPartialResult(), + resultColIdx: resultColIdx, groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), } return e diff --git a/executor/window.go b/executor/window.go index de615f84a9f30..e814be65bfaea 100644 --- a/executor/window.go +++ b/executor/window.go @@ -19,11 +19,11 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/tidb/executor/windowfuncs" + "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/util/chunk" ) -// WindowExec is the executor for window functions. +// WindowExec is the executor for window functions. Note that it only supports aggregation without frame clause now. type WindowExec struct { baseExecutor @@ -32,10 +32,12 @@ type WindowExec struct { inputRow chunk.Row groupRows []chunk.Row childResults []*chunk.Chunk - windowFunc windowfuncs.WindowFunc - partialResult windowfuncs.PartialResult + windowFunc aggfuncs.AggFunc + partialResult aggfuncs.PartialResult + resultColIdx int executed bool meetNewGroup bool + remainingRows int64 } // Close implements the Executor Close interface. @@ -55,13 +57,13 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } chk.Reset() - if e.meetNewGroup && e.windowFunc.HasRemainingResults() { + if e.meetNewGroup && e.remainingRows > 0 { err := e.appendResult2Chunk(chk) if err != nil { return err } } - for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(chk.NumCols()-1) > 0) { + for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(e.resultColIdx) > 0) { err := e.consumeOneGroup(ctx, chk) if err != nil { e.executed = true @@ -104,10 +106,13 @@ func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) error { if len(e.groupRows) == 0 { return nil } - e.copyChk(chk) - var err error - e.groupRows, err = e.windowFunc.ProcessOneChunk(e.ctx, e.groupRows, chk, e.partialResult) - return err + err := e.windowFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResult) + if err != nil { + return errors.Trace(err) + } + e.remainingRows += int64(len(e.groupRows)) + e.groupRows = e.groupRows[:0] + return nil } func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk) (err error) { @@ -142,9 +147,18 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk // appendResult2Chunk appends result of the window function to the result chunk. func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error { e.copyChk(chk) - var err error - e.groupRows, err = e.windowFunc.ExhaustResult(e.ctx, e.groupRows, chk, e.partialResult) - return err + for e.remainingRows > 0 && chk.RemainedRows(e.resultColIdx) > 0 { + // TODO: We can extend the agg func interface to avoid the `for` loop here. + err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk) + if err != nil { + return err + } + e.remainingRows-- + } + if e.remainingRows == 0 { + e.windowFunc.ResetPartialResult(e.partialResult) + } + return nil } func (e *WindowExec) copyChk(chk *chunk.Chunk) { diff --git a/executor/windowfuncs/builder.go b/executor/windowfuncs/builder.go deleted file mode 100644 index 8243e64a524f1..0000000000000 --- a/executor/windowfuncs/builder.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package windowfuncs - -import ( - "github.com/pingcap/errors" - "github.com/pingcap/tidb/executor/aggfuncs" - "github.com/pingcap/tidb/expression/aggregation" - "github.com/pingcap/tidb/sessionctx" -) - -// Build builds window functions according to the window functions description. -func Build(sctx sessionctx.Context, windowFuncDesc *aggregation.WindowFuncDesc, ordinal int) (WindowFunc, error) { - aggDesc := aggregation.NewAggFuncDesc(sctx, windowFuncDesc.Name, windowFuncDesc.Args, false) - agg := aggfuncs.Build(sctx, aggDesc, ordinal) - if agg == nil { - return nil, errors.New("window evaluator only support aggregation functions without frame now") - } - return &aggWithoutFrame{agg: agg}, nil -} diff --git a/executor/windowfuncs/window_funcs.go b/executor/windowfuncs/window_funcs.go deleted file mode 100644 index 3c502afc23830..0000000000000 --- a/executor/windowfuncs/window_funcs.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package windowfuncs - -import ( - "unsafe" - - "github.com/pingcap/tidb/executor/aggfuncs" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/util/chunk" -) - -// PartialResult represents data structure to store the partial result for the -// aggregate functions. Here we use unsafe.Pointer to allow the partial result -// to be any type. -type PartialResult unsafe.Pointer - -// WindowFunc is the interface for processing window functions. -type WindowFunc interface { - // ProcessOneChunk processes one chunk. - ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) - // ExhaustResult exhausts result to the result chunk. - ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) - // HasRemainingResults checks if there are some remained results to be exhausted. - HasRemainingResults() bool - // AllocPartialResult allocates a specific data structure to store the partial result. - AllocPartialResult() PartialResult -} - -// aggWithoutFrame deals with agg functions with no frame specification. -type aggWithoutFrame struct { - agg aggfuncs.AggFunc - remained int64 -} - -type partialResult4AggWithoutFrame struct { - result aggfuncs.PartialResult -} - -// ProcessOneChunk implements the WindowFunc interface. -func (wf *aggWithoutFrame) ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) { - p := (*partialResult4AggWithoutFrame)(pr) - err := wf.agg.UpdatePartialResult(sctx, rows, p.result) - if err != nil { - return nil, err - } - wf.remained += int64(len(rows)) - rows = rows[:0] - return rows, nil -} - -// ExhaustResult implements the WindowFunc interface. -func (wf *aggWithoutFrame) ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) { - rows = rows[:0] - p := (*partialResult4AggWithoutFrame)(pr) - for wf.remained > 0 && dest.RemainedRows(dest.NumCols()-1) > 0 { - err := wf.agg.AppendFinalResult2Chunk(sctx, p.result, dest) - if err != nil { - return rows, err - } - wf.remained-- - } - if wf.remained == 0 { - wf.agg.ResetPartialResult(p.result) - } - return rows, nil -} - -// AllocPartialResult implements the WindowFunc interface. -func (wf *aggWithoutFrame) AllocPartialResult() PartialResult { - return PartialResult(&partialResult4AggWithoutFrame{wf.agg.AllocPartialResult()}) -} - -// HasRemainingResults implements the WindowFunc interface. -func (wf *aggWithoutFrame) HasRemainingResults() bool { - return wf.remained > 0 -} From c655de1d5ce21aff4e1905a4cdf328684d8a9db8 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Fri, 11 Jan 2019 16:29:52 +0800 Subject: [PATCH 11/13] address comments --- executor/builder.go | 1 - executor/window.go | 44 +++++++++++++++++++++++--------------------- util/chunk/chunk.go | 5 ----- 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 9aff073e5a59c..81151e7efd03c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1897,7 +1897,6 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec e := &WindowExec{baseExecutor: base, windowFunc: agg, partialResult: agg.AllocPartialResult(), - resultColIdx: resultColIdx, groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), } return e diff --git a/executor/window.go b/executor/window.go index e814be65bfaea..d80a413c75bd3 100644 --- a/executor/window.go +++ b/executor/window.go @@ -27,17 +27,17 @@ import ( type WindowExec struct { baseExecutor - groupChecker *groupChecker - inputIter *chunk.Iterator4Chunk - inputRow chunk.Row - groupRows []chunk.Row - childResults []*chunk.Chunk - windowFunc aggfuncs.AggFunc - partialResult aggfuncs.PartialResult - resultColIdx int - executed bool - meetNewGroup bool - remainingRows int64 + groupChecker *groupChecker + inputIter *chunk.Iterator4Chunk + inputRow chunk.Row + groupRows []chunk.Row + childResults []*chunk.Chunk + windowFunc aggfuncs.AggFunc + partialResult aggfuncs.PartialResult + executed bool + meetNewGroup bool + remainingRowsInGroup int64 + remainingRowsInChunk int } // Close implements the Executor Close interface. @@ -57,13 +57,13 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } chk.Reset() - if e.meetNewGroup && e.remainingRows > 0 { + if e.meetNewGroup && e.remainingRowsInGroup > 0 { err := e.appendResult2Chunk(chk) if err != nil { return err } } - for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(e.resultColIdx) > 0) { + for !e.executed && (chk.NumRows() == 0 || e.remainingRowsInChunk > 0) { err := e.consumeOneGroup(ctx, chk) if err != nil { e.executed = true @@ -84,7 +84,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro return errors.Trace(err) } if e.meetNewGroup { - err := e.consumeGroupRows(chk) + err := e.consumeGroupRows() if err != nil { return errors.Trace(err) } @@ -102,7 +102,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro return nil } -func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) error { +func (e *WindowExec) consumeGroupRows() error { if len(e.groupRows) == 0 { return nil } @@ -110,7 +110,7 @@ func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) error { if err != nil { return errors.Trace(err) } - e.remainingRows += int64(len(e.groupRows)) + e.remainingRowsInGroup += int64(len(e.groupRows)) e.groupRows = e.groupRows[:0] return nil } @@ -120,8 +120,8 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk return nil } - // Before fetching a new batch of input, we should consume the last groupChecker. - err = e.consumeGroupRows(chk) + // Before fetching a new batch of input, we should consume the last group rows. + err = e.consumeGroupRows() if err != nil { return errors.Trace(err) } @@ -147,15 +147,16 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk // appendResult2Chunk appends result of the window function to the result chunk. func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error { e.copyChk(chk) - for e.remainingRows > 0 && chk.RemainedRows(e.resultColIdx) > 0 { + for e.remainingRowsInGroup > 0 && e.remainingRowsInChunk > 0 { // TODO: We can extend the agg func interface to avoid the `for` loop here. err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk) if err != nil { return err } - e.remainingRows-- + e.remainingRowsInGroup-- + e.remainingRowsInChunk-- } - if e.remainingRows == 0 { + if e.remainingRowsInGroup == 0 { e.windowFunc.ResetPartialResult(e.partialResult) } return nil @@ -167,6 +168,7 @@ func (e *WindowExec) copyChk(chk *chunk.Chunk) { } childResult := e.childResults[0] e.childResults = e.childResults[1:] + e.remainingRowsInChunk = childResult.NumRows() columns := e.Schema().Columns[:len(e.Schema().Columns)-1] for i, col := range columns { chk.CopyColumns(childResult, i, col.Index) diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index aa68763eec57d..dd459d757b1be 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -542,8 +542,3 @@ func readTime(buf []byte) types.Time { Fsp: fsp, } } - -// RemainedRows returns the number of rows needs to be appended in specific column. -func (c *Chunk) RemainedRows(colIdx int) int { - return c.columns[0].length - c.columns[colIdx].length -} From 7db14b1f9d7272fe2a05c2645e6feae9c18710ee Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Mon, 14 Jan 2019 17:13:16 +0800 Subject: [PATCH 12/13] rename and add tests --- executor/window.go | 2 +- util/chunk/chunk.go | 10 +++++----- util/chunk/chunk_test.go | 17 +++++++++++++++++ 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/executor/window.go b/executor/window.go index d80a413c75bd3..e29c6219ac258 100644 --- a/executor/window.go +++ b/executor/window.go @@ -171,6 +171,6 @@ func (e *WindowExec) copyChk(chk *chunk.Chunk) { e.remainingRowsInChunk = childResult.NumRows() columns := e.Schema().Columns[:len(e.Schema().Columns)-1] for i, col := range columns { - chk.CopyColumns(childResult, i, col.Index) + chk.MakeRefTo(i, childResult, col.Index) } } diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index dd459d757b1be..c59afc45cebe4 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -138,6 +138,11 @@ func (c *Chunk) MakeRef(srcColIdx, dstColIdx int) { c.columns[dstColIdx] = c.columns[srcColIdx] } +// MakeRefTo copies columns `src.columns[srcColIdx]` to `c.columns[dstColIdx]`. +func (c *Chunk) MakeRefTo(dstColIdx int, src *Chunk, srcColIdx int) { + c.columns[dstColIdx] = src.columns[srcColIdx] +} + // SwapColumn swaps column "c.columns[colIdx]" with column // "other.columns[otherIdx]". If there exists columns refer to the column to be // swapped, we need to re-build the reference. @@ -187,11 +192,6 @@ func (c *Chunk) SwapColumns(other *Chunk) { c.numVirtualRows, other.numVirtualRows = other.numVirtualRows, c.numVirtualRows } -// CopyColumns copies columns `other.columns[from]` to `c.columns[dst]`. -func (c *Chunk) CopyColumns(other *Chunk, dst, from int) { - c.columns[dst] = other.columns[from] -} - // SetNumVirtualRows sets the virtual row number for a Chunk. // It should only be used when there exists no column in the Chunk. func (c *Chunk) SetNumVirtualRows(numVirtualRows int) { diff --git a/util/chunk/chunk_test.go b/util/chunk/chunk_test.go index b78be4ad67f5c..e4f0975a6e763 100644 --- a/util/chunk/chunk_test.go +++ b/util/chunk/chunk_test.go @@ -620,6 +620,23 @@ func (s *testChunkSuite) TestPreAlloc4RowAndInsert(c *check.C) { } } +func (s *testChunkSuite) TestMakeRefTo(c *check.C) { + fieldTypes := make([]*types.FieldType, 0, 2) + fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat}) + fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat}) + + chk1 := NewChunkWithCapacity(fieldTypes, 1) + chk1.AppendFloat64(0, 1) + chk1.AppendFloat64(1, 3) + + chk2 := NewChunkWithCapacity(fieldTypes, 1) + chk2.MakeRefTo(0, chk1, 1) + chk2.MakeRefTo(1, chk1, 0) + + c.Assert(chk2.columns[0] == chk1.columns[1], check.IsTrue) + c.Assert(chk2.columns[1] == chk1.columns[0], check.IsTrue) +} + func BenchmarkAppendInt(b *testing.B) { b.ReportAllocs() chk := newChunk(8) From db3289ab1f2387efdeaab060067a640775c7c580 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Mon, 14 Jan 2019 17:32:23 +0800 Subject: [PATCH 13/13] fix merge errors --- executor/window.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/executor/window.go b/executor/window.go index e29c6219ac258..2e476067822a3 100644 --- a/executor/window.go +++ b/executor/window.go @@ -47,7 +47,7 @@ func (e *WindowExec) Close() error { } // Next implements the Executor Next interface. -func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { +func (e *WindowExec) Next(ctx context.Context, chk *chunk.RecordBatch) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("windowExec.Next", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -58,13 +58,13 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { } chk.Reset() if e.meetNewGroup && e.remainingRowsInGroup > 0 { - err := e.appendResult2Chunk(chk) + err := e.appendResult2Chunk(chk.Chunk) if err != nil { return err } } for !e.executed && (chk.NumRows() == 0 || e.remainingRowsInChunk > 0) { - err := e.consumeOneGroup(ctx, chk) + err := e.consumeOneGroup(ctx, chk.Chunk) if err != nil { e.executed = true return errors.Trace(err) @@ -127,7 +127,7 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk } childResult := e.children[0].newFirstChunk() - err = e.children[0].Next(ctx, childResult) + err = e.children[0].Next(ctx, &chunk.RecordBatch{Chunk: childResult}) if err != nil { return errors.Trace(err) }