diff --git a/executor/builder.go b/executor/builder.go index 9aff073e5a59c..81151e7efd03c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1897,7 +1897,6 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec e := &WindowExec{baseExecutor: base, windowFunc: agg, partialResult: agg.AllocPartialResult(), - resultColIdx: resultColIdx, groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), } return e diff --git a/executor/window.go b/executor/window.go index e814be65bfaea..d80a413c75bd3 100644 --- a/executor/window.go +++ b/executor/window.go @@ -27,17 +27,17 @@ import ( type WindowExec struct { baseExecutor - groupChecker *groupChecker - inputIter *chunk.Iterator4Chunk - inputRow chunk.Row - groupRows []chunk.Row - childResults []*chunk.Chunk - windowFunc aggfuncs.AggFunc - partialResult aggfuncs.PartialResult - resultColIdx int - executed bool - meetNewGroup bool - remainingRows int64 + groupChecker *groupChecker + inputIter *chunk.Iterator4Chunk + inputRow chunk.Row + groupRows []chunk.Row + childResults []*chunk.Chunk + windowFunc aggfuncs.AggFunc + partialResult aggfuncs.PartialResult + executed bool + meetNewGroup bool + remainingRowsInGroup int64 + remainingRowsInChunk int } // Close implements the Executor Close interface. @@ -57,13 +57,13 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } chk.Reset() - if e.meetNewGroup && e.remainingRows > 0 { + if e.meetNewGroup && e.remainingRowsInGroup > 0 { err := e.appendResult2Chunk(chk) if err != nil { return err } } - for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(e.resultColIdx) > 0) { + for !e.executed && (chk.NumRows() == 0 || e.remainingRowsInChunk > 0) { err := e.consumeOneGroup(ctx, chk) if err != nil { e.executed = true @@ -84,7 +84,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro return errors.Trace(err) } if e.meetNewGroup { - err := e.consumeGroupRows(chk) + err := e.consumeGroupRows() if err != nil { return errors.Trace(err) } @@ -102,7 +102,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro return nil } -func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) error { +func (e *WindowExec) consumeGroupRows() error { if len(e.groupRows) == 0 { return nil } @@ -110,7 +110,7 @@ func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) error { if err != nil { return errors.Trace(err) } - e.remainingRows += int64(len(e.groupRows)) + e.remainingRowsInGroup += int64(len(e.groupRows)) e.groupRows = e.groupRows[:0] return nil } @@ -120,8 +120,8 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk return nil } - // Before fetching a new batch of input, we should consume the last groupChecker. - err = e.consumeGroupRows(chk) + // Before fetching a new batch of input, we should consume the last group rows. + err = e.consumeGroupRows() if err != nil { return errors.Trace(err) } @@ -147,15 +147,16 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk // appendResult2Chunk appends result of the window function to the result chunk. func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error { e.copyChk(chk) - for e.remainingRows > 0 && chk.RemainedRows(e.resultColIdx) > 0 { + for e.remainingRowsInGroup > 0 && e.remainingRowsInChunk > 0 { // TODO: We can extend the agg func interface to avoid the `for` loop here. err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk) if err != nil { return err } - e.remainingRows-- + e.remainingRowsInGroup-- + e.remainingRowsInChunk-- } - if e.remainingRows == 0 { + if e.remainingRowsInGroup == 0 { e.windowFunc.ResetPartialResult(e.partialResult) } return nil @@ -167,6 +168,7 @@ func (e *WindowExec) copyChk(chk *chunk.Chunk) { } childResult := e.childResults[0] e.childResults = e.childResults[1:] + e.remainingRowsInChunk = childResult.NumRows() columns := e.Schema().Columns[:len(e.Schema().Columns)-1] for i, col := range columns { chk.CopyColumns(childResult, i, col.Index) diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index aa68763eec57d..dd459d757b1be 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -542,8 +542,3 @@ func readTime(buf []byte) types.Time { Fsp: fsp, } } - -// RemainedRows returns the number of rows needs to be appended in specific column. -func (c *Chunk) RemainedRows(colIdx int) int { - return c.columns[0].length - c.columns[colIdx].length -}