Skip to content

Commit

Permalink
Merge pull request #12 from winoros/fix-apply
Browse files Browse the repository at this point in the history
planner: let scope only be affected when check FDs
  • Loading branch information
AilinKid authored Mar 17, 2022
2 parents 548ff66 + 2e5249e commit 030a6ba
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 30 deletions.
24 changes: 0 additions & 24 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4775,30 +4775,6 @@ func (b *PlanBuilder) buildApplyWithJoinType(outerPlan, innerPlan LogicalPlan, t
for i := outerPlan.Schema().Len(); i < ap.Schema().Len(); i++ {
ap.names[i] = types.EmptyName
}
// build the join correlated equal condition for apply join, this equal condition is used for deriving the transitive FD between outer and inner side.
correlatedCols := ExtractCorrelatedCols4LogicalPlan(innerPlan)
deduplicateCorrelatedCols := make(map[int64]*expression.CorrelatedColumn)
for _, cc := range correlatedCols {
if _, ok := deduplicateCorrelatedCols[cc.UniqueID]; !ok {
deduplicateCorrelatedCols[cc.UniqueID] = cc
}
}
// for case like select (select t1.a from t2) from t1. <t1.a> will be assigned with new UniqueID after sub query projection is built.
// we should distinguish them out, building the equivalence relationship from inner <t1.a> == outer <t1.a> in the apply-join for FD derivation.
for _, cc := range deduplicateCorrelatedCols {
// for every correlated column, find the connection with the inner newly built column.
for _, col := range innerPlan.Schema().Columns {
if cc.UniqueID == col.CorrelatedColUniqueID {
ccc := &cc.Column
cond, err := expression.NewFunction(b.ctx, ast.EQ, types.NewFieldType(mysql.TypeTiny), ccc, col)
if err != nil {
// give up connection building for not interfering old logic.
return ap
}
ap.EqualConditions = append(ap.EqualConditions, cond.(*expression.ScalarFunction))
}
}
}
return ap
}

Expand Down
50 changes: 44 additions & 6 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,24 +184,25 @@ func (p *LogicalJoin) Shallow() *LogicalJoin {
func (p *LogicalJoin) ExtractFD() *fd.FDSet {
switch p.JoinType {
case InnerJoin:
return p.extractFDForInnerJoin()
return p.extractFDForInnerJoin(nil)
case LeftOuterJoin, RightOuterJoin:
return p.extractFDForOuterJoin()
return p.extractFDForOuterJoin(nil)
case SemiJoin:
return p.extractFDForSemiJoin()
return p.extractFDForSemiJoin(nil)
default:
return &fd.FDSet{HashCodeToUniqueID: make(map[string]int)}
}
}

func (p *LogicalJoin) extractFDForSemiJoin() *fd.FDSet {
func (p *LogicalJoin) extractFDForSemiJoin(filtersFromApply []expression.Expression) *fd.FDSet {
// 1: since semi join will keep the part or all rows of the outer table, it's outer FD can be saved.
// 2: the un-projected column will be left for the upper layer projection or already be pruned from bottom up.
outerFD, _ := p.children[0].ExtractFD(), p.children[1].ExtractFD()
fds := outerFD

eqCondSlice := expression.ScalarFuncs2Exprs(p.EqualConditions)
allConds := append(eqCondSlice, p.OtherConditions...)
allConds = append(allConds, filtersFromApply...)
notNullColsFromFilters := extractNotNullFromConds(allConds, p)

constUniqueIDs := extractConstantCols(p.LeftConditions, p.SCtx(), fds)
Expand All @@ -212,14 +213,15 @@ func (p *LogicalJoin) extractFDForSemiJoin() *fd.FDSet {
return fds
}

func (p *LogicalJoin) extractFDForInnerJoin() *fd.FDSet {
func (p *LogicalJoin) extractFDForInnerJoin(filtersFromApply []expression.Expression) *fd.FDSet {
leftFD, rightFD := p.children[0].ExtractFD(), p.children[1].ExtractFD()
fds := leftFD
fds.MakeCartesianProduct(rightFD)

eqCondSlice := expression.ScalarFuncs2Exprs(p.EqualConditions)
// some join eq conditions are stored in the OtherConditions.
allConds := append(eqCondSlice, p.OtherConditions...)
allConds = append(allConds, filtersFromApply...)
notNullColsFromFilters := extractNotNullFromConds(allConds, p)

constUniqueIDs := extractConstantCols(allConds, p.SCtx(), fds)
Expand Down Expand Up @@ -251,7 +253,7 @@ func (p *LogicalJoin) extractFDForInnerJoin() *fd.FDSet {
return fds
}

func (p *LogicalJoin) extractFDForOuterJoin() *fd.FDSet {
func (p *LogicalJoin) extractFDForOuterJoin(filtersFromApply []expression.Expression) *fd.FDSet {
outerFD, innerFD := p.children[0].ExtractFD(), p.children[1].ExtractFD()
innerCondition := p.RightConditions
outerCondition := p.LeftConditions
Expand All @@ -273,6 +275,7 @@ func (p *LogicalJoin) extractFDForOuterJoin() *fd.FDSet {
allConds := append(eqCondSlice, p.OtherConditions...)
allConds = append(allConds, innerCondition...)
allConds = append(allConds, outerCondition...)
allConds = append(allConds, filtersFromApply...)
notNullColsFromFilters := extractNotNullFromConds(allConds, p)

filterFD := &fd.FDSet{HashCodeToUniqueID: make(map[string]int)}
Expand Down Expand Up @@ -1033,6 +1036,41 @@ func (la *LogicalApply) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
return corCols
}

func (la *LogicalApply) ExtractFD() *fd.FDSet {
innerPlan := la.children[1]
// build the join correlated equal condition for apply join, this equal condition is used for deriving the transitive FD between outer and inner side.
correlatedCols := ExtractCorrelatedCols4LogicalPlan(innerPlan)
deduplicateCorrelatedCols := make(map[int64]*expression.CorrelatedColumn)
for _, cc := range correlatedCols {
if _, ok := deduplicateCorrelatedCols[cc.UniqueID]; !ok {
deduplicateCorrelatedCols[cc.UniqueID] = cc
}
}
eqCond := make([]expression.Expression, 0, 4)
// for case like select (select t1.a from t2) from t1. <t1.a> will be assigned with new UniqueID after sub query projection is built.
// we should distinguish them out, building the equivalence relationship from inner <t1.a> == outer <t1.a> in the apply-join for FD derivation.
for _, cc := range deduplicateCorrelatedCols {
// for every correlated column, find the connection with the inner newly built column.
for _, col := range innerPlan.Schema().Columns {
if cc.UniqueID == col.CorrelatedColUniqueID {
ccc := &cc.Column
cond := expression.NewFunctionInternal(la.ctx, ast.EQ, types.NewFieldType(mysql.TypeTiny), ccc, col)
eqCond = append(eqCond, cond.(*expression.ScalarFunction))
}
}
}
switch la.JoinType {
case InnerJoin:
return la.extractFDForInnerJoin(eqCond)
case LeftOuterJoin, RightOuterJoin:
return la.extractFDForOuterJoin(eqCond)
case SemiJoin:
return la.extractFDForSemiJoin(eqCond)
default:
return &fd.FDSet{HashCodeToUniqueID: make(map[string]int)}
}
}

// LogicalMaxOneRow checks if a query returns no more than one row.
type LogicalMaxOneRow struct {
baseLogicalPlan
Expand Down

0 comments on commit 030a6ba

Please sign in to comment.