From 02199a1e569fa1d990c755c673688e3f41578614 Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Thu, 28 Nov 2019 22:56:37 +0800 Subject: [PATCH 1/3] executor: init the final recv channel of hash agg with enough chunk --- executor/aggregate.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index a1e7b696387c7..f4a6addacc327 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -256,6 +256,9 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { e.isChildReturnEmpty = true e.finalOutputCh = make(chan *AfFinalResult, finalConcurrency) e.finalInputCh = make(chan *chunk.Chunk, finalConcurrency) + for i := 0; i < finalConcurrency; i++ { + e.finalInputCh <- newFirstChunk(e) + } e.inputCh = make(chan *HashAggInput, partialConcurrency) e.finishCh = make(chan struct{}, 1) @@ -652,28 +655,28 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error if e.executed { return nil } - for !chk.IsFull() { - e.finalInputCh <- chk + for { result, ok := <-e.finalOutputCh - if !ok { // all finalWorkers exited + if !ok { e.executed = true - if chk.NumRows() > 0 { // but there are some data left - return nil - } if e.isChildReturnEmpty && e.defaultVal != nil { chk.Append(e.defaultVal, 0, 1) } - e.isChildReturnEmpty = false return nil } if result.err != nil { return result.err } + chk.Append(result.chk, 0, result.chk.NumRows()) + result.chk.Reset() + e.finalInputCh <- result.chk if chk.NumRows() > 0 { e.isChildReturnEmpty = false } + if chk.IsFull() { + return nil + } } - return nil } // unparallelExec executes hash aggregation algorithm in single thread. From 1e5bb2de03c1af06bfb0e18258e1dd9daa128f55 Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Wed, 4 Dec 2019 17:30:06 +0800 Subject: [PATCH 2/3] address comments --- executor/aggregate.go | 21 ++++----- executor/executor_required_rows_test.go | 61 ------------------------- 2 files changed, 8 insertions(+), 74 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 9b0e2bfff8e7e..4eb6c1ef430ed 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -90,8 +90,9 @@ type HashAggFinalWorker struct { // AfFinalResult indicates aggregation functions final result. type AfFinalResult struct { - chk *chunk.Chunk - err error + chk *chunk.Chunk + err error + giveBackCh chan *chunk.Chunk } // HashAggExec deals with all the aggregate functions. @@ -150,7 +151,6 @@ type HashAggExec struct { finishCh chan struct{} finalOutputCh chan *AfFinalResult - finalInputCh chan *chunk.Chunk partialOutputChs []chan *HashAggIntermData inputCh chan *HashAggInput partialInputChs []chan *chunk.Chunk @@ -271,10 +271,6 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { partialConcurrency := sessionVars.HashAggPartialConcurrency e.isChildReturnEmpty = true e.finalOutputCh = make(chan *AfFinalResult, finalConcurrency) - e.finalInputCh = make(chan *chunk.Chunk, finalConcurrency) - for i := 0; i < finalConcurrency; i++ { - e.finalInputCh <- newFirstChunk(e) - } e.inputCh = make(chan *HashAggInput, partialConcurrency) e.finishCh = make(chan struct{}, 1) @@ -319,11 +315,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { groupSet: set.NewStringSet(), inputCh: e.partialOutputChs[i], outputCh: e.finalOutputCh, - finalResultHolderCh: e.finalInputCh, + finalResultHolderCh: make(chan *chunk.Chunk, 1), rowBuffer: make([]types.Datum, 0, e.Schema().Len()), mutableRow: chunk.MutRowFromTypes(retTypes(e)), groupKeys: make([][]byte, 0, 8), } + e.finalWorkers[i].finalResultHolderCh <- newFirstChunk(e) } } @@ -550,7 +547,7 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) { } } } - w.outputCh <- &AfFinalResult{chk: result} + w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh} } func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) { @@ -683,13 +680,11 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error if result.err != nil { return result.err } - chk.Append(result.chk, 0, result.chk.NumRows()) + chk.SwapColumns(result.chk) result.chk.Reset() - e.finalInputCh <- result.chk + result.giveBackCh <- result.chk if chk.NumRows() > 0 { e.isChildReturnEmpty = false - } - if chk.IsFull() { return nil } } diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index 348b043f883de..90df8e91bdcb9 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -675,67 +675,6 @@ func (s *testExecSuite) TestStreamAggRequiredRows(c *C) { } } -func (s *testExecSuite) TestHashAggParallelRequiredRows(c *C) { - maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize - testCases := []struct { - totalRows int - aggFunc string - requiredRows []int - expectedRows []int - expectedRowsDS []int - gen func(valType *types.FieldType) interface{} - }{ - { - totalRows: maxChunkSize, - aggFunc: ast.AggFuncSum, - requiredRows: []int{1, 2, 3, 4, 5, 6, 7}, - expectedRows: []int{1, 2, 3, 4, 5, 6, 7}, - expectedRowsDS: []int{maxChunkSize, 0}, - gen: divGenerator(1), - }, - { - totalRows: maxChunkSize * 3, - aggFunc: ast.AggFuncAvg, - requiredRows: []int{1, 3}, - expectedRows: []int{1, 2}, - expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0}, - gen: divGenerator(maxChunkSize), - }, - { - totalRows: maxChunkSize * 3, - aggFunc: ast.AggFuncAvg, - requiredRows: []int{maxChunkSize, maxChunkSize}, - expectedRows: []int{maxChunkSize, maxChunkSize / 2}, - expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0}, - gen: divGenerator(2), - }, - } - - for _, hasDistinct := range []bool{false, true} { - for _, testCase := range testCases { - sctx := defaultCtx() - ctx := context.Background() - ds := newRequiredRowsDataSourceWithGenerator(sctx, testCase.totalRows, testCase.expectedRowsDS, testCase.gen) - childCols := ds.Schema().Columns - schema := expression.NewSchema(childCols...) - groupBy := []expression.Expression{childCols[1]} - aggFunc, err := aggregation.NewAggFuncDesc(sctx, testCase.aggFunc, []expression.Expression{childCols[0]}, hasDistinct) - c.Assert(err, IsNil) - aggFuncs := []*aggregation.AggFuncDesc{aggFunc} - exec := buildHashAggExecutor(sctx, ds, schema, aggFuncs, groupBy) - c.Assert(exec.Open(ctx), IsNil) - chk := newFirstChunk(exec) - for i := range testCase.requiredRows { - chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) - c.Assert(exec.Next(ctx, chk), IsNil) - c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) - } - c.Assert(exec.Close(), IsNil) - c.Assert(ds.checkNumNextCalled(), IsNil) - } - } -} - func (s *testExecSuite) TestMergeJoinRequiredRows(c *C) { justReturn1 := func(valType *types.FieldType) interface{} { switch valType.Tp { From f36bcef0cf1ea6c2965509c80d76a1c52fbdc068 Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Fri, 6 Dec 2019 16:59:44 +0800 Subject: [PATCH 3/3] fix sqllogic test --- executor/aggregate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 4eb6c1ef430ed..574ee516c33cf 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -540,7 +540,7 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) { result.SetNumVirtualRows(result.NumRows() + 1) } if result.IsFull() { - w.outputCh <- &AfFinalResult{chk: result} + w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh} result, finished = w.receiveFinalResultHolder() if finished { return