Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: init the final recv channel of hash agg with enough chunk #13811

Merged
merged 7 commits into from
Dec 6, 2019
19 changes: 11 additions & 8 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
e.isChildReturnEmpty = true
e.finalOutputCh = make(chan *AfFinalResult, finalConcurrency)
e.finalInputCh = make(chan *chunk.Chunk, finalConcurrency)
for i := 0; i < finalConcurrency; i++ {
e.finalInputCh <- newFirstChunk(e)
}
e.inputCh = make(chan *HashAggInput, partialConcurrency)
e.finishCh = make(chan struct{}, 1)

Expand Down Expand Up @@ -668,28 +671,28 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error
if e.executed {
return nil
}
for !chk.IsFull() {
e.finalInputCh <- chk
for {
result, ok := <-e.finalOutputCh
if !ok { // all finalWorkers exited
if !ok {
e.executed = true
if chk.NumRows() > 0 { // but there are some data left
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's remaining data in chk. The e.isChildReturnEmpty must be false. Thus we don't need to return it here.

return nil
}
if e.isChildReturnEmpty && e.defaultVal != nil {
chk.Append(e.defaultVal, 0, 1)
}
e.isChildReturnEmpty = false
return nil
}
if result.err != nil {
return result.err
}
chk.Append(result.chk, 0, result.chk.NumRows())
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
result.chk.Reset()
e.finalInputCh <- result.chk
if chk.NumRows() > 0 {
e.isChildReturnEmpty = false
}
if chk.IsFull() {
return nil
}
}
return nil
}

// unparallelExec executes hash aggregation algorithm in single thread.
Expand Down