-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: introduce a new execution framework for aggregate functions #6852
Changes from 19 commits
182658b
01ec369
faa3f7d
1ebf4d5
038135a
7f1fd3a
e5acefb
e25d273
dca132a
81c812b
cacad7c
d3a1ced
cf34a16
b1335c3
ce821f9
dbbef24
57f57b8
5f17822
aa71782
f3b005f
c7f7144
66801be
0d859db
7f8134c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
// Copyright 2018 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package aggfuncs | ||
|
||
import ( | ||
"unsafe" | ||
|
||
"github.com/pingcap/tidb/expression" | ||
"github.com/pingcap/tidb/sessionctx" | ||
"github.com/pingcap/tidb/util/chunk" | ||
) | ||
|
||
// All the AggFunc implementations are listed here for navigation. | ||
var ( | ||
// All the AggFunc implementations for "COUNT" are listed here. | ||
// All the AggFunc implementations for "SUM" are listed here. | ||
// All the AggFunc implementations for "AVG" are listed here. | ||
// All the AggFunc implementations for "FIRSTROW" are listed here. | ||
// All the AggFunc implementations for "MAX" are listed here. | ||
// All the AggFunc implementations for "MIN" are listed here. | ||
// All the AggFunc implementations for "GROUP_CONCAT" are listed here. | ||
// All the AggFunc implementations for "BIT_OR" are listed here. | ||
// All the AggFunc implementations for "BIT_XOR" are listed here. | ||
// All the AggFunc implementations for "BIT_AND" are listed here. | ||
) | ||
|
||
// PartialResult represents data structure to store the partial result for the | ||
// aggregate functions. Here we use unsafe.Pointer to allow the partial result | ||
// to be any type. | ||
type PartialResult unsafe.Pointer | ||
|
||
// AggFunc is the interface to evaluate the aggregate functions. | ||
type AggFunc interface { | ||
// AllocPartialResult allocates a specific data structure to store the | ||
// partial result, initializes it, and converts it to PartialResult to | ||
// return back. Aggregate operator implementation, no matter it's a hash | ||
// or stream, should hold this allocated PartialResult for the further | ||
// operations like: "ResetPartialResult", "UpdatePartialResult". | ||
AllocPartialResult() PartialResult | ||
|
||
// ResetPartialResult resets the partial result to the original state for a | ||
// specific aggregate function. It converts the input PartialResult to the | ||
// specific data structure which stores the partial result and then reset | ||
// every field to the proper original state. | ||
ResetPartialResult(pr PartialResult) | ||
|
||
// UpdatePartialResult updates the specific partial result for an aggregate | ||
// function using the input rows which all belonging to the same data group. | ||
// It converts the PartialResult to the specific data structure which stores | ||
// the partial result and then iterates on the input rows and update that | ||
// partial result according to the functionality and the state of the | ||
// aggregate function. | ||
UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error | ||
|
||
// AppendFinalResult2Chunk finalizes the partial result and append the | ||
// final result to the input chunk. Like other operations, it converts the | ||
// input PartialResult to the specific data structure which stores the | ||
// partial result and then calculates the final result and append that | ||
// final result to the chunk provided. | ||
AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error | ||
} | ||
|
||
type baseAggFunc struct { | ||
// input stores the input arguments for an aggregate function, we should | ||
// call input.EvalXXX to get the actual input data for this function. | ||
input []expression.Expression | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
// output stores the ordinal of the columns in the output chunk, which is | ||
// used to append the partial or final result of this function. | ||
output []int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add comments for these two args. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
// Copyright 2018 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package aggfuncs | ||
|
||
import ( | ||
"github.com/pingcap/tidb/ast" | ||
"github.com/pingcap/tidb/expression/aggregation" | ||
) | ||
|
||
// Build is used to build a specific AggFunc implementation according to the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
// input aggFuncDesc. | ||
func Build(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
switch aggFuncDesc.Name { | ||
case ast.AggFuncCount: | ||
return buildCount(aggFuncDesc, output) | ||
case ast.AggFuncSum: | ||
return buildSum(aggFuncDesc, output) | ||
case ast.AggFuncAvg: | ||
return buildAvg(aggFuncDesc, output) | ||
case ast.AggFuncFirstRow: | ||
return buildFirstRow(aggFuncDesc, output) | ||
case ast.AggFuncMax: | ||
return buildMax(aggFuncDesc, output) | ||
case ast.AggFuncMin: | ||
return buildMin(aggFuncDesc, output) | ||
case ast.AggFuncGroupConcat: | ||
return buildGroupConcat(aggFuncDesc, output) | ||
case ast.AggFuncBitOr: | ||
return buildBitOr(aggFuncDesc, output) | ||
case ast.AggFuncBitXor: | ||
return buildBitXor(aggFuncDesc, output) | ||
case ast.AggFuncBitAnd: | ||
return buildBitAnd(aggFuncDesc, output) | ||
} | ||
return nil | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "COUNT". | ||
func buildCount(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
return nil | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "SUM". | ||
func buildSum(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
return nil | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "AVG". | ||
func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
return nil | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "FIRST_ROW". | ||
func buildFirstRow(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
return nil | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "MAX". | ||
func buildMax(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
return nil | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "MIN". | ||
func buildMin(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
return nil | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "GROUP_CONCAT". | ||
func buildGroupConcat(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
return nil | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "BIT_OR". | ||
func buildBitOr(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
return nil | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "BIT_XOR". | ||
func buildBitXor(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
return nil | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "BIT_AND". | ||
func buildBitAnd(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc { | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ package executor | |
|
||
import ( | ||
"github.com/juju/errors" | ||
"github.com/pingcap/tidb/executor/aggfuncs" | ||
"github.com/pingcap/tidb/expression" | ||
"github.com/pingcap/tidb/expression/aggregation" | ||
"github.com/pingcap/tidb/mysql" | ||
|
@@ -189,11 +190,15 @@ type StreamAggExec struct { | |
curGroupKey []types.Datum | ||
tmpGroupKey []types.Datum | ||
|
||
// for chunk execution. | ||
inputIter *chunk.Iterator4Chunk | ||
inputRow chunk.Row | ||
mutableRow chunk.MutRow | ||
rowBuffer []types.Datum | ||
|
||
// for the new execution framework of aggregate functions | ||
newAggFuncs []aggfuncs.AggFunc | ||
partialResults []aggfuncs.PartialResult | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to hold There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It'e better to let aggregate function implementations to be stateless. If not so, we have to allocate an aggregate function for every group, this is worse when we use it in the hash aggregate operator. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, ClickHouse also has the same aggregate function framework: https://github.com/yandex/ClickHouse/blob/master/dbms/src/AggregateFunctions/AggregateFunctionAvg.h, and so does the Impala: https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/udf/udf.h There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it. |
||
groupRows []chunk.Row | ||
} | ||
|
||
// Open implements the Executor Open interface. | ||
|
@@ -209,9 +214,16 @@ func (e *StreamAggExec) Open(ctx context.Context) error { | |
e.mutableRow = chunk.MutRowFromTypes(e.retTypes()) | ||
e.rowBuffer = make([]types.Datum, 0, e.Schema().Len()) | ||
|
||
e.aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.AggFuncs)) | ||
for _, agg := range e.AggFuncs { | ||
e.aggCtxs = append(e.aggCtxs, agg.CreateContext(e.ctx.GetSessionVars().StmtCtx)) | ||
if e.newAggFuncs != nil { | ||
e.partialResults = make([]aggfuncs.PartialResult, 0, len(e.newAggFuncs)) | ||
for _, newAggFunc := range e.newAggFuncs { | ||
e.partialResults = append(e.partialResults, newAggFunc.AllocPartialResult()) | ||
} | ||
} else { | ||
e.aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.AggFuncs)) | ||
for _, agg := range e.AggFuncs { | ||
e.aggCtxs = append(e.aggCtxs, agg.CreateContext(e.ctx.GetSessionVars().StmtCtx)) | ||
} | ||
} | ||
|
||
return nil | ||
|
@@ -242,13 +254,24 @@ func (e *StreamAggExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) e | |
return errors.Trace(err) | ||
} | ||
if meetNewGroup { | ||
e.appendResult2Chunk(chk) | ||
} | ||
for i, af := range e.AggFuncs { | ||
err := af.Update(e.aggCtxs[i], e.StmtCtx, e.inputRow) | ||
err := e.consumeGroupRows() | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
err = e.appendResult2Chunk(chk) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
} | ||
if e.newAggFuncs != nil { | ||
e.groupRows = append(e.groupRows, e.inputRow) | ||
} else { | ||
for i, af := range e.AggFuncs { | ||
err := af.Update(e.aggCtxs[i], e.StmtCtx, e.inputRow) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
} | ||
} | ||
if meetNewGroup { | ||
e.inputRow = e.inputIter.Next() | ||
|
@@ -259,19 +282,44 @@ func (e *StreamAggExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) e | |
return nil | ||
} | ||
|
||
func (e *StreamAggExec) consumeGroupRows() error { | ||
if len(e.groupRows) == 0 { | ||
return nil | ||
} | ||
|
||
for i, newAggFunc := range e.newAggFuncs { | ||
err := newAggFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResults[i]) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
} | ||
e.groupRows = e.groupRows[:0] | ||
return nil | ||
} | ||
|
||
func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk) error { | ||
if e.inputRow != e.inputIter.End() { | ||
return nil | ||
} | ||
|
||
if e.newAggFuncs != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why we need to consumeGroupRows here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. before calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put this check between line 279 and line 280 may be better? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, if we put this check to that position, we have to call |
||
err := e.consumeGroupRows() | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
} | ||
|
||
err := e.children[0].Next(ctx, e.childrenResults[0]) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
// No more data. | ||
if e.childrenResults[0].NumRows() == 0 { | ||
if e.hasData || len(e.GroupByItems) == 0 { | ||
e.appendResult2Chunk(chk) | ||
err := e.appendResult2Chunk(chk) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
} | ||
e.executed = true | ||
return nil | ||
|
@@ -285,14 +333,28 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch | |
|
||
// appendResult2Chunk appends result of all the aggregation functions to the | ||
// result chunk, and reset the evaluation context for each aggregation. | ||
func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) { | ||
func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) error { | ||
if e.newAggFuncs != nil { | ||
for i, newAggFunc := range e.newAggFuncs { | ||
err := newAggFunc.AppendFinalResult2Chunk(e.ctx, e.partialResults[i], chk) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
newAggFunc.ResetPartialResult(e.partialResults[i]) | ||
} | ||
if len(e.newAggFuncs) == 0 { | ||
chk.SetNumVirtualRows(chk.NumRows() + 1) | ||
} | ||
return nil | ||
} | ||
e.rowBuffer = e.rowBuffer[:0] | ||
for i, af := range e.AggFuncs { | ||
e.rowBuffer = append(e.rowBuffer, af.GetResult(e.aggCtxs[i])) | ||
af.ResetContext(e.ctx.GetSessionVars().StmtCtx, e.aggCtxs[i]) | ||
} | ||
e.mutableRow.SetDatums(e.rowBuffer...) | ||
chk.AppendRow(e.mutableRow.ToRow()) | ||
return nil | ||
} | ||
|
||
// meetNewGroup returns a value that represents if the new group is different from last group. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ import ( | |
"github.com/pingcap/tidb/ast" | ||
"github.com/pingcap/tidb/distsql" | ||
"github.com/pingcap/tidb/domain" | ||
"github.com/pingcap/tidb/executor/aggfuncs" | ||
"github.com/pingcap/tidb/expression" | ||
"github.com/pingcap/tidb/expression/aggregation" | ||
"github.com/pingcap/tidb/infoschema" | ||
|
@@ -919,8 +920,20 @@ func (b *executorBuilder) buildStreamAgg(v *plan.PhysicalStreamAgg) Executor { | |
AggFuncs: make([]aggregation.Aggregation, 0, len(v.AggFuncs)), | ||
GroupByItems: v.GroupByItems, | ||
} | ||
for _, aggDesc := range v.AggFuncs { | ||
newAggFuncs := make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)) | ||
for i, aggDesc := range v.AggFuncs { | ||
e.AggFuncs = append(e.AggFuncs, aggDesc.GetAggFunc()) | ||
newAggFunc := aggfuncs.Build(aggDesc, []int{i}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to pass a slice |
||
if newAggFunc != nil { | ||
newAggFuncs = append(newAggFuncs, newAggFunc) | ||
} | ||
} | ||
|
||
// Once we have successfully build all the aggregate functions to the new | ||
// aggregate function execution framework, we can store them to the stream | ||
// aggregate operator to indicate it using the new execution framework. | ||
if len(newAggFuncs) == len(v.AggFuncs) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment for this check. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment for this check. |
||
e.newAggFuncs = newAggFuncs | ||
} | ||
metrics.ExecutorCounter.WithLabelValues("StreamAggExec").Inc() | ||
return e | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/ AppendFinalResult2Chunk/ GetFinalResult
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the original name, which indicates the result is appended to the output chunk