From 182658b2a627b253f00b0bc75dae8b9afb8b09d1 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Sun, 17 Jun 2018 20:39:07 +0800 Subject: [PATCH 01/14] executor: refactor aggregate functions --- executor/aggfuncs/aggfuncs.go | 61 ++++++ executor/aggfuncs/builder.go | 131 +++++++++++++ executor/aggfuncs/func_avg.go | 347 ++++++++++++++++++++++++++++++++++ executor/aggregate.go | 84 +++++++- executor/builder.go | 11 +- 5 files changed, 623 insertions(+), 11 deletions(-) create mode 100644 executor/aggfuncs/aggfuncs.go create mode 100644 executor/aggfuncs/builder.go create mode 100644 executor/aggfuncs/func_avg.go diff --git a/executor/aggfuncs/aggfuncs.go b/executor/aggfuncs/aggfuncs.go new file mode 100644 index 0000000000000..18dac8bed8795 --- /dev/null +++ b/executor/aggfuncs/aggfuncs.go @@ -0,0 +1,61 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggfuncs + +import ( + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/chunk" +) + +// All the AggFunc implementations are listed here for navigation. +var ( + // All the AggFunc implementations for "COUNT". + // All the AggFunc implementations for "SUM". + // All the AggFunc implementations for "AVG". + _ AggFunc = (*avgDedup4Decimal)(nil) + _ AggFunc = (*avgOriginal4Decimal)(nil) + _ AggFunc = (*avgPartial4Decimal)(nil) + + _ AggFunc = (*avgDedup4Float64)(nil) + _ AggFunc = (*avgOriginal4Float64)(nil) + _ AggFunc = (*avgPartial4Float64)(nil) + + _ AggFunc = (*avgDedup4Float32)(nil) + _ AggFunc = (*avgOriginal4Float32)(nil) + _ AggFunc = (*avgPartial4Float32)(nil) + + // All the AggFunc implementations for "FIRSTROW". + // All the AggFunc implementations for "MAX". + // All the AggFunc implementations for "MIN". + // All the AggFunc implementations for "GROUP_CONCAT". + // All the AggFunc implementations for "BIT_OR". + // All the AggFunc implementations for "BIT_XOR". + // All the AggFunc implementations for "BIT_AND". +) + +// AggFunc is the interface to evaluate aggregate functions. +type AggFunc interface { + AllocPartialResult() []byte + ResetPartialResult(partialBytes []byte) + UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error + + AppendPartialResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error + AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error +} + +type baseAggFunc struct { + input []expression.Expression + output []int +} diff --git a/executor/aggfuncs/builder.go b/executor/aggfuncs/builder.go new file mode 100644 index 0000000000000..98c0d090795b9 --- /dev/null +++ b/executor/aggfuncs/builder.go @@ -0,0 +1,131 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggfuncs + +import ( + "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/mysql" +) + +// Build is used to build AggFunc according to the aggFuncDesc +func Build(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + switch aggFuncDesc.Name { + case ast.AggFuncCount: + return buildCount(aggFuncDesc, output) + case ast.AggFuncSum: + return buildSum(aggFuncDesc, output) + case ast.AggFuncAvg: + return buildAvg(aggFuncDesc, output) + case ast.AggFuncFirstRow: + return buildFirstRow(aggFuncDesc, output) + case ast.AggFuncMax: + return buildMax(aggFuncDesc, output) + case ast.AggFuncMin: + return buildMin(aggFuncDesc, output) + case ast.AggFuncGroupConcat: + return buildGroupConcat(aggFuncDesc, output) + case ast.AggFuncBitOr: + return buildBitOr(aggFuncDesc, output) + case ast.AggFuncBitXor: + return buildBitXor(aggFuncDesc, output) + case ast.AggFuncBitAnd: + return buildBitAnd(aggFuncDesc, output) + } + return nil +} + +func buildCount(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + return nil +} + +func buildSum(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + return nil +} + +func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + base := baseAggFunc{ + input: aggFuncDesc.Args, + output: output, + } + + switch aggFuncDesc.Mode { + // Build avg functions which consume the original data and remove the + // duplicated input of the same group. + case aggregation.DedupMode: + return nil + // TODO: implement UpdatePartialResult for the following structs. + // switch aggFuncDesc.Args[0].GetType().Tp { + // case mysql.TypeDecimal: + // return &avgDedup4Decimal{baseAvgDecimal{base}} + // case mysql.TypeFloat: + // return &avgDedup4Float32{baseAvgFloat32{base}} + // case mysql.TypeDouble: + // return &avgDedup4Float64{baseAvgFloat64{base}} + // } + + // Build avg functions which consume the original data and update their + // partial results. + case aggregation.CompleteMode, aggregation.Partial1Mode: + switch aggFuncDesc.Args[0].GetType().Tp { + case mysql.TypeDecimal: + return &avgOriginal4Decimal{baseAvgDecimal{base}} + case mysql.TypeFloat: + return &avgOriginal4Float32{baseAvgFloat32{base}} + case mysql.TypeDouble: + return &avgOriginal4Float64{baseAvgFloat64{base}} + } + + // Build avg functions which consume the partial result of other agg + // functions and update their partial results. + case aggregation.Partial2Mode, aggregation.FinalMode: + switch aggFuncDesc.Args[1].GetType().Tp { + case mysql.TypeDecimal: + return &avgPartial4Decimal{baseAvgDecimal{base}} + case mysql.TypeFloat: + return &avgPartial4Float32{baseAvgFloat32{base}} + case mysql.TypeDouble: + return &avgPartial4Float64{baseAvgFloat64{base}} + } + } + return nil +} + +func buildFirstRow(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + return nil +} + +func buildMax(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + return nil +} + +func buildMin(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + return nil +} + +func buildGroupConcat(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + return nil +} + +func buildBitOr(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + return nil +} + +func buildBitXor(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + return nil +} + +func buildBitAnd(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + return nil +} diff --git a/executor/aggfuncs/func_avg.go b/executor/aggfuncs/func_avg.go new file mode 100644 index 0000000000000..63b7ba1cd77ed --- /dev/null +++ b/executor/aggfuncs/func_avg.go @@ -0,0 +1,347 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggfuncs + +import ( + "unsafe" + + "github.com/juju/errors" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" +) + +// All the AggFunc implementations for "AVG" are listed here. +var ( + _ AggFunc = (*avgDedup4Decimal)(nil) + _ AggFunc = (*avgOriginal4Decimal)(nil) + _ AggFunc = (*avgPartial4Decimal)(nil) + + _ AggFunc = (*avgDedup4Float64)(nil) + _ AggFunc = (*avgOriginal4Float64)(nil) + _ AggFunc = (*avgPartial4Float64)(nil) + + _ AggFunc = (*avgDedup4Float32)(nil) + _ AggFunc = (*avgOriginal4Float32)(nil) + _ AggFunc = (*avgPartial4Float32)(nil) +) + +// All the following avg function implementations return the decimal result, +// which store the partial results in "partialResult4AvgDecimal". +// +// "baseAvgDecimal" is wrapped by: +// - "avgDedup4Decimal" +// - "avgOriginal4Decimal" +// - "avgPartial4Decimal" +type baseAvgDecimal struct { + baseAggFunc +} + +type partialResult4AvgDecimal struct { + sum types.MyDecimal + count int64 +} + +func (e *baseAvgDecimal) toPartialResult(partialBytes []byte) *partialResult4AvgDecimal { + return (*partialResult4AvgDecimal)(unsafe.Pointer(&partialBytes[0])) +} + +func (e *baseAvgDecimal) AllocPartialResult() []byte { + partialBytes := make([]byte, unsafe.Sizeof(partialResult4AvgDecimal{})) + e.ResetPartialResult(partialBytes) + return partialBytes +} + +func (e *baseAvgDecimal) ResetPartialResult(partialBytes []byte) { + *e.toPartialResult(partialBytes) = partialResult4AvgDecimal{} +} + +func (e *baseAvgDecimal) AppendPartialResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { + partialResult := e.toPartialResult(partialBytes) + chk.AppendMyDecimal(e.output[0], &partialResult.sum) + chk.AppendInt64(e.output[1], partialResult.count) + return nil +} + +func (e *baseAvgDecimal) AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { + partialResult := e.toPartialResult(partialBytes) + if partialResult.count == 0 { + chk.AppendNull(e.output[0]) + return nil + } + decimalCount := types.NewDecFromInt(partialResult.count) + finalResult := new(types.MyDecimal) + err := types.DecimalDiv(&partialResult.sum, decimalCount, finalResult, types.DivFracIncr) + if err != nil { + return errors.Trace(err) + } + chk.AppendMyDecimal(e.output[0], finalResult) + return nil +} + +type avgDedup4Decimal struct { + baseAvgDecimal +} + +func (e *avgDedup4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { + return errors.New("not implemented yet") +} + +type avgOriginal4Decimal struct { + baseAvgDecimal +} + +func (e *avgOriginal4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { + partialResult := e.toPartialResult(partialBytes) + newSum := new(types.MyDecimal) + for _, row := range rowsInGroup { + input, isNull, err := e.input[0].EvalDecimal(sctx, row) + if err != nil { + return errors.Trace(err) + } else if isNull { + continue + } + err = types.DecimalAdd(&partialResult.sum, input, newSum) + if err != nil { + return errors.Trace(err) + } + partialResult.sum = *newSum + partialResult.count++ + } + return nil +} + +type avgPartial4Decimal struct { + baseAvgDecimal +} + +func (e *avgPartial4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { + partialResult := e.toPartialResult(partialBytes) + newSum := new(types.MyDecimal) + for _, row := range rowsInGroup { + inputSum, isNull, err := e.input[1].EvalDecimal(sctx, row) + if err != nil { + return errors.Trace(err) + } else if isNull { + continue + } + + inputCount, isNull, err := e.input[0].EvalInt(sctx, row) + if err != nil { + return errors.Trace(err) + } else if isNull { + continue + } + + err = types.DecimalAdd(&partialResult.sum, inputSum, newSum) + if err != nil { + return errors.Trace(err) + } + partialResult.sum = *newSum + partialResult.count += inputCount + } + return nil +} + +// All the following avg function implementations return the float64 result, +// which store the partial results in "partialResult4AvgFloat64". +// +// "baseAvgFloat64" is wrapped by: +// - "avgDedup4Float64" +// - "avgOriginal4Float64" +// - "avgPartial4Float64" +type baseAvgFloat64 struct { + baseAggFunc +} + +type partialResult4AvgFloat64 struct { + sum float64 + count int64 +} + +func (e *baseAvgFloat64) toPartialResult(partialBytes []byte) *partialResult4AvgFloat64 { + return (*partialResult4AvgFloat64)(unsafe.Pointer(&partialBytes[0])) +} + +func (e *baseAvgFloat64) AllocPartialResult() []byte { + partialBytes := make([]byte, unsafe.Sizeof(partialResult4AvgFloat64{})) + e.ResetPartialResult(partialBytes) + return partialBytes +} + +func (e *baseAvgFloat64) ResetPartialResult(partialBytes []byte) { + *e.toPartialResult(partialBytes) = partialResult4AvgFloat64{} +} + +func (e *baseAvgFloat64) AppendPartialResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { + partialResult := e.toPartialResult(partialBytes) + chk.AppendFloat64(e.output[0], partialResult.sum) + chk.AppendInt64(e.output[1], partialResult.count) + return nil +} + +func (e *baseAvgFloat64) AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { + partialResult := e.toPartialResult(partialBytes) + chk.AppendFloat64(e.output[0], partialResult.sum/float64(partialResult.count)) + return nil +} + +type avgDedup4Float64 struct { + baseAvgFloat64 +} + +func (e *avgDedup4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { + return errors.New("not implemented yet") +} + +type avgOriginal4Float64 struct { + baseAvgFloat64 +} + +func (e *avgOriginal4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { + partialResult := e.toPartialResult(partialBytes) + for _, row := range rowsInGroup { + input, isNull, err := e.input[0].EvalReal(sctx, row) + if err != nil { + return errors.Trace(err) + } else if isNull { + continue + } + partialResult.sum += input + partialResult.count++ + } + return nil +} + +type avgPartial4Float64 struct { + baseAvgFloat64 +} + +func (e *avgPartial4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { + partialResult := e.toPartialResult(partialBytes) + for _, row := range rowsInGroup { + inputSum, isNull, err := e.input[1].EvalReal(sctx, row) + if err != nil { + return errors.Trace(err) + } else if isNull { + continue + } + + inputCount, isNull, err := e.input[0].EvalInt(sctx, row) + if err != nil { + return errors.Trace(err) + } else if isNull { + continue + } + partialResult.sum += inputSum + partialResult.count += inputCount + } + return nil +} + +// All the following avg function implementations return the float32 result, +// which store the partial results in "partialResult4AvgFloat32". +// +// "baseAvgFloat32" is wrapped by: +// - "avgDedup4Float32" +// - "avgOriginal4Float32" +// - "avgPartial4Float32" +type baseAvgFloat32 struct { + baseAggFunc +} + +type partialResult4AvgFloat32 struct { + sum float32 + count int64 +} + +func (e *baseAvgFloat32) toPartialResult(partialBytes []byte) *partialResult4AvgFloat32 { + return (*partialResult4AvgFloat32)(unsafe.Pointer(&partialBytes[0])) +} + +func (e *baseAvgFloat32) AllocPartialResult() []byte { + partialBytes := make([]byte, unsafe.Sizeof(partialResult4AvgFloat32{})) + e.ResetPartialResult(partialBytes) + return partialBytes +} + +func (e *baseAvgFloat32) ResetPartialResult(partialBytes []byte) { + *e.toPartialResult(partialBytes) = partialResult4AvgFloat32{} +} + +func (e *baseAvgFloat32) AppendPartialResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { + partialResult := e.toPartialResult(partialBytes) + chk.AppendFloat32(e.output[0], partialResult.sum) + chk.AppendInt64(e.output[1], partialResult.count) + return nil +} + +func (e *baseAvgFloat32) AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { + partialResult := e.toPartialResult(partialBytes) + chk.AppendFloat32(e.output[0], partialResult.sum/float32(partialResult.count)) + return nil +} + +type avgDedup4Float32 struct { + baseAvgFloat32 +} + +func (e *avgDedup4Float32) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { + return errors.New("not implemented yet") +} + +type avgOriginal4Float32 struct { + baseAvgFloat32 +} + +func (e *avgOriginal4Float32) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { + partialResult := e.toPartialResult(partialBytes) + for _, row := range rowsInGroup { + input, isNull, err := e.input[0].EvalReal(sctx, row) + if err != nil { + return errors.Trace(err) + } else if isNull { + continue + } + partialResult.sum += float32(input) + partialResult.count++ + } + return nil +} + +type avgPartial4Float32 struct { + baseAvgFloat32 +} + +func (e *avgPartial4Float32) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { + partialResult := e.toPartialResult(partialBytes) + for _, row := range rowsInGroup { + inputSum, isNull, err := e.input[1].EvalReal(sctx, row) + if err != nil { + return errors.Trace(err) + } else if isNull { + continue + } + + inputCount, isNull, err := e.input[0].EvalInt(sctx, row) + if err != nil { + return errors.Trace(err) + } else if isNull { + continue + } + partialResult.sum += float32(inputSum) + partialResult.count += inputCount + } + return nil +} diff --git a/executor/aggregate.go b/executor/aggregate.go index ab94b2f138d98..5985e20475e52 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -14,7 +14,9 @@ package executor import ( + "fmt" "github.com/juju/errors" + "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/mysql" @@ -189,11 +191,15 @@ type StreamAggExec struct { curGroupKey []types.Datum tmpGroupKey []types.Datum - // for chunk execution. inputIter *chunk.Iterator4Chunk inputRow chunk.Row mutableRow chunk.MutRow rowBuffer []types.Datum + + // for the new execution framework of aggregate functions + newAggFuncs []aggfuncs.AggFunc + partialBytes [][]byte + groupRows []chunk.Row } // Open implements the Executor Open interface. @@ -209,9 +215,16 @@ func (e *StreamAggExec) Open(ctx context.Context) error { e.mutableRow = chunk.MutRowFromTypes(e.retTypes()) e.rowBuffer = make([]types.Datum, 0, e.Schema().Len()) - e.aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.AggFuncs)) - for _, agg := range e.AggFuncs { - e.aggCtxs = append(e.aggCtxs, agg.CreateContext(e.ctx.GetSessionVars().StmtCtx)) + if e.newAggFuncs != nil { + e.partialBytes = make([][]byte, 0, len(e.newAggFuncs)) + for _, newAggFunc := range e.newAggFuncs { + e.partialBytes = append(e.partialBytes, newAggFunc.AllocPartialResult()) + } + } else { + e.aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.AggFuncs)) + for _, agg := range e.AggFuncs { + e.aggCtxs = append(e.aggCtxs, agg.CreateContext(e.ctx.GetSessionVars().StmtCtx)) + } } return nil @@ -242,13 +255,24 @@ func (e *StreamAggExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) e return errors.Trace(err) } if meetNewGroup { - e.appendResult2Chunk(chk) - } - for i, af := range e.AggFuncs { - err := af.Update(e.aggCtxs[i], e.StmtCtx, e.inputRow) + err := e.consumeGroupRows() if err != nil { return errors.Trace(err) } + err = e.appendResult2Chunk(chk) + if err != nil { + return errors.Trace(err) + } + } + if e.newAggFuncs != nil { + e.groupRows = append(e.groupRows, e.inputRow) + } else { + for i, af := range e.AggFuncs { + err := af.Update(e.aggCtxs[i], e.StmtCtx, e.inputRow) + if err != nil { + return errors.Trace(err) + } + } } if meetNewGroup { e.inputRow = e.inputIter.Next() @@ -259,11 +283,33 @@ func (e *StreamAggExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) e return nil } +func (e *StreamAggExec) consumeGroupRows() error { + if len(e.groupRows) == 0 { + return nil + } + + for i, newAggFunc := range e.newAggFuncs { + err := newAggFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialBytes[i]) + if err != nil { + return errors.Trace(err) + } + } + e.groupRows = e.groupRows[:0] + return nil +} + func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk) error { if e.inputRow != e.inputIter.End() { return nil } + if e.newAggFuncs != nil { + err := e.consumeGroupRows() + if err != nil { + return errors.Trace(err) + } + } + err := e.children[0].Next(ctx, e.childrenResults[0]) if err != nil { return errors.Trace(err) @@ -271,7 +317,10 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch // No more data. if e.childrenResults[0].NumRows() == 0 { if e.hasData || len(e.GroupByItems) == 0 { - e.appendResult2Chunk(chk) + err := e.appendResult2Chunk(chk) + if err != nil { + return errors.Trace(err) + } } e.executed = true return nil @@ -285,7 +334,21 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch // appendResult2Chunk appends result of all the aggregation functions to the // result chunk, and reset the evaluation context for each aggregation. -func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) { +func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) error { + if e.newAggFuncs != nil { + fmt.Printf("StreamAggExec.appendResult2Chunk: use new aggfunc\n") + for i, newAggFunc := range e.newAggFuncs { + err := newAggFunc.AppendFinalResult2Chunk(e.ctx, e.partialBytes[i], chk) + if err != nil { + return errors.Trace(err) + } + newAggFunc.ResetPartialResult(e.partialBytes[i]) + } + if len(e.newAggFuncs) == 0 { + chk.SetNumVirtualRows(chk.NumRows() + 1) + } + return nil + } e.rowBuffer = e.rowBuffer[:0] for i, af := range e.AggFuncs { e.rowBuffer = append(e.rowBuffer, af.GetResult(e.aggCtxs[i])) @@ -293,6 +356,7 @@ func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) { } e.mutableRow.SetDatums(e.rowBuffer...) chk.AppendRow(e.mutableRow.ToRow()) + return nil } // meetNewGroup returns a value that represents if the new group is different from last group. diff --git a/executor/builder.go b/executor/builder.go index de86e37018476..ea59153af1ac2 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/infoschema" @@ -852,8 +853,16 @@ func (b *executorBuilder) buildStreamAgg(v *plan.PhysicalStreamAgg) Executor { AggFuncs: make([]aggregation.Aggregation, 0, len(v.AggFuncs)), GroupByItems: v.GroupByItems, } - for _, aggDesc := range v.AggFuncs { + newAggFuncs := make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)) + for i, aggDesc := range v.AggFuncs { e.AggFuncs = append(e.AggFuncs, aggDesc.GetAggFunc()) + newAggFunc := aggfuncs.Build(aggDesc, []int{i}) + if newAggFunc != nil { + newAggFuncs = append(newAggFuncs, newAggFunc) + } + } + if len(newAggFuncs) == len(v.AggFuncs) { + e.newAggFuncs = newAggFuncs } metrics.ExecutorCounter.WithLabelValues("StreamAggExec").Inc() return e From 01ec36943246d0c76e156cbb94804487ff6bc17b Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Wed, 20 Jun 2018 12:49:11 +0800 Subject: [PATCH 02/14] change type name --- executor/aggfuncs/builder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/aggfuncs/builder.go b/executor/aggfuncs/builder.go index 98c0d090795b9..070ffd7d70970 100644 --- a/executor/aggfuncs/builder.go +++ b/executor/aggfuncs/builder.go @@ -79,7 +79,7 @@ func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { // partial results. case aggregation.CompleteMode, aggregation.Partial1Mode: switch aggFuncDesc.Args[0].GetType().Tp { - case mysql.TypeDecimal: + case mysql.TypeNewDecimal: return &avgOriginal4Decimal{baseAvgDecimal{base}} case mysql.TypeFloat: return &avgOriginal4Float32{baseAvgFloat32{base}} @@ -91,7 +91,7 @@ func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { // functions and update their partial results. case aggregation.Partial2Mode, aggregation.FinalMode: switch aggFuncDesc.Args[1].GetType().Tp { - case mysql.TypeDecimal: + case mysql.TypeNewDecimal: return &avgPartial4Decimal{baseAvgDecimal{base}} case mysql.TypeFloat: return &avgPartial4Float32{baseAvgFloat32{base}} From 7f1fd3a73105e5591688262b65aa554f045a434f Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Thu, 21 Jun 2018 22:06:00 +0800 Subject: [PATCH 03/14] handle distinct in partial1 and complete mode --- executor/aggfuncs/builder.go | 16 +++++++++++++--- executor/aggfuncs/func_avg.go | 18 +++++++++++++++--- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/executor/aggfuncs/builder.go b/executor/aggfuncs/builder.go index 070ffd7d70970..ae8c60152ae4b 100644 --- a/executor/aggfuncs/builder.go +++ b/executor/aggfuncs/builder.go @@ -17,6 +17,7 @@ import ( "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/types" ) // Build is used to build AggFunc according to the aggFuncDesc @@ -80,11 +81,20 @@ func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { case aggregation.CompleteMode, aggregation.Partial1Mode: switch aggFuncDesc.Args[0].GetType().Tp { case mysql.TypeNewDecimal: - return &avgOriginal4Decimal{baseAvgDecimal{base}} + if aggFuncDesc.HasDistinct { + return &avgOriginal4Decimal{baseAvgDecimal{base}, make(map[types.MyDecimal]bool)} + } + return &avgOriginal4Decimal{baseAvgDecimal{base}, nil} case mysql.TypeFloat: - return &avgOriginal4Float32{baseAvgFloat32{base}} + if aggFuncDesc.HasDistinct { + return &avgOriginal4Float32{baseAvgFloat32{base}, make(map[float32]bool)} + } + return &avgOriginal4Float32{baseAvgFloat32{base}, nil} case mysql.TypeDouble: - return &avgOriginal4Float64{baseAvgFloat64{base}} + if aggFuncDesc.HasDistinct { + return &avgOriginal4Float64{baseAvgFloat64{base}, make(map[float64]bool)} + } + return &avgOriginal4Float64{baseAvgFloat64{base}, nil} } // Build avg functions which consume the partial result of other agg diff --git a/executor/aggfuncs/func_avg.go b/executor/aggfuncs/func_avg.go index 63b7ba1cd77ed..fbb13f2c86b4f 100644 --- a/executor/aggfuncs/func_avg.go +++ b/executor/aggfuncs/func_avg.go @@ -100,6 +100,7 @@ func (e *avgDedup4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGr type avgOriginal4Decimal struct { baseAvgDecimal + deDuper map[types.MyDecimal]bool } func (e *avgOriginal4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { @@ -109,7 +110,7 @@ func (e *avgOriginal4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsI input, isNull, err := e.input[0].EvalDecimal(sctx, row) if err != nil { return errors.Trace(err) - } else if isNull { + } else if isNull || (e.deDuper != nil && e.deDuper[*input]) { continue } err = types.DecimalAdd(&partialResult.sum, input, newSum) @@ -118,6 +119,9 @@ func (e *avgOriginal4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsI } partialResult.sum = *newSum partialResult.count++ + if e.deDuper != nil { + e.deDuper[*input] = true + } } return nil } @@ -207,6 +211,7 @@ func (e *avgDedup4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGr type avgOriginal4Float64 struct { baseAvgFloat64 + deDuper map[float64]bool } func (e *avgOriginal4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { @@ -215,11 +220,14 @@ func (e *avgOriginal4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsI input, isNull, err := e.input[0].EvalReal(sctx, row) if err != nil { return errors.Trace(err) - } else if isNull { + } else if isNull || (e.deDuper != nil && e.deDuper[input]) { continue } partialResult.sum += input partialResult.count++ + if e.deDuper != nil { + e.deDuper[input] = true + } } return nil } @@ -303,6 +311,7 @@ func (e *avgDedup4Float32) UpdatePartialResult(sctx sessionctx.Context, rowsInGr type avgOriginal4Float32 struct { baseAvgFloat32 + deDuper map[float32]bool } func (e *avgOriginal4Float32) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { @@ -311,11 +320,14 @@ func (e *avgOriginal4Float32) UpdatePartialResult(sctx sessionctx.Context, rowsI input, isNull, err := e.input[0].EvalReal(sctx, row) if err != nil { return errors.Trace(err) - } else if isNull { + } else if isNull || (e.deDuper != nil && e.deDuper[float32(input)]) { continue } partialResult.sum += float32(input) partialResult.count++ + if e.deDuper != nil { + e.deDuper[float32(input)] = true + } } return nil } From e5acefb943683a79511bdab8a6894a694eabced5 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Thu, 21 Jun 2018 22:49:30 +0800 Subject: [PATCH 04/14] handle 0 count --- executor/aggfuncs/func_avg.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/executor/aggfuncs/func_avg.go b/executor/aggfuncs/func_avg.go index fbb13f2c86b4f..413ea1de673b6 100644 --- a/executor/aggfuncs/func_avg.go +++ b/executor/aggfuncs/func_avg.go @@ -197,7 +197,11 @@ func (e *baseAvgFloat64) AppendPartialResult2Chunk(sctx sessionctx.Context, part func (e *baseAvgFloat64) AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { partialResult := e.toPartialResult(partialBytes) - chk.AppendFloat64(e.output[0], partialResult.sum/float64(partialResult.count)) + if partialResult.count == 0 { + chk.AppendNull(e.output[0]) + } else { + chk.AppendFloat64(e.output[0], partialResult.sum/float64(partialResult.count)) + } return nil } @@ -297,7 +301,11 @@ func (e *baseAvgFloat32) AppendPartialResult2Chunk(sctx sessionctx.Context, part func (e *baseAvgFloat32) AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { partialResult := e.toPartialResult(partialBytes) - chk.AppendFloat32(e.output[0], partialResult.sum/float32(partialResult.count)) + if partialResult.count == 0 { + chk.AppendNull(e.output[0]) + } else { + chk.AppendFloat32(e.output[0], partialResult.sum/float32(partialResult.count)) + } return nil } From dca132a128a41a6a234e8b2414c158529bdc2d00 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Sat, 23 Jun 2018 20:41:07 +0800 Subject: [PATCH 05/14] add implementation of SUM --- executor/aggfuncs/aggfuncs.go | 2 ++ executor/aggfuncs/builder.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/executor/aggfuncs/aggfuncs.go b/executor/aggfuncs/aggfuncs.go index 18dac8bed8795..2d5dd993fb075 100644 --- a/executor/aggfuncs/aggfuncs.go +++ b/executor/aggfuncs/aggfuncs.go @@ -23,6 +23,8 @@ import ( var ( // All the AggFunc implementations for "COUNT". // All the AggFunc implementations for "SUM". + _ AggFunc = (*sum4Decimal)(nil) + // All the AggFunc implementations for "AVG". _ AggFunc = (*avgDedup4Decimal)(nil) _ AggFunc = (*avgOriginal4Decimal)(nil) diff --git a/executor/aggfuncs/builder.go b/executor/aggfuncs/builder.go index ae8c60152ae4b..87881d6a6eb32 100644 --- a/executor/aggfuncs/builder.go +++ b/executor/aggfuncs/builder.go @@ -52,6 +52,36 @@ func buildCount(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { } func buildSum(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { + base := baseAggFunc{ + input: aggFuncDesc.Args, + output: output, + } + + switch aggFuncDesc.Mode { + // Build sum functions which consume the original data and remove the + // duplicated input of the same group. + case aggregation.DedupMode: + return nil + + // Build sum functions which consume the original data and update their + // partial results. + case aggregation.CompleteMode, aggregation.Partial1Mode: + switch aggFuncDesc.Args[0].GetType().Tp { + case mysql.TypeNewDecimal: + if aggFuncDesc.HasDistinct { + return &sum4Decimal{base, make(map[types.MyDecimal]bool)} + } + return &sum4Decimal{base, nil} + } + + // Build sum functions which consume the partial result of other agg + // functions and update their partial results. + case aggregation.Partial2Mode, aggregation.FinalMode: + switch aggFuncDesc.Args[0].GetType().Tp { + case mysql.TypeNewDecimal: + return &sum4Decimal{base, nil} + } + } return nil } From cacad7c27ceb80d7bcb3da80c3cfe27dac3e5817 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Sat, 23 Jun 2018 20:43:13 +0800 Subject: [PATCH 06/14] add missing file --- executor/aggfuncs/func_sum.go | 85 +++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 executor/aggfuncs/func_sum.go diff --git a/executor/aggfuncs/func_sum.go b/executor/aggfuncs/func_sum.go new file mode 100644 index 0000000000000..9af3a62fba66b --- /dev/null +++ b/executor/aggfuncs/func_sum.go @@ -0,0 +1,85 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggfuncs + +import ( + "unsafe" + + "github.com/juju/errors" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" +) + +// All the AggFunc implementations for "SUM" are listed here. +var ( + _ AggFunc = (*sum4Decimal)(nil) +) + +type sum4Decimal struct { + baseAggFunc + deDuper map[types.MyDecimal]bool +} + +type partialResult4SumDecimal struct { + sum types.MyDecimal +} + +func (e *sum4Decimal) toPartialResult(partialBytes []byte) *partialResult4SumDecimal { + return (*partialResult4SumDecimal)(unsafe.Pointer(&partialBytes[0])) +} + +func (e *sum4Decimal) AllocPartialResult() []byte { + partialBytes := make([]byte, unsafe.Sizeof(partialResult4SumDecimal{})) + e.ResetPartialResult(partialBytes) + return partialBytes +} + +func (e *sum4Decimal) ResetPartialResult(partialBytes []byte) { + *e.toPartialResult(partialBytes) = partialResult4SumDecimal{} +} + +func (e *sum4Decimal) AppendPartialResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { + partialResult := e.toPartialResult(partialBytes) + chk.AppendMyDecimal(e.output[0], &partialResult.sum) + return nil +} + +func (e *sum4Decimal) AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { + partialResult := e.toPartialResult(partialBytes) + chk.AppendMyDecimal(e.output[0], &partialResult.sum) + return nil +} + +func (e *sum4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { + partialResult := e.toPartialResult(partialBytes) + newSum := new(types.MyDecimal) + for _, row := range rowsInGroup { + input, isNull, err := e.input[0].EvalDecimal(sctx, row) + if err != nil { + return errors.Trace(err) + } else if isNull || (e.deDuper != nil && e.deDuper[*input]) { + continue + } + err = types.DecimalAdd(&partialResult.sum, input, newSum) + if err != nil { + return errors.Trace(err) + } + partialResult.sum = *newSum + if e.deDuper != nil { + e.deDuper[*input] = true + } + } + return nil +} From d3a1ceda325454afbf2681b1ffdafa51996b0dc9 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Sun, 24 Jun 2018 14:22:01 +0800 Subject: [PATCH 07/14] only introduce the framework in this PR --- executor/aggfuncs/aggfuncs.go | 59 +++--- executor/aggfuncs/builder.go | 96 ++------- executor/aggfuncs/func_avg.go | 367 ---------------------------------- 3 files changed, 45 insertions(+), 477 deletions(-) delete mode 100644 executor/aggfuncs/func_avg.go diff --git a/executor/aggfuncs/aggfuncs.go b/executor/aggfuncs/aggfuncs.go index 2d5dd993fb075..b697b941b167c 100644 --- a/executor/aggfuncs/aggfuncs.go +++ b/executor/aggfuncs/aggfuncs.go @@ -21,39 +21,46 @@ import ( // All the AggFunc implementations are listed here for navigation. var ( - // All the AggFunc implementations for "COUNT". - // All the AggFunc implementations for "SUM". - _ AggFunc = (*sum4Decimal)(nil) - - // All the AggFunc implementations for "AVG". - _ AggFunc = (*avgDedup4Decimal)(nil) - _ AggFunc = (*avgOriginal4Decimal)(nil) - _ AggFunc = (*avgPartial4Decimal)(nil) - - _ AggFunc = (*avgDedup4Float64)(nil) - _ AggFunc = (*avgOriginal4Float64)(nil) - _ AggFunc = (*avgPartial4Float64)(nil) - - _ AggFunc = (*avgDedup4Float32)(nil) - _ AggFunc = (*avgOriginal4Float32)(nil) - _ AggFunc = (*avgPartial4Float32)(nil) - - // All the AggFunc implementations for "FIRSTROW". - // All the AggFunc implementations for "MAX". - // All the AggFunc implementations for "MIN". - // All the AggFunc implementations for "GROUP_CONCAT". - // All the AggFunc implementations for "BIT_OR". - // All the AggFunc implementations for "BIT_XOR". - // All the AggFunc implementations for "BIT_AND". +// All the AggFunc implementations for "COUNT" are listed here. +// All the AggFunc implementations for "SUM" are listed here. +// All the AggFunc implementations for "AVG" are listed here. +// All the AggFunc implementations for "FIRSTROW" are listed here. +// All the AggFunc implementations for "MAX" are listed here. +// All the AggFunc implementations for "MIN" are listed here. +// All the AggFunc implementations for "GROUP_CONCAT" are listed here. +// All the AggFunc implementations for "BIT_OR" are listed here. +// All the AggFunc implementations for "BIT_XOR" are listed here. +// All the AggFunc implementations for "BIT_AND" are listed here. ) -// AggFunc is the interface to evaluate aggregate functions. +// AggFunc is the interface to evaluate the aggregate functions. type AggFunc interface { + // AllocPartialResult allocates a specific data structure to store the + // partial result, initializes it, and converts it to a bype slice to return + // back. Aggregate operator implementations, no mater whether it's a hash or + // stream implementation, should hold this byte slice for further operations + // like: "ResetPartialResult", "UpdatePartialResult". AllocPartialResult() []byte + + // ResetPartialResult resets the partial result to the original state for a + // specific aggregate function. It converts the input byte slice to the + // specific data structure which stores the partial result and then reset + // every field to the proper original state. ResetPartialResult(partialBytes []byte) + + // UpdatePartialResult updates the specific partial result for an aggregate + // function using the input rows which all belonging to the same data group. + // It converts the input byte slice to the specific data structure which + // stores the partial result and then iterates on the input rows and update + // that partial result according to the functionality and the state of the + // aggregate function. UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error - AppendPartialResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error + // AppendFinalResult2Chunk finalizes the partial result and append the + // final result to the input chunk. Like other operations, it converts the + // input byte slice to the specific data structure which stores the partial + // result and then calculates the final result and append that final result + // to the chunk provided. AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error } diff --git a/executor/aggfuncs/builder.go b/executor/aggfuncs/builder.go index 87881d6a6eb32..f0342930e1a4f 100644 --- a/executor/aggfuncs/builder.go +++ b/executor/aggfuncs/builder.go @@ -20,7 +20,8 @@ import ( "github.com/pingcap/tidb/types" ) -// Build is used to build AggFunc according to the aggFuncDesc +// Build is used to build a specific AggFunc implementation according to the +// input aggFuncDesc func Build(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { switch aggFuncDesc.Name { case ast.AggFuncCount: @@ -47,125 +48,52 @@ func Build(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { return nil } +// buildCount builds the AggFunc implementation for function "COUNT". func buildCount(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { return nil } +// buildCount builds the AggFunc implementation for function "SUM". func buildSum(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { - base := baseAggFunc{ - input: aggFuncDesc.Args, - output: output, - } - - switch aggFuncDesc.Mode { - // Build sum functions which consume the original data and remove the - // duplicated input of the same group. - case aggregation.DedupMode: - return nil - - // Build sum functions which consume the original data and update their - // partial results. - case aggregation.CompleteMode, aggregation.Partial1Mode: - switch aggFuncDesc.Args[0].GetType().Tp { - case mysql.TypeNewDecimal: - if aggFuncDesc.HasDistinct { - return &sum4Decimal{base, make(map[types.MyDecimal]bool)} - } - return &sum4Decimal{base, nil} - } - - // Build sum functions which consume the partial result of other agg - // functions and update their partial results. - case aggregation.Partial2Mode, aggregation.FinalMode: - switch aggFuncDesc.Args[0].GetType().Tp { - case mysql.TypeNewDecimal: - return &sum4Decimal{base, nil} - } - } return nil } +// buildCount builds the AggFunc implementation for function "AVG". func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { - base := baseAggFunc{ - input: aggFuncDesc.Args, - output: output, - } - - switch aggFuncDesc.Mode { - // Build avg functions which consume the original data and remove the - // duplicated input of the same group. - case aggregation.DedupMode: - return nil - // TODO: implement UpdatePartialResult for the following structs. - // switch aggFuncDesc.Args[0].GetType().Tp { - // case mysql.TypeDecimal: - // return &avgDedup4Decimal{baseAvgDecimal{base}} - // case mysql.TypeFloat: - // return &avgDedup4Float32{baseAvgFloat32{base}} - // case mysql.TypeDouble: - // return &avgDedup4Float64{baseAvgFloat64{base}} - // } - - // Build avg functions which consume the original data and update their - // partial results. - case aggregation.CompleteMode, aggregation.Partial1Mode: - switch aggFuncDesc.Args[0].GetType().Tp { - case mysql.TypeNewDecimal: - if aggFuncDesc.HasDistinct { - return &avgOriginal4Decimal{baseAvgDecimal{base}, make(map[types.MyDecimal]bool)} - } - return &avgOriginal4Decimal{baseAvgDecimal{base}, nil} - case mysql.TypeFloat: - if aggFuncDesc.HasDistinct { - return &avgOriginal4Float32{baseAvgFloat32{base}, make(map[float32]bool)} - } - return &avgOriginal4Float32{baseAvgFloat32{base}, nil} - case mysql.TypeDouble: - if aggFuncDesc.HasDistinct { - return &avgOriginal4Float64{baseAvgFloat64{base}, make(map[float64]bool)} - } - return &avgOriginal4Float64{baseAvgFloat64{base}, nil} - } - - // Build avg functions which consume the partial result of other agg - // functions and update their partial results. - case aggregation.Partial2Mode, aggregation.FinalMode: - switch aggFuncDesc.Args[1].GetType().Tp { - case mysql.TypeNewDecimal: - return &avgPartial4Decimal{baseAvgDecimal{base}} - case mysql.TypeFloat: - return &avgPartial4Float32{baseAvgFloat32{base}} - case mysql.TypeDouble: - return &avgPartial4Float64{baseAvgFloat64{base}} - } - } return nil } +// buildCount builds the AggFunc implementation for function "FIRST_ROW". func buildFirstRow(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { return nil } +// buildCount builds the AggFunc implementation for function "MAX". func buildMax(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { return nil } +// buildCount builds the AggFunc implementation for function "MIN". func buildMin(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { return nil } +// buildCount builds the AggFunc implementation for function "GROUP_CONCAT". func buildGroupConcat(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { return nil } +// buildCount builds the AggFunc implementation for function "BIT_OR". func buildBitOr(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { return nil } +// buildCount builds the AggFunc implementation for function "BIT_XOR". func buildBitXor(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { return nil } +// buildCount builds the AggFunc implementation for function "BIT_AND". func buildBitAnd(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { return nil } diff --git a/executor/aggfuncs/func_avg.go b/executor/aggfuncs/func_avg.go deleted file mode 100644 index 413ea1de673b6..0000000000000 --- a/executor/aggfuncs/func_avg.go +++ /dev/null @@ -1,367 +0,0 @@ -// Copyright 2018 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package aggfuncs - -import ( - "unsafe" - - "github.com/juju/errors" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/chunk" -) - -// All the AggFunc implementations for "AVG" are listed here. -var ( - _ AggFunc = (*avgDedup4Decimal)(nil) - _ AggFunc = (*avgOriginal4Decimal)(nil) - _ AggFunc = (*avgPartial4Decimal)(nil) - - _ AggFunc = (*avgDedup4Float64)(nil) - _ AggFunc = (*avgOriginal4Float64)(nil) - _ AggFunc = (*avgPartial4Float64)(nil) - - _ AggFunc = (*avgDedup4Float32)(nil) - _ AggFunc = (*avgOriginal4Float32)(nil) - _ AggFunc = (*avgPartial4Float32)(nil) -) - -// All the following avg function implementations return the decimal result, -// which store the partial results in "partialResult4AvgDecimal". -// -// "baseAvgDecimal" is wrapped by: -// - "avgDedup4Decimal" -// - "avgOriginal4Decimal" -// - "avgPartial4Decimal" -type baseAvgDecimal struct { - baseAggFunc -} - -type partialResult4AvgDecimal struct { - sum types.MyDecimal - count int64 -} - -func (e *baseAvgDecimal) toPartialResult(partialBytes []byte) *partialResult4AvgDecimal { - return (*partialResult4AvgDecimal)(unsafe.Pointer(&partialBytes[0])) -} - -func (e *baseAvgDecimal) AllocPartialResult() []byte { - partialBytes := make([]byte, unsafe.Sizeof(partialResult4AvgDecimal{})) - e.ResetPartialResult(partialBytes) - return partialBytes -} - -func (e *baseAvgDecimal) ResetPartialResult(partialBytes []byte) { - *e.toPartialResult(partialBytes) = partialResult4AvgDecimal{} -} - -func (e *baseAvgDecimal) AppendPartialResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { - partialResult := e.toPartialResult(partialBytes) - chk.AppendMyDecimal(e.output[0], &partialResult.sum) - chk.AppendInt64(e.output[1], partialResult.count) - return nil -} - -func (e *baseAvgDecimal) AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { - partialResult := e.toPartialResult(partialBytes) - if partialResult.count == 0 { - chk.AppendNull(e.output[0]) - return nil - } - decimalCount := types.NewDecFromInt(partialResult.count) - finalResult := new(types.MyDecimal) - err := types.DecimalDiv(&partialResult.sum, decimalCount, finalResult, types.DivFracIncr) - if err != nil { - return errors.Trace(err) - } - chk.AppendMyDecimal(e.output[0], finalResult) - return nil -} - -type avgDedup4Decimal struct { - baseAvgDecimal -} - -func (e *avgDedup4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { - return errors.New("not implemented yet") -} - -type avgOriginal4Decimal struct { - baseAvgDecimal - deDuper map[types.MyDecimal]bool -} - -func (e *avgOriginal4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { - partialResult := e.toPartialResult(partialBytes) - newSum := new(types.MyDecimal) - for _, row := range rowsInGroup { - input, isNull, err := e.input[0].EvalDecimal(sctx, row) - if err != nil { - return errors.Trace(err) - } else if isNull || (e.deDuper != nil && e.deDuper[*input]) { - continue - } - err = types.DecimalAdd(&partialResult.sum, input, newSum) - if err != nil { - return errors.Trace(err) - } - partialResult.sum = *newSum - partialResult.count++ - if e.deDuper != nil { - e.deDuper[*input] = true - } - } - return nil -} - -type avgPartial4Decimal struct { - baseAvgDecimal -} - -func (e *avgPartial4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { - partialResult := e.toPartialResult(partialBytes) - newSum := new(types.MyDecimal) - for _, row := range rowsInGroup { - inputSum, isNull, err := e.input[1].EvalDecimal(sctx, row) - if err != nil { - return errors.Trace(err) - } else if isNull { - continue - } - - inputCount, isNull, err := e.input[0].EvalInt(sctx, row) - if err != nil { - return errors.Trace(err) - } else if isNull { - continue - } - - err = types.DecimalAdd(&partialResult.sum, inputSum, newSum) - if err != nil { - return errors.Trace(err) - } - partialResult.sum = *newSum - partialResult.count += inputCount - } - return nil -} - -// All the following avg function implementations return the float64 result, -// which store the partial results in "partialResult4AvgFloat64". -// -// "baseAvgFloat64" is wrapped by: -// - "avgDedup4Float64" -// - "avgOriginal4Float64" -// - "avgPartial4Float64" -type baseAvgFloat64 struct { - baseAggFunc -} - -type partialResult4AvgFloat64 struct { - sum float64 - count int64 -} - -func (e *baseAvgFloat64) toPartialResult(partialBytes []byte) *partialResult4AvgFloat64 { - return (*partialResult4AvgFloat64)(unsafe.Pointer(&partialBytes[0])) -} - -func (e *baseAvgFloat64) AllocPartialResult() []byte { - partialBytes := make([]byte, unsafe.Sizeof(partialResult4AvgFloat64{})) - e.ResetPartialResult(partialBytes) - return partialBytes -} - -func (e *baseAvgFloat64) ResetPartialResult(partialBytes []byte) { - *e.toPartialResult(partialBytes) = partialResult4AvgFloat64{} -} - -func (e *baseAvgFloat64) AppendPartialResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { - partialResult := e.toPartialResult(partialBytes) - chk.AppendFloat64(e.output[0], partialResult.sum) - chk.AppendInt64(e.output[1], partialResult.count) - return nil -} - -func (e *baseAvgFloat64) AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { - partialResult := e.toPartialResult(partialBytes) - if partialResult.count == 0 { - chk.AppendNull(e.output[0]) - } else { - chk.AppendFloat64(e.output[0], partialResult.sum/float64(partialResult.count)) - } - return nil -} - -type avgDedup4Float64 struct { - baseAvgFloat64 -} - -func (e *avgDedup4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { - return errors.New("not implemented yet") -} - -type avgOriginal4Float64 struct { - baseAvgFloat64 - deDuper map[float64]bool -} - -func (e *avgOriginal4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { - partialResult := e.toPartialResult(partialBytes) - for _, row := range rowsInGroup { - input, isNull, err := e.input[0].EvalReal(sctx, row) - if err != nil { - return errors.Trace(err) - } else if isNull || (e.deDuper != nil && e.deDuper[input]) { - continue - } - partialResult.sum += input - partialResult.count++ - if e.deDuper != nil { - e.deDuper[input] = true - } - } - return nil -} - -type avgPartial4Float64 struct { - baseAvgFloat64 -} - -func (e *avgPartial4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { - partialResult := e.toPartialResult(partialBytes) - for _, row := range rowsInGroup { - inputSum, isNull, err := e.input[1].EvalReal(sctx, row) - if err != nil { - return errors.Trace(err) - } else if isNull { - continue - } - - inputCount, isNull, err := e.input[0].EvalInt(sctx, row) - if err != nil { - return errors.Trace(err) - } else if isNull { - continue - } - partialResult.sum += inputSum - partialResult.count += inputCount - } - return nil -} - -// All the following avg function implementations return the float32 result, -// which store the partial results in "partialResult4AvgFloat32". -// -// "baseAvgFloat32" is wrapped by: -// - "avgDedup4Float32" -// - "avgOriginal4Float32" -// - "avgPartial4Float32" -type baseAvgFloat32 struct { - baseAggFunc -} - -type partialResult4AvgFloat32 struct { - sum float32 - count int64 -} - -func (e *baseAvgFloat32) toPartialResult(partialBytes []byte) *partialResult4AvgFloat32 { - return (*partialResult4AvgFloat32)(unsafe.Pointer(&partialBytes[0])) -} - -func (e *baseAvgFloat32) AllocPartialResult() []byte { - partialBytes := make([]byte, unsafe.Sizeof(partialResult4AvgFloat32{})) - e.ResetPartialResult(partialBytes) - return partialBytes -} - -func (e *baseAvgFloat32) ResetPartialResult(partialBytes []byte) { - *e.toPartialResult(partialBytes) = partialResult4AvgFloat32{} -} - -func (e *baseAvgFloat32) AppendPartialResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { - partialResult := e.toPartialResult(partialBytes) - chk.AppendFloat32(e.output[0], partialResult.sum) - chk.AppendInt64(e.output[1], partialResult.count) - return nil -} - -func (e *baseAvgFloat32) AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { - partialResult := e.toPartialResult(partialBytes) - if partialResult.count == 0 { - chk.AppendNull(e.output[0]) - } else { - chk.AppendFloat32(e.output[0], partialResult.sum/float32(partialResult.count)) - } - return nil -} - -type avgDedup4Float32 struct { - baseAvgFloat32 -} - -func (e *avgDedup4Float32) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { - return errors.New("not implemented yet") -} - -type avgOriginal4Float32 struct { - baseAvgFloat32 - deDuper map[float32]bool -} - -func (e *avgOriginal4Float32) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { - partialResult := e.toPartialResult(partialBytes) - for _, row := range rowsInGroup { - input, isNull, err := e.input[0].EvalReal(sctx, row) - if err != nil { - return errors.Trace(err) - } else if isNull || (e.deDuper != nil && e.deDuper[float32(input)]) { - continue - } - partialResult.sum += float32(input) - partialResult.count++ - if e.deDuper != nil { - e.deDuper[float32(input)] = true - } - } - return nil -} - -type avgPartial4Float32 struct { - baseAvgFloat32 -} - -func (e *avgPartial4Float32) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { - partialResult := e.toPartialResult(partialBytes) - for _, row := range rowsInGroup { - inputSum, isNull, err := e.input[1].EvalReal(sctx, row) - if err != nil { - return errors.Trace(err) - } else if isNull { - continue - } - - inputCount, isNull, err := e.input[0].EvalInt(sctx, row) - if err != nil { - return errors.Trace(err) - } else if isNull { - continue - } - partialResult.sum += float32(inputSum) - partialResult.count += inputCount - } - return nil -} From cf34a1666354659b661fb716b241f903bc0af0d9 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Sun, 24 Jun 2018 14:23:40 +0800 Subject: [PATCH 08/14] remove useless code --- executor/aggfuncs/func_sum.go | 85 ----------------------------------- 1 file changed, 85 deletions(-) delete mode 100644 executor/aggfuncs/func_sum.go diff --git a/executor/aggfuncs/func_sum.go b/executor/aggfuncs/func_sum.go deleted file mode 100644 index 9af3a62fba66b..0000000000000 --- a/executor/aggfuncs/func_sum.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2018 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package aggfuncs - -import ( - "unsafe" - - "github.com/juju/errors" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/chunk" -) - -// All the AggFunc implementations for "SUM" are listed here. -var ( - _ AggFunc = (*sum4Decimal)(nil) -) - -type sum4Decimal struct { - baseAggFunc - deDuper map[types.MyDecimal]bool -} - -type partialResult4SumDecimal struct { - sum types.MyDecimal -} - -func (e *sum4Decimal) toPartialResult(partialBytes []byte) *partialResult4SumDecimal { - return (*partialResult4SumDecimal)(unsafe.Pointer(&partialBytes[0])) -} - -func (e *sum4Decimal) AllocPartialResult() []byte { - partialBytes := make([]byte, unsafe.Sizeof(partialResult4SumDecimal{})) - e.ResetPartialResult(partialBytes) - return partialBytes -} - -func (e *sum4Decimal) ResetPartialResult(partialBytes []byte) { - *e.toPartialResult(partialBytes) = partialResult4SumDecimal{} -} - -func (e *sum4Decimal) AppendPartialResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { - partialResult := e.toPartialResult(partialBytes) - chk.AppendMyDecimal(e.output[0], &partialResult.sum) - return nil -} - -func (e *sum4Decimal) AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error { - partialResult := e.toPartialResult(partialBytes) - chk.AppendMyDecimal(e.output[0], &partialResult.sum) - return nil -} - -func (e *sum4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error { - partialResult := e.toPartialResult(partialBytes) - newSum := new(types.MyDecimal) - for _, row := range rowsInGroup { - input, isNull, err := e.input[0].EvalDecimal(sctx, row) - if err != nil { - return errors.Trace(err) - } else if isNull || (e.deDuper != nil && e.deDuper[*input]) { - continue - } - err = types.DecimalAdd(&partialResult.sum, input, newSum) - if err != nil { - return errors.Trace(err) - } - partialResult.sum = *newSum - if e.deDuper != nil { - e.deDuper[*input] = true - } - } - return nil -} From b1335c36da5a8e1257784273816dda7b35b26cd8 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Sun, 24 Jun 2018 14:26:16 +0800 Subject: [PATCH 09/14] remove debug log --- executor/aggregate.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 5985e20475e52..b5ff9c8fc5153 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -14,7 +14,6 @@ package executor import ( - "fmt" "github.com/juju/errors" "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" @@ -336,7 +335,6 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch // result chunk, and reset the evaluation context for each aggregation. func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) error { if e.newAggFuncs != nil { - fmt.Printf("StreamAggExec.appendResult2Chunk: use new aggfunc\n") for i, newAggFunc := range e.newAggFuncs { err := newAggFunc.AppendFinalResult2Chunk(e.ctx, e.partialBytes[i], chk) if err != nil { From dbbef246a9eef4a91de52a8f9a12d890f6c53deb Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Sun, 24 Jun 2018 14:32:14 +0800 Subject: [PATCH 10/14] fix ci --- executor/aggfuncs/builder.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/executor/aggfuncs/builder.go b/executor/aggfuncs/builder.go index f0342930e1a4f..29b2dd2d6ecc0 100644 --- a/executor/aggfuncs/builder.go +++ b/executor/aggfuncs/builder.go @@ -16,8 +16,6 @@ package aggfuncs import ( "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/expression/aggregation" - "github.com/pingcap/tidb/mysql" - "github.com/pingcap/tidb/types" ) // Build is used to build a specific AggFunc implementation according to the From 5f1782287e2fddd8769f3dc0d40db5374c961105 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Mon, 25 Jun 2018 16:39:47 +0800 Subject: [PATCH 11/14] address comment --- executor/aggfuncs/aggfuncs.go | 11 ++++++++--- executor/aggfuncs/builder.go | 2 +- executor/builder.go | 4 ++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/executor/aggfuncs/aggfuncs.go b/executor/aggfuncs/aggfuncs.go index b697b941b167c..05a57a4694f5a 100644 --- a/executor/aggfuncs/aggfuncs.go +++ b/executor/aggfuncs/aggfuncs.go @@ -37,8 +37,8 @@ var ( type AggFunc interface { // AllocPartialResult allocates a specific data structure to store the // partial result, initializes it, and converts it to a bype slice to return - // back. Aggregate operator implementations, no mater whether it's a hash or - // stream implementation, should hold this byte slice for further operations + // back. Aggregate operator implementation, no matter it's a hash or stream + // implementation, should hold this byte slice for the further operations // like: "ResetPartialResult", "UpdatePartialResult". AllocPartialResult() []byte @@ -65,6 +65,11 @@ type AggFunc interface { } type baseAggFunc struct { - input []expression.Expression + // input stores the input arguments for an aggregate function, we should + // call input.EvalXXX to get the actual input data for this function. + input []expression.Expression + + // output stores the ordinal of the columns in the output chunk, which is + // used to append the partial or final result of this function. output []int } diff --git a/executor/aggfuncs/builder.go b/executor/aggfuncs/builder.go index 29b2dd2d6ecc0..27e456ead4870 100644 --- a/executor/aggfuncs/builder.go +++ b/executor/aggfuncs/builder.go @@ -19,7 +19,7 @@ import ( ) // Build is used to build a specific AggFunc implementation according to the -// input aggFuncDesc +// input aggFuncDesc. func Build(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { switch aggFuncDesc.Name { case ast.AggFuncCount: diff --git a/executor/builder.go b/executor/builder.go index abb68dc64deae..86fb4a42e4fa1 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -928,6 +928,10 @@ func (b *executorBuilder) buildStreamAgg(v *plan.PhysicalStreamAgg) Executor { newAggFuncs = append(newAggFuncs, newAggFunc) } } + + // Once we have successfully build all the aggregate functions to the new + // aggregate function execution framework, we can store them to the stream + // aggregate operator to indicate it using the new execution framework. if len(newAggFuncs) == len(v.AggFuncs) { e.newAggFuncs = newAggFuncs } From aa71782cd325f89c3a385a46729273a38c4d0147 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Mon, 25 Jun 2018 18:39:43 +0800 Subject: [PATCH 12/14] address comment --- executor/aggfuncs/aggfuncs.go | 37 +++++++++++++++++++++-------------- executor/aggregate.go | 16 +++++++-------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/executor/aggfuncs/aggfuncs.go b/executor/aggfuncs/aggfuncs.go index 05a57a4694f5a..9d31941570f6f 100644 --- a/executor/aggfuncs/aggfuncs.go +++ b/executor/aggfuncs/aggfuncs.go @@ -14,6 +14,8 @@ package aggfuncs import ( + "unsafe" + "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/chunk" @@ -33,35 +35,40 @@ var ( // All the AggFunc implementations for "BIT_AND" are listed here. ) +// PartialResult represents data structure to store the partial result for the +// aggregate functions. Here we use unsafe.Pointer to allow the partial result +// to be any type. +type PartialResult unsafe.Pointer + // AggFunc is the interface to evaluate the aggregate functions. type AggFunc interface { // AllocPartialResult allocates a specific data structure to store the - // partial result, initializes it, and converts it to a bype slice to return - // back. Aggregate operator implementation, no matter it's a hash or stream - // implementation, should hold this byte slice for the further operations - // like: "ResetPartialResult", "UpdatePartialResult". - AllocPartialResult() []byte + // partial result, initializes it, and converts it to PartialResult to + // return back. Aggregate operator implementation, no matter it's a hash + // or stream, should hold this allocated PartialResult for the further + // operations like: "ResetPartialResult", "UpdatePartialResult". + AllocPartialResult() PartialResult // ResetPartialResult resets the partial result to the original state for a - // specific aggregate function. It converts the input byte slice to the + // specific aggregate function. It converts the input PartialResult to the // specific data structure which stores the partial result and then reset // every field to the proper original state. - ResetPartialResult(partialBytes []byte) + ResetPartialResult(pr PartialResult) // UpdatePartialResult updates the specific partial result for an aggregate // function using the input rows which all belonging to the same data group. - // It converts the input byte slice to the specific data structure which - // stores the partial result and then iterates on the input rows and update - // that partial result according to the functionality and the state of the + // It converts the PartialResult to the specific data structure which stores + // the partial result and then iterates on the input rows and update that + // partial result according to the functionality and the state of the // aggregate function. - UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error + UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error // AppendFinalResult2Chunk finalizes the partial result and append the // final result to the input chunk. Like other operations, it converts the - // input byte slice to the specific data structure which stores the partial - // result and then calculates the final result and append that final result - // to the chunk provided. - AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error + // input PartialResult to the specific data structure which stores the + // partial result and then calculates the final result and append that + // final result to the chunk provided. + AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error } type baseAggFunc struct { diff --git a/executor/aggregate.go b/executor/aggregate.go index b5ff9c8fc5153..eab7036ac8c11 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -196,9 +196,9 @@ type StreamAggExec struct { rowBuffer []types.Datum // for the new execution framework of aggregate functions - newAggFuncs []aggfuncs.AggFunc - partialBytes [][]byte - groupRows []chunk.Row + newAggFuncs []aggfuncs.AggFunc + partialResults []aggfuncs.PartialResult + groupRows []chunk.Row } // Open implements the Executor Open interface. @@ -215,9 +215,9 @@ func (e *StreamAggExec) Open(ctx context.Context) error { e.rowBuffer = make([]types.Datum, 0, e.Schema().Len()) if e.newAggFuncs != nil { - e.partialBytes = make([][]byte, 0, len(e.newAggFuncs)) + e.partialResults = make([]aggfuncs.PartialResult, 0, len(e.newAggFuncs)) for _, newAggFunc := range e.newAggFuncs { - e.partialBytes = append(e.partialBytes, newAggFunc.AllocPartialResult()) + e.partialResults = append(e.partialResults, newAggFunc.AllocPartialResult()) } } else { e.aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.AggFuncs)) @@ -288,7 +288,7 @@ func (e *StreamAggExec) consumeGroupRows() error { } for i, newAggFunc := range e.newAggFuncs { - err := newAggFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialBytes[i]) + err := newAggFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResults[i]) if err != nil { return errors.Trace(err) } @@ -336,11 +336,11 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) error { if e.newAggFuncs != nil { for i, newAggFunc := range e.newAggFuncs { - err := newAggFunc.AppendFinalResult2Chunk(e.ctx, e.partialBytes[i], chk) + err := newAggFunc.AppendFinalResult2Chunk(e.ctx, e.partialResults[i], chk) if err != nil { return errors.Trace(err) } - newAggFunc.ResetPartialResult(e.partialBytes[i]) + newAggFunc.ResetPartialResult(e.partialResults[i]) } if len(e.newAggFuncs) == 0 { chk.SetNumVirtualRows(chk.NumRows() + 1) From f3b005f28810745787576dcc44d974c5fb2388eb Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Tue, 26 Jun 2018 18:54:43 +0800 Subject: [PATCH 13/14] address comment --- executor/aggfuncs/aggfuncs.go | 12 +++++----- executor/aggfuncs/builder.go | 42 +++++++++++++++++------------------ executor/builder.go | 2 +- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/executor/aggfuncs/aggfuncs.go b/executor/aggfuncs/aggfuncs.go index 9d31941570f6f..7e82e2c11ff75 100644 --- a/executor/aggfuncs/aggfuncs.go +++ b/executor/aggfuncs/aggfuncs.go @@ -72,11 +72,11 @@ type AggFunc interface { } type baseAggFunc struct { - // input stores the input arguments for an aggregate function, we should - // call input.EvalXXX to get the actual input data for this function. - input []expression.Expression + // args stores the input arguments for an aggregate function, we should + // call arg.EvalXXX to get the actual input data for this function. + args []expression.Expression - // output stores the ordinal of the columns in the output chunk, which is - // used to append the partial or final result of this function. - output []int + // ordinal stores the ordinal of the columns in the output chunk, which is + // used to append the final result of this function. + ordinal int } diff --git a/executor/aggfuncs/builder.go b/executor/aggfuncs/builder.go index 27e456ead4870..09fab4126eeee 100644 --- a/executor/aggfuncs/builder.go +++ b/executor/aggfuncs/builder.go @@ -20,78 +20,78 @@ import ( // Build is used to build a specific AggFunc implementation according to the // input aggFuncDesc. -func Build(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func Build(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { switch aggFuncDesc.Name { case ast.AggFuncCount: - return buildCount(aggFuncDesc, output) + return buildCount(aggFuncDesc, ordinal) case ast.AggFuncSum: - return buildSum(aggFuncDesc, output) + return buildSum(aggFuncDesc, ordinal) case ast.AggFuncAvg: - return buildAvg(aggFuncDesc, output) + return buildAvg(aggFuncDesc, ordinal) case ast.AggFuncFirstRow: - return buildFirstRow(aggFuncDesc, output) + return buildFirstRow(aggFuncDesc, ordinal) case ast.AggFuncMax: - return buildMax(aggFuncDesc, output) + return buildMax(aggFuncDesc, ordinal) case ast.AggFuncMin: - return buildMin(aggFuncDesc, output) + return buildMin(aggFuncDesc, ordinal) case ast.AggFuncGroupConcat: - return buildGroupConcat(aggFuncDesc, output) + return buildGroupConcat(aggFuncDesc, ordinal) case ast.AggFuncBitOr: - return buildBitOr(aggFuncDesc, output) + return buildBitOr(aggFuncDesc, ordinal) case ast.AggFuncBitXor: - return buildBitXor(aggFuncDesc, output) + return buildBitXor(aggFuncDesc, ordinal) case ast.AggFuncBitAnd: - return buildBitAnd(aggFuncDesc, output) + return buildBitAnd(aggFuncDesc, ordinal) } return nil } // buildCount builds the AggFunc implementation for function "COUNT". -func buildCount(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func buildCount(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { return nil } // buildCount builds the AggFunc implementation for function "SUM". -func buildSum(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func buildSum(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { return nil } // buildCount builds the AggFunc implementation for function "AVG". -func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { return nil } // buildCount builds the AggFunc implementation for function "FIRST_ROW". -func buildFirstRow(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func buildFirstRow(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { return nil } // buildCount builds the AggFunc implementation for function "MAX". -func buildMax(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func buildMax(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { return nil } // buildCount builds the AggFunc implementation for function "MIN". -func buildMin(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func buildMin(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { return nil } // buildCount builds the AggFunc implementation for function "GROUP_CONCAT". -func buildGroupConcat(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func buildGroupConcat(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { return nil } // buildCount builds the AggFunc implementation for function "BIT_OR". -func buildBitOr(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func buildBitOr(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { return nil } // buildCount builds the AggFunc implementation for function "BIT_XOR". -func buildBitXor(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func buildBitXor(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { return nil } // buildCount builds the AggFunc implementation for function "BIT_AND". -func buildBitAnd(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { +func buildBitAnd(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { return nil } diff --git a/executor/builder.go b/executor/builder.go index 86fb4a42e4fa1..34394cef6833b 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -923,7 +923,7 @@ func (b *executorBuilder) buildStreamAgg(v *plan.PhysicalStreamAgg) Executor { newAggFuncs := make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)) for i, aggDesc := range v.AggFuncs { e.AggFuncs = append(e.AggFuncs, aggDesc.GetAggFunc()) - newAggFunc := aggfuncs.Build(aggDesc, []int{i}) + newAggFunc := aggfuncs.Build(aggDesc, i) if newAggFunc != nil { newAggFuncs = append(newAggFuncs, newAggFunc) } From 0d859db77ad1861641c8fbe0286670ea90a36fd2 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Fri, 29 Jun 2018 13:45:25 +0800 Subject: [PATCH 14/14] addres comment --- executor/aggregate.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index eab7036ac8c11..0b5336392fc7b 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -278,6 +278,12 @@ func (e *StreamAggExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) e return nil } } + if e.newAggFuncs != nil { + err := e.consumeGroupRows() + if err != nil { + return errors.Trace(err) + } + } } return nil } @@ -302,13 +308,6 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch return nil } - if e.newAggFuncs != nil { - err := e.consumeGroupRows() - if err != nil { - return errors.Trace(err) - } - } - err := e.children[0].Next(ctx, e.childrenResults[0]) if err != nil { return errors.Trace(err)