Skip to content

Commit

Permalink
planner: move logical datasource, table-scan, index-scan, tikv-gather…
Browse files Browse the repository at this point in the history
… to logicalop pkg (#56291)

ref #51664, ref #52714
  • Loading branch information
AilinKid authored Sep 26, 2024
1 parent 5697675 commit d7f9027
Show file tree
Hide file tree
Showing 53 changed files with 862 additions and 763 deletions.
4 changes: 2 additions & 2 deletions pkg/planner/cardinality/selectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func TestSelectivity(t *testing.T) {
require.NoErrorf(t, err, "for building plan, expr %s", err, tt.exprs)

sel := p.(base.LogicalPlan).Children()[0].(*logicalop.LogicalSelection)
ds := sel.Children()[0].(*plannercore.DataSource)
ds := sel.Children()[0].(*logicalop.DataSource)

histColl := statsTbl.GenerateHistCollFromColumnInfo(ds.TableInfo, ds.Schema().Columns)

Expand Down Expand Up @@ -515,7 +515,7 @@ func TestDNFCondSelectivity(t *testing.T) {
require.NoErrorf(t, err, "error %v, for building plan, sql %s", err, tt)

sel := p.(base.LogicalPlan).Children()[0].(*logicalop.LogicalSelection)
ds := sel.Children()[0].(*plannercore.DataSource)
ds := sel.Children()[0].(*logicalop.DataSource)

histColl := statsTbl.GenerateHistCollFromColumnInfo(ds.TableInfo, ds.Schema().Columns)

Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cardinality/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func TestTraceDebugSelectivity(t *testing.T) {
require.NoError(t, err)

sel := p.(base.LogicalPlan).Children()[0].(*logicalop.LogicalSelection)
ds := sel.Children()[0].(*plannercore.DataSource)
ds := sel.Children()[0].(*logicalop.DataSource)

dsSchemaCols = append(dsSchemaCols, ds.Schema().Columns)
selConditions = append(selConditions, sel.Conditions)
Expand Down
10 changes: 5 additions & 5 deletions pkg/planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (*ImplTiKVSingleReadGather) Match(_ *memo.GroupExpr, _ *property.PhysicalPr
// OnImplement implements ImplementationRule OnImplement interface.
func (*ImplTiKVSingleReadGather) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) ([]memo.Implementation, error) {
logicProp := expr.Group.Prop
sg := expr.ExprNode.(*plannercore.TiKVSingleGather)
sg := expr.ExprNode.(*logicalop.TiKVSingleGather)
if sg.IsIndexGather {
reader := plannercore.GetPhysicalIndexReader(sg, logicProp.Schema, logicProp.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt), reqProp)
return []memo.Implementation{impl.NewIndexReaderImpl(reader, sg.Source)}, nil
Expand All @@ -190,14 +190,14 @@ type ImplTableScan struct {

// Match implements ImplementationRule Match interface.
func (*ImplTableScan) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
ts := expr.ExprNode.(*plannercore.LogicalTableScan)
ts := expr.ExprNode.(*logicalop.LogicalTableScan)
return prop.IsSortItemEmpty() || (len(prop.SortItems) == 1 && ts.HandleCols != nil && prop.SortItems[0].Col.EqualColumn(ts.HandleCols.GetCol(0)))
}

// OnImplement implements ImplementationRule OnImplement interface.
func (*ImplTableScan) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) ([]memo.Implementation, error) {
logicProp := expr.Group.Prop
logicalScan := expr.ExprNode.(*plannercore.LogicalTableScan)
logicalScan := expr.ExprNode.(*logicalop.LogicalTableScan)
ts := plannercore.GetPhysicalScan4LogicalTableScan(logicalScan, logicProp.Schema, logicProp.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt))
if !reqProp.IsSortItemEmpty() {
ts.KeepOrder = true
Expand All @@ -213,13 +213,13 @@ type ImplIndexScan struct {

// Match implements ImplementationRule Match interface.
func (*ImplIndexScan) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
is := expr.ExprNode.(*plannercore.LogicalIndexScan)
is := expr.ExprNode.(*logicalop.LogicalIndexScan)
return is.MatchIndexProp(prop)
}

// OnImplement implements ImplementationRule OnImplement interface.
func (*ImplIndexScan) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) ([]memo.Implementation, error) {
logicalScan := expr.ExprNode.(*plannercore.LogicalIndexScan)
logicalScan := expr.ExprNode.(*logicalop.LogicalIndexScan)
is := plannercore.GetPhysicalIndexScan4LogicalIndexScan(logicalScan, expr.Group.Prop.Schema, expr.Group.Prop.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt))
if !reqProp.IsSortItemEmpty() {
is.KeepOrder = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/optimize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestPreparePossibleProperties(t *testing.T) {
require.NoError(t, err)

// collect the target columns: f, a
ds, ok := logic.Children()[0].Children()[0].(*plannercore.DataSource)
ds, ok := logic.Children()[0].Children()[0].(*logicalop.DataSource)
require.True(t, ok)

var columnF, columnA *expression.Column
Expand Down
18 changes: 9 additions & 9 deletions pkg/planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,15 @@ func NewRulePushSelDownTableScan() Transformation {
// the key ranges of the `ts` operator.
func (*PushSelDownTableScan) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
sel := old.GetExpr().ExprNode.(*logicalop.LogicalSelection)
ts := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalTableScan)
ts := old.Children[0].GetExpr().ExprNode.(*logicalop.LogicalTableScan)
if ts.HandleCols == nil {
return nil, false, false, nil
}
accesses, remained := ranger.DetachCondsForColumn(ts.SCtx().GetRangerCtx(), sel.Conditions, ts.HandleCols.GetCol(0))
if accesses == nil {
return nil, false, false, nil
}
newTblScan := plannercore.LogicalTableScan{
newTblScan := logicalop.LogicalTableScan{
Source: ts.Source,
HandleCols: ts.HandleCols,
AccessConds: ts.AccessConds.Shallow(),
Expand Down Expand Up @@ -251,7 +251,7 @@ func NewRulePushSelDownIndexScan() Transformation {
// or just keep the two GroupExprs unchanged.
func (*PushSelDownIndexScan) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
sel := old.GetExpr().ExprNode.(*logicalop.LogicalSelection)
is := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalIndexScan)
is := old.Children[0].GetExpr().ExprNode.(*logicalop.LogicalIndexScan)
if len(is.IdxCols) == 0 {
return nil, false, false, nil
}
Expand Down Expand Up @@ -283,7 +283,7 @@ func (*PushSelDownIndexScan) OnTransform(old *memo.ExprIter) (newExprs []*memo.G
}
}
// TODO: `res` still has some unused fields: EqOrInCount, IsDNFCond.
newIs := plannercore.LogicalIndexScan{
newIs := logicalop.LogicalIndexScan{
Source: is.Source,
IsDoubleRead: is.IsDoubleRead,
EqCondCount: res.EqCondCount,
Expand Down Expand Up @@ -332,7 +332,7 @@ func NewRulePushSelDownTiKVSingleGather() Transformation {
// 2. `remainedSel -> newTg -> pushedSel -> any`
func (*PushSelDownTiKVSingleGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
sel := old.GetExpr().ExprNode.(*logicalop.LogicalSelection)
sg := old.Children[0].GetExpr().ExprNode.(*plannercore.TiKVSingleGather)
sg := old.Children[0].GetExpr().ExprNode.(*logicalop.TiKVSingleGather)
childGroup := old.Children[0].Children[0].Group
var pushed, remained []expression.Expression
sctx := sg.SCtx()
Expand Down Expand Up @@ -378,7 +378,7 @@ func NewRuleEnumeratePaths() Transformation {

// OnTransform implements Transformation interface.
func (*EnumeratePaths) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
ds := old.GetExpr().ExprNode.(*plannercore.DataSource)
ds := old.GetExpr().ExprNode.(*logicalop.DataSource)
gathers := ds.Convert2Gathers()
for _, gather := range gathers {
expr := memo.Convert2GroupExpr(gather)
Expand Down Expand Up @@ -437,7 +437,7 @@ func (r *PushAggDownGather) Match(expr *memo.ExprIter) bool {
func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
agg := old.GetExpr().ExprNode.(*logicalop.LogicalAggregation)
aggSchema := old.GetExpr().Group.Prop.Schema
gather := old.Children[0].GetExpr().ExprNode.(*plannercore.TiKVSingleGather)
gather := old.Children[0].GetExpr().ExprNode.(*logicalop.TiKVSingleGather)
childGroup := old.Children[0].GetExpr().Children[0]
// The old Aggregation should stay unchanged for other transformation.
// So we build a new LogicalAggregation for the partialAgg.
Expand Down Expand Up @@ -1416,7 +1416,7 @@ func (r *PushTopNDownTiKVSingleGather) Match(expr *memo.ExprIter) bool {
func (r *PushTopNDownTiKVSingleGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
topN := old.GetExpr().ExprNode.(*logicalop.LogicalTopN)
topNSchema := old.Children[0].Group.Prop.Schema
gather := old.Children[0].GetExpr().ExprNode.(*plannercore.TiKVSingleGather)
gather := old.Children[0].GetExpr().ExprNode.(*logicalop.TiKVSingleGather)
childGroup := old.Children[0].GetExpr().Children[0]

particalTopN := logicalop.LogicalTopN{
Expand Down Expand Up @@ -1853,7 +1853,7 @@ func (r *PushLimitDownTiKVSingleGather) Match(expr *memo.ExprIter) bool {
func (r *PushLimitDownTiKVSingleGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
limit := old.GetExpr().ExprNode.(*logicalop.LogicalLimit)
limitSchema := old.Children[0].Group.Prop.Schema
gather := old.Children[0].GetExpr().ExprNode.(*plannercore.TiKVSingleGather)
gather := old.Children[0].GetExpr().ExprNode.(*logicalop.TiKVSingleGather)
childGroup := old.Children[0].GetExpr().Children[0]

particalLimit := logicalop.LogicalLimit{
Expand Down
6 changes: 1 addition & 5 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ go_library(
"indexmerge_path.go",
"indexmerge_unfinished_path.go",
"initialize.go",
"logical_datasource.go",
"logical_index_scan.go",
"logical_initialize.go",
"logical_plan_builder.go",
"logical_plans.go",
"logical_table_scan.go",
"logical_tikv_single_gather.go",
"memtable_infoschema_extractor.go",
"memtable_predicate_extractor.go",
"mock.go",
Expand Down Expand Up @@ -137,13 +133,13 @@ go_library(
"//pkg/planner/core/resolve",
"//pkg/planner/core/rule",
"//pkg/planner/core/rule/util",
"//pkg/planner/funcdep",
"//pkg/planner/planctx",
"//pkg/planner/property",
"//pkg/planner/util",
"//pkg/planner/util/coreusage",
"//pkg/planner/util/costusage",
"//pkg/planner/util/debugtrace",
"//pkg/planner/util/domainmisc",
"//pkg/planner/util/fixcontrol",
"//pkg/planner/util/optimizetrace",
"//pkg/planner/util/optimizetrace/logicaltrace",
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestGroupNDVs(t *testing.T) {
for i := 1; i < len(v.Children()); i++ {
stack = append(stack, v.Children()[i])
}
case *core.DataSource:
case *logicalop.DataSource:
if len(stack) == 0 {
traversed = true
} else {
Expand Down
16 changes: 8 additions & 8 deletions pkg/planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *columnStatsUsageCollector) updateColMapFromExpressions(col *expression.
c.updateColMap(col, expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list))
}

func (c *columnStatsUsageCollector) collectPredicateColumnsForDataSource(ds *DataSource) {
func (c *columnStatsUsageCollector) collectPredicateColumnsForDataSource(ds *logicalop.DataSource) {
// Skip all system tables.
if filter.IsSystemSchema(ds.DBName.L) {
return
Expand Down Expand Up @@ -178,7 +178,7 @@ func (c *columnStatsUsageCollector) collectPredicateColumnsForUnionAll(p *logica
}
}

func (c *columnStatsUsageCollector) addHistNeededColumns(ds *DataSource) {
func (c *columnStatsUsageCollector) addHistNeededColumns(ds *logicalop.DataSource) {
c.visitedPhysTblIDs.Insert(int(ds.PhysicalTableID))
if c.collectMode&collectHistNeededColumns == 0 {
return
Expand Down Expand Up @@ -232,12 +232,12 @@ func (c *columnStatsUsageCollector) collectFromPlan(lp base.LogicalPlan) {
}
if c.collectMode&collectPredicateColumns != 0 {
switch x := lp.(type) {
case *DataSource:
case *logicalop.DataSource:
c.collectPredicateColumnsForDataSource(x)
case *LogicalIndexScan:
case *logicalop.LogicalIndexScan:
c.collectPredicateColumnsForDataSource(x.Source)
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalTableScan:
case *logicalop.LogicalTableScan:
c.collectPredicateColumnsForDataSource(x.Source)
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *logicalop.LogicalProjection:
Expand Down Expand Up @@ -336,11 +336,11 @@ func (c *columnStatsUsageCollector) collectFromPlan(lp base.LogicalPlan) {
// Since c.visitedPhysTblIDs is also collected here and needs to be collected even collectHistNeededColumns is not set,
// so we do the c.collectMode check in addHistNeededColumns() after collecting c.visitedPhysTblIDs.
switch x := lp.(type) {
case *DataSource:
case *logicalop.DataSource:
c.addHistNeededColumns(x)
case *LogicalIndexScan:
case *logicalop.LogicalIndexScan:
c.addHistNeededColumns(x.Source)
case *LogicalTableScan:
case *logicalop.LogicalTableScan:
c.addHistNeededColumns(x.Source)
}
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/planner/core/core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func init() {
utilfuncp.FindBestTask4LogicalCTETable = findBestTask4LogicalCTETable
utilfuncp.FindBestTask4LogicalMemTable = findBestTask4LogicalMemTable
utilfuncp.FindBestTask4LogicalTableDual = findBestTask4LogicalTableDual
utilfuncp.FindBestTask4LogicalDataSource = findBestTask4LogicalDataSource
utilfuncp.FindBestTask4LogicalShowDDLJobs = findBestTask4LogicalShowDDLJobs
utilfuncp.ExhaustPhysicalPlans4LogicalCTE = exhaustPhysicalPlans4LogicalCTE
utilfuncp.ExhaustPhysicalPlans4LogicalSort = exhaustPhysicalPlans4LogicalSort
Expand All @@ -62,10 +63,16 @@ func init() {
utilfuncp.GetEstimatedProbeCntFromProbeParents = getEstimatedProbeCntFromProbeParents
utilfuncp.AppendCandidate4PhysicalOptimizeOp = appendCandidate4PhysicalOptimizeOp

utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan
utilfuncp.AttachPlan2Task = attachPlan2Task
utilfuncp.WindowIsTopN = windowIsTopN
utilfuncp.DoOptimize = doOptimize
utilfuncp.IsSingleScan = isSingleScan
utilfuncp.WindowIsTopN = windowIsTopN
utilfuncp.AttachPlan2Task = attachPlan2Task
utilfuncp.AddPrefix4ShardIndexes = addPrefix4ShardIndexes
utilfuncp.DeriveStats4DataSource = deriveStats4DataSource
utilfuncp.ApplyPredicateSimplification = applyPredicateSimplification
utilfuncp.DeriveStats4LogicalIndexScan = deriveStats4LogicalIndexScan
utilfuncp.DeriveStats4LogicalTableScan = deriveStats4LogicalTableScan
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan

// For mv index init.
cardinality.GetTblInfoForUsedStatsByPhysicalID = getTblInfoForUsedStatsByPhysicalID
Expand Down
12 changes: 6 additions & 6 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func getIndexJoinByOuterIdx(p *logicalop.LogicalJoin, prop *property.PhysicalPro
// hasDitryWrite: whether the inner child contains dirty data.
// zippedChildren: [Projection, Aggregation, Selection]
type indexJoinInnerChildWrapper struct {
ds *DataSource
ds *logicalop.DataSource
hasDitryWrite bool
zippedChildren []base.LogicalPlan
}
Expand All @@ -762,7 +762,7 @@ func extractIndexJoinInnerChildPattern(p *logicalop.LogicalJoin, innerChild base
childLoop:
for curChild := innerChild; curChild != nil; curChild = nextChild(curChild) {
switch child := curChild.(type) {
case *DataSource:
case *logicalop.DataSource:
wrapper.ds = child
break childLoop
case *logicalop.LogicalProjection, *logicalop.LogicalSelection, *logicalop.LogicalAggregation:
Expand Down Expand Up @@ -826,7 +826,7 @@ func buildIndexJoinInner2TableScan(
ranges = indexJoinResult.chosenRanges
} else {
pkMatched := false
pkCol := ds.getPKIsHandleCol()
pkCol := ds.GetPKIsHandleCol()
if pkCol == nil {
return nil
}
Expand Down Expand Up @@ -1181,7 +1181,7 @@ func constructInnerIndexScanTask(
isPartition: ds.PartitionDefIdx != nil,
physicalTableID: ds.PhysicalTableID,
tblColHists: ds.TblColHists,
pkIsHandleCol: ds.getPKIsHandleCol(),
pkIsHandleCol: ds.GetPKIsHandleCol(),
}.Init(ds.SCtx(), ds.QueryBlockOffset())
cop := &CopTask{
indexPlan: is,
Expand Down Expand Up @@ -1338,7 +1338,7 @@ func constructInnerIndexScanTask(
// There are two kinds of agg: stream agg and hash agg. Stream agg depends on some conditions, such as the group by cols
//
// Step2: build other inner plan node to task
func constructIndexJoinInnerSideTaskWithAggCheck(p *logicalop.LogicalJoin, prop *property.PhysicalProperty, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task {
func constructIndexJoinInnerSideTaskWithAggCheck(p *logicalop.LogicalJoin, prop *property.PhysicalProperty, dsCopTask *CopTask, ds *logicalop.DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task {
var la *logicalop.LogicalAggregation
var canPushAggToCop bool
if len(wrapper.zippedChildren) > 0 {
Expand Down Expand Up @@ -2431,7 +2431,7 @@ func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bo
ret := true
for _, ch := range p.Children() {
switch c := ch.(type) {
case *DataSource:
case *logicalop.DataSource:
validDs := false
indexMergeIsIntersection := false
for _, path := range c.PossibleAccessPaths {
Expand Down
4 changes: 2 additions & 2 deletions pkg/planner/core/exhaust_physical_plans_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func rewriteSimpleExpr(ctx expression.BuildContext, str string, schema *expressi
}

type indexJoinContext struct {
dataSourceNode *DataSource
dataSourceNode *logicalop.DataSource
dsNames types.NameSlice
path *util.AccessPath
joinNode *logicalop.LogicalJoin
Expand All @@ -66,7 +66,7 @@ func prepareForAnalyzeLookUpFilters() *indexJoinContext {
}()
ctx.GetSessionVars().PlanID.Store(-1)
joinNode := logicalop.LogicalJoin{}.Init(ctx.GetPlanCtx(), 0)
dataSourceNode := DataSource{}.Init(ctx.GetPlanCtx(), 0)
dataSourceNode := logicalop.DataSource{}.Init(ctx.GetPlanCtx(), 0)
dsSchema := expression.NewSchema()
var dsNames types.NameSlice
dsSchema.Append(&expression.Column{
Expand Down
Loading

0 comments on commit d7f9027

Please sign in to comment.