Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: introduce a new execution framework for aggregate functions #6852

Merged
merged 24 commits into from
Jun 29, 2018
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
182658b
executor: refactor aggregate functions
zz-jason Jun 17, 2018
01ec369
change type name
zz-jason Jun 20, 2018
faa3f7d
Merge branch 'master' into dev/refactor-agg
zz-jason Jun 21, 2018
1ebf4d5
Merge branch 'master' into dev/refactor-agg
zz-jason Jun 21, 2018
038135a
Merge branch 'master' into dev/refactor-agg
zz-jason Jun 21, 2018
7f1fd3a
handle distinct in partial1 and complete mode
zz-jason Jun 21, 2018
e5acefb
handle 0 count
zz-jason Jun 21, 2018
e25d273
Merge branch 'master' into dev/refactor-agg
zz-jason Jun 22, 2018
dca132a
add implementation of SUM
zz-jason Jun 23, 2018
81c812b
Merge branch 'dev/refactor-agg' of https://github.com/zz-jason/tidb i…
zz-jason Jun 23, 2018
cacad7c
add missing file
zz-jason Jun 23, 2018
d3a1ced
only introduce the framework in this PR
zz-jason Jun 24, 2018
cf34a16
remove useless code
zz-jason Jun 24, 2018
b1335c3
remove debug log
zz-jason Jun 24, 2018
ce821f9
Merge branch 'master' into dev/refactor-agg
zz-jason Jun 24, 2018
dbbef24
fix ci
zz-jason Jun 24, 2018
57f57b8
Merge branch 'dev/refactor-agg' of https://github.com/zz-jason/tidb i…
zz-jason Jun 24, 2018
5f17822
address comment
zz-jason Jun 25, 2018
aa71782
address comment
zz-jason Jun 25, 2018
f3b005f
address comment
zz-jason Jun 26, 2018
c7f7144
Merge branch 'master' into dev/refactor-agg
lysu Jun 28, 2018
66801be
Merge branch 'master' into dev/refactor-agg
zz-jason Jun 28, 2018
0d859db
addres comment
zz-jason Jun 29, 2018
7f8134c
Merge branch 'master' into dev/refactor-agg
coocood Jun 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions executor/aggfuncs/aggfuncs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package aggfuncs

import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
)

// All the AggFunc implementations are listed here for navigation.
var (
// All the AggFunc implementations for "COUNT" are listed here.
// All the AggFunc implementations for "SUM" are listed here.
// All the AggFunc implementations for "AVG" are listed here.
// All the AggFunc implementations for "FIRSTROW" are listed here.
// All the AggFunc implementations for "MAX" are listed here.
// All the AggFunc implementations for "MIN" are listed here.
// All the AggFunc implementations for "GROUP_CONCAT" are listed here.
// All the AggFunc implementations for "BIT_OR" are listed here.
// All the AggFunc implementations for "BIT_XOR" are listed here.
// All the AggFunc implementations for "BIT_AND" are listed here.
)

// AggFunc is the interface to evaluate the aggregate functions.
type AggFunc interface {
// AllocPartialResult allocates a specific data structure to store the
// partial result, initializes it, and converts it to a bype slice to return
// back. Aggregate operator implementations, no mater whether it's a hash or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ mater/ matter

// stream implementation, should hold this byte slice for further operations
// like: "ResetPartialResult", "UpdatePartialResult".
AllocPartialResult() []byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we need another struct which contains partialResultBytes to handle the hashagg evaluation and aggfunc with distinct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for now, we can just add a map field in a specific aggregate function implementation, during the execution of UpdatePartialResult we use that map to deduplicate the input, when ResetPartialResult, we reset that map.


// ResetPartialResult resets the partial result to the original state for a
// specific aggregate function. It converts the input byte slice to the
// specific data structure which stores the partial result and then reset
// every field to the proper original state.
ResetPartialResult(partialBytes []byte)

// UpdatePartialResult updates the specific partial result for an aggregate
// function using the input rows which all belonging to the same data group.
// It converts the input byte slice to the specific data structure which
// stores the partial result and then iterates on the input rows and update
// that partial result according to the functionality and the state of the
// aggregate function.
UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partialBytes []byte) error

// AppendFinalResult2Chunk finalizes the partial result and append the
// final result to the input chunk. Like other operations, it converts the
// input byte slice to the specific data structure which stores the partial
// result and then calculates the final result and append that final result
// to the chunk provided.
AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ AppendFinalResult2Chunk/ GetFinalResult

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the original name, which indicates the result is appended to the output chunk

}

type baseAggFunc struct {
input []expression.Expression
output []int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments for these two args.

}
97 changes: 97 additions & 0 deletions executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package aggfuncs

import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/expression/aggregation"
)

// Build is used to build a specific AggFunc implementation according to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add . at the end of this comment.
so as the other comments

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// input aggFuncDesc
func Build(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
switch aggFuncDesc.Name {
case ast.AggFuncCount:
return buildCount(aggFuncDesc, output)
case ast.AggFuncSum:
return buildSum(aggFuncDesc, output)
case ast.AggFuncAvg:
return buildAvg(aggFuncDesc, output)
case ast.AggFuncFirstRow:
return buildFirstRow(aggFuncDesc, output)
case ast.AggFuncMax:
return buildMax(aggFuncDesc, output)
case ast.AggFuncMin:
return buildMin(aggFuncDesc, output)
case ast.AggFuncGroupConcat:
return buildGroupConcat(aggFuncDesc, output)
case ast.AggFuncBitOr:
return buildBitOr(aggFuncDesc, output)
case ast.AggFuncBitXor:
return buildBitXor(aggFuncDesc, output)
case ast.AggFuncBitAnd:
return buildBitAnd(aggFuncDesc, output)
}
return nil
}

// buildCount builds the AggFunc implementation for function "COUNT".
func buildCount(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
return nil
}

// buildCount builds the AggFunc implementation for function "SUM".
func buildSum(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
return nil
}

// buildCount builds the AggFunc implementation for function "AVG".
func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
return nil
}

// buildCount builds the AggFunc implementation for function "FIRST_ROW".
func buildFirstRow(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
return nil
}

// buildCount builds the AggFunc implementation for function "MAX".
func buildMax(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
return nil
}

// buildCount builds the AggFunc implementation for function "MIN".
func buildMin(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
return nil
}

// buildCount builds the AggFunc implementation for function "GROUP_CONCAT".
func buildGroupConcat(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
return nil
}

// buildCount builds the AggFunc implementation for function "BIT_OR".
func buildBitOr(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
return nil
}

// buildCount builds the AggFunc implementation for function "BIT_XOR".
func buildBitXor(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
return nil
}

// buildCount builds the AggFunc implementation for function "BIT_AND".
func buildBitAnd(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
return nil
}
82 changes: 72 additions & 10 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/mysql"
Expand Down Expand Up @@ -189,11 +190,15 @@ type StreamAggExec struct {
curGroupKey []types.Datum
tmpGroupKey []types.Datum

// for chunk execution.
inputIter *chunk.Iterator4Chunk
inputRow chunk.Row
mutableRow chunk.MutRow
rowBuffer []types.Datum

// for the new execution framework of aggregate functions
newAggFuncs []aggfuncs.AggFunc
partialBytes [][]byte
groupRows []chunk.Row
}

// Open implements the Executor Open interface.
Expand All @@ -209,9 +214,16 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
e.mutableRow = chunk.MutRowFromTypes(e.retTypes())
e.rowBuffer = make([]types.Datum, 0, e.Schema().Len())

e.aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.AggFuncs))
for _, agg := range e.AggFuncs {
e.aggCtxs = append(e.aggCtxs, agg.CreateContext(e.ctx.GetSessionVars().StmtCtx))
if e.newAggFuncs != nil {
e.partialBytes = make([][]byte, 0, len(e.newAggFuncs))
for _, newAggFunc := range e.newAggFuncs {
e.partialBytes = append(e.partialBytes, newAggFunc.AllocPartialResult())
}
} else {
e.aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.AggFuncs))
for _, agg := range e.AggFuncs {
e.aggCtxs = append(e.aggCtxs, agg.CreateContext(e.ctx.GetSessionVars().StmtCtx))
}
}

return nil
Expand Down Expand Up @@ -242,13 +254,24 @@ func (e *StreamAggExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) e
return errors.Trace(err)
}
if meetNewGroup {
e.appendResult2Chunk(chk)
}
for i, af := range e.AggFuncs {
err := af.Update(e.aggCtxs[i], e.StmtCtx, e.inputRow)
err := e.consumeGroupRows()
if err != nil {
return errors.Trace(err)
}
err = e.appendResult2Chunk(chk)
if err != nil {
return errors.Trace(err)
}
}
if e.newAggFuncs != nil {
e.groupRows = append(e.groupRows, e.inputRow)
} else {
for i, af := range e.AggFuncs {
err := af.Update(e.aggCtxs[i], e.StmtCtx, e.inputRow)
if err != nil {
return errors.Trace(err)
}
}
}
if meetNewGroup {
e.inputRow = e.inputIter.Next()
Expand All @@ -259,19 +282,44 @@ func (e *StreamAggExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) e
return nil
}

func (e *StreamAggExec) consumeGroupRows() error {
if len(e.groupRows) == 0 {
return nil
}

for i, newAggFunc := range e.newAggFuncs {
err := newAggFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialBytes[i])
if err != nil {
return errors.Trace(err)
}
}
e.groupRows = e.groupRows[:0]
return nil
}

func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk) error {
if e.inputRow != e.inputIter.End() {
return nil
}

if e.newAggFuncs != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to consumeGroupRows here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before calling fetchChildIfNecessary, we may have some unconsumed rows stored in e.childrenResults[0], we should consume them before calling e.children[0].Next, which will reset e.childrenResults[0] before execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put this check between line 279 and line 280 may be better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if we put this check to that position, we have to call consumeGroupRows() for every input row.

err := e.consumeGroupRows()
if err != nil {
return errors.Trace(err)
}
}

err := e.children[0].Next(ctx, e.childrenResults[0])
if err != nil {
return errors.Trace(err)
}
// No more data.
if e.childrenResults[0].NumRows() == 0 {
if e.hasData || len(e.GroupByItems) == 0 {
e.appendResult2Chunk(chk)
err := e.appendResult2Chunk(chk)
if err != nil {
return errors.Trace(err)
}
}
e.executed = true
return nil
Expand All @@ -285,14 +333,28 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch

// appendResult2Chunk appends result of all the aggregation functions to the
// result chunk, and reset the evaluation context for each aggregation.
func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) {
func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) error {
if e.newAggFuncs != nil {
for i, newAggFunc := range e.newAggFuncs {
err := newAggFunc.AppendFinalResult2Chunk(e.ctx, e.partialBytes[i], chk)
if err != nil {
return errors.Trace(err)
}
newAggFunc.ResetPartialResult(e.partialBytes[i])
}
if len(e.newAggFuncs) == 0 {
chk.SetNumVirtualRows(chk.NumRows() + 1)
}
return nil
}
e.rowBuffer = e.rowBuffer[:0]
for i, af := range e.AggFuncs {
e.rowBuffer = append(e.rowBuffer, af.GetResult(e.aggCtxs[i]))
af.ResetContext(e.ctx.GetSessionVars().StmtCtx, e.aggCtxs[i])
}
e.mutableRow.SetDatums(e.rowBuffer...)
chk.AppendRow(e.mutableRow.ToRow())
return nil
}

// meetNewGroup returns a value that represents if the new group is different from last group.
Expand Down
11 changes: 10 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -919,8 +920,16 @@ func (b *executorBuilder) buildStreamAgg(v *plan.PhysicalStreamAgg) Executor {
AggFuncs: make([]aggregation.Aggregation, 0, len(v.AggFuncs)),
GroupByItems: v.GroupByItems,
}
for _, aggDesc := range v.AggFuncs {
newAggFuncs := make([]aggfuncs.AggFunc, 0, len(v.AggFuncs))
for i, aggDesc := range v.AggFuncs {
e.AggFuncs = append(e.AggFuncs, aggDesc.GetAggFunc())
newAggFunc := aggfuncs.Build(aggDesc, []int{i})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to pass a slice
since there is only one element in the slice?

if newAggFunc != nil {
newAggFuncs = append(newAggFuncs, newAggFunc)
}
}
if len(newAggFuncs) == len(v.AggFuncs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment for this check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment for this check.

e.newAggFuncs = newAggFuncs
}
metrics.ExecutorCounter.WithLabelValues("StreamAggExec").Inc()
return e
Expand Down