Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support new aggregate framework for HashAggExec #7268

Merged
merged 8 commits into from
Aug 21, 2018

Conversation

XuHuaiyu
Copy link
Contributor

@XuHuaiyu XuHuaiyu commented Aug 3, 2018

What have you changed? (mandatory)

This PR supports the new aggregate framework for HashAggExec.
Since HashAggExec has supported parallel execution, we introduce
MergePartialResult to merge the partial results in the final phase,
which would avoid the cost of converting PartialResult to chunk.

Please review the following PRs before this PR:

What is the type of the changes? (mandatory)

  • Improvement (non-breaking change which is an improvement to an existing feature)

How has this PR been tested? (mandatory)

exist tests

Does this PR affect documentation (docs/docs-cn) update? (mandatory)

no

Does this PR affect tidb-ansible update? (mandatory)

no

Does this PR need to be added to the release notes? (mandatory)

#6952

Refer to a related PR or issue link (optional)

Benchmark result if necessary (optional)

Add a few positive/negative examples (optional)

@XuHuaiyu XuHuaiyu added type/enhancement The issue or PR belongs to an enhancement. status/WIP sig/execution SIG execution labels Aug 3, 2018
@XuHuaiyu XuHuaiyu added this to the 2.1 milestone Aug 3, 2018
@shenli
Copy link
Member

shenli commented Aug 5, 2018

@XuHuaiyu This PR is too big. Would you please split it into a few small ones?

@XuHuaiyu
Copy link
Contributor Author

XuHuaiyu commented Aug 6, 2018

I'll split this PR into small ones:

  1. refine aggFunc.Build to buildFinalFunc for parallel execution. expression, executor: add a new interface MergePartialResult for the new aggregation framework #7281
  2. add AppendPartialResult2Chunk for AggFunc interface.
  3. modify the evaluate logic using new interface for hash agg.

@zz-jason
Copy link
Member

@XuHuaiyu All the dependency are satisfied, is this PR ready to be reviewed?

@XuHuaiyu
Copy link
Contributor Author

yes, I'm removing the useless code. @zz-jason

@XuHuaiyu
Copy link
Contributor Author

/run-all-tests

@XuHuaiyu XuHuaiyu added the priority/P1 The issue has P1 priority. label Aug 15, 2018
@XuHuaiyu
Copy link
Contributor Author

/run-all-tests

@XuHuaiyu
Copy link
Contributor Author

/run-common-test tidb-test=pr/599
/run-integration-common-test tidb-test=pr/599

for _, arg := range e.args {
v, isNull, err = arg.EvalString(sctx, row)
if err != nil {
return errors.Trace(err)
}
if isNull {
continue
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -31,16 +30,16 @@ import (
"golang.org/x/net/context"
)

type aggCtxsMapper map[string][]*aggregation.AggEvaluateContext
type aggCtxsMapper map[string][]aggfuncs.PartialResult
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change its name to something like partialResultMapper?

@XuHuaiyu
Copy link
Contributor Author

PTAL @winoros @zz-jason

1 similar comment
@XuHuaiyu
Copy link
Contributor Author

PTAL @winoros @zz-jason

@@ -51,9 +51,9 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool {
func resolveColumnAndReplace(origin *expression.Column, replace map[string]*expression.Column) {
dst := replace[string(origin.HashCode(nil))]
if dst != nil {
colName := origin.ColName
colName, retType := origin.ColName, origin.RetType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why make this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.

tidb> desc select distinct a from t2 ;
+---------------------+----------+------+-------------------------------------------------------------+
| id                  | count    | task | operator info                                               |
+---------------------+----------+------+-------------------------------------------------------------+
| HashAgg_4           | 8000.00  | root | group by:test.t2.a, funcs:firstrow(test.t2.a)               |
| └─TableReader_8     | 10000.00 | root | data:TableScan_7                                            |
|   └─TableScan_7     | 10000.00 | cop  | table:t2, range:[-inf,+inf], keep order:false, stats:pseudo |
+---------------------+----------+------+-------------------------------------------------------------+
3 rows in set (0.00 sec)

There will be a Projection between HashAgg4 and TableReader_8 during logical plan building,
whose schema is [a (type enum)].
We reset the return type of origin (which is from HashAgg4's schema) here to make sure the return type of HashAgg_4 is the result of type inferring rather than the return type of Projection.

@@ -594,6 +594,11 @@ func (b *planBuilder) buildDistinct(child LogicalPlan, length int) *LogicalAggre
}
plan4Agg.SetChildren(child)
plan4Agg.SetSchema(child.Schema().Clone())
// Distinct will be rewritten as first_row, we reset the type here since the return type
// of first_row is not always the same as the column arg of first_row.
for i, col := range plan4Agg.schema.Columns {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make the same change for all the places that create the schema of aggregate operator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We reset the type of aggregate here since this is not correct to set the schema of plan4Agg as the child's schema in line 596.

If the schema of aggregate is set correctly, we do not need to make the same change.

@@ -345,6 +345,9 @@ func (a *AggFuncDesc) typeInfer4MaxMin(ctx sessionctx.Context) {
a.Args[0] = expression.BuildCastFunction(ctx, a.Args[0], tp)
}
a.RetTp = a.Args[0].GetType()
if a.RetTp.Tp == mysql.TypeEnum || a.RetTp.Tp == mysql.TypeSet {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check TypeBit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeBit has been checked when buildMaxMin and buildFirstRow.
I tried to check it here, but it caused other problems,
I'll try to move the check of TypeBit here individually.

return rows, false
// getPartialResultBatch fetches a batch of partial results from HashAggIntermData.
func (d *HashAggIntermData) getPartialResultBatch(sc *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, aggFuncs []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys [][]byte, reachEnd bool) {
if len(prs) == maxChunkSize {
Copy link
Member

@zz-jason zz-jason Aug 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this function returns all the (group key, partial result) pairs back to the caller, this check can be removed?

@@ -73,7 +72,7 @@ type HashAggFinalWorker struct {

rowBuffer []types.Datum
mutableRow chunk.MutRow
aggCtxsMap aggCtxsMapper
partialResultMap aggPartialResultMapper
groupSet *mvmap.MVMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems groupSet can be declared as map[string]struct{}, or just use executor.aggfuncs.stringSet?

We can leave it to the future PRs.

@shenli
Copy link
Member

shenli commented Aug 19, 2018

@XuHuaiyu Please address the comments.

@XuHuaiyu
Copy link
Contributor Author

/run-all-tests tidb-test=pr/599

@XuHuaiyu
Copy link
Contributor Author

/run-common-test tidb-test=pr/599
/run-integration-common-test tidb-test=pr/599

Copy link
Member

@zz-jason zz-jason left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@shenli shenli added the status/LGT1 Indicates that a PR has LGTM 1. label Aug 20, 2018
@XuHuaiyu
Copy link
Contributor Author

PTAL @winoros

Copy link
Member

@winoros winoros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm
After this pr, if we group_concat on a sorted source. Will it still be no order?

@XuHuaiyu
Copy link
Contributor Author

@winoros
For parallel hash agg, yes.
We need to implement

group_concat([ORDER BY {unsigned_integer | col_name | expr}
                 [ASC | DESC] [,col_name ...]])

to make it orderly.

@XuHuaiyu XuHuaiyu added status/LGT2 Indicates that a PR has LGTM 2. and removed status/LGT1 Indicates that a PR has LGTM 1. labels Aug 21, 2018
@XuHuaiyu XuHuaiyu merged commit 4836aa3 into pingcap:master Aug 21, 2018
@XuHuaiyu XuHuaiyu deleted the hash_agg branch December 12, 2018 09:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority/P1 The issue has P1 priority. sig/execution SIG execution status/LGT2 Indicates that a PR has LGTM 2. type/enhancement The issue or PR belongs to an enhancement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants