Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: export elements in cteClass for late pkg move. #55429

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,17 +294,17 @@ func (c *columnStatsUsageCollector) collectFromPlan(lp base.LogicalPlan) {
case *logicalop.LogicalPartitionUnionAll:
c.collectPredicateColumnsForUnionAll(&x.LogicalUnionAll)
case *LogicalCTE:
// Visit seedPartLogicalPlan and recursivePartLogicalPlan first.
c.collectFromPlan(x.Cte.seedPartLogicalPlan)
if x.Cte.recursivePartLogicalPlan != nil {
c.collectFromPlan(x.Cte.recursivePartLogicalPlan)
// Visit SeedPartLogicalPlan and RecursivePartLogicalPlan first.
c.collectFromPlan(x.Cte.SeedPartLogicalPlan)
if x.Cte.RecursivePartLogicalPlan != nil {
c.collectFromPlan(x.Cte.RecursivePartLogicalPlan)
}
// Schema change from seedPlan/recursivePlan to self.
columns := x.Schema().Columns
seedColumns := x.Cte.seedPartLogicalPlan.Schema().Columns
seedColumns := x.Cte.SeedPartLogicalPlan.Schema().Columns
var recursiveColumns []*expression.Column
if x.Cte.recursivePartLogicalPlan != nil {
recursiveColumns = x.Cte.recursivePartLogicalPlan.Schema().Columns
if x.Cte.RecursivePartLogicalPlan != nil {
recursiveColumns = x.Cte.RecursivePartLogicalPlan.Schema().Columns
}
relatedCols := make([]*expression.Column, 0, 2)
for i, col := range columns {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2483,7 +2483,7 @@ func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bo
if storeTp != kv.TiFlash {
return false
}
if c.Cte.recursivePartLogicalPlan != nil || !c.Cte.seedPartLogicalPlan.CanPushToCop(storeTp) {
if c.Cte.RecursivePartLogicalPlan != nil || !c.Cte.SeedPartLogicalPlan.CanPushToCop(storeTp) {
return false
}
return true
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2914,7 +2914,7 @@ func findBestTask4LogicalCTE(p *LogicalCTE, prop *property.PhysicalProperty, cou
return base.InvalidTask, 1, nil
}
// The physical plan has been build when derive stats.
pcte := PhysicalCTE{SeedPlan: p.Cte.seedPartPhysicalPlan, RecurPlan: p.Cte.recursivePartPhysicalPlan, CTE: p.Cte, cteAsName: p.CteAsName, cteName: p.CteName}.Init(p.SCtx(), p.StatsInfo())
pcte := PhysicalCTE{SeedPlan: p.Cte.SeedPartPhysicalPlan, RecurPlan: p.Cte.RecursivePartPhysicalPlan, CTE: p.Cte, cteAsName: p.CteAsName, cteName: p.CteName}.Init(p.SCtx(), p.StatsInfo())
pcte.SetSchema(p.Schema())
if prop.IsFlashProp() && prop.CTEProducerStatus == property.AllCTECanMpp {
pcte.readerReceiver = PhysicalExchangeReceiver{IsCTEReader: true}.Init(p.SCtx(), p.StatsInfo())
Expand Down
79 changes: 40 additions & 39 deletions pkg/planner/core/logical_cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,25 @@ func (p LogicalCTE) Init(ctx base.PlanContext, offset int) *LogicalCTE {
type CTEClass struct {
// The union between seed part and recursive part is DISTINCT or DISTINCT ALL.
IsDistinct bool
// seedPartLogicalPlan and recursivePartLogicalPlan are the logical plans for the seed part and recursive part of this CTE.
seedPartLogicalPlan base.LogicalPlan
recursivePartLogicalPlan base.LogicalPlan
// seedPartPhysicalPlan and recursivePartPhysicalPlan are the physical plans for the seed part and recursive part of this CTE.
seedPartPhysicalPlan base.PhysicalPlan
recursivePartPhysicalPlan base.PhysicalPlan
// SeedPartLogicalPlan and RecursivePartLogicalPlan are the logical plans for the seed part and recursive part of this CTE.
SeedPartLogicalPlan base.LogicalPlan
// RecursivePartLogicalPlan is nil if this CTE is not a recursive CTE.
RecursivePartLogicalPlan base.LogicalPlan
// SeedPartPhysicalPlan and RecursivePartPhysicalPlan are the physical plans for the seed part and recursive part of this CTE.
SeedPartPhysicalPlan base.PhysicalPlan
RecursivePartPhysicalPlan base.PhysicalPlan
// storageID for this CTE.
IDForStorage int
// optFlag is the optFlag for the whole CTE.
optFlag uint64
// OptFlag is the OptFlag for the whole CTE.
OptFlag uint64
HasLimit bool
LimitBeg uint64
LimitEnd uint64
IsInApply bool
// pushDownPredicates may be push-downed by different references.
pushDownPredicates []expression.Expression
// PushDownPredicates may be push-downed by different references.
PushDownPredicates []expression.Expression
ColumnMap map[string]*expression.Column
isOuterMostCTE bool
IsOuterMostCTE bool
}

const emptyCTEClassSize = int64(unsafe.Sizeof(CTEClass{}))
Expand All @@ -83,14 +84,14 @@ func (cc *CTEClass) MemoryUsage() (sum int64) {
}

sum = emptyCTEClassSize
if cc.seedPartPhysicalPlan != nil {
sum += cc.seedPartPhysicalPlan.MemoryUsage()
if cc.SeedPartPhysicalPlan != nil {
sum += cc.SeedPartPhysicalPlan.MemoryUsage()
}
if cc.recursivePartPhysicalPlan != nil {
sum += cc.recursivePartPhysicalPlan.MemoryUsage()
if cc.RecursivePartPhysicalPlan != nil {
sum += cc.RecursivePartPhysicalPlan.MemoryUsage()
}

for _, expr := range cc.pushDownPredicates {
for _, expr := range cc.PushDownPredicates {
sum += expr.MemoryUsage()
}
for key, val := range cc.ColumnMap {
Expand All @@ -105,11 +106,11 @@ func (cc *CTEClass) MemoryUsage() (sum int64) {

// PredicatePushDown implements base.LogicalPlan.<1st> interface.
func (p *LogicalCTE) PredicatePushDown(predicates []expression.Expression, _ *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) {
if p.Cte.recursivePartLogicalPlan != nil {
if p.Cte.RecursivePartLogicalPlan != nil {
// Doesn't support recursive CTE yet.
return predicates, p.Self()
}
if !p.Cte.isOuterMostCTE {
if !p.Cte.IsOuterMostCTE {
return predicates, p.Self()
}
pushedPredicates := make([]expression.Expression, len(predicates))
Expand All @@ -126,15 +127,15 @@ func (p *LogicalCTE) PredicatePushDown(predicates []expression.Expression, _ *op
}
}
if len(pushedPredicates) == 0 {
p.Cte.pushDownPredicates = append(p.Cte.pushDownPredicates, expression.NewOne())
p.Cte.PushDownPredicates = append(p.Cte.PushDownPredicates, expression.NewOne())
return predicates, p.Self()
}
newPred := make([]expression.Expression, 0, len(predicates))
for i := range pushedPredicates {
newPred = append(newPred, pushedPredicates[i].Clone())
ruleutil.ResolveExprAndReplace(newPred[i], p.Cte.ColumnMap)
}
p.Cte.pushDownPredicates = append(p.Cte.pushDownPredicates, expression.ComposeCNFCondition(p.SCtx().GetExprCtx(), newPred...))
p.Cte.PushDownPredicates = append(p.Cte.PushDownPredicates, expression.ComposeCNFCondition(p.SCtx().GetExprCtx(), newPred...))
return predicates, p.Self()
}

Expand Down Expand Up @@ -180,43 +181,43 @@ func (p *LogicalCTE) DeriveStats(_ []*property.StatsInfo, selfSchema *expression
}

var err error
if p.Cte.seedPartPhysicalPlan == nil {
if p.Cte.SeedPartPhysicalPlan == nil {
// Build push-downed predicates.
if len(p.Cte.pushDownPredicates) > 0 {
newCond := expression.ComposeDNFCondition(p.SCtx().GetExprCtx(), p.Cte.pushDownPredicates...)
newSel := logicalop.LogicalSelection{Conditions: []expression.Expression{newCond}}.Init(p.SCtx(), p.Cte.seedPartLogicalPlan.QueryBlockOffset())
newSel.SetChildren(p.Cte.seedPartLogicalPlan)
p.Cte.seedPartLogicalPlan = newSel
p.Cte.optFlag |= flagPredicatePushDown
if len(p.Cte.PushDownPredicates) > 0 {
newCond := expression.ComposeDNFCondition(p.SCtx().GetExprCtx(), p.Cte.PushDownPredicates...)
newSel := logicalop.LogicalSelection{Conditions: []expression.Expression{newCond}}.Init(p.SCtx(), p.Cte.SeedPartLogicalPlan.QueryBlockOffset())
newSel.SetChildren(p.Cte.SeedPartLogicalPlan)
p.Cte.SeedPartLogicalPlan = newSel
p.Cte.OptFlag |= flagPredicatePushDown
}
p.Cte.seedPartLogicalPlan, p.Cte.seedPartPhysicalPlan, _, err = doOptimize(context.TODO(), p.SCtx(), p.Cte.optFlag, p.Cte.seedPartLogicalPlan)
p.Cte.SeedPartLogicalPlan, p.Cte.SeedPartPhysicalPlan, _, err = doOptimize(context.TODO(), p.SCtx(), p.Cte.OptFlag, p.Cte.SeedPartLogicalPlan)
if err != nil {
return nil, err
}
}
if p.OnlyUsedAsStorage {
p.SetChildren(p.Cte.seedPartLogicalPlan)
p.SetChildren(p.Cte.SeedPartLogicalPlan)
}
resStat := p.Cte.seedPartPhysicalPlan.StatsInfo()
resStat := p.Cte.SeedPartPhysicalPlan.StatsInfo()
// Changing the pointer so that SeedStat in LogicalCTETable can get the new stat.
*p.SeedStat = *resStat
p.SetStats(&property.StatsInfo{
RowCount: resStat.RowCount,
ColNDVs: make(map[int64]float64, selfSchema.Len()),
})
for i, col := range selfSchema.Columns {
p.StatsInfo().ColNDVs[col.UniqueID] += resStat.ColNDVs[p.Cte.seedPartLogicalPlan.Schema().Columns[i].UniqueID]
p.StatsInfo().ColNDVs[col.UniqueID] += resStat.ColNDVs[p.Cte.SeedPartLogicalPlan.Schema().Columns[i].UniqueID]
}
if p.Cte.recursivePartLogicalPlan != nil {
if p.Cte.recursivePartPhysicalPlan == nil {
p.Cte.recursivePartPhysicalPlan, _, err = DoOptimize(context.TODO(), p.SCtx(), p.Cte.optFlag, p.Cte.recursivePartLogicalPlan)
if p.Cte.RecursivePartLogicalPlan != nil {
if p.Cte.RecursivePartPhysicalPlan == nil {
p.Cte.RecursivePartPhysicalPlan, _, err = DoOptimize(context.TODO(), p.SCtx(), p.Cte.OptFlag, p.Cte.RecursivePartLogicalPlan)
if err != nil {
return nil, err
}
}
recurStat := p.Cte.recursivePartLogicalPlan.StatsInfo()
recurStat := p.Cte.RecursivePartLogicalPlan.StatsInfo()
for i, col := range selfSchema.Columns {
p.StatsInfo().ColNDVs[col.UniqueID] += recurStat.ColNDVs[p.Cte.recursivePartLogicalPlan.Schema().Columns[i].UniqueID]
p.StatsInfo().ColNDVs[col.UniqueID] += recurStat.ColNDVs[p.Cte.RecursivePartLogicalPlan.Schema().Columns[i].UniqueID]
}
if p.Cte.IsDistinct {
p.StatsInfo().RowCount, _ = cardinality.EstimateColsNDVWithMatchedLen(p.Schema().Columns, p.Schema(), p.StatsInfo())
Expand All @@ -238,9 +239,9 @@ func (p *LogicalCTE) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]ba

// ExtractCorrelatedCols implements the base.LogicalPlan.<15th> interface.
func (p *LogicalCTE) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
corCols := coreusage.ExtractCorrelatedCols4LogicalPlan(p.Cte.seedPartLogicalPlan)
if p.Cte.recursivePartLogicalPlan != nil {
corCols = append(corCols, coreusage.ExtractCorrelatedCols4LogicalPlan(p.Cte.recursivePartLogicalPlan)...)
corCols := coreusage.ExtractCorrelatedCols4LogicalPlan(p.Cte.SeedPartLogicalPlan)
if p.Cte.RecursivePartLogicalPlan != nil {
corCols = append(corCols, coreusage.ExtractCorrelatedCols4LogicalPlan(p.Cte.RecursivePartLogicalPlan)...)
}
return corCols
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4208,14 +4208,14 @@ func (b *PlanBuilder) tryBuildCTE(ctx context.Context, tn *ast.TableName, asName
if cte.cteClass == nil {
cte.cteClass = &CTEClass{
IsDistinct: cte.isDistinct,
seedPartLogicalPlan: cte.seedLP,
recursivePartLogicalPlan: cte.recurLP,
SeedPartLogicalPlan: cte.seedLP,
RecursivePartLogicalPlan: cte.recurLP,
IDForStorage: cte.storageID,
optFlag: cte.optFlag,
OptFlag: cte.optFlag,
HasLimit: hasLimit,
LimitBeg: limitBeg,
LimitEnd: limitEnd,
pushDownPredicates: make([]expression.Expression, 0),
PushDownPredicates: make([]expression.Expression, 0),
ColumnMap: make(map[string]*expression.Column),
}
}
Expand Down Expand Up @@ -5121,9 +5121,9 @@ func setIsInApplyForCTE(p base.LogicalPlan, apSchema *expression.Schema) {
if len(coreusage.ExtractCorColumnsBySchema4LogicalPlan(p, apSchema)) > 0 {
x.Cte.IsInApply = true
}
setIsInApplyForCTE(x.Cte.seedPartLogicalPlan, apSchema)
if x.Cte.recursivePartLogicalPlan != nil {
setIsInApplyForCTE(x.Cte.recursivePartLogicalPlan, apSchema)
setIsInApplyForCTE(x.Cte.SeedPartLogicalPlan, apSchema)
if x.Cte.RecursivePartLogicalPlan != nil {
setIsInApplyForCTE(x.Cte.RecursivePartLogicalPlan, apSchema)
}
default:
for _, child := range p.Children() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func GetDBTableInfo(visitInfo []visitInfo) []stmtctx.TableEntry {
return tables
}

// GetOptFlag gets the optFlag of the PlanBuilder.
// GetOptFlag gets the OptFlag of the PlanBuilder.
func (b *PlanBuilder) GetOptFlag() uint64 {
if b.isSampling {
// Disable logical optimization to avoid the optimizer
Expand Down
10 changes: 5 additions & 5 deletions pkg/planner/core/recheck_cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ func findCTEs(
cte := cteReader.Cte
if !isRootTree {
// Set it to false since it's referenced by other CTEs.
cte.isOuterMostCTE = false
cte.IsOuterMostCTE = false
}
if visited.Has(cte.IDForStorage) {
return
}
visited.Insert(cte.IDForStorage)
// Set it when we meet it first time.
cte.isOuterMostCTE = isRootTree
findCTEs(cte.seedPartLogicalPlan, visited, false)
if cte.recursivePartLogicalPlan != nil {
findCTEs(cte.recursivePartLogicalPlan, visited, false)
cte.IsOuterMostCTE = isRootTree
findCTEs(cte.SeedPartLogicalPlan, visited, false)
if cte.RecursivePartLogicalPlan != nil {
findCTEs(cte.RecursivePartLogicalPlan, visited, false)
}
return
}
Expand Down