Skip to content

Commit

Permalink
cherry pick pingcap#35443 to release-5.2
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
tiancaiamao authored and ti-srebot committed Jun 28, 2022
1 parent 3279c08 commit 22dcbdb
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 8 deletions.
100 changes: 100 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1529,3 +1529,103 @@ func (s *testSerialSuite) TestAggInDisk(c *C) {
tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t;").Check(testkit.Rows("0"))
tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t group by c1;").Check(testkit.Rows())
}
<<<<<<< HEAD
=======

func TestRandomPanicAggConsume(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_max_chunk_size=32")
tk.MustExec("set @@tidb_init_chunk_size=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
for i := 0; i <= 1000; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%v),(%v),(%v)", i, i, i))
}

fpName := "github.com/pingcap/tidb/executor/ConsumeRandomPanic"
require.NoError(t, failpoint.Enable(fpName, "5%panic(\"ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]\")"))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()

// Test 10 times panic for each AggExec.
var res sqlexec.RecordSet
for i := 1; i <= 10; i++ {
var err error
for err == nil {
// Test paralleled hash agg.
res, err = tk.Exec("select /*+ HASH_AGG() */ count(a) from t group by a")
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
}
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")

err = nil
for err == nil {
// Test unparalleled hash agg.
res, err = tk.Exec("select /*+ HASH_AGG() */ count(distinct a) from t")
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
}
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")

err = nil
for err == nil {
// Test stream agg.
res, err = tk.Exec("select /*+ STREAM_AGG() */ count(a) from t")
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
}
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")
}
}

func TestIssue35295(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t100")
// This bug only happens on partition prune mode = 'static'
tk.MustExec("set @@tidb_partition_prune_mode = 'static'")
tk.MustExec(`CREATE TABLE t100 (
ID bigint(20) unsigned NOT NULL AUTO_INCREMENT,
col1 int(10) NOT NULL DEFAULT '0' COMMENT 'test',
money bigint(20) NOT NULL COMMENT 'test',
logtime datetime NOT NULL COMMENT '记录时间',
PRIMARY KEY (ID,logtime)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=1 COMMENT='test'
PARTITION BY RANGE COLUMNS(logtime) (
PARTITION p20220608 VALUES LESS THAN ("20220609"),
PARTITION p20220609 VALUES LESS THAN ("20220610"),
PARTITION p20220610 VALUES LESS THAN ("20220611"),
PARTITION p20220611 VALUES LESS THAN ("20220612"),
PARTITION p20220612 VALUES LESS THAN ("20220613"),
PARTITION p20220613 VALUES LESS THAN ("20220614"),
PARTITION p20220614 VALUES LESS THAN ("20220615"),
PARTITION p20220615 VALUES LESS THAN ("20220616"),
PARTITION p20220616 VALUES LESS THAN ("20220617"),
PARTITION p20220617 VALUES LESS THAN ("20220618"),
PARTITION p20220618 VALUES LESS THAN ("20220619"),
PARTITION p20220619 VALUES LESS THAN ("20220620"),
PARTITION p20220620 VALUES LESS THAN ("20220621"),
PARTITION p20220621 VALUES LESS THAN ("20220622"),
PARTITION p20220622 VALUES LESS THAN ("20220623"),
PARTITION p20220623 VALUES LESS THAN ("20220624"),
PARTITION p20220624 VALUES LESS THAN ("20220625")
);`)
tk.MustExec("insert into t100(col1,money,logtime) values (100,10,'2022-06-09 00:00:00');")
tk.MustExec("insert into t100(col1,money,logtime) values (100,10,'2022-06-10 00:00:00');")
tk.MustQuery("SELECT /*+STREAM_AGG()*/ col1,sum(money) FROM t100 WHERE logtime>='2022-06-09 00:00:00' AND col1=100 ;").Check(testkit.Rows("100 20"))
tk.MustQuery("SELECT /*+HASH_AGG()*/ col1,sum(money) FROM t100 WHERE logtime>='2022-06-09 00:00:00' AND col1=100 ;").Check(testkit.Rows("100 20"))
}
>>>>>>> d99b35822... *: only add default value for final aggregation to fix the aggregate push down (partition) union case (#35443)
10 changes: 8 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,9 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
if v.IsFinalAgg() {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
}
}
for _, aggDesc := range v.AggFuncs {
if aggDesc.HasDistinct || len(aggDesc.OrderByItems) > 0 {
Expand Down Expand Up @@ -1363,10 +1365,14 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu
groupChecker: newVecGroupChecker(b.ctx, v.GroupByItems),
aggFuncs: make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)),
}

if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
// Only do this for final agg, see issue #35295, #30923
if v.IsFinalAgg() {
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
}
}
for i, aggDesc := range v.AggFuncs {
aggFunc := aggfuncs.Build(b.ctx, aggDesc, i)
Expand Down
2 changes: 0 additions & 2 deletions expression/aggregation/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ func (a *AggFuncDesc) Split(ordinal []int) (partialAggDesc, finalAggDesc *AggFun
partialAggDesc.Mode = Partial1Mode
} else if a.Mode == FinalMode {
partialAggDesc.Mode = Partial2Mode
} else {
panic("Error happened during AggFuncDesc.Split, the AggFunctionMode is not CompleteMode or FinalMode.")
}
finalAggDesc = &AggFuncDesc{
Mode: FinalMode, // We only support FinalMode now in final phase.
Expand Down
2 changes: 1 addition & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ type basePhysicalAgg struct {
MppPartitionCols []*property.MPPPartitionColumn
}

func (p *basePhysicalAgg) isFinalAgg() bool {
func (p *basePhysicalAgg) IsFinalAgg() bool {
if len(p.AggFuncs) > 0 {
if p.AggFuncs[0].Mode == aggregation.FinalMode || p.AggFuncs[0].Mode == aggregation.CompleteMode {
return true
Expand Down
16 changes: 16 additions & 0 deletions planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,22 @@ func (a *aggregationPushDownSolver) tryAggPushDownForUnion(union *LogicalUnionAl
}
}
pushedAgg := a.splitPartialAgg(agg)
<<<<<<< HEAD
=======
if pushedAgg == nil {
return nil
}

// Update the agg mode for the pushed down aggregation.
for _, aggFunc := range pushedAgg.AggFuncs {
if aggFunc.Mode == aggregation.CompleteMode {
aggFunc.Mode = aggregation.Partial1Mode
} else if aggFunc.Mode == aggregation.FinalMode {
aggFunc.Mode = aggregation.Partial2Mode
}
}

>>>>>>> d99b35822... *: only add default value for final aggregation to fix the aggregate push down (partition) union case (#35443)
newChildren := make([]LogicalPlan, 0, len(union.Children()))
for _, child := range union.Children() {
newChild, err := a.pushAggCrossUnion(pushedAgg, union.Schema(), child)
Expand Down
4 changes: 2 additions & 2 deletions planner/core/rule_eliminate_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool {
// passing down the aggregation mode to TiFlash.
if physicalAgg, ok := p.Children()[0].(*PhysicalHashAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
if physicalAgg.IsFinalAgg() {
return false
}
}
}
if physicalAgg, ok := p.Children()[0].(*PhysicalStreamAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
if physicalAgg.IsFinalAgg() {
return false
}
}
Expand Down
28 changes: 27 additions & 1 deletion planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1584,8 +1584,22 @@ func BuildFinalModeAggregation(
}
}

<<<<<<< HEAD
finalAggFunc.HasDistinct = true
finalAggFunc.Mode = aggregation.CompleteMode
=======
finalAggFunc.OrderByItems = byItems
finalAggFunc.HasDistinct = aggFunc.HasDistinct
// In logical optimize phase, the Agg->PartitionUnion->TableReader may become
// Agg1->PartitionUnion->Agg2->TableReader, and the Agg2 is a partial aggregation.
// So in the push down here, we need to add a new if-condition check:
// If the original agg mode is partial already, the finalAggFunc's mode become Partial2.
if aggFunc.Mode == aggregation.CompleteMode {
finalAggFunc.Mode = aggregation.CompleteMode
} else if aggFunc.Mode == aggregation.Partial1Mode || aggFunc.Mode == aggregation.Partial2Mode {
finalAggFunc.Mode = aggregation.Partial2Mode
}
>>>>>>> d99b35822... *: only add default value for final aggregation to fix the aggregate push down (partition) union case (#35443)
} else {
if aggregation.NeedCount(finalAggFunc.Name) {
if isMPPTask && finalAggFunc.Name == ast.AggFuncCount {
Expand Down Expand Up @@ -1642,8 +1656,20 @@ func BuildFinalModeAggregation(
partial.AggFuncs = append(partial.AggFuncs, aggFunc)
}

<<<<<<< HEAD
finalAggFunc.Mode = aggregation.FinalMode
funcMap[aggFunc] = finalAggFunc
=======
// In logical optimize phase, the Agg->PartitionUnion->TableReader may become
// Agg1->PartitionUnion->Agg2->TableReader, and the Agg2 is a partial aggregation.
// So in the push down here, we need to add a new if-condition check:
// If the original agg mode is partial already, the finalAggFunc's mode become Partial2.
if aggFunc.Mode == aggregation.CompleteMode {
finalAggFunc.Mode = aggregation.FinalMode
} else if aggFunc.Mode == aggregation.Partial1Mode || aggFunc.Mode == aggregation.Partial2Mode {
finalAggFunc.Mode = aggregation.Partial2Mode
}
>>>>>>> d99b35822... *: only add default value for final aggregation to fix the aggregate push down (partition) union case (#35443)
}

finalAggFunc.Args = args
Expand Down Expand Up @@ -1699,7 +1725,7 @@ func (p *basePhysicalAgg) convertAvgForMPP() *PhysicalProjection {
}
// no avgs
// for final agg, always add project due to in-compatibility between TiDB and TiFlash
if len(p.schema.Columns) == len(newSchema.Columns) && !p.isFinalAgg() {
if len(p.schema.Columns) == len(newSchema.Columns) && !p.IsFinalAgg() {
return nil
}
// add remaining columns to exprs
Expand Down

0 comments on commit 22dcbdb

Please sign in to comment.