diff --git a/executor/builder.go b/executor/builder.go index 7a245ca3ebd9d..9aff073e5a59c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor/aggfuncs" - "github.com/pingcap/tidb/executor/windowfuncs" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" @@ -1888,14 +1887,17 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec for _, item := range v.PartitionBy { groupByItems = append(groupByItems, item.Col) } - windowFunc, err := windowfuncs.Build(b.ctx, v.WindowFuncDesc, len(v.Schema().Columns)-1) - if err != nil { - b.err = err + aggDesc := aggregation.NewAggFuncDesc(b.ctx, v.WindowFuncDesc.Name, v.WindowFuncDesc.Args, false) + resultColIdx := len(v.Schema().Columns) - 1 + agg := aggfuncs.Build(b.ctx, aggDesc, resultColIdx) + if agg == nil { + b.err = errors.Trace(errors.New("window evaluator only support aggregation functions without frame now")) return nil } e := &WindowExec{baseExecutor: base, - windowFunc: windowFunc, - partialResult: windowFunc.AllocPartialResult(), + windowFunc: agg, + partialResult: agg.AllocPartialResult(), + resultColIdx: resultColIdx, groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), } return e diff --git a/executor/window.go b/executor/window.go index de615f84a9f30..e814be65bfaea 100644 --- a/executor/window.go +++ b/executor/window.go @@ -19,11 +19,11 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/tidb/executor/windowfuncs" + "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/util/chunk" ) -// WindowExec is the executor for window functions. +// WindowExec is the executor for window functions. Note that it only supports aggregation without frame clause now. type WindowExec struct { baseExecutor @@ -32,10 +32,12 @@ type WindowExec struct { inputRow chunk.Row groupRows []chunk.Row childResults []*chunk.Chunk - windowFunc windowfuncs.WindowFunc - partialResult windowfuncs.PartialResult + windowFunc aggfuncs.AggFunc + partialResult aggfuncs.PartialResult + resultColIdx int executed bool meetNewGroup bool + remainingRows int64 } // Close implements the Executor Close interface. @@ -55,13 +57,13 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } chk.Reset() - if e.meetNewGroup && e.windowFunc.HasRemainingResults() { + if e.meetNewGroup && e.remainingRows > 0 { err := e.appendResult2Chunk(chk) if err != nil { return err } } - for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(chk.NumCols()-1) > 0) { + for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(e.resultColIdx) > 0) { err := e.consumeOneGroup(ctx, chk) if err != nil { e.executed = true @@ -104,10 +106,13 @@ func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) error { if len(e.groupRows) == 0 { return nil } - e.copyChk(chk) - var err error - e.groupRows, err = e.windowFunc.ProcessOneChunk(e.ctx, e.groupRows, chk, e.partialResult) - return err + err := e.windowFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResult) + if err != nil { + return errors.Trace(err) + } + e.remainingRows += int64(len(e.groupRows)) + e.groupRows = e.groupRows[:0] + return nil } func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk) (err error) { @@ -142,9 +147,18 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk // appendResult2Chunk appends result of the window function to the result chunk. func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error { e.copyChk(chk) - var err error - e.groupRows, err = e.windowFunc.ExhaustResult(e.ctx, e.groupRows, chk, e.partialResult) - return err + for e.remainingRows > 0 && chk.RemainedRows(e.resultColIdx) > 0 { + // TODO: We can extend the agg func interface to avoid the `for` loop here. + err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk) + if err != nil { + return err + } + e.remainingRows-- + } + if e.remainingRows == 0 { + e.windowFunc.ResetPartialResult(e.partialResult) + } + return nil } func (e *WindowExec) copyChk(chk *chunk.Chunk) { diff --git a/executor/windowfuncs/builder.go b/executor/windowfuncs/builder.go deleted file mode 100644 index 8243e64a524f1..0000000000000 --- a/executor/windowfuncs/builder.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package windowfuncs - -import ( - "github.com/pingcap/errors" - "github.com/pingcap/tidb/executor/aggfuncs" - "github.com/pingcap/tidb/expression/aggregation" - "github.com/pingcap/tidb/sessionctx" -) - -// Build builds window functions according to the window functions description. -func Build(sctx sessionctx.Context, windowFuncDesc *aggregation.WindowFuncDesc, ordinal int) (WindowFunc, error) { - aggDesc := aggregation.NewAggFuncDesc(sctx, windowFuncDesc.Name, windowFuncDesc.Args, false) - agg := aggfuncs.Build(sctx, aggDesc, ordinal) - if agg == nil { - return nil, errors.New("window evaluator only support aggregation functions without frame now") - } - return &aggWithoutFrame{agg: agg}, nil -} diff --git a/executor/windowfuncs/window_funcs.go b/executor/windowfuncs/window_funcs.go deleted file mode 100644 index 3c502afc23830..0000000000000 --- a/executor/windowfuncs/window_funcs.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package windowfuncs - -import ( - "unsafe" - - "github.com/pingcap/tidb/executor/aggfuncs" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/util/chunk" -) - -// PartialResult represents data structure to store the partial result for the -// aggregate functions. Here we use unsafe.Pointer to allow the partial result -// to be any type. -type PartialResult unsafe.Pointer - -// WindowFunc is the interface for processing window functions. -type WindowFunc interface { - // ProcessOneChunk processes one chunk. - ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) - // ExhaustResult exhausts result to the result chunk. - ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) - // HasRemainingResults checks if there are some remained results to be exhausted. - HasRemainingResults() bool - // AllocPartialResult allocates a specific data structure to store the partial result. - AllocPartialResult() PartialResult -} - -// aggWithoutFrame deals with agg functions with no frame specification. -type aggWithoutFrame struct { - agg aggfuncs.AggFunc - remained int64 -} - -type partialResult4AggWithoutFrame struct { - result aggfuncs.PartialResult -} - -// ProcessOneChunk implements the WindowFunc interface. -func (wf *aggWithoutFrame) ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) { - p := (*partialResult4AggWithoutFrame)(pr) - err := wf.agg.UpdatePartialResult(sctx, rows, p.result) - if err != nil { - return nil, err - } - wf.remained += int64(len(rows)) - rows = rows[:0] - return rows, nil -} - -// ExhaustResult implements the WindowFunc interface. -func (wf *aggWithoutFrame) ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, dest *chunk.Chunk, pr PartialResult) ([]chunk.Row, error) { - rows = rows[:0] - p := (*partialResult4AggWithoutFrame)(pr) - for wf.remained > 0 && dest.RemainedRows(dest.NumCols()-1) > 0 { - err := wf.agg.AppendFinalResult2Chunk(sctx, p.result, dest) - if err != nil { - return rows, err - } - wf.remained-- - } - if wf.remained == 0 { - wf.agg.ResetPartialResult(p.result) - } - return rows, nil -} - -// AllocPartialResult implements the WindowFunc interface. -func (wf *aggWithoutFrame) AllocPartialResult() PartialResult { - return PartialResult(&partialResult4AggWithoutFrame{wf.agg.AllocPartialResult()}) -} - -// HasRemainingResults implements the WindowFunc interface. -func (wf *aggWithoutFrame) HasRemainingResults() bool { - return wf.remained > 0 -}