Skip to content

Commit

Permalink
let window exec only handles agg without frame now.
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx committed Jan 11, 2019
1 parent cec25b3 commit 8a1d0ff
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 138 deletions.
14 changes: 8 additions & 6 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/executor/windowfuncs"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -1888,14 +1887,17 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec
for _, item := range v.PartitionBy {
groupByItems = append(groupByItems, item.Col)
}
windowFunc, err := windowfuncs.Build(b.ctx, v.WindowFuncDesc, len(v.Schema().Columns)-1)
if err != nil {
b.err = err
aggDesc := aggregation.NewAggFuncDesc(b.ctx, v.WindowFuncDesc.Name, v.WindowFuncDesc.Args, false)
resultColIdx := len(v.Schema().Columns) - 1
agg := aggfuncs.Build(b.ctx, aggDesc, resultColIdx)
if agg == nil {
b.err = errors.Trace(errors.New("window evaluator only support aggregation functions without frame now"))
return nil
}
e := &WindowExec{baseExecutor: base,
windowFunc: windowFunc,
partialResult: windowFunc.AllocPartialResult(),
windowFunc: agg,
partialResult: agg.AllocPartialResult(),
resultColIdx: resultColIdx,
groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems),
}
return e
Expand Down
40 changes: 27 additions & 13 deletions executor/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/executor/windowfuncs"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/util/chunk"
)

// WindowExec is the executor for window functions.
// WindowExec is the executor for window functions. Note that it only supports aggregation without frame clause now.
type WindowExec struct {
baseExecutor

Expand All @@ -32,10 +32,12 @@ type WindowExec struct {
inputRow chunk.Row
groupRows []chunk.Row
childResults []*chunk.Chunk
windowFunc windowfuncs.WindowFunc
partialResult windowfuncs.PartialResult
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
resultColIdx int
executed bool
meetNewGroup bool
remainingRows int64
}

// Close implements the Executor Close interface.
Expand All @@ -55,13 +57,13 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.meetNewGroup && e.windowFunc.HasRemainingResults() {
if e.meetNewGroup && e.remainingRows > 0 {
err := e.appendResult2Chunk(chk)
if err != nil {
return err
}
}
for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(chk.NumCols()-1) > 0) {
for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(e.resultColIdx) > 0) {
err := e.consumeOneGroup(ctx, chk)
if err != nil {
e.executed = true
Expand Down Expand Up @@ -104,10 +106,13 @@ func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) error {
if len(e.groupRows) == 0 {
return nil
}
e.copyChk(chk)
var err error
e.groupRows, err = e.windowFunc.ProcessOneChunk(e.ctx, e.groupRows, chk, e.partialResult)
return err
err := e.windowFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResult)
if err != nil {
return errors.Trace(err)
}
e.remainingRows += int64(len(e.groupRows))
e.groupRows = e.groupRows[:0]
return nil
}

func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk) (err error) {
Expand Down Expand Up @@ -142,9 +147,18 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk
// appendResult2Chunk appends result of the window function to the result chunk.
func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error {
e.copyChk(chk)
var err error
e.groupRows, err = e.windowFunc.ExhaustResult(e.ctx, e.groupRows, chk, e.partialResult)
return err
for e.remainingRows > 0 && chk.RemainedRows(e.resultColIdx) > 0 {
// TODO: We can extend the agg func interface to avoid the `for` loop here.
err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk)
if err != nil {
return err
}
e.remainingRows--
}
if e.remainingRows == 0 {
e.windowFunc.ResetPartialResult(e.partialResult)
}
return nil
}

func (e *WindowExec) copyChk(chk *chunk.Chunk) {
Expand Down
31 changes: 0 additions & 31 deletions executor/windowfuncs/builder.go

This file was deleted.

88 changes: 0 additions & 88 deletions executor/windowfuncs/window_funcs.go

This file was deleted.

0 comments on commit 8a1d0ff

Please sign in to comment.