Skip to content

Commit

Permalink
planner: enforce projection push down (#25450) (#25741)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jul 22, 2021
1 parent fb9cbb3 commit 9094b44
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 108 deletions.
2 changes: 1 addition & 1 deletion executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (s *tiflashTestSuite) TestTiFlashPartitionTableReader(c *C) {

tk.MustExec("SET tidb_enforce_mpp=1")
tk.MustExec("set @@session.tidb_isolation_read_engines='tiflash'")
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
l, r := rand.Intn(400), rand.Intn(400)
if l > r {
l, r = r, l
Expand Down
26 changes: 19 additions & 7 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2011,13 +2011,25 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty
if !ok {
return nil, true, nil
}
proj := PhysicalProjection{
Exprs: p.Exprs,
CalculateNoDelay: p.CalculateNoDelay,
AvoidColumnEvaluator: p.AvoidColumnEvaluator,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, newProp)
proj.SetSchema(p.schema)
return []PhysicalPlan{proj}, true, nil
newProps := []*property.PhysicalProperty{newProp}
// generate a mpp task candidate if enforced mpp
if newProp.TaskTp != property.MppTaskType && p.SCtx().GetSessionVars().IsMPPEnforced() && p.canPushToCop(kv.TiFlash) &&
expression.CanExprsPushDown(p.SCtx().GetSessionVars().StmtCtx, p.Exprs, p.SCtx().GetClient(), kv.TiFlash) {
mppProp := newProp.CloneEssentialFields()
mppProp.TaskTp = property.MppTaskType
newProps = append(newProps, mppProp)
}
ret := make([]PhysicalPlan, 0, len(newProps))
for _, newProp := range newProps {
proj := PhysicalProjection{
Exprs: p.Exprs,
CalculateNoDelay: p.CalculateNoDelay,
AvoidColumnEvaluator: p.AvoidColumnEvaluator,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, newProp)
proj.SetSchema(p.schema)
ret = append(ret, proj)
}
return ret, true, nil
}

func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPlan {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3207,7 +3207,7 @@ func (s *testIntegrationSerialSuite) TestPushDownProjectionForMPP(c *C) {
}
}

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_opt_broadcast_join=0;")
tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_opt_broadcast_join=0; set @@tidb_enforce_mpp=1;")

var input []string
var output []struct {
Expand Down
Loading

0 comments on commit 9094b44

Please sign in to comment.