Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: introduce a new execution framework for aggregate functions #6852

Merged
merged 24 commits into from
Jun 29, 2018

Conversation

zz-jason
Copy link
Member

@zz-jason zz-jason commented Jun 17, 2018

What have you changed? (mandatory)

Introduce a new interface named AggFunc defined in executor/aggfuncs/aggfuncs.go to refactor the execution framework of aggregate functions. The main usage of the new execution framework is:

  1. use AllocPartialResult() to allocate the struct to store the partial result for every aggregate function
  2. use UpdatePartialResult() to update the partial result for every aggregate function, no mater whether the input is the original or partial data. The input partialBytes will be converted to the specific partial result struct before update.
  3. use ResetPartialResult() to reset or reinitialize the partial result for every aggregate function. The input partialBytes will be converted to the specific partial result struct before reinitialization.
  4. use AppendFinalResult2Chunk() to finalize the partial result to the input chk. The input partialBytes will be converted to the specific partial result before finalization every group.

The main improvements are:

  1. by calling UpdatePartialResult() with []chunk.Row, we can reduce the total function calls, which saves a lot of time. And for stream aggregate, the input data for a aggregate function are stored sequentially in the input []chunk.Row, which can further improve the CPU cache performance.
  2. by calling AllocPartialResult() to allocate the specific struct to store the partial result for every aggregate function, we can reduce the redundant memory usage in the old struct AggEvaluateContext.

Use aggfuncs.Build to create a AggFunc according to the AggFuncDesc. For now:

  1. only partially supported some implementations of AVG
  2. the new execution framework is only supported in the StreamAggExec if possiable

What are the type of the changes (mandatory)?

  • Improvement (non-breaking change which is an improvement to an existing feature)

How has this PR been tested (mandatory)?

  • unit test
  • explain test

Does this PR affect documentation (docs/docs-cn) update? (optional)

No

Benchmark result if necessary (optional)

test sql:

mysql root@172.16.10.112:tpch> desc select avg(L_QUANTITY) from (select * from lineitem union all select * from lineitem) tmp;
+----------------+--------------+-------------------------------+------+-----------------------------------------------------+--------------+
| id             | parents      | children                      | task | operator info                                       | count        |
+----------------+--------------+-------------------------------+------+-----------------------------------------------------+--------------+
| TableScan_23   |              |                               | cop  | table:lineitem, range:[-inf,+inf], keep order:false | 59986052.00  |
| TableReader_24 | Union_21     |                               | root | data:TableScan_23                                   | 59986052.00  |
| TableScan_26   |              |                               | cop  | table:lineitem, range:[-inf,+inf], keep order:false | 59986052.00  |
| TableReader_27 | Union_21     |                               | root | data:TableScan_26                                   | 59986052.00  |
| Union_21       | StreamAgg_13 | TableReader_24,TableReader_27 | root |                                                     | 119972104.00 |
| StreamAgg_13   |              | Union_21                      | root | funcs:avg(tmp.l_quantity)                           | 1.00         |
+----------------+--------------+-------------------------------+------+-----------------------------------------------------+--------------+
6 rows in set
Time: 0.017s

Before this PR:

mysql root@172.16.10.112:tpch> select avg(L_QUANTITY) from (select * from lineitem union all select * from lineitem) tmp;
+-----------------+
| avg(L_QUANTITY) |
+-----------------+
| 25.501562       |
+-----------------+
1 row in set
Time: 54.508s

After this PR:

mysql root@172.16.10.112:tpch> select avg(L_QUANTITY) from (select * from lineitem union all select * from lineitem) tmp;
+-----------------+
| avg(L_QUANTITY) |
+-----------------+
| 25.501562       |
+-----------------+
1 row in set
Time: 27.767s

The performance gain is about 96%

@zz-jason
Copy link
Member Author

/run-all-tests

@winoros
Copy link
Member

winoros commented Jun 20, 2018

Before's result and After's is placed in the wrong position?

@zz-jason
Copy link
Member Author

@winoros updated

@zz-jason zz-jason added sig/execution SIG execution type/enhancement The issue or PR belongs to an enhancement. labels Jun 21, 2018
@zz-jason
Copy link
Member Author

/run-all-tests

@zz-jason
Copy link
Member Author

/run-all-tests

1 similar comment
@zz-jason
Copy link
Member Author

/run-all-tests

@zz-jason
Copy link
Member Author

/run-all-tests

@zz-jason
Copy link
Member Author

/run-all-tests

@zz-jason
Copy link
Member Author

/run-all-tests

@zz-jason
Copy link
Member Author

@XuHuaiyu @winoros @lamxTyler PTAL

@zz-jason
Copy link
Member Author

@lysu PTAL

func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) {
func (e *StreamAggExec) appendResult2Chunk(chk *chunk.Chunk) error {
if e.newAggFuncs != nil {
fmt.Printf("StreamAggExec.appendResult2Chunk: use new aggfunc\n")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the debug log.

type AggFunc interface {
// AllocPartialResult allocates a specific data structure to store the
// partial result, initializes it, and converts it to a bype slice to return
// back. Aggregate operator implementations, no mater whether it's a hash or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ mater/ matter

"github.com/pingcap/tidb/expression/aggregation"
)

// Build is used to build a specific AggFunc implementation according to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add . at the end of this comment.
so as the other comments

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

newAggFuncs = append(newAggFuncs, newAggFunc)
}
}
if len(newAggFuncs) == len(v.AggFuncs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment for this check.

newAggFuncs = append(newAggFuncs, newAggFunc)
}
}
if len(newAggFuncs) == len(v.AggFuncs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment for this check.


type baseAggFunc struct {
input []expression.Expression
output []int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments for these two args.

func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk) error {
if e.inputRow != e.inputIter.End() {
return nil
}

if e.newAggFuncs != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to consumeGroupRows here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before calling fetchChildIfNecessary, we may have some unconsumed rows stored in e.childrenResults[0], we should consume them before calling e.children[0].Next, which will reset e.childrenResults[0] before execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put this check between line 279 and line 280 may be better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if we put this check to that position, we have to call consumeGroupRows() for every input row.

// input byte slice to the specific data structure which stores the partial
// result and then calculates the final result and append that final result
// to the chunk provided.
AppendFinalResult2Chunk(sctx sessionctx.Context, partialBytes []byte, chk *chunk.Chunk) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ AppendFinalResult2Chunk/ GetFinalResult

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the original name, which indicates the result is appended to the output chunk

// back. Aggregate operator implementations, no mater whether it's a hash or
// stream implementation, should hold this byte slice for further operations
// like: "ResetPartialResult", "UpdatePartialResult".
AllocPartialResult() []byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we need another struct which contains partialResultBytes to handle the hashagg evaluation and aggfunc with distinct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for now, we can just add a map field in a specific aggregate function implementation, during the execution of UpdatePartialResult we use that map to deduplicate the input, when ResetPartialResult, we reset that map.

@@ -100,6 +100,7 @@ func (e *avgDedup4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGr

type avgOriginal4Decimal struct {
baseAvgDecimal
deDuper map[types.MyDecimal]bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deDuper should be initialized

@@ -80,11 +81,20 @@ func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, output []int) AggFunc {
case aggregation.CompleteMode, aggregation.Partial1Mode:
switch aggFuncDesc.Args[0].GetType().Tp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should consider all the input types,
use EvalType here may be better?

e.AggFuncs = append(e.AggFuncs, aggDesc.GetAggFunc())
newAggFunc := aggfuncs.Build(aggDesc, []int{i})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to pass a slice
since there is only one element in the slice?

// input PartialResult to the specific data structure which stores the
// partial result and then calculates the final result and append that
// final result to the chunk provided.
AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ AppendFinalResult2Chunk/ GetFinalResult

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the original name, which indicates the result is appended to the output chunk

type baseAggFunc struct {
// input stores the input arguments for an aggregate function, we should
// call input.EvalXXX to get the actual input data for this function.
input []expression.Expression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. s/ input/ args may be clearer.
  2. we do not need to define output as a slice,
    since we only use it to append the final result to a chunk.

@XuHuaiyu
Copy link
Contributor

Do we need a GetPartialResult func which may used by mocktikv.

@zz-jason
Copy link
Member Author

@XuHuaiyu If we only decide to use it in the final or complete mode, we don't need to add the GetPartialResult, mocktikv can just use the origin old aggregate funcs.

@XuHuaiyu
Copy link
Contributor

PTAL @coocood

@lysu
Copy link
Contributor

lysu commented Jun 28, 2018

/run-all-tests tidb-test=pr/559


// for the new execution framework of aggregate functions
newAggFuncs []aggfuncs.AggFunc
partialResults []aggfuncs.PartialResult
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to hold partialResults here instead of in each AggFunc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'e better to let aggregate function implementations to be stateless. If not so, we have to allocate an aggregate function for every group, this is worse when we use it in the hash aggregate operator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

@XuHuaiyu
Copy link
Contributor

LGTM

@zz-jason zz-jason added the status/LGT1 Indicates that a PR has LGTM 1. label Jun 29, 2018
@coocood
Copy link
Member

coocood commented Jun 29, 2018

LGTM

@coocood coocood added status/LGT2 Indicates that a PR has LGTM 2. and removed status/LGT1 Indicates that a PR has LGTM 1. labels Jun 29, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sig/execution SIG execution status/LGT2 Indicates that a PR has LGTM 2. type/enhancement The issue or PR belongs to an enhancement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants