From 3e88f0073cda3821b1ba89dda445f88c3a7738c3 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 13 Aug 2018 19:19:13 +0800 Subject: [PATCH 1/6] executor: use new aggregate framework for HashAggExec --- executor/aggregate.go | 175 +++++++++++++++++------------------------- executor/builder.go | 33 +++++--- 2 files changed, 92 insertions(+), 116 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 5a1613a31768e..cca04801e5243 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -19,7 +19,6 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -31,16 +30,16 @@ import ( "golang.org/x/net/context" ) -type aggCtxsMapper map[string][]*aggregation.AggEvaluateContext +type aggCtxsMapper map[string][]aggfuncs.PartialResult // baseHashAggWorker stores the common attributes of HashAggFinalWorker and HashAggPartialWorker. type baseHashAggWorker struct { finishCh <-chan struct{} - aggFuncs []aggregation.Aggregation + aggFuncs []aggfuncs.AggFunc maxChunkSize int } -func newBaseHashAggWorker(finishCh <-chan struct{}, aggFuncs []aggregation.Aggregation, maxChunkSize int) baseHashAggWorker { +func newBaseHashAggWorker(finishCh <-chan struct{}, aggFuncs []aggfuncs.AggFunc, maxChunkSize int) baseHashAggWorker { return baseHashAggWorker{ finishCh: finishCh, aggFuncs: aggFuncs, @@ -91,7 +90,7 @@ type AfFinalResult struct { // HashAggExec deals with all the aggregate functions. // It is built from the Aggregate Plan. When Next() is called, it reads all the data from Src -// and updates all the items in AggFuncs. +// and updates all the items in PartialAggFuncs. // The parallel execution flow is as the following graph shows: // // +-------------+ @@ -133,17 +132,16 @@ type AfFinalResult struct { type HashAggExec struct { baseExecutor - prepared bool - sc *stmtctx.StatementContext - AggFuncs []aggregation.Aggregation - aggCtxsMap aggCtxsMapper - groupMap *mvmap.MVMap - groupIterator *mvmap.Iterator - mutableRow chunk.MutRow - rowBuffer []types.Datum - GroupByItems []expression.Expression - groupKey []byte - groupVals [][]byte + prepared bool + sc *stmtctx.StatementContext + PartialAggFuncs []aggfuncs.AggFunc + FinalAggFuncs []aggfuncs.AggFunc + partialResultMap aggCtxsMapper + groupMap *mvmap.MVMap + groupIterator *mvmap.Iterator + GroupByItems []expression.Expression + groupKey []byte + groupVals [][]byte // After we support parallel execution for aggregation functions with distinct, // we can remove this attribute. @@ -174,30 +172,21 @@ type HashAggInput struct { // HashAggIntermData indicates the intermediate data of aggregation execution. type HashAggIntermData struct { - groupKeys [][]byte - cursor int - groupCtxMap aggCtxsMapper + groupKeys [][]byte + cursor int + partialResultMap aggCtxsMapper } -// ToRows converts HashAggInterData to DatumRows. -func (d *HashAggIntermData) ToRows(sc *stmtctx.StatementContext, rows [][]types.Datum, aggFuncs []aggregation.Aggregation, maxChunkSize int) (_ [][]types.Datum, reachEnd bool) { - if len(rows) == maxChunkSize { - return rows, false +// getPartialResultBatch fetches a batch of partial results from HashAggIntermData. +func (d *HashAggIntermData) getPartialResultBatch(sc *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, aggFuncs []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys [][]byte, reachEnd bool) { + if len(prs) == maxChunkSize { + return prs, nil, false } + keyStart := d.cursor for ; d.cursor < len(d.groupKeys); d.cursor++ { - groupKey := d.groupKeys[d.cursor] - row := make([]types.Datum, 0, len(aggFuncs)*2) - aggCtxs := d.groupCtxMap[string(groupKey)] - for i, f := range aggFuncs { - for _, d := range f.GetPartialResult(aggCtxs[i]) { - row = append(row, d) - } - } - // Append groupKey as the last element. - row = append(row, types.NewBytesDatum(groupKey)) - rows = append(rows, row) + prs = append(prs, d.partialResultMap[string(d.groupKeys[d.cursor])]) } - return rows, true + return prs, d.groupKeys[keyStart:d.cursor], true } // Close implements the Executor Close interface. @@ -206,7 +195,7 @@ func (e *HashAggExec) Close() error { e.childResult = nil e.groupMap = nil e.groupIterator = nil - e.aggCtxsMap = nil + e.partialResultMap = nil return nil } // `Close` may be called after `Open` without calling `Next` in test. @@ -245,9 +234,7 @@ func (e *HashAggExec) Open(ctx context.Context) error { func (e *HashAggExec) initForUnparallelExec() { e.groupMap = mvmap.NewMVMap() e.groupIterator = e.groupMap.NewIterator() - e.aggCtxsMap = make(aggCtxsMapper, 0) - e.mutableRow = chunk.MutRowFromTypes(e.retTypes()) - e.rowBuffer = make([]types.Datum, 0, e.Schema().Len()) + e.partialResultMap = make(aggCtxsMapper, 0) e.groupKey = make([]byte, 0, 8) e.groupVals = make([][]byte, 0, 8) e.childResult = e.children[0].newChunk() @@ -276,14 +263,8 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { // Init partial workers. for i := 0; i < partialConcurrency; i++ { - // Aggregation may retain some status variables, - // so we need to clone the AggFuncs to avoid data race on these variables. - newAggFuncs := make([]aggregation.Aggregation, len(e.AggFuncs)) - for i := range newAggFuncs { - newAggFuncs[i] = e.AggFuncs[i].Clone(ctx) - } w := HashAggPartialWorker{ - baseHashAggWorker: newBaseHashAggWorker(e.finishCh, newAggFuncs, e.maxChunkSize), + baseHashAggWorker: newBaseHashAggWorker(e.finishCh, e.PartialAggFuncs, e.maxChunkSize), inputCh: e.partialInputChs[i], outputChs: e.partialOutputChs, giveBackCh: e.inputCh, @@ -303,7 +284,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { // Init final workers. for i := 0; i < finalConcurrency; i++ { e.finalWorkers[i] = HashAggFinalWorker{ - baseHashAggWorker: newBaseHashAggWorker(e.finishCh, e.newFinalAggFuncs(ctx), e.maxChunkSize), + baseHashAggWorker: newBaseHashAggWorker(e.finishCh, e.FinalAggFuncs, e.maxChunkSize), aggCtxsMap: make(aggCtxsMapper, 0), groupSet: mvmap.NewMVMap(), groupVals: make([][]byte, 0, 8), @@ -317,17 +298,6 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { } } -func (e *HashAggExec) newFinalAggFuncs(ctx sessionctx.Context) (newAggFuncs []aggregation.Aggregation) { - newAggFuncs = make([]aggregation.Aggregation, 0, len(e.AggFuncs)) - idx := 0 - for _, af := range e.AggFuncs { - var aggFunc aggregation.Aggregation - idx, aggFunc = af.GetFinalAggFunc(ctx, idx) - newAggFuncs = append(newAggFuncs, aggFunc) - } - return newAggFuncs -} - func (w *HashAggPartialWorker) getChildInput() bool { select { case <-w.finishCh: @@ -374,9 +344,9 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s if err != nil { return errors.Trace(err) } - aggEvalCtxs := w.getContext(sc, groupKey, w.aggCtxsMap) + partialResults := w.getPartialResult(sc, groupKey, w.aggCtxsMap) for i, af := range w.aggFuncs { - if err = af.Update(aggEvalCtxs[i], sc, row); err != nil { + if err = af.UpdatePartialResult(ctx, []chunk.Row{row}, partialResults[i]); err != nil { return errors.Trace(err) } } @@ -402,8 +372,8 @@ func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, f continue } w.outputChs[i] <- &HashAggIntermData{ - groupKeys: groupKeysSlice[i], - groupCtxMap: w.aggCtxsMap, + groupKeys: groupKeysSlice[i], + partialResultMap: w.aggCtxsMap, } } } @@ -427,16 +397,16 @@ func (w *HashAggPartialWorker) getGroupKey(sc *stmtctx.StatementContext, row chu return w.groupKey, errors.Trace(err) } -func (w baseHashAggWorker) getContext(sc *stmtctx.StatementContext, groupKey []byte, mapper aggCtxsMapper) []*aggregation.AggEvaluateContext { - aggCtxs, ok := mapper[string(groupKey)] +func (w baseHashAggWorker) getPartialResult(sc *stmtctx.StatementContext, groupKey []byte, mapper aggCtxsMapper) []aggfuncs.PartialResult { + partialResults, ok := mapper[string(groupKey)] if !ok { - aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(w.aggFuncs)) + partialResults = make([]aggfuncs.PartialResult, 0, len(w.aggFuncs)) for _, af := range w.aggFuncs { - aggCtxs = append(aggCtxs, af.CreateContext(sc)) + partialResults = append(partialResults, af.AllocPartialResult()) } - mapper[string(groupKey)] = aggCtxs + mapper[string(groupKey)] = partialResults } - return aggCtxs + return partialResults } func (w *HashAggFinalWorker) getPartialInput() (input *HashAggIntermData, ok bool) { @@ -451,30 +421,32 @@ func (w *HashAggFinalWorker) getPartialInput() (input *HashAggIntermData, ok boo return } -func (w *HashAggFinalWorker) consumeIntermData(sc *stmtctx.StatementContext) (err error) { +func (w *HashAggFinalWorker) consumeIntermData(sctx sessionctx.Context) (err error) { var ( - input *HashAggIntermData - ok bool - intermDataRowsBuffer [][]types.Datum + input *HashAggIntermData + ok bool + intermDataBuffer [][]aggfuncs.PartialResult + groupKeys [][]byte + sc = sctx.GetSessionVars().StmtCtx ) for { if input, ok = w.getPartialInput(); !ok { return nil } - if intermDataRowsBuffer == nil { - intermDataRowsBuffer = make([][]types.Datum, 0, w.maxChunkSize) + if intermDataBuffer == nil { + intermDataBuffer = make([][]aggfuncs.PartialResult, 0, w.maxChunkSize) } // Consume input in batches, size of every batch is less than w.maxChunkSize. for reachEnd := false; !reachEnd; { - intermDataRowsBuffer, reachEnd = input.ToRows(sc, intermDataRowsBuffer[:0], w.aggFuncs, w.maxChunkSize) - for _, row := range intermDataRowsBuffer { - groupKey := row[len(row)-1].GetBytes() + intermDataBuffer, groupKeys, reachEnd = input.getPartialResultBatch(sc, intermDataBuffer[:0], w.aggFuncs, w.maxChunkSize) + for i, groupKey := range groupKeys { if len(w.groupSet.Get(groupKey, w.groupVals[:0])) == 0 { w.groupSet.Put(groupKey, []byte{}) } - aggEvalCtxs := w.getContext(sc, groupKey, w.aggCtxsMap) - for i, af := range w.aggFuncs { - if err = af.Update(aggEvalCtxs[i], sc, chunk.MutRowFromDatums(row).ToRow()); err != nil { + prs := intermDataBuffer[i] + finalPartialResults := w.getPartialResult(sc, groupKey, w.aggCtxsMap) + for j, af := range w.aggFuncs { + if err = af.MergePartialResult(sctx, prs[j], finalPartialResults[j]); err != nil { return errors.Trace(err) } } @@ -483,7 +455,7 @@ func (w *HashAggFinalWorker) consumeIntermData(sc *stmtctx.StatementContext) (er } } -func (w *HashAggFinalWorker) getFinalResult(sc *stmtctx.StatementContext) { +func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) { groupIter := w.groupSet.NewIterator() result, finished := w.receiveFinalResultHolder() if finished { @@ -498,13 +470,10 @@ func (w *HashAggFinalWorker) getFinalResult(sc *stmtctx.StatementContext) { } return } - aggCtxs := w.getContext(sc, groupKey, w.aggCtxsMap) - w.rowBuffer = w.rowBuffer[:0] + partialResults := w.getPartialResult(sctx.GetSessionVars().StmtCtx, groupKey, w.aggCtxsMap) for i, af := range w.aggFuncs { - w.rowBuffer = append(w.rowBuffer, af.GetResult(aggCtxs[i])) + af.AppendFinalResult2Chunk(sctx, partialResults[i], result) } - w.mutableRow.SetDatums(w.rowBuffer...) - result.AppendRow(w.mutableRow.ToRow()) if result.NumRows() == w.maxChunkSize { w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh} result, finished = w.receiveFinalResultHolder() @@ -529,11 +498,10 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro defer func() { waitGroup.Done() }() - sc := ctx.GetSessionVars().StmtCtx - if err := w.consumeIntermData(sc); err != nil { + if err := w.consumeIntermData(ctx); err != nil { w.outputCh <- &AfFinalResult{err: errors.Trace(err)} } - w.getFinalResult(sc) + w.getFinalResult(ctx) } // Next implements the Executor Next interface. @@ -665,13 +633,10 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro if groupKey == nil { return nil } - aggCtxs := e.getContexts(groupKey) - e.rowBuffer = e.rowBuffer[:0] - for i, af := range e.AggFuncs { - e.rowBuffer = append(e.rowBuffer, af.GetResult(aggCtxs[i])) + partialResults := e.getPartialResults(groupKey) + for i, af := range e.PartialAggFuncs { + af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk) } - e.mutableRow.SetDatums(e.rowBuffer...) - chk.AppendRow(e.mutableRow.ToRow()) if chk.NumRows() == e.maxChunkSize { return nil } @@ -698,9 +663,9 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) { if len(e.groupMap.Get(groupKey, e.groupVals[:0])) == 0 { e.groupMap.Put(groupKey, []byte{}) } - aggCtxs := e.getContexts(groupKey) - for i, af := range e.AggFuncs { - err = af.Update(aggCtxs[i], e.sc, row) + partialResults := e.getPartialResults(groupKey) + for i, af := range e.PartialAggFuncs { + err = af.UpdatePartialResult(e.ctx, []chunk.Row{row}, partialResults[i]) if err != nil { return errors.Trace(err) } @@ -729,17 +694,17 @@ func (e *HashAggExec) getGroupKey(row chunk.Row) ([]byte, error) { return e.groupKey, nil } -func (e *HashAggExec) getContexts(groupKey []byte) []*aggregation.AggEvaluateContext { +func (e *HashAggExec) getPartialResults(groupKey []byte) []aggfuncs.PartialResult { groupKeyString := string(groupKey) - aggCtxs, ok := e.aggCtxsMap[groupKeyString] + partialResults, ok := e.partialResultMap[groupKeyString] if !ok { - aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.AggFuncs)) - for _, af := range e.AggFuncs { - aggCtxs = append(aggCtxs, af.CreateContext(e.ctx.GetSessionVars().StmtCtx)) + partialResults = make([]aggfuncs.PartialResult, 0, len(e.PartialAggFuncs)) + for _, af := range e.PartialAggFuncs { + partialResults = append(partialResults, af.AllocPartialResult()) } - e.aggCtxsMap[groupKeyString] = aggCtxs + e.partialResultMap[groupKeyString] = partialResults } - return aggCtxs + return partialResults } // StreamAggExec deals with all the aggregate functions. diff --git a/executor/builder.go b/executor/builder.go index 7122e5f7a33a1..76afc53481f10 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -927,13 +927,16 @@ func (b *executorBuilder) buildHashAgg(v *plan.PhysicalHashAgg) Executor { b.err = errors.Trace(b.err) return nil } + if len(v.AggFuncs) == 0 { + return src + } src = b.buildProjBelowAgg(v.AggFuncs, v.GroupByItems, src) sessionVars := b.ctx.GetSessionVars() e := &HashAggExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), - sc: sessionVars.StmtCtx, - AggFuncs: make([]aggregation.Aggregation, 0, len(v.AggFuncs)), - GroupByItems: v.GroupByItems, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), + sc: sessionVars.StmtCtx, + PartialAggFuncs: make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)), + GroupByItems: v.GroupByItems, } // We take `create table t(a int, b int);` as example. // @@ -970,16 +973,21 @@ func (b *executorBuilder) buildHashAgg(v *plan.PhysicalHashAgg) Executor { if finalCon, partialCon := sessionVars.HashAggFinalConcurrency, sessionVars.HashAggPartialConcurrency; finalCon <= 0 || partialCon <= 0 || finalCon == 1 && partialCon == 1 { e.isUnparallelExec = true } + partialOrdinal := 0 for i, aggDesc := range v.AggFuncs { - if !e.isUnparallelExec { - if aggDesc.Mode == aggregation.CompleteMode { - aggDesc.Mode = aggregation.Partial1Mode - } else { - aggDesc.Mode = aggregation.Partial2Mode + if e.isUnparallelExec { + e.PartialAggFuncs = append(e.PartialAggFuncs, aggfuncs.Build(b.ctx, aggDesc, i)) + } else { + ordinal := []int{partialOrdinal} + partialOrdinal++ + if aggDesc.Name == ast.AggFuncAvg { + ordinal = append(ordinal, partialOrdinal+1) + partialOrdinal++ } + finalDesc := aggDesc.Split(ordinal) + e.PartialAggFuncs = append(e.PartialAggFuncs, aggfuncs.Build(b.ctx, aggDesc, i)) + e.FinalAggFuncs = append(e.FinalAggFuncs, aggfuncs.Build(b.ctx, finalDesc, i)) } - aggFunc := aggDesc.GetAggFunc(b.ctx) - e.AggFuncs = append(e.AggFuncs, aggFunc) if e.defaultVal != nil { value := aggDesc.GetDefaultValue() e.defaultVal.AppendDatum(i, &value) @@ -996,6 +1004,9 @@ func (b *executorBuilder) buildStreamAgg(v *plan.PhysicalStreamAgg) Executor { b.err = errors.Trace(b.err) return nil } + if len(v.AggFuncs) == 0 { + return src + } src = b.buildProjBelowAgg(v.AggFuncs, v.GroupByItems, src) e := &StreamAggExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), From 26c6b169a75634d49e10739679d432db76d0d137 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 14 Aug 2018 10:53:11 +0800 Subject: [PATCH 2/6] fix bug for first_row/max/min(enum/set) --- config/config.go | 2 +- expression/aggregation/descriptor.go | 3 +++ plan/logical_plan_builder.go | 9 +++++++++ plan/rule_eliminate_projection.go | 4 ++-- 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index 036ca3977700a..76d7725a4a0bf 100644 --- a/config/config.go +++ b/config/config.go @@ -280,7 +280,7 @@ var defaultConf = Config{ Performance: Performance{ TCPKeepAlive: true, CrossJoin: true, - StatsLease: "3s", + StatsLease: "0s", RunAutoAnalyze: true, StmtCountLimit: 5000, FeedbackProbability: 0.05, diff --git a/expression/aggregation/descriptor.go b/expression/aggregation/descriptor.go index 578c86fc9957a..79099f7109e61 100644 --- a/expression/aggregation/descriptor.go +++ b/expression/aggregation/descriptor.go @@ -345,6 +345,9 @@ func (a *AggFuncDesc) typeInfer4MaxMin(ctx sessionctx.Context) { a.Args[0] = expression.BuildCastFunction(ctx, a.Args[0], tp) } a.RetTp = a.Args[0].GetType() + if a.RetTp.Tp == mysql.TypeEnum || a.RetTp.Tp == mysql.TypeSet { + a.RetTp = &types.FieldType{Tp: mysql.TypeString, Flen: mysql.MaxFieldCharLength} + } } func (a *AggFuncDesc) typeInfer4BitFuncs(ctx sessionctx.Context) { diff --git a/plan/logical_plan_builder.go b/plan/logical_plan_builder.go index d1536ab17ad8a..15e34752cd965 100644 --- a/plan/logical_plan_builder.go +++ b/plan/logical_plan_builder.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/sirupsen/logrus" ) const ( @@ -117,6 +118,9 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega plan4Agg.AggFuncs = append(plan4Agg.AggFuncs, newFunc) schema4Agg.Append(col) } + for _, c := range schema4Agg.Columns { + logrus.Warning(c.RetType) + } plan4Agg.SetChildren(p) plan4Agg.GroupByItems = gbyItems plan4Agg.SetSchema(schema4Agg) @@ -594,6 +598,11 @@ func (b *planBuilder) buildDistinct(child LogicalPlan, length int) *LogicalAggre } plan4Agg.SetChildren(child) plan4Agg.SetSchema(child.Schema().Clone()) + // Distinct will be rewritten as first_row, we reset the type here since the return type + // of first_row is not always the same as the column arg of first_row. + for i, col := range plan4Agg.schema.Columns { + col.RetType = plan4Agg.AggFuncs[i].RetTp + } return plan4Agg } diff --git a/plan/rule_eliminate_projection.go b/plan/rule_eliminate_projection.go index 26b7f7fa703ab..a5a9ece522c71 100644 --- a/plan/rule_eliminate_projection.go +++ b/plan/rule_eliminate_projection.go @@ -51,9 +51,9 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool { func resolveColumnAndReplace(origin *expression.Column, replace map[string]*expression.Column) { dst := replace[string(origin.HashCode(nil))] if dst != nil { - colName := origin.ColName + colName, retType := origin.ColName, origin.RetType *origin = *dst - origin.ColName = colName + origin.ColName, origin.RetType = colName, retType } } From 011e5978f9a44f4bd3e014d2ecf909df8105513a Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 15 Aug 2018 10:03:03 +0800 Subject: [PATCH 3/6] fix bug when agg funcs is empty or/and groupByItems is empty --- config/config.go | 2 +- executor/aggregate.go | 6 ++++++ executor/aggregate_test.go | 2 ++ executor/builder.go | 6 ------ plan/logical_plan_builder.go | 4 ---- 5 files changed, 9 insertions(+), 11 deletions(-) diff --git a/config/config.go b/config/config.go index 76d7725a4a0bf..036ca3977700a 100644 --- a/config/config.go +++ b/config/config.go @@ -280,7 +280,7 @@ var defaultConf = Config{ Performance: Performance{ TCPKeepAlive: true, CrossJoin: true, - StatsLease: "0s", + StatsLease: "3s", RunAutoAnalyze: true, StmtCountLimit: 5000, FeedbackProbability: 0.05, diff --git a/executor/aggregate.go b/executor/aggregate.go index cca04801e5243..45cfc73b85943 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -474,6 +474,9 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) { for i, af := range w.aggFuncs { af.AppendFinalResult2Chunk(sctx, partialResults[i], result) } + if len(w.aggFuncs) == 0 { + result.SetNumVirtualRows(result.NumRows() + 1) + } if result.NumRows() == w.maxChunkSize { w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh} result, finished = w.receiveFinalResultHolder() @@ -634,6 +637,9 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro return nil } partialResults := e.getPartialResults(groupKey) + if len(e.PartialAggFuncs) == 0 { + chk.SetNumVirtualRows(chk.NumRows() + 1) + } for i, af := range e.PartialAggFuncs { af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk) } diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index f096dfed3066c..af1f3cb1b30c5 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -332,7 +332,9 @@ func (s *testSuite) TestAggregation(c *C) { tk.MustExec("drop table t") tk.MustExec("create table t(a decimal(10, 4))") + tk.MustQuery("select 10 from t group by a").Check(testkit.Rows()) tk.MustExec("insert into t value(0), (-0.9871), (-0.9871)") + tk.MustQuery("select 10 from t group by a").Check(testkit.Rows("10", "10")) tk.MustQuery("select sum(a) from (select a from t union all select a from t) tmp").Check(testkit.Rows("-3.9484")) } diff --git a/executor/builder.go b/executor/builder.go index 76afc53481f10..3d766667185bf 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -927,9 +927,6 @@ func (b *executorBuilder) buildHashAgg(v *plan.PhysicalHashAgg) Executor { b.err = errors.Trace(b.err) return nil } - if len(v.AggFuncs) == 0 { - return src - } src = b.buildProjBelowAgg(v.AggFuncs, v.GroupByItems, src) sessionVars := b.ctx.GetSessionVars() e := &HashAggExec{ @@ -1004,9 +1001,6 @@ func (b *executorBuilder) buildStreamAgg(v *plan.PhysicalStreamAgg) Executor { b.err = errors.Trace(b.err) return nil } - if len(v.AggFuncs) == 0 { - return src - } src = b.buildProjBelowAgg(v.AggFuncs, v.GroupByItems, src) e := &StreamAggExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src), diff --git a/plan/logical_plan_builder.go b/plan/logical_plan_builder.go index 15e34752cd965..5f18b30bff543 100644 --- a/plan/logical_plan_builder.go +++ b/plan/logical_plan_builder.go @@ -39,7 +39,6 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" - "github.com/sirupsen/logrus" ) const ( @@ -118,9 +117,6 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega plan4Agg.AggFuncs = append(plan4Agg.AggFuncs, newFunc) schema4Agg.Append(col) } - for _, c := range schema4Agg.Columns { - logrus.Warning(c.RetType) - } plan4Agg.SetChildren(p) plan4Agg.GroupByItems = gbyItems plan4Agg.SetSchema(schema4Agg) From 9e1f1aea973dc252d4e434a062cd1d54b5db2720 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 15 Aug 2018 13:23:02 +0800 Subject: [PATCH 4/6] fix group_concat bug when arg contains null --- executor/aggfuncs/func_group_concat.go | 19 ++++++++----------- executor/aggregate_test.go | 6 ++++-- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/executor/aggfuncs/func_group_concat.go b/executor/aggfuncs/func_group_concat.go index dc9845f4df997..a2cf95824f1d0 100644 --- a/executor/aggfuncs/func_group_concat.go +++ b/executor/aggfuncs/func_group_concat.go @@ -82,29 +82,28 @@ func (e *groupConcat) ResetPartialResult(pr PartialResult) { func (e *groupConcat) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) { p := (*partialResult4GroupConcat)(pr) - v, isNull := "", false + v, isNull, preLen := "", false, 0 for _, row := range rowsInGroup { - if p.buffer != nil { + if p.buffer != nil && p.buffer.Len() != 0 { + preLen = p.buffer.Len() p.buffer.WriteString(e.sep) } - isAllNull := true for _, arg := range e.args { v, isNull, err = arg.EvalString(sctx, row) if err != nil { return errors.Trace(err) } if isNull { - continue + break } - isAllNull = false if p.buffer == nil { p.buffer = &bytes.Buffer{} } p.buffer.WriteString(v) } - if isAllNull { + if isNull { if p.buffer != nil { - p.buffer.Truncate(p.buffer.Len() - len(e.sep)) + p.buffer.Truncate(preLen) } continue } @@ -156,7 +155,6 @@ func (e *groupConcatDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsI p := (*partialResult4GroupConcatDistinct)(pr) v, isNull := "", false for _, row := range rowsInGroup { - allIsNull := true p.valsBuf.Reset() for _, arg := range e.args { v, isNull, err = arg.EvalString(sctx, row) @@ -164,12 +162,11 @@ func (e *groupConcatDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsI return errors.Trace(err) } if isNull { - continue + break } - allIsNull = false p.valsBuf.WriteString(v) } - if allIsNull { + if isNull { continue } joinedVals := p.valsBuf.String() diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index af1f3cb1b30c5..2191a3c33746f 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -326,9 +326,11 @@ func (s *testSuite) TestAggregation(c *C) { result.Check(testkit.Rows("0 0 0", "1 1 1")) tk.MustExec("drop table t") - tk.MustExec("create table t(a int)") - tk.MustExec("insert into t value(null)") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t value(null, null)") tk.MustQuery("select group_concat(a), group_concat(distinct a) from t").Check(testkit.Rows(" ")) + tk.MustExec("insert into t value(1, null), (null, 1), (1, 2), (3, 4)") + tk.MustQuery("select group_concat(a, b), group_concat(distinct a,b) from t").Check(testkit.Rows("12,34 12,34")) tk.MustExec("drop table t") tk.MustExec("create table t(a decimal(10, 4))") From be45b8b70964653836474ca289913eff482d1258 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 15 Aug 2018 16:23:22 +0800 Subject: [PATCH 5/6] addess comment --- executor/aggregate.go | 46 +++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 45cfc73b85943..85667b22b8ac8 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -30,7 +30,7 @@ import ( "golang.org/x/net/context" ) -type aggCtxsMapper map[string][]aggfuncs.PartialResult +type aggPartialResultMapper map[string][]aggfuncs.PartialResult // baseHashAggWorker stores the common attributes of HashAggFinalWorker and HashAggPartialWorker. type baseHashAggWorker struct { @@ -52,14 +52,14 @@ func newBaseHashAggWorker(finishCh <-chan struct{}, aggFuncs []aggfuncs.AggFunc, type HashAggPartialWorker struct { baseHashAggWorker - inputCh chan *chunk.Chunk - outputChs []chan *HashAggIntermData - globalOutputCh chan *AfFinalResult - giveBackCh chan<- *HashAggInput - aggCtxsMap aggCtxsMapper - groupByItems []expression.Expression - groupKey []byte - groupValDatums []types.Datum + inputCh chan *chunk.Chunk + outputChs []chan *HashAggIntermData + globalOutputCh chan *AfFinalResult + giveBackCh chan<- *HashAggInput + partialResultsMap aggPartialResultMapper + groupByItems []expression.Expression + groupKey []byte + groupValDatums []types.Datum // chk stores the input data from child, // and is reused by childExec and partial worker. chk *chunk.Chunk @@ -72,7 +72,7 @@ type HashAggFinalWorker struct { rowBuffer []types.Datum mutableRow chunk.MutRow - aggCtxsMap aggCtxsMapper + partialResultMap aggPartialResultMapper groupSet *mvmap.MVMap groupVals [][]byte inputCh chan *HashAggIntermData @@ -136,7 +136,7 @@ type HashAggExec struct { sc *stmtctx.StatementContext PartialAggFuncs []aggfuncs.AggFunc FinalAggFuncs []aggfuncs.AggFunc - partialResultMap aggCtxsMapper + partialResultMap aggPartialResultMapper groupMap *mvmap.MVMap groupIterator *mvmap.Iterator GroupByItems []expression.Expression @@ -174,7 +174,7 @@ type HashAggInput struct { type HashAggIntermData struct { groupKeys [][]byte cursor int - partialResultMap aggCtxsMapper + partialResultMap aggPartialResultMapper } // getPartialResultBatch fetches a batch of partial results from HashAggIntermData. @@ -234,7 +234,7 @@ func (e *HashAggExec) Open(ctx context.Context) error { func (e *HashAggExec) initForUnparallelExec() { e.groupMap = mvmap.NewMVMap() e.groupIterator = e.groupMap.NewIterator() - e.partialResultMap = make(aggCtxsMapper, 0) + e.partialResultMap = make(aggPartialResultMapper, 0) e.groupKey = make([]byte, 0, 8) e.groupVals = make([][]byte, 0, 8) e.childResult = e.children[0].newChunk() @@ -269,7 +269,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { outputChs: e.partialOutputChs, giveBackCh: e.inputCh, globalOutputCh: e.finalOutputCh, - aggCtxsMap: make(aggCtxsMapper, 0), + partialResultsMap: make(aggPartialResultMapper, 0), groupByItems: e.GroupByItems, chk: e.children[0].newChunk(), } @@ -285,7 +285,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { for i := 0; i < finalConcurrency; i++ { e.finalWorkers[i] = HashAggFinalWorker{ baseHashAggWorker: newBaseHashAggWorker(e.finishCh, e.FinalAggFuncs, e.maxChunkSize), - aggCtxsMap: make(aggCtxsMapper, 0), + partialResultMap: make(aggPartialResultMapper, 0), groupSet: mvmap.NewMVMap(), groupVals: make([][]byte, 0, 8), inputCh: e.partialOutputChs[i], @@ -327,7 +327,7 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG if !w.getChildInput() { return } - if err := w.updatePartialResult(ctx, sc, w.chk, len(w.aggCtxsMap)); err != nil { + if err := w.updatePartialResult(ctx, sc, w.chk, len(w.partialResultsMap)); err != nil { w.globalOutputCh <- &AfFinalResult{err: errors.Trace(err)} return } @@ -344,7 +344,7 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s if err != nil { return errors.Trace(err) } - partialResults := w.getPartialResult(sc, groupKey, w.aggCtxsMap) + partialResults := w.getPartialResult(sc, groupKey, w.partialResultsMap) for i, af := range w.aggFuncs { if err = af.UpdatePartialResult(ctx, []chunk.Row{row}, partialResults[i]); err != nil { return errors.Trace(err) @@ -358,11 +358,11 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s // We only support parallel execution for single-machine, so process of encode and decode can be skipped. func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, finalConcurrency int) { groupKeysSlice := make([][][]byte, finalConcurrency) - for groupKey := range w.aggCtxsMap { + for groupKey := range w.partialResultsMap { groupKeyBytes := []byte(groupKey) finalWorkerIdx := int(murmur3.Sum32(groupKeyBytes)) % finalConcurrency if groupKeysSlice[finalWorkerIdx] == nil { - groupKeysSlice[finalWorkerIdx] = make([][]byte, 0, len(w.aggCtxsMap)/finalConcurrency) + groupKeysSlice[finalWorkerIdx] = make([][]byte, 0, len(w.partialResultsMap)/finalConcurrency) } groupKeysSlice[finalWorkerIdx] = append(groupKeysSlice[finalWorkerIdx], groupKeyBytes) } @@ -373,7 +373,7 @@ func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, f } w.outputChs[i] <- &HashAggIntermData{ groupKeys: groupKeysSlice[i], - partialResultMap: w.aggCtxsMap, + partialResultMap: w.partialResultsMap, } } } @@ -397,7 +397,7 @@ func (w *HashAggPartialWorker) getGroupKey(sc *stmtctx.StatementContext, row chu return w.groupKey, errors.Trace(err) } -func (w baseHashAggWorker) getPartialResult(sc *stmtctx.StatementContext, groupKey []byte, mapper aggCtxsMapper) []aggfuncs.PartialResult { +func (w baseHashAggWorker) getPartialResult(sc *stmtctx.StatementContext, groupKey []byte, mapper aggPartialResultMapper) []aggfuncs.PartialResult { partialResults, ok := mapper[string(groupKey)] if !ok { partialResults = make([]aggfuncs.PartialResult, 0, len(w.aggFuncs)) @@ -444,7 +444,7 @@ func (w *HashAggFinalWorker) consumeIntermData(sctx sessionctx.Context) (err err w.groupSet.Put(groupKey, []byte{}) } prs := intermDataBuffer[i] - finalPartialResults := w.getPartialResult(sc, groupKey, w.aggCtxsMap) + finalPartialResults := w.getPartialResult(sc, groupKey, w.partialResultMap) for j, af := range w.aggFuncs { if err = af.MergePartialResult(sctx, prs[j], finalPartialResults[j]); err != nil { return errors.Trace(err) @@ -470,7 +470,7 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) { } return } - partialResults := w.getPartialResult(sctx.GetSessionVars().StmtCtx, groupKey, w.aggCtxsMap) + partialResults := w.getPartialResult(sctx.GetSessionVars().StmtCtx, groupKey, w.partialResultMap) for i, af := range w.aggFuncs { af.AppendFinalResult2Chunk(sctx, partialResults[i], result) } From f4b29f3c23807426eaac52b32b780a8a4d34d2ba Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 20 Aug 2018 11:50:31 +0800 Subject: [PATCH 6/6] address comment --- executor/aggregate.go | 10 +++++----- plan/rule_eliminate_projection.go | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 85667b22b8ac8..257b8076ea5ae 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -179,14 +179,14 @@ type HashAggIntermData struct { // getPartialResultBatch fetches a batch of partial results from HashAggIntermData. func (d *HashAggIntermData) getPartialResultBatch(sc *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, aggFuncs []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys [][]byte, reachEnd bool) { - if len(prs) == maxChunkSize { - return prs, nil, false - } keyStart := d.cursor - for ; d.cursor < len(d.groupKeys); d.cursor++ { + for ; d.cursor < len(d.groupKeys) && len(prs) < maxChunkSize; d.cursor++ { prs = append(prs, d.partialResultMap[string(d.groupKeys[d.cursor])]) } - return prs, d.groupKeys[keyStart:d.cursor], true + if d.cursor == len(d.groupKeys) { + reachEnd = true + } + return prs, d.groupKeys[keyStart:d.cursor], reachEnd } // Close implements the Executor Close interface. diff --git a/plan/rule_eliminate_projection.go b/plan/rule_eliminate_projection.go index a5a9ece522c71..7199522f37572 100644 --- a/plan/rule_eliminate_projection.go +++ b/plan/rule_eliminate_projection.go @@ -17,8 +17,8 @@ import ( "github.com/pingcap/tidb/expression" ) -// canProjectionBeEliminatedLoose checks whether a projection can be eliminated, returns true if -// every expression is a single column. +// canProjectionBeEliminatedLoose checks whether a projection can be eliminated, +// returns true if every expression is a single column. func canProjectionBeEliminatedLoose(p *LogicalProjection) bool { for _, expr := range p.Exprs { _, ok := expr.(*expression.Column) @@ -29,8 +29,8 @@ func canProjectionBeEliminatedLoose(p *LogicalProjection) bool { return true } -// canProjectionBeEliminatedStrict checks whether a projection can be eliminated, returns true if -// the projection just copy its child's output. +// canProjectionBeEliminatedStrict checks whether a projection can be +// eliminated, returns true if the projection just copy its child's output. func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool { if p.Schema().Len() == 0 { return true @@ -83,8 +83,8 @@ func doPhysicalProjectionElimination(p PhysicalPlan) PhysicalPlan { return child } -// eliminatePhysicalProjection should be called after physical optimization to eliminate the redundant projection -// left after logical projection elimination. +// eliminatePhysicalProjection should be called after physical optimization to +// eliminate the redundant projection left after logical projection elimination. func eliminatePhysicalProjection(p PhysicalPlan) PhysicalPlan { oldSchema := p.Schema() newRoot := doPhysicalProjectionElimination(p)