Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx committed Jan 11, 2019
1 parent 8a1d0ff commit c655de1
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 27 deletions.
1 change: 0 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1897,7 +1897,6 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec
e := &WindowExec{baseExecutor: base,
windowFunc: agg,
partialResult: agg.AllocPartialResult(),
resultColIdx: resultColIdx,
groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems),
}
return e
Expand Down
44 changes: 23 additions & 21 deletions executor/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ import (
type WindowExec struct {
baseExecutor

groupChecker *groupChecker
inputIter *chunk.Iterator4Chunk
inputRow chunk.Row
groupRows []chunk.Row
childResults []*chunk.Chunk
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
resultColIdx int
executed bool
meetNewGroup bool
remainingRows int64
groupChecker *groupChecker
inputIter *chunk.Iterator4Chunk
inputRow chunk.Row
groupRows []chunk.Row
childResults []*chunk.Chunk
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
executed bool
meetNewGroup bool
remainingRowsInGroup int64
remainingRowsInChunk int
}

// Close implements the Executor Close interface.
Expand All @@ -57,13 +57,13 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.meetNewGroup && e.remainingRows > 0 {
if e.meetNewGroup && e.remainingRowsInGroup > 0 {
err := e.appendResult2Chunk(chk)
if err != nil {
return err
}
}
for !e.executed && (chk.NumRows() == 0 || chk.RemainedRows(e.resultColIdx) > 0) {
for !e.executed && (chk.NumRows() == 0 || e.remainingRowsInChunk > 0) {
err := e.consumeOneGroup(ctx, chk)
if err != nil {
e.executed = true
Expand All @@ -84,7 +84,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro
return errors.Trace(err)
}
if e.meetNewGroup {
err := e.consumeGroupRows(chk)
err := e.consumeGroupRows()
if err != nil {
return errors.Trace(err)
}
Expand All @@ -102,15 +102,15 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro
return nil
}

func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) error {
func (e *WindowExec) consumeGroupRows() error {
if len(e.groupRows) == 0 {
return nil
}
err := e.windowFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResult)
if err != nil {
return errors.Trace(err)
}
e.remainingRows += int64(len(e.groupRows))
e.remainingRowsInGroup += int64(len(e.groupRows))
e.groupRows = e.groupRows[:0]
return nil
}
Expand All @@ -120,8 +120,8 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk
return nil
}

// Before fetching a new batch of input, we should consume the last groupChecker.
err = e.consumeGroupRows(chk)
// Before fetching a new batch of input, we should consume the last group rows.
err = e.consumeGroupRows()
if err != nil {
return errors.Trace(err)
}
Expand All @@ -147,15 +147,16 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk
// appendResult2Chunk appends result of the window function to the result chunk.
func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error {
e.copyChk(chk)
for e.remainingRows > 0 && chk.RemainedRows(e.resultColIdx) > 0 {
for e.remainingRowsInGroup > 0 && e.remainingRowsInChunk > 0 {
// TODO: We can extend the agg func interface to avoid the `for` loop here.
err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk)
if err != nil {
return err
}
e.remainingRows--
e.remainingRowsInGroup--
e.remainingRowsInChunk--
}
if e.remainingRows == 0 {
if e.remainingRowsInGroup == 0 {
e.windowFunc.ResetPartialResult(e.partialResult)
}
return nil
Expand All @@ -167,6 +168,7 @@ func (e *WindowExec) copyChk(chk *chunk.Chunk) {
}
childResult := e.childResults[0]
e.childResults = e.childResults[1:]
e.remainingRowsInChunk = childResult.NumRows()
columns := e.Schema().Columns[:len(e.Schema().Columns)-1]
for i, col := range columns {
chk.CopyColumns(childResult, i, col.Index)
Expand Down
5 changes: 0 additions & 5 deletions util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,3 @@ func readTime(buf []byte) types.Time {
Fsp: fsp,
}
}

// RemainedRows returns the number of rows needs to be appended in specific column.
func (c *Chunk) RemainedRows(colIdx int) int {
return c.columns[0].length - c.columns[colIdx].length
}

0 comments on commit c655de1

Please sign in to comment.